[rabbitmq-discuss] Limit QueueingConsumer Memory Usage
Lars George
lars at worldlingo.com
Tue Jul 14 16:15:34 BST 2009
Hi Ben,
The code is really straight forward. Here is the Listener:
public static void main(String[] args) {
try {
ConnectionParameters params = new ConnectionParameters();
params.setUsername("guest");
params.setPassword("guest");
params.setVirtualHost("/");
params.setRequestedHeartbeat(0);
ConnectionFactory factory = new ConnectionFactory(params);
Connection conn = factory.newConnection("queue1.internal");
Channel channel = conn.createChannel();
channel.basicQos(1);
int wait = -1;
if (args.length > 0) wait = Integer.parseInt(args[0]);
boolean noAck = false;
boolean exit = false;
TestConsumer consumer = new TestConsumer(channel);
channel.basicConsume("test", noAck, consumer);
while (!exit) {
TestConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
} catch (InterruptedException ie) {
continue;
}
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println("msg -> " + msg);
// (process the message components ...)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
exit = msg != null && msg.startsWith("STOP");
if (wait > -1) {
System.out.println("waiting secs -> " + wait);
Thread.sleep(wait * 1000);
}
}
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
And the Publisher looks like this:
public static void main(String[] args) {
try {
ConnectionParameters params = new ConnectionParameters();
params.setUsername("guest");
params.setPassword("guest");
params.setVirtualHost("/");
params.setRequestedHeartbeat(0);
ConnectionFactory factory = new ConnectionFactory(params);
Connection conn = factory.newConnection("queue1.internal");
Channel channel = conn.createChannel();
channel.exchangeDeclare("test-rabbitmq", "fanout");
channel.queueDeclare("test");
channel.queueBind("test", "test-rabbitmq", "test");
int num = 1;
if (args.length > 1) num = Integer.parseInt(args[1]);
for (int n = 0; n < num; n++) {
String msg = args[0] + "-" + (n+1) + "/" + num;
byte[] body = msg.getBytes();
channel.basicPublish("test-rabbitmq", "test", null, body);
System.out.println("Sending -> " + msg);
}
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
So the way I test this is run three Listeners, one with an additional
"5" as the parameter to make is sleep 5 seconds between takes.
java -cp
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\bin;\workspace\test-rabbitmq\lib\commons-io-1.4.jar
Listener
java -cp
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\bin;\workspace\test-rabbitmq\lib\commons-io-1.4.jar
Listener
java -cp
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\bin;\workspace\test-rabbitmq\lib\commons-io-1.4.jar
Listener 5
The I start the Publisher and push 10000 messages
java -cp
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\lib\commons-io-1.4.jar;\workspace\test-rabbitmq\bin
Publisher test 10000
The idea is that the one Listener that waits 5 seconds should only ever
receive 1, or 30 or whatever prefetch size is, messages. But I have
instrumented the QueueingConsumer with logging and named it TestConsumer
just adding this
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
_queue.add(ValueOrException.<Delivery, ShutdownSignalException>
makeValue(new Delivery(envelope, properties, body)));
System.out.println("new queue size -> " + _queue.size());
}
to print out the size of the in memory queue. And it runs without
bounds. If you like I can send you all three classes directly. Let me know.
Regards,
Lars
Ben Hood wrote:
> Lars,
>
> On 14 Jul 2009, at 07:06, Lars George <lars at worldlingo.com> wrote:
>
>> Hi Amit,
>>
>> We upgraded to 1.6.0 and I added for example channel.basicQos(30);
>> but to no avail. My test program still fills the local queue way past
>> the 30 message limit using the QueueingConsumer and noAck set to false.
>>
>> Is this in a later version? Am I doing something wrong?
>>
>
> Interesting. Can you send a cut down version of the code that
> reproduces the behaviour and to make sure that are you are definetly
> not auto-acking anything?
>
> Ben
>
>> Regards,
>> Lars
>>
>>
>> amit bhatnagar wrote:
>>> Check out Channel.BasicQos() and set a prefetch count.
>>>
>>> Have a look here for a good description of the prefetch:
>>> http://hopper.squarespace.com/blog/2009/1/26/work-distribution-in-rabbitmq.html
>>>
>>>
>>> fta:
>>>
>>> "This command allows a consumer to choose a prefetch window that
>>> specifies the amount of unacknowledged messages it is prepared to
>>> receive. By setting the prefetch count to a non-zero value, the broker
>>> will not deliver any messages to the consumer that would breach that
>>> limit. To move the window forwards, the consumer has to acknowledge
>>> the receipt of a message (or a group of messages). By acknowledging a
>>> message, the consumer gains credit in the broker which makes it
>>> eligible to receive more messages. This credit-based flow control
>>> allows the broker to distribute work proportional to the individual
>>> processing ability of each worker, as opposed to a simple round robin
>>> mechanism."
>>>
>>>
>>> On Jul 10, 4:32 am, Lars George <l... at worldlingo.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to use a blocking provider, but the QueueingConsumer is
>>>> somewhat limiting as it receives whatever the queue sends and
>>>> caches it
>>>> locally, in the app servers memory. If that is a very large number
>>>> then
>>>> you can quickly run out of memory and kill the Java process with an
>>>> OOME.
>>>>
>>>> Is there a way to implement a Consumer that say only receives N queue
>>>> items before it waits until they get processed locally? I mean from
>>>> the
>>>> internal BlockingQueue and using handleDelivery() this is doable but
>>>> then you would block the main loop in the AMPQ connection - and miss
>>>> heart beats etc.?
>>>>
>>>> Is there a better way with this or do I have to go with a dumb while
>>>> (true) { channel.basicGet() } loop. With that you have the issue to
>>>> somehow gracefully handle the null delivery and not create a loop that
>>>> consumes all CPU cycles with no actual work being done.
>>>>
>>>> Thanks,
>>>> Lars
>>>>
>>>> lars.vcf
>>>> < 1KViewDownload
>>>>
>>>> _______________________________________________
>>>> rabbitmq-discuss mailing list
>>>> rabbitmq-disc... at lists.rabbitmq.comhttp://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>>
>>>>
>>>
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>
>>>
>> <lars.vcf>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: lars.vcf
Type: text/x-vcard
Size: 301 bytes
Desc: not available
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090714/c59bd0a2/attachment.vcf
More information about the rabbitmq-discuss
mailing list