[rabbitmq-discuss] Handling Consumer Cancel Notifications in Java Client

Chris stuff at moesel.net
Fri Aug 9 14:19:04 BST 2013


I'm not sure if I can send attachments in this mailing list, so I'll inline
the data in the message.  Please let me know if there are any issues or
questions.  I am using RabbitMQ Java client 3.0.1 w/ RabbitMQ server 3.1.1.

*Thread Dump*

I did a thread dump on a simple app and here is what seems to be the
relevant portion when I try to reconsume (full thread dump at end of
message):

"AMQP Connection 10.106.76.31:5672" prio=5 tid=0x00007fde3383e000
nid=0x6203 in Object.wait() [0x00000001104bc000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007d5bfb028> (a
com.rabbitmq.utility.BlockingValueOrException)
at java.lang.Object.wait(Object.java:503)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:50)
- locked <0x00000007d5bfb028> (a
com.rabbitmq.utility.BlockingValueOrException)
at
com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:89)
- locked <0x00000007d5bfb028> (a
com.rabbitmq.utility.BlockingValueOrException)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:972)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:941)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:933)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:926)
at
com.avid.acs.example.rabbitmq.SimpleConsumerExample$SimpleConsumer.handleCancel(SimpleConsumerExample.java:50)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:395)
at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)


*Simple App to Reproduce*

I *can* reproduce this in a simple Java app, which I will copy below.  Here
is how I reproduce it:

   - You need a running active/active cluster with a policy to mirror the
   queue you will test with
   - Create the queue on node1 so that is where its master resides (with
   slave on node2)
   - Run the SimpleConsumerExample so it connects to node2
   - Restart RabbitMQ on node1 so that the master dies and the slave on
   node2 is promoted.

At this point, we should see handleCancel invoked (it is) and then
reconsume from the channel (this is where it hangs).

*The Java Example Code*
*
*

package example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class SimpleConsumerExample {
    private static final String DEFAULT_URI = "amqp://localhost";
    private static final String DEFAULT_QUEUE = "test_queue";

    public static void main(String[] args) throws Exception {
        String uri = args.length > 0 ? args[0] : DEFAULT_URI;
        String queueName = args.length > 1 ? args[1] : DEFAULT_QUEUE;
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(uri);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        String queue = channel.queueDeclare(queueName, true, false, false,
null).getQueue();
        String cTag = channel.basicConsume(queue, true, new
SimpleConsumer(channel, queue));
        System.out.println("basicConsume: " + cTag);
    }

    public static class SimpleConsumer extends DefaultConsumer {
        private String queue;

        public SimpleConsumer(Channel channel, String queue) {
            super(channel);
            this.queue = queue;
        }

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("handleDelivery: " + consumerTag);
        }

        @Override
        public void handleCancel(String consumerTag) throws IOException {
            System.out.println("handleCancel: " + consumerTag);
            super.handleCancel(consumerTag);

            String cTag = getChannel().basicConsume(queue, this);
            System.out.println("basic(Re)Consume: " + cTag); // <-- Never
gets here
        }
    }
}


*The Thread Dump*
*
*

Full thread dump OpenJDK 64-Bit Server VM (24.0-b49 mixed mode):

"DestroyJavaVM" prio=5 tid=0x00007fde35802000 nid=0x1d03 waiting on
condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"pool-1-thread-1" prio=5 tid=0x00007fde338a7800 nid=0x6603 waiting on
condition [0x000000011072d000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000007d57e8fe8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

"pool-2-thread-1" prio=5 tid=0x00007fde3504d800 nid=0x6403 waiting on
condition [0x000000011062a000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000007d5c4f300> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

"AMQP Connection 10.106.76.31:5672" prio=5 tid=0x00007fde3383e000
nid=0x6203 in Object.wait() [0x00000001104bc000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007d5bfb028> (a
com.rabbitmq.utility.BlockingValueOrException)
at java.lang.Object.wait(Object.java:503)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:50)
- locked <0x00000007d5bfb028> (a
com.rabbitmq.utility.BlockingValueOrException)
at
com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:89)
- locked <0x00000007d5bfb028> (a
com.rabbitmq.utility.BlockingValueOrException)
at
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:972)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:941)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:933)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:926)
at
com.avid.acs.example.rabbitmq.SimpleConsumerExample$SimpleConsumer.handleCancel(SimpleConsumerExample.java:50)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:395)
at
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

