[rabbitmq-discuss] New user : Help needed in understanding memory usage

Alexandru Scvorţov alexandru at rabbitmq.com
Tue Aug 10 10:31:42 BST 2010


Hi Avanti,

If the memory usage is steadily increasing, you're probably not ack'ing
the messages.

Try running,
% rabbitmqctl list_queues name messages

That should list your queues and tell you how many ready and
unacknowledged there are.  If this number of messages is
steadily increasing, you're not ack'ing properly.

If that's the case, you can either set noAck to true (i.e. messages
don't need to be acknowledged), or explicitly ack them (like you're
doing now); just make sure that that code is being run with the right
arguments.

Cheers,
Alex

On Mon, Aug 09, 2010 at 04:59:15PM -0700, Avanti Nadgir wrote:
> I am experimenting with RabbitMQ on a rhel vm (ver 1.8.1)
> Have started the broker and am able to publish and consume events till the resident memory usage of the broker peaks at about 400M (it keeps growing with the number of messages published even though the consumer is consuming events) After this the broker needs to be restarted to bring back the memory usage to 0 before more messages can be sent.
> 
> Attached is the code snippet of the publisher and consumer.  
> =================================================================
> Publisher :
>                 factory.setHost("localhost");
>                 factory.setPort(5672);
>                 Connection conn = factory.newConnection();
> 
>                 Channel channel = conn.createChannel();
> 
>                 System.out.println("done creating conn");
>                 String exchangeName = "testExchange";
>                 String queueName = "testQueueNew";
>                 String routingKey = "";
>                 channel.exchangeDeclare(exchangeName, "direct", true);
>                 channel.queueDeclare(queueName, true, false, true, null);
>                 channel.queueBind(queueName, exchangeName, routingKey);
> 
>                 byte[] messageBodyBytes = new byte[2000];
>                 for(int j = 0;j<2000;j++) {
>                         messageBodyBytes[j] = 'A';
>                 }
>                 for(int i = 0; i < numMessages; i++) {
>                         channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
>                         if(sleepInterval != 0) {
>                                 if (i % sleepInterval == 0) {
>                                         Thread.sleep(timeToSleep);
>                                         System.out.println("Published Message " + i);
>                                 }
>                         }
>                 }
> 
>                 channel.close();
>                 conn.close();
> ==================================================================
> Consumer :
>                 ConnectionFactory factory = new ConnectionFactory();
>                 factory.setHost("localhost");
>                 factory.setPort(5672);
>                 Connection conn = factory.newConnection();
>                 Channel channel = conn.createChannel();
> 
>                 String exchangeName = "testExchange";
>                 String routingKey = "";
>                 String queueName = "testQueue";
>                 channel.exchangeDeclare(exchangeName, "direct", true);
>                 channel.queueBind(queueName, exchangeName, routingKey);
> 
>                 boolean noAck = false;
>                 QueueingConsumer consumer = new QueueingConsumer(channel);
>                 channel.basicConsume(queueName, noAck, consumer);
>                 int count = 0;
>                 while (count < numMessages) {
>                     count++;
>                     QueueingConsumer.Delivery delivery;
>                     try {
>                         delivery = consumer.nextDelivery();
>                     } catch (InterruptedException ie) {
>                         continue;
>                     }
>                     // (process the message components ...)
> 
>                         byte[] body = delivery.getBody();
>                         String resp = new String(body);
>                         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
>                 }
> 
>                 channel.close();
>                 conn.close();
> 
> ===================================================================================
> Any help is appreciated.
> 
> Thanks,
> Avanti
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


More information about the rabbitmq-discuss mailing list