[rabbitmq-discuss] Rabbitmq Java Client Memory Leak?

Benjamin Bennett benbennett at gmail.com
Sun Dec 12 20:02:16 GMT 2010


Don't know if it is possible, I would attach the source to your
debugger.That might give you some idea what is going on.
I had  quick look at the QueueingConsumer code and it looks pretty
strait forward.
 What profiler are you using ? I am yourkit memory inspections would
give you some idea what class is holding the reference.
 Something is holding onto the bytes and it cannot be gc'd.
Other item is the new ByteStreamInputArray() wonder if you can set
that as a variable and set it to null after you read. But looking at
the javadocs that shouldn't be the issue.
I would still try it to 100% that isn't the issue.



On Sun, Dec 12, 2010 at 1:36 PM, drenz <david.renz at gmail.com> wrote:
> I've updated the code to explicitly set body = null after I am done
> using it. See below.
>
> However, the memory continues to build up. This is the only part of
> the code that runs, so it has to be something in here.  It almost
> looks like the QueueingConsumer object might be keeping these
> references around.  Is that possible?
>
> Thanks
>
> byte[] body = delivery.getBody();
>
> ObjectInputStream in = new ObjectInputStream(new
> ByteArrayInputStream(body));
> Status s = (Status) in.readObject();
>
> body = null;
>
> in.close();
>
> in = null;
>
>
>
>
>
> On Dec 12, 12:51 pm, Benjamin Bennett <benbenn... at gmail.com> wrote:
>> Having a quick look at it make sure the line is not holding a
>> reference , considering you are saying the profiler is saying that a
>> reference is out there.
>>  byte[] body = delivery.getBody();
>> de references the delivery.getBody();
>>
>> if the body variable is referencing the delivery.getBody() it will
>> prevent it from being gc'ed .
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sun, Dec 12, 2010 at 9:58 AM, drenz <david.r... at gmail.com> wrote:
>> > Hello,
>>
>> > I have a pretty simple set up where I use the RabbitMQ client library
>> > to pull objects from a RabbitMQ server.
>>
>> > Unfortunately, there seems to be some memory leak in my program
>> > because it quickly consumes all of the heap memory with byte arrays.
>> > I've increased the max memory for the JVM to 2 GB and it consumes all
>> > of that too.
>>
>> > Here is the code I use to pull data from the queue.  Having used a few
>> > Java profilers, it looks like the leak is associated with they
>> > QueueingConsumer.Delivery object, because each Delivery object has a
>> > member byte[] called _body.
>>
>> > Does anyone have any idea how I can ensure these arrays are garbage
>> > collected / destroyed?
>>
>> > Thanks in advance!!!
>>
>> > Property values:
>> > queue.user=guest
>> > queue.password=guest
>> > queue.virtualHost=/
>> > queue.host=127.0.0.1
>> > queue.hostPort=5672
>> > queue.heartBeat=0
>> > queue.exchange=myExchange
>> > queue.queueName=dtt
>> > queue.routingKey=dttRoute
>>
>> > Code:
>>
>> >        public void consume() throws IOException, ClassNotFoundException {
>>
>> >                ConnectionFactory factory = new ConnectionFactory();
>> >                factory.setUsername(PropertyUtil
>> >                                .getProperyFile().getProperty("queue.user"));
>> >                factory.setPassword(PropertyUtil
>> >                                .getProperyFile().getProperty("queue.password"));
>> >                factory.setVirtualHost(PropertyUtil
>> >                                .getProperyFile().getProperty("queue.virtualHost"));
>> >                factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
>> >                                .getProperyFile().getProperty("queue.heartBeat")));
>> >                factory.setHost(PropertyUtil
>> >                                .getProperyFile().getProperty("queue.host"));
>> >                factory.setPort(Integer.parseInt(PropertyUtil
>> >                                .getProperyFile().getProperty("queue.hostPort")));
>>
>> >                boolean durable = true;
>> >                boolean noAck = false;
>>
>> >                Connection conn = factory.newConnection();
>>
>> >                Channel channel = conn.createChannel();
>>
>> >                channel.basicQos(2);
>> >                channel.exchangeDeclare(getExchangeName(), "direct", durable);
>> >                channel.queueDeclare(getQueueName(), durable, false, false, null);
>> >                channel.queueBind(getQueueName(), getExchangeName(),
>> > getRoutingKey());
>>
>> >                QueueingConsumer consumer = new QueueingConsumer(channel);
>> >                channel.basicConsume(getQueueName(), noAck, consumer);
>>
>> >                boolean runInfinite = true;
>>
>> >                java.sql.Connection sqlConn = null;
>> >                sqlConn = DatabaseUtil.getConnection();
>>
>> >                long start = 0;
>> >                long end = 0;
>>
>> >                while (runInfinite) {
>> >                        start = System.currentTimeMillis();
>> >                        QueueingConsumer.Delivery delivery = null;
>> >                        try {
>> >                                delivery = consumer.nextDelivery();
>>
>> >                        } catch (Exception ie) {
>> >                                log.error(ie.getMessage(), ie);
>> >                                continue;
>> >                        }
>>
>> >                        byte[] body = delivery.getBody();
>>
>> >                        ObjectInputStream in = new ObjectInputStream(
>> >                                        new ByteArrayInputStream(body));
>>
>> >                        Status s = (Status) in.readObject();
>> >                        in.close();
>> >                        in = null;
>>
>> >                        try {
>>
>> >                                try {
>> >                                        process(s, sqlConn);
>>
>> >                                } catch (Exception e) {
>> >                                        try{
>> >                                                sqlConn = DatabaseUtil.getConnection();
>> >                                        } catch (Exception e1){
>> >                                                log.error(e.getMessage(), e1);
>> >                                                throw e1;
>> >                                        }
>> >                                }
>>
>> >                                // only send an acknowledgement when the processing completed
>> >                                // successfully
>> >                                channel
>> >                                                .basicAck(delivery.getEnvelope().getDeliveryTag(),
>> >                                                                false);
>>
>> >                        } catch (Exception e) {
>> >                                log.error(e.getMessage(), e);
>> >                        }
>>
>> >                        end = System.currentTimeMillis();
>> >                        delivery = null;
>>
>> >                        log.debug("Processing time: "+ (end - start) +" ms");
>>
>> >                }
>> >                try {
>> >                        sqlConn.close();
>> >                } catch (SQLException e) {
>> >                        // TODO Auto-generated catch block
>> >                        log.error(e.getMessage(), e);
>> >                }
>>
>> >                channel.close();
>> >                conn.close();
>>
>> >        }
>>
>> > _______________________________________________
>> > rabbitmq-discuss mailing list
>> > rabbitmq-disc... at lists.rabbitmq.com
>> >https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>


More information about the rabbitmq-discuss mailing list