[rabbitmq-discuss] Rabbitmq Java Client Memory Leak?

drenz david.renz at gmail.com
Sun Dec 12 15:58:12 GMT 2010


Hello,

I have a pretty simple set up where I use the RabbitMQ client library
to pull objects from a RabbitMQ server.

Unfortunately, there seems to be some memory leak in my program
because it quickly consumes all of the heap memory with byte arrays.
I've increased the max memory for the JVM to 2 GB and it consumes all
of that too.

Here is the code I use to pull data from the queue.  Having used a few
Java profilers, it looks like the leak is associated with they
QueueingConsumer.Delivery object, because each Delivery object has a
member byte[] called _body.

Does anyone have any idea how I can ensure these arrays are garbage
collected / destroyed?

Thanks in advance!!!

Property values:
queue.user=guest
queue.password=guest
queue.virtualHost=/
queue.host=127.0.0.1
queue.hostPort=5672
queue.heartBeat=0
queue.exchange=myExchange
queue.queueName=dtt
queue.routingKey=dttRoute

Code:

	public void consume() throws IOException, ClassNotFoundException {

		ConnectionFactory factory = new ConnectionFactory();
		factory.setUsername(PropertyUtil
				.getProperyFile().getProperty("queue.user"));
		factory.setPassword(PropertyUtil
				.getProperyFile().getProperty("queue.password"));
		factory.setVirtualHost(PropertyUtil
				.getProperyFile().getProperty("queue.virtualHost"));
		factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
				.getProperyFile().getProperty("queue.heartBeat")));
		factory.setHost(PropertyUtil
				.getProperyFile().getProperty("queue.host"));
		factory.setPort(Integer.parseInt(PropertyUtil
				.getProperyFile().getProperty("queue.hostPort")));

		boolean durable = true;
		boolean noAck = false;

		Connection conn = factory.newConnection();

		Channel channel = conn.createChannel();

		channel.basicQos(2);
		channel.exchangeDeclare(getExchangeName(), "direct", durable);
		channel.queueDeclare(getQueueName(), durable, false, false, null);
		channel.queueBind(getQueueName(), getExchangeName(),
getRoutingKey());

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(getQueueName(), noAck, consumer);

		boolean runInfinite = true;

		java.sql.Connection sqlConn = null;
		sqlConn = DatabaseUtil.getConnection();

		long start = 0;
		long end = 0;

		while (runInfinite) {
			start = System.currentTimeMillis();
			QueueingConsumer.Delivery delivery = null;
			try {
				delivery = consumer.nextDelivery();

			} catch (Exception ie) {
				log.error(ie.getMessage(), ie);
				continue;
			}

			byte[] body = delivery.getBody();

			ObjectInputStream in = new ObjectInputStream(
					new ByteArrayInputStream(body));

			Status s = (Status) in.readObject();
			in.close();
			in = null;

			try {

				try {
					process(s, sqlConn);

				} catch (Exception e) {
					try{
						sqlConn = DatabaseUtil.getConnection();
					} catch (Exception e1){
						log.error(e.getMessage(), e1);
						throw e1;
					}
				}

				// only send an acknowledgement when the processing completed
				// successfully
				channel
						.basicAck(delivery.getEnvelope().getDeliveryTag(),
								false);

			} catch (Exception e) {
				log.error(e.getMessage(), e);
			}


			end = System.currentTimeMillis();
			delivery = null;

			log.debug("Processing time: "+ (end - start) +" ms");

		}
		try {
			sqlConn.close();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			log.error(e.getMessage(), e);
		}

		channel.close();
		conn.close();

	}




More information about the rabbitmq-discuss mailing list