[rabbitmq-discuss] Rabbitmq Java Client Memory Leak?

drenz david.renz at gmail.com
Sun Dec 12 19:36:49 GMT 2010


I've updated the code to explicitly set body = null after I am done
using it. See below.

However, the memory continues to build up. This is the only part of
the code that runs, so it has to be something in here.  It almost
looks like the QueueingConsumer object might be keeping these
references around.  Is that possible?

Thanks

byte[] body = delivery.getBody();

ObjectInputStream in = new ObjectInputStream(new
ByteArrayInputStream(body));
Status s = (Status) in.readObject();

body = null;

in.close();

in = null;





On Dec 12, 12:51 pm, Benjamin Bennett <benbenn... at gmail.com> wrote:
> Having a quick look at it make sure the line is not holding a
> reference , considering you are saying the profiler is saying that a
> reference is out there.
>  byte[] body = delivery.getBody();
> de references the delivery.getBody();
>
> if the body variable is referencing the delivery.getBody() it will
> prevent it from being gc'ed .
>
>
>
>
>
>
>
>
>
> On Sun, Dec 12, 2010 at 9:58 AM, drenz <david.r... at gmail.com> wrote:
> > Hello,
>
> > I have a pretty simple set up where I use the RabbitMQ client library
> > to pull objects from a RabbitMQ server.
>
> > Unfortunately, there seems to be some memory leak in my program
> > because it quickly consumes all of the heap memory with byte arrays.
> > I've increased the max memory for the JVM to 2 GB and it consumes all
> > of that too.
>
> > Here is the code I use to pull data from the queue.  Having used a few
> > Java profilers, it looks like the leak is associated with they
> > QueueingConsumer.Delivery object, because each Delivery object has a
> > member byte[] called _body.
>
> > Does anyone have any idea how I can ensure these arrays are garbage
> > collected / destroyed?
>
> > Thanks in advance!!!
>
> > Property values:
> > queue.user=guest
> > queue.password=guest
> > queue.virtualHost=/
> > queue.host=127.0.0.1
> > queue.hostPort=5672
> > queue.heartBeat=0
> > queue.exchange=myExchange
> > queue.queueName=dtt
> > queue.routingKey=dttRoute
>
> > Code:
>
> >        public void consume() throws IOException, ClassNotFoundException {
>
> >                ConnectionFactory factory = new ConnectionFactory();
> >                factory.setUsername(PropertyUtil
> >                                .getProperyFile().getProperty("queue.user"));
> >                factory.setPassword(PropertyUtil
> >                                .getProperyFile().getProperty("queue.password"));
> >                factory.setVirtualHost(PropertyUtil
> >                                .getProperyFile().getProperty("queue.virtualHost"));
> >                factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
> >                                .getProperyFile().getProperty("queue.heartBeat")));
> >                factory.setHost(PropertyUtil
> >                                .getProperyFile().getProperty("queue.host"));
> >                factory.setPort(Integer.parseInt(PropertyUtil
> >                                .getProperyFile().getProperty("queue.hostPort")));
>
> >                boolean durable = true;
> >                boolean noAck = false;
>
> >                Connection conn = factory.newConnection();
>
> >                Channel channel = conn.createChannel();
>
> >                channel.basicQos(2);
> >                channel.exchangeDeclare(getExchangeName(), "direct", durable);
> >                channel.queueDeclare(getQueueName(), durable, false, false, null);
> >                channel.queueBind(getQueueName(), getExchangeName(),
> > getRoutingKey());
>
> >                QueueingConsumer consumer = new QueueingConsumer(channel);
> >                channel.basicConsume(getQueueName(), noAck, consumer);
>
> >                boolean runInfinite = true;
>
> >                java.sql.Connection sqlConn = null;
> >                sqlConn = DatabaseUtil.getConnection();
>
> >                long start = 0;
> >                long end = 0;
>
> >                while (runInfinite) {
> >                        start = System.currentTimeMillis();
> >                        QueueingConsumer.Delivery delivery = null;
> >                        try {
> >                                delivery = consumer.nextDelivery();
>
> >                        } catch (Exception ie) {
> >                                log.error(ie.getMessage(), ie);
> >                                continue;
> >                        }
>
> >                        byte[] body = delivery.getBody();
>
> >                        ObjectInputStream in = new ObjectInputStream(
> >                                        new ByteArrayInputStream(body));
>
> >                        Status s = (Status) in.readObject();
> >                        in.close();
> >                        in = null;
>
> >                        try {
>
> >                                try {
> >                                        process(s, sqlConn);
>
> >                                } catch (Exception e) {
> >                                        try{
> >                                                sqlConn = DatabaseUtil.getConnection();
> >                                        } catch (Exception e1){
> >                                                log.error(e.getMessage(), e1);
> >                                                throw e1;
> >                                        }
> >                                }
>
> >                                // only send an acknowledgement when the processing completed
> >                                // successfully
> >                                channel
> >                                                .basicAck(delivery.getEnvelope().getDeliveryTag(),
> >                                                                false);
>
> >                        } catch (Exception e) {
> >                                log.error(e.getMessage(), e);
> >                        }
>
> >                        end = System.currentTimeMillis();
> >                        delivery = null;
>
> >                        log.debug("Processing time: "+ (end - start) +" ms");
>
> >                }
> >                try {
> >                        sqlConn.close();
> >                } catch (SQLException e) {
> >                        // TODO Auto-generated catch block
> >                        log.error(e.getMessage(), e);
> >                }
>
> >                channel.close();
> >                conn.close();
>
> >        }
>
> > _______________________________________________
> > rabbitmq-discuss mailing list
> > rabbitmq-disc... at lists.rabbitmq.com
> >https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


More information about the rabbitmq-discuss mailing list