[rabbitmq-discuss] Rabbitmq Java Client Memory Leak?

Benjamin Bennett benbennett at gmail.com
Sun Dec 12 17:51:48 GMT 2010


Having a quick look at it make sure the line is not holding a
reference , considering you are saying the profiler is saying that a
reference is out there.
 byte[] body = delivery.getBody();
de references the delivery.getBody();


if the body variable is referencing the delivery.getBody() it will
prevent it from being gc'ed .



On Sun, Dec 12, 2010 at 9:58 AM, drenz <david.renz at gmail.com> wrote:
> Hello,
>
> I have a pretty simple set up where I use the RabbitMQ client library
> to pull objects from a RabbitMQ server.
>
> Unfortunately, there seems to be some memory leak in my program
> because it quickly consumes all of the heap memory with byte arrays.
> I've increased the max memory for the JVM to 2 GB and it consumes all
> of that too.
>
> Here is the code I use to pull data from the queue.  Having used a few
> Java profilers, it looks like the leak is associated with they
> QueueingConsumer.Delivery object, because each Delivery object has a
> member byte[] called _body.
>
> Does anyone have any idea how I can ensure these arrays are garbage
> collected / destroyed?
>
> Thanks in advance!!!
>
> Property values:
> queue.user=guest
> queue.password=guest
> queue.virtualHost=/
> queue.host=127.0.0.1
> queue.hostPort=5672
> queue.heartBeat=0
> queue.exchange=myExchange
> queue.queueName=dtt
> queue.routingKey=dttRoute
>
> Code:
>
>        public void consume() throws IOException, ClassNotFoundException {
>
>                ConnectionFactory factory = new ConnectionFactory();
>                factory.setUsername(PropertyUtil
>                                .getProperyFile().getProperty("queue.user"));
>                factory.setPassword(PropertyUtil
>                                .getProperyFile().getProperty("queue.password"));
>                factory.setVirtualHost(PropertyUtil
>                                .getProperyFile().getProperty("queue.virtualHost"));
>                factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
>                                .getProperyFile().getProperty("queue.heartBeat")));
>                factory.setHost(PropertyUtil
>                                .getProperyFile().getProperty("queue.host"));
>                factory.setPort(Integer.parseInt(PropertyUtil
>                                .getProperyFile().getProperty("queue.hostPort")));
>
>                boolean durable = true;
>                boolean noAck = false;
>
>                Connection conn = factory.newConnection();
>
>                Channel channel = conn.createChannel();
>
>                channel.basicQos(2);
>                channel.exchangeDeclare(getExchangeName(), "direct", durable);
>                channel.queueDeclare(getQueueName(), durable, false, false, null);
>                channel.queueBind(getQueueName(), getExchangeName(),
> getRoutingKey());
>
>                QueueingConsumer consumer = new QueueingConsumer(channel);
>                channel.basicConsume(getQueueName(), noAck, consumer);
>
>                boolean runInfinite = true;
>
>                java.sql.Connection sqlConn = null;
>                sqlConn = DatabaseUtil.getConnection();
>
>                long start = 0;
>                long end = 0;
>
>                while (runInfinite) {
>                        start = System.currentTimeMillis();
>                        QueueingConsumer.Delivery delivery = null;
>                        try {
>                                delivery = consumer.nextDelivery();
>
>                        } catch (Exception ie) {
>                                log.error(ie.getMessage(), ie);
>                                continue;
>                        }
>
>                        byte[] body = delivery.getBody();
>
>                        ObjectInputStream in = new ObjectInputStream(
>                                        new ByteArrayInputStream(body));
>
>                        Status s = (Status) in.readObject();
>                        in.close();
>                        in = null;
>
>                        try {
>
>                                try {
>                                        process(s, sqlConn);
>
>                                } catch (Exception e) {
>                                        try{
>                                                sqlConn = DatabaseUtil.getConnection();
>                                        } catch (Exception e1){
>                                                log.error(e.getMessage(), e1);
>                                                throw e1;
>                                        }
>                                }
>
>                                // only send an acknowledgement when the processing completed
>                                // successfully
>                                channel
>                                                .basicAck(delivery.getEnvelope().getDeliveryTag(),
>                                                                false);
>
>                        } catch (Exception e) {
>                                log.error(e.getMessage(), e);
>                        }
>
>
>                        end = System.currentTimeMillis();
>                        delivery = null;
>
>                        log.debug("Processing time: "+ (end - start) +" ms");
>
>                }
>                try {
>                        sqlConn.close();
>                } catch (SQLException e) {
>                        // TODO Auto-generated catch block
>                        log.error(e.getMessage(), e);
>                }
>
>                channel.close();
>                conn.close();
>
>        }
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>


More information about the rabbitmq-discuss mailing list