[rabbitmq-discuss] Non-blocking Consumers approach

Mahesh Viraktamath yuva670 at gmail.com
Mon Aug 27 19:30:58 BST 2012


Hi,

As per your suggestion, I created 5 channels as follows:

for (int i = 0; i < 5; i++) {
            final Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);

            log.info(" [x] Awaiting RPC requests");

            boolean autoAck = false;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, "watermark" + i,
                    new DefaultConsumer(channel) {

                        @Override
                        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
                                throws IOException {
                            String routingKey = envelope.getRoutingKey();
                            String contentType =
properties.getContentType();
                            long deliveryTag = envelope.getDeliveryTag();
                            //process the message here
                            channel.basicPublish("",
properties.getReplyTo(),
                                    new
BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(),
                                    response.getBytes("UTF-8"));
                            channel.basicAck(deliveryTag, false);
                        }
                    });

            }

But, the consumer doesn't seem to listen to the queue, as soon as I run
this application, it exists. Earlier I used the while(true) loop to get the
messages. How do I make sure if the consumer listens to the intended queue
and process messages?


-Mahesh

On Mon, Aug 27, 2012 at 6:54 PM, Matthias Radestock
<matthias at rabbitmq.com>wrote:

> Mahesh,
>
>
> On 27/08/12 13:37, Mahesh Viraktamath wrote:
>
>> I read the API guide and still stuck to the QueuingConsumer, for each
>> incoming message I create a new service thread (with common channel,
>> without that the service won't know which message to acknowledge). But
>> in the guide, it is mentioned that the channel should be created for
>> each thread. Now, my question is does this guide assumes that we run
>> different consumer threads? *I can't understand the concept of different
>> consumer threads*. I run a single consumer and use basicConsume() to
>>
>> listen to the messages and each message (nextDelivery()) will give rise
>> to a new service thread. Again, each message creating a thread is scary !
>>
>> If it helps, I am running the consumer as a java application and use
>> basicConsume() and nextDelivery() in a while loop to process the
>> messages. Our application sends messages to these non-stop. So, I am
>> looking for a consumer which will not block the incoming messages when
>> it is processing a large message.
>>
>
> I suggest you create N channels and for each channel:
> - set the basic.qos prefetch to 1 or some other low-ish value (for fair
> dispatch)
> - create a consumer as shown in http://www.rabbitmq.com/api-**
> guide.html#consuming <http://www.rabbitmq.com/api-guide.html#consuming>
>
> You write above that you only have one channel because "without that the
> service won't know which message to acknowledge", but the channel is
> accessible via the context (as shown in the example), so there is no
> ambiguity.
>
> Obviously message ordering goes out of the window, but that's the case for
> any scheme where messages are handled by multiple threads.
>
> Regards,
>
> Matthias.
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120828/bed9ad9f/attachment.htm>


More information about the rabbitmq-discuss mailing list