[rabbitmq-discuss] [java-client] Parallelizing message consumption from a single queue

Steve Powell steve at rabbitmq.com
Tue Mar 27 17:28:00 BST 2012


Hi Josh,

> This is the approach I've taken so far. I have a single Connection,
> Channel and Consumer and a single thread that loop on
> consumer.nextDelivery(). As soon as consumer.nextDelivery() returns, the
> message is acknowledged and handed off to a separate thread pool for
> processing, allowing the thread to move on to the next delivery.

Since you mention nextDelivery() this means you are using our
QueueingConsumer implementation. If all your single thread is doing is
to hand the message to another thread(pool) for processing, then you
don't need QueueingConsumer. Instead, define a DefaultConsumer extension
and acknowledge and hand-over the message in the handleDelivery()
method.

Here is what it might look like:

  Channel channel ...
  Consumer cons = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag,
                                 Envelope envelope,
                                 AMQP.BasicProperties properties,
                                 byte[] body)
          throws IOException
      {
              // acknowledge the message

              // pass it to your thread process
      }
  }

and of course you need to attach it to your queue...

  basicConsume(queueName, false, cons);

This will eliminate the internal queue set up by QueueingConsumer and
allow you greater control over error conditions (just add more callback
methods to your Consumer). The callback methods are called serially, and
you can issue channel method calls in the handleDelivery() body without
deadlocking.

You may want to use autoAck=true if you are *not* deferring the
acknowledgement until after processing. Only do this if you really do
want to acknowledge everything and have no limit on the messages handed
off to the thread pool.

Be aware that the basicQos() prefetch-count setting on the channel
(together with autoAck=false) will permit the delivery of multiple
messages before you have to acknowledge any of them, so it is possible
to defer the acknowledgement until the end of processing, if the
prefetch-count is larger than one, and still permit the concurrent
processing of up-to prefetch-count messages.

Please let us know how you get on.

Steve Powell  (a happy bunny)
----------some more definitions from the SPD----------
chinchilla (n.) Cooling device for the lower jaw.
socialcast (n.) Someone to whom everyone is speaking but nobody likes.
literacy (n.) A textually transmitted disease usually contracted in childhood.

On 27 Mar 2012, at 00:49, Josh Stone wrote:
> ...many lines


More information about the rabbitmq-discuss mailing list