[rabbitmq-discuss] RabbitMQ 1.7.0 close the cnx when consumer don't ack deliveries during 30s

Vincent Barat vbarat at ubikod.com
Thu Jul 15 15:35:59 BST 2010

  Hi Simon,

I got my fix: here is what I do:

1- I use basicQos() to be sure that the server will not send "too 
many" messages (actually, I have checked that basicQos() does not 
offers a guarantee on the number of un-acknoledged messages sent, 
since I actually receive much more than the parameter I give, but it 
seems at least to limit the amount of messages, maybe there is a bug 
here ?).
2- I use an unlimited LinkedBlockingQueue to buffer my messages and 
to be sure that handleDelivery will never block
3- then, I use another thread to read this queue and write my 
message to the database

It works perfectly.

Here is the code:

public abstract class BufferedQueueConsumer extends QueueConsumer
   private static final Log LOG = 

   /** Maximum time during which the messages will be buffered 
before triggering a delivery */
   private int pace = 1000;

   /** Maximum number of messages being buffered before triggering a 
delivered */
   private int capacity = 1000;

   /** Thread used to consume buffered messages */
   private final Thread consumerThread;

    * The message buffer. A queue is used to store the buffered 
messages and to synchronize the
    * message producer and consumer. This queue must not have a 
maximum capacity, otherwise it could
    * block the producer in handleDelivery() and then brake the AMQP 
connection. Instead, we rely on
    * AMQP's QOS parameter which allow to configure the server to 
send not more than a given amount
    * of unacknowledged deliveries. Tests have shown that this 
parameter is not a guarantee (we can
    * receive more unacknowledged messages than expected) but it has 
a clear effect preventing memory
    * overflows.
   private final BlockingQueue<Delivery> deliveryQueue = new 

   public BufferedQueueConsumer(final String queueName, String 
exchange, String routingKey,
     String mode, boolean durable)
     /* Create the parent queue consumer */
     super(queueName, exchange, routingKey, mode, durable);

     /* Create the thread consuming delivered messages */
     consumerThread = new Thread(queueName + "'s consumer")
       public void run()
         LOG.info("Thread " + this.getName() + " started");

         /* Create a delivery list used to extract deliveries from 
the delivery queue */
         final List<Delivery> deliveryList = new ArrayList<Delivery>();
         final List<Delivery> readOnlyDeliveryList = 

         /* Initialize last delivery time */
         long lastDeliveryTime = System.currentTimeMillis();

         while (true)
           long currentTime = System.currentTimeMillis();

            * If we have polled enough messages from the delivery 
queue, or we have polled the
            * delivery queue long enough
           if (deliveryList.size() == capacity
             || ((currentTime >= lastDeliveryTime + pace) && 
             /* Ask subclass to handle all buffered deliveries */

             /* Acknowledge all buffered deliveries */

             /* Forget all buffered deliveries */

             /* Reset last delivery time */
             lastDeliveryTime = System.currentTimeMillis();

            * If we need to poll more messages from the delivery 
queue, or we haven't polled the
            * delivery queue long enough
                * If the delivery list is empty, we need to wait 
forever for one message before being
                * able to deliver something
               if (deliveryList.isEmpty())

                * If the delivery list is not empty, we only need to 
wait for the time remaining to
                * poll
                 Delivery delivery = 
deliveryQueue.poll(lastDeliveryTime + pace - currentTime,
                 if (delivery != null)
             catch (InterruptedException ie)
               LOG.warn("Thread " + this.getName() + " for " + 
queueName + " interrupted");

   protected void onNewChannel(Channel channel) throws IOException

      * Configure the channel to send not more than a given amount 
of unacknowledged messages. This
      * avoid at the same time to buffer too many messages (which 
could exhaust memory) and to block
      * in handleDelivery() (which could brake the connection).

   public void handleDelivery(String consumerTag, Envelope envelope, 
BasicProperties properties,
     byte[] body)
       /* Store the delivery in the message buffer */
       deliveryQueue.put(new Delivery(envelope, properties, body));
     catch (InterruptedException ie)
       /* Due to program being exited, do nothing */

    * Override this method to be notified of buffered deliveries. 
Once this method has run, messages
    * will be acknowledged.
    * @param stack unmodifiable list of deliveries.
   public abstract void handleBufferedDeliveries(List<Delivery> stack);

Le 14/07/10 18:53, Simon MacMullen a écrit :
> Hi Vincent. Yes, I think you could do that, although you have to 
> think about what to do when there aren't 1000 messages to deliver 
> - would you just wait indefinitely?
> It does sound like you'd be building something like a fixed-length 
> QueueingConsumer though - I don't really see what that would gain 
> you.
> Cheers, Simon
> On 14/07/10 17:38, Vincent Barat wrote:
>> Concerning this issue, I've read about the basicQos() call.
>> Do you think that I can use this call to force the server to send 
>> me a
>> maximum of, say, 1000 messages that I could store in an array in
>> handleDelivery() (without blocking), and, once I've counted 1000
>> messages exactly, I trigger another thread to store all messages and
>> then ack all messages ?
>> Le 14/07/10 11:30, Simon MacMullen a écrit :
>>> On 13/07/10 16:00, Vincent Barat wrote:
>>> <snip>
>>>> But from time to time, the database blocks during 30s to 
>>>> 1minute and
>>>> thus I don't acknowledge during the same time.
>>>> It seems that the RabbitMQ server closes the connection. Is 
>>>> there a
>>>> timeout (or a setting) I can use to fix this issue ?
>>> Hi Vincent. Not acking for 30s certainly should not close the
>>> connection. When the server closes a connection due to client 
>>> error it
>>> sends an error message; you should see this as an IOException in 
>>> the
>>> Java client. Check the ShutdownSignalException inside the 
>>> IOException
>>> to get the error code / message. Alternatively you could look in 
>>> the
>>> server logs, sometimes there's more detail there.
>>> One way this could be happening is:
>>> * You have heartbeating turned on (I think some of the client 
>>> version
>>> have this turned on by default). Heartbeating makes the server
>>> periodically check the client is still alive and responding.
>>> * You block the connection main loop, say by implementing your own
>>> Consumer rather than using QueueingConsumer, and doing something 
>>> slow
>>> / expensive in Consumer.handleDelivery().
>>> Does this sound plausible? If not I would look for error 
>>> messages as
>>> above.
>>> Cheers, Simon
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

More information about the rabbitmq-discuss mailing list