[rabbitmq-discuss] Limit QueueingConsumer Memory Usage

Lars George lars at worldlingo.com
Tue Jul 14 16:15:34 BST 2009


Hi Ben,

The code is really straight forward. Here is the Listener:

  public static void main(String[] args) {
    try {
      ConnectionParameters params = new ConnectionParameters();
      params.setUsername("guest");
      params.setPassword("guest");
      params.setVirtualHost("/");
      params.setRequestedHeartbeat(0);
      ConnectionFactory factory = new ConnectionFactory(params);
      Connection conn = factory.newConnection("queue1.internal");
      Channel channel = conn.createChannel();
     
      channel.basicQos(1);
     
      int wait = -1;
      if (args.length > 0) wait = Integer.parseInt(args[0]);
      boolean noAck = false;
      boolean exit = false;
      TestConsumer consumer = new TestConsumer(channel);
      channel.basicConsume("test", noAck, consumer);
      while (!exit) {
        TestConsumer.Delivery delivery;
        try {
          delivery = consumer.nextDelivery();
        } catch (InterruptedException ie) {
          continue;
        }
        byte[] body = delivery.getBody();
        String msg = new String(body);
        System.out.println("msg -> " + msg);
        // (process the message components ...)
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        exit = msg != null && msg.startsWith("STOP");
        if (wait > -1) {
          System.out.println("waiting secs -> " + wait);
          Thread.sleep(wait * 1000);
        }
      }
      channel.close();
      conn.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }


And the Publisher looks like this:

  public static void main(String[] args) {
    try {
      ConnectionParameters params = new ConnectionParameters();
      params.setUsername("guest");
      params.setPassword("guest");
      params.setVirtualHost("/");
      params.setRequestedHeartbeat(0);
      ConnectionFactory factory = new ConnectionFactory(params);
      Connection conn = factory.newConnection("queue1.internal");
      Channel channel = conn.createChannel();
 
      channel.exchangeDeclare("test-rabbitmq", "fanout");
      channel.queueDeclare("test");
      channel.queueBind("test", "test-rabbitmq", "test");

      int num = 1;
      if (args.length > 1) num = Integer.parseInt(args[1]);
      for (int n = 0; n < num; n++) {
        String msg = args[0] + "-" + (n+1) + "/" + num;
        byte[] body = msg.getBytes();
        channel.basicPublish("test-rabbitmq", "test", null, body);
        System.out.println("Sending -> " + msg);
      }
     
      channel.close();
      conn.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

So the way I test this is run three Listeners, one with an additional 
"5" as the parameter to make is sleep 5 seconds between takes.

java -cp 
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\bin;\workspace\test-rabbitmq\lib\commons-io-1.4.jar 
Listener
java -cp 
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\bin;\workspace\test-rabbitmq\lib\commons-io-1.4.jar 
Listener
java -cp 
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\bin;\workspace\test-rabbitmq\lib\commons-io-1.4.jar 
Listener 5

The I start the Publisher and push 10000 messages

java -cp 
\workspace\test-rabbitmq\lib\rabbitmq-client.jar;\workspace\test-rabbitmq\lib\commons-io-1.4.jar;\workspace\test-rabbitmq\bin 
Publisher test 10000

The idea is that the one Listener that waits 5 seconds should only ever 
receive 1, or 30 or whatever prefetch size is, messages. But I have 
instrumented the QueueingConsumer with logging and named it TestConsumer 
just adding this

  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
      AMQP.BasicProperties properties, byte[] body) throws IOException {
    _queue.add(ValueOrException.<Delivery, ShutdownSignalException>
      makeValue(new Delivery(envelope, properties, body)));
    System.out.println("new queue size -> " + _queue.size());
  }

to print out the size of the in memory queue. And it runs without 
bounds. If you like I can send you all three classes directly. Let me know.

Regards,
Lars


Ben Hood wrote:
> Lars,
>
> On 14 Jul 2009, at 07:06, Lars George <lars at worldlingo.com> wrote:
>
>> Hi Amit,
>>
>> We upgraded to 1.6.0 and I added for example channel.basicQos(30); 
>> but to no avail. My test program still fills the local queue way past 
>> the 30 message limit using the QueueingConsumer and noAck set to false.
>>
>> Is this in a later version? Am I doing something wrong?
>>
>
> Interesting. Can you send a cut down version of the code that 
> reproduces the behaviour and to make sure that are you are definetly 
> not auto-acking anything?
>
> Ben
>
>> Regards,
>> Lars
>>
>>
>> amit bhatnagar wrote:
>>> Check out Channel.BasicQos() and set a prefetch count.
>>>
>>> Have a look here for a good description of the prefetch:
>>> http://hopper.squarespace.com/blog/2009/1/26/work-distribution-in-rabbitmq.html 
>>>
>>>
>>> fta:
>>>
>>> "This command allows a consumer to choose a prefetch window that
>>> specifies the amount of unacknowledged messages it is prepared to
>>> receive. By setting the prefetch count to a non-zero value, the broker
>>> will not deliver any messages to the consumer that would breach that
>>> limit. To move the window forwards, the consumer has to acknowledge
>>> the receipt of a message (or a group of messages). By acknowledging a
>>> message, the consumer gains credit in the broker which makes it
>>> eligible to receive more messages. This credit-based flow control
>>> allows the broker to distribute work proportional to the individual
>>> processing ability of each worker, as opposed to a simple round robin
>>> mechanism."
>>>
>>>
>>> On Jul 10, 4:32 am, Lars George <l... at worldlingo.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to use a blocking provider, but the QueueingConsumer is
>>>> somewhat limiting as it receives whatever the queue sends and 
>>>> caches it
>>>> locally, in the app servers memory. If that is a very large number 
>>>> then
>>>> you can quickly run out of memory and kill the Java process with an 
>>>> OOME.
>>>>
>>>> Is there a way to implement a Consumer that say only receives N queue
>>>> items before it waits until they get processed locally? I mean from 
>>>> the
>>>> internal BlockingQueue and using handleDelivery() this is doable but
>>>> then you would block the main loop in the AMPQ connection - and miss
>>>> heart beats etc.?
>>>>
>>>> Is there a better way with this or do I have to go with a dumb while
>>>> (true) { channel.basicGet() } loop. With that you have the issue to
>>>> somehow gracefully handle the null delivery and not create a loop that
>>>> consumes all CPU cycles with no actual work being done.
>>>>
>>>> Thanks,
>>>> Lars
>>>>
>>>> lars.vcf
>>>> < 1KViewDownload
>>>>
>>>> _______________________________________________
>>>> rabbitmq-discuss mailing list
>>>> rabbitmq-disc... at lists.rabbitmq.comhttp://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss 
>>>>
>>>>
>>>
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>
>>>
>> <lars.vcf>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: lars.vcf
Type: text/x-vcard
Size: 301 bytes
Desc: not available
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090714/c59bd0a2/attachment.vcf 


More information about the rabbitmq-discuss mailing list