[rabbitmq-discuss] New user : Help needed in understanding memory usage

Avanti Nadgir avanti at yahoo-inc.com
Tue Aug 10 00:59:15 BST 2010


I am experimenting with RabbitMQ on a rhel vm (ver 1.8.1)
Have started the broker and am able to publish and consume events till the resident memory usage of the broker peaks at about 400M (it keeps growing with the number of messages published even though the consumer is consuming events) After this the broker needs to be restarted to bring back the memory usage to 0 before more messages can be sent.

Attached is the code snippet of the publisher and consumer.  
=================================================================
Publisher :
                factory.setHost("localhost");
                factory.setPort(5672);
                Connection conn = factory.newConnection();

                Channel channel = conn.createChannel();

                System.out.println("done creating conn");
                String exchangeName = "testExchange";
                String queueName = "testQueueNew";
                String routingKey = "";
                channel.exchangeDeclare(exchangeName, "direct", true);
                channel.queueDeclare(queueName, true, false, true, null);
                channel.queueBind(queueName, exchangeName, routingKey);

                byte[] messageBodyBytes = new byte[2000];
                for(int j = 0;j<2000;j++) {
                        messageBodyBytes[j] = 'A';
                }
                for(int i = 0; i < numMessages; i++) {
                        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
                        if(sleepInterval != 0) {
                                if (i % sleepInterval == 0) {
                                        Thread.sleep(timeToSleep);
                                        System.out.println("Published Message " + i);
                                }
                        }
                }

                channel.close();
                conn.close();
==================================================================
Consumer :
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                factory.setPort(5672);
                Connection conn = factory.newConnection();
                Channel channel = conn.createChannel();

                String exchangeName = "testExchange";
                String routingKey = "";
                String queueName = "testQueue";
                channel.exchangeDeclare(exchangeName, "direct", true);
                channel.queueBind(queueName, exchangeName, routingKey);

                boolean noAck = false;
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(queueName, noAck, consumer);
                int count = 0;
                while (count < numMessages) {
                    count++;
                    QueueingConsumer.Delivery delivery;
                    try {
                        delivery = consumer.nextDelivery();
                    } catch (InterruptedException ie) {
                        continue;
                    }
                    // (process the message components ...)

                        byte[] body = delivery.getBody();
                        String resp = new String(body);
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }

                channel.close();
                conn.close();

===================================================================================
Any help is appreciated.

Thanks,
Avanti


More information about the rabbitmq-discuss mailing list