[rabbitmq-discuss] Rabbitmq - retrieving messages concurrently

Steve Powell steve at rabbitmq.com
Fri Mar 30 14:09:32 BST 2012


The critical point to realise is that to get multiple consumers running
concurrently in the same Java Client, you need to use multiple channels.

Your code so far has 20 threads (potentially) able to process concurrent
consumers; if you want 20 consumers to be concurrently receiving
messages from the same queue then create 20 channels on that connection
and put one consumer on each channel.

The consumers need not be instantiated each time (unless they have
private state), and the consumer calls (handleDelivery() for example)
get passed the consumerTag used when attaching the consumer (with
basicConsume()) as a parameter, so the code can, in principle service
many of the consumer callbacks. Be aware that if you set it up sharing
the Consumer objects in this way, the code you write must be

As for assigning a number to each message this is going to be a little
tricky. However, using a shared AtomicInteger will permit you to
generate an increment a number in the handleDelivery callbacks, without
the possibility of accidentally using the same number. What you do with
that number is then up to you.

You have correctly created an ExecutorService to increase the number of
threads from the default (5). In this case you should ensure that the
ExecutorSercvice is shutdown after the connection is closed, or else you
might find that there are daemon threads left in the JVM that might
prevent an orderly shutdown.

I hope this helps you. The details are, of course, very application

Steve Powell
steve at rabbitmq.com
[wrk: +44-2380-111-528] [mob: +44-7815-838-558]

On 30 Mar 2012, at 12:51, Santhoshsd wrote:

> I want to retrieve 5000 messages/second from a single RabbitMQ server
> and for each message assign a number and then push it into redis.
> I want to use a single queue multiple consumer approach in the
> rabbitmq client. Please let me know the way the proceed after this
> point in Java.
> ConnectionFactory factory = new ConnectionFactory();
>        factory.setUsername(Constants.USERNAME);
>        factory.setPassword(Constants.PASSWORD);
>        factory.setVirtualHost(Constants.VIRTUALHOST);
>        factory.setHost(Constants.HOSTNAME);
>        factory.setPort(Constants.PORTNUMBER);
>        ExecutorService es = Executors.newFixedThreadPool(20);
>        Connection conn = factory.newConnection(es);
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

More information about the rabbitmq-discuss mailing list