[rabbitmq-discuss] Rabbitmq Java Client Memory Leak?

drenz david.renz at gmail.com
Mon Jan 3 22:56:10 GMT 2011


Thanks for the help guys! I've resolved the issue.

Not surprisingly, this was my own fault. I had a long running database
connection in another layer of the code that was eating up memory.
Nothing was wrong with rabbitmq.

Thanks,

Dave


On Dec 12 2010, 3:02 pm, Benjamin Bennett <benbenn... at gmail.com>
wrote:
> Don't know if it is possible, I would attach the source to your
> debugger.That might give you some idea what is going on.
> I had  quick look at the QueueingConsumer code and it looks pretty
> strait forward.
>  What profiler are you using ? I am yourkit memory inspections would
> give you some idea what class is holding the reference.
>  Something is holding onto the bytes and it cannot be gc'd.
> Other item is the new ByteStreamInputArray() wonder if you can set
> that as a variable and set it to null after you read. But looking at
> the javadocs that shouldn't be the issue.
> I would still try it to 100% that isn't the issue.
>
>
>
>
>
>
>
>
>
> On Sun, Dec 12, 2010 at 1:36 PM, drenz <david.r... at gmail.com> wrote:
> > I've updated the code to explicitly set body = null after I am done
> > using it. See below.
>
> > However, the memory continues to build up. This is the only part of
> > the code that runs, so it has to be something in here.  It almost
> > looks like the QueueingConsumer object might be keeping these
> > references around.  Is that possible?
>
> > Thanks
>
> > byte[] body = delivery.getBody();
>
> > ObjectInputStream in = new ObjectInputStream(new
> > ByteArrayInputStream(body));
> > Status s = (Status) in.readObject();
>
> > body = null;
>
> > in.close();
>
> > in = null;
>
> > On Dec 12, 12:51 pm, Benjamin Bennett <benbenn... at gmail.com> wrote:
> >> Having a quick look at it make sure the line is not holding a
> >> reference , considering you are saying the profiler is saying that a
> >> reference is out there.
> >>  byte[] body = delivery.getBody();
> >> de references the delivery.getBody();
>
> >> if the body variable is referencing the delivery.getBody() it will
> >> prevent it from being gc'ed .
>
> >> On Sun, Dec 12, 2010 at 9:58 AM, drenz <david.r... at gmail.com> wrote:
> >> > Hello,
>
> >> > I have a pretty simple set up where I use the RabbitMQ client library
> >> > to pull objects from a RabbitMQ server.
>
> >> > Unfortunately, there seems to be some memory leak in my program
> >> > because it quickly consumes all of the heap memory with byte arrays.
> >> > I've increased the max memory for the JVM to 2 GB and it consumes all
> >> > of that too.
>
> >> > Here is the code I use to pull data from the queue.  Having used a few
> >> > Java profilers, it looks like the leak is associated with they
> >> > QueueingConsumer.Delivery object, because each Delivery object has a
> >> > member byte[] called _body.
>
> >> > Does anyone have any idea how I can ensure these arrays are garbage
> >> > collected / destroyed?
>
> >> > Thanks in advance!!!
>
> >> > Property values:
> >> > queue.user=guest
> >> > queue.password=guest
> >> > queue.virtualHost=/
> >> > queue.host=127.0.0.1
> >> > queue.hostPort=5672
> >> > queue.heartBeat=0
> >> > queue.exchange=myExchange
> >> > queue.queueName=dtt
> >> > queue.routingKey=dttRoute
>
> >> > Code:
>
> >> >        public void consume() throws IOException, ClassNotFoundException {
>
> >> >                ConnectionFactory factory = new ConnectionFactory();
> >> >                factory.setUsername(PropertyUtil
> >> >                                .getProperyFile().getProperty("queue.user"));
> >> >                factory.setPassword(PropertyUtil
> >> >                                .getProperyFile().getProperty("queue.password"));
> >> >                factory.setVirtualHost(PropertyUtil
> >> >                                .getProperyFile().getProperty("queue.virtualHost"));
> >> >                factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
> >> >                                .getProperyFile().getProperty("queue.heartBeat")));
> >> >                factory.setHost(PropertyUtil
> >> >                                .getProperyFile().getProperty("queue.host"));
> >> >                factory.setPort(Integer.parseInt(PropertyUtil
> >> >                                .getProperyFile().getProperty("queue.hostPort")));
>
> >> >                boolean durable = true;
> >> >                boolean noAck = false;
>
> >> >                Connection conn = factory.newConnection();
>
> >> >                Channel channel = conn.createChannel();
>
> >> >                channel.basicQos(2);
> >> >                channel.exchangeDeclare(getExchangeName(), "direct", durable);
> >> >                channel.queueDeclare(getQueueName(), durable, false, false, null);
> >> >                channel.queueBind(getQueueName(), getExchangeName(),
> >> > getRoutingKey());
>
> >> >                QueueingConsumer consumer = new QueueingConsumer(channel);
> >> >                channel.basicConsume(getQueueName(), noAck, consumer);
>
> >> >                boolean runInfinite = true;
>
> >> >                java.sql.Connection sqlConn = null;
> >> >                sqlConn = DatabaseUtil.getConnection();
>
> >> >                long start = 0;
> >> >                long end = 0;
>
> >> >                while (runInfinite) {
> >> >                        start = System.currentTimeMillis();
> >> >                        QueueingConsumer.Delivery delivery = null;
> >> >                        try {
> >> >                                delivery = consumer.nextDelivery();
>
> >> >                        } catch (Exception ie) {
> >> >                                log.error(ie.getMessage(), ie);
> >> >                                continue;
> >> >                        }
>
> >> >                        byte[] body = delivery.getBody();
>
> >> >                        ObjectInputStream in = new ObjectInputStream(
> >> >                                        new ByteArrayInputStream(body));
>
> >> >                        Status s = (Status) in.readObject();
> >> >                        in.close();
> >> >                        in = null;
>
> >> >                        try {
>
> >> >                                try {
> >> >                                        process(s, sqlConn);
>
> >> >                                } catch (Exception e) {
> >> >                                        try{
> >> >                                                sqlConn = DatabaseUtil.getConnection();
> >> >                                        } catch (Exception e1){
> >> >                                                log.error(e.getMessage(), e1);
> >> >                                                throw e1;
> >> >                                        }
> >> >                                }
>
> >> >                                // only send an acknowledgement when the processing completed
> >> >                                // successfully
> >> >                                channel
> >> >                                                .basicAck(delivery.getEnvelope().getDeliveryTag(),
> >> >                                                                false);
>
> >> >                        } catch (Exception e) {
> >> >                                log.error(e.getMessage(), e);
> >> >                        }
>
> >> >                        end = System.currentTimeMillis();
> >> >                        delivery = null;
>
> >> >                        log.debug("Processing time: "+ (end - start) +" ms");
>
> >> >                }
> >> >                try {
> >> >                        sqlConn.close();
> >> >                } catch (SQLException e) {
> >> >                        // TODO Auto-generated catch block
> >> >                        log.error(e.getMessage(), e);
> >> >                }
>
> >> >                channel.close();
> >> >                conn.close();
>
> >> >        }
>
> >> > _______________________________________________
> >> > rabbitmq-discuss mailing list
> >> > rabbitmq-disc... at lists.rabbitmq.com
> >> >https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
> >> _______________________________________________
> >> rabbitmq-discuss mailing list
> >> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
> > _______________________________________________
> > rabbitmq-discuss mailing list
> > rabbitmq-disc... at lists.rabbitmq.com
> >https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


More information about the rabbitmq-discuss mailing list