[rabbitmq-discuss] Re-subscribing to a queue in a cluster

Adam Brightwell abrightwell at LCE.com
Tue Oct 9 17:07:06 BST 2012


Hey Emile,

Thanks for the response.

So, I think I found my issue.  Or at least it resolved my issue.

It would appear that in order to "re-subscribe" I had to handle this
action in a separate thread.  If I use the following "test" code in my
consumer's "handleCancel()" method it would not work:

public void handleCancel(String consumerTag) {
    Channel channel = super.getChannel();
    TestConsumer consumer = new TestConsumer();
    channel.basicConsume(this.queue, consumer);
}

However, if I put the "basicConsume" off on its own thread, like the
following, I have success:

public void handleCancel(String consumerTag) {
    Channel channel = super.getChannel();
    TestConsumer consumer = new TestConsumer();

    Thread t = new Thread() {
        public void run() {
            try {
                channel.basicConsume(queue, consumer);
            } catch (IOException exception) {
                exception.printStackTrace();
            }
        }
    };
}

Digging further into the threading/concurrency model of RabbitMQ, I found
that I obviously have to be careful about concurrently executing
operations on a single channel.  And I also found that the connection
operations are apparently thread-safe except the ones that involve RPC
calls.  I understand that the "basicConsume" just so happens to be an RPC
based command.

So, with the above in mind, what I can only assume is happening is that
the RPC call involved with the basicConsume is blocking other Channel
based operations.  This is why I suspect that the "basicAck" was unable to
execute successfully and that the "handleDelivery" was never being reached.

I hope this makes sense, but to sum it up it seems that I simply had a
threading problem.  I'm in the process of updating our consumer and
application to handle this properly, but I was curious if there are any
established patterns or guidelines the community might suggest?  For
instance, having a thread pool dedicated for such operations, utilizing
java's ExecutorService or is there something already built in to the
client api that will provide me a way to run this code if I wrapped it in
a Runnable, Callable, etc.?

Also, as a side note, has any consideration been given to using java's
concurrency model more widely in the java client api?  For instance,
synchronization of methods or code blocks to possibly help avoid such
situations?

As for my version, I am currently locked in to 2.8.1.  Unfortunately, I
cannot upgrade yet.  However, I have validated this same behavior in 2.8.7.

Thanks again for all your help, it is greatly appreciated.

Thanks,
Adam



On 10/8/12 7:23 AM, "Emile Joubert" <emile at rabbitmq.com> wrote:

>
>Hi Adam,
>
>On 05/10/12 00:13, Adam Brightwell wrote:
>> 1. Create new consumer and call basicConsume on the original channel.
>> 2. Close original channel, recreate channel and call basicConsume with
>>the
>> new channel.
>> 3. Close the original channel, close the original Connection.
>>Reconnect,
>> recreate channel, call basicConsume with the new channel.
>
>All three options should work, but it is not necessary to re-establish a
>new consumer, channel or connection unless directly connected to the
>failing node and therefore all options do more work than necessary. The
>only required action is to resubscribe the existing consumer (and that
>consumer must be prepared to receive duplicate messages).
>
>> OR they will be consumed and unacknowledged. In the latter case, they
>> are not being processed by my consumer or at least not making it to
>> the "handleDelivery()" method.
>
>So messages are consumed without acknowledgement, but don't make it to
>the handleDelivery method? Can you explain what you mean by that?
>
>Also, what version of RabbitMQ are you using? You are advised to use the
>latest version.
>
>
>-Emile
>
>



More information about the rabbitmq-discuss mailing list