"Monitor Ctrl-Break" daemon prio=5 tid=0x00007fde34009000 nid=0x6003
runnable [0x000000011035c000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:150)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
- locked <0x00000007d5657698> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:154)
at java.io.BufferedReader.readLine(BufferedReader.java:317)
- locked <0x00000007d5657698> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:382)
at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:85)
at java.lang.Thread.run(Thread.java:724)

"Service Thread" daemon prio=5 tid=0x00007fde3308f000 nid=0x5c03 runnable
[0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" daemon prio=5 tid=0x00007fde3308e800 nid=0x5a03
waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" daemon prio=5 tid=0x00007fde3308d000 nid=0x5803
waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" daemon prio=5 tid=0x00007fde33035000 nid=0x5603 waiting
on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" daemon prio=5 tid=0x00007fde33818800 nid=0x4603 in
Object.wait() [0x000000010f9f2000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007d5505448> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
- locked <0x00000007d5505448> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:189)

"Reference Handler" daemon prio=5 tid=0x00007fde33817800 nid=0x4403 in
Object.wait() [0x000000010f8ef000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007d5504fd0> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:503)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
- locked <0x00000007d5504fd0> (a java.lang.ref.Reference$Lock)

"VM Thread" prio=5 tid=0x00007fde33815000 nid=0x4203 runnable

"GC task thread#0 (ParallelGC)" prio=5 tid=0x00007fde3380e800 nid=0x3203
runnable

"GC task thread#1 (ParallelGC)" prio=5 tid=0x00007fde3303a800 nid=0x3403
runnable

"GC task thread#2 (ParallelGC)" prio=5 tid=0x00007fde3303b000 nid=0x3603
runnable

"GC task thread#3 (ParallelGC)" prio=5 tid=0x00007fde3303b800 nid=0x3803
runnable

"GC task thread#4 (ParallelGC)" prio=5 tid=0x00007fde3303c000 nid=0x3a03
runnable

"GC task thread#5 (ParallelGC)" prio=5 tid=0x00007fde3303d000 nid=0x3c03
runnable

"GC task thread#6 (ParallelGC)" prio=5 tid=0x00007fde3303d800 nid=0x3e03
runnable

"GC task thread#7 (ParallelGC)" prio=5 tid=0x00007fde3303e000 nid=0x4003
runnable

"VM Periodic Task Thread" prio=5 tid=0x00007fde33086000 nid=0x5e03 waiting
on condition

JNI global references: 161

Heap
 PSYoungGen      total 38912K, used 10145K [0x00000007d5500000,
0x00000007d8000000, 0x0000000800000000)
  eden space 33792K, 30% used
[0x00000007d5500000,0x00000007d5ee8558,0x00000007d7600000)
  from space 5120K, 0% used
[0x00000007d7b00000,0x00000007d7b00000,0x00000007d8000000)
  to   space 5120K, 0% used
[0x00000007d7600000,0x00000007d7600000,0x00000007d7b00000)
 ParOldGen       total 87040K, used 0K [0x0000000780000000,
0x0000000785500000, 0x00000007d5500000)
  object space 87040K, 0% used
[0x0000000780000000,0x0000000780000000,0x0000000785500000)
 PSPermGen       total 21504K, used 4385K [0x000000077ae00000,
0x000000077c300000, 0x0000000780000000)
  object space 21504K, 20% used
[0x000000077ae00000,0x000000077b248528,0x000000077c300000)


*Conclusion*
*
*
Thanks for any help you can give!

-Chris

*
*


On Fri, Aug 9, 2013 at 3:19 AM, Michael Klishin <mklishin at gopivotal.com>wrote:

> Chris:
>
> > Whenever I try to do anything with the channel in my "handleCancel"
> method implementation, it blocks forever.  I'm guessing there is a lock on
> the channel while I'm in that method, and that lock that prevents me from
> redeclaring queues or consuming from them?
>
> Chris,
>
> There should be no lock that prevents operations from handleCancel.
>
> Can you use VisualVM or jstack to take a thread dump and see what method
> hangs?
> Do you see any alarms mentioned in RabbitMQ log?
>
> Is there a way you can reproduce this issue with a code sample you can
> post?
> --
> MK
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130809/5b8f8e3a/attachment.htm>


More information about the rabbitmq-discuss mailing list