[rabbitmq-discuss] RabbitMQ 1.7.0 close the cnx when consumer don't ack deliveries during 30s

Vincent Barat vbarat at ubikod.com
Thu Jul 15 15:35:59 BST 2010


  Hi Simon,

I got my fix: here is what I do:

1- I use basicQos() to be sure that the server will not send "too 
many" messages (actually, I have checked that basicQos() does not 
offers a guarantee on the number of un-acknoledged messages sent, 
since I actually receive much more than the parameter I give, but it 
seems at least to limit the amount of messages, maybe there is a bug 
here ?).
2- I use an unlimited LinkedBlockingQueue to buffer my messages and 
to be sure that handleDelivery will never block
3- then, I use another thread to read this queue and write my 
message to the database

It works perfectly.

Here is the code:

public abstract class BufferedQueueConsumer extends QueueConsumer
{
   private static final Log LOG = 
LogFactory.getLog(BufferedQueueConsumer.class);

   /** Maximum time during which the messages will be buffered 
before triggering a delivery */
   private int pace = 1000;

   /** Maximum number of messages being buffered before triggering a 
delivered */
   private int capacity = 1000;

   /** Thread used to consume buffered messages */
   private final Thread consumerThread;

   /**
    * The message buffer. A queue is used to store the buffered 
messages and to synchronize the
    * message producer and consumer. This queue must not have a 
maximum capacity, otherwise it could
    * block the producer in handleDelivery() and then brake the AMQP 
connection. Instead, we rely on
    * AMQP's QOS parameter which allow to configure the server to 
send not more than a given amount
    * of unacknowledged deliveries. Tests have shown that this 
parameter is not a guarantee (we can
    * receive more unacknowledged messages than expected) but it has 
a clear effect preventing memory
    * overflows.
    */
   private final BlockingQueue<Delivery> deliveryQueue = new 
LinkedBlockingQueue<Delivery>();

   public BufferedQueueConsumer(final String queueName, String 
exchange, String routingKey,
     String mode, boolean durable)
   {
     /* Create the parent queue consumer */
     super(queueName, exchange, routingKey, mode, durable);

     /* Create the thread consuming delivered messages */
     consumerThread = new Thread(queueName + "'s consumer")
     {
       @Override
       public void run()
       {
         LOG.info("Thread " + this.getName() + " started");

         /* Create a delivery list used to extract deliveries from 
the delivery queue */
         final List<Delivery> deliveryList = new ArrayList<Delivery>();
         final List<Delivery> readOnlyDeliveryList = 
Collections.unmodifiableList(deliveryList);

         /* Initialize last delivery time */
         long lastDeliveryTime = System.currentTimeMillis();

         while (true)
         {
           long currentTime = System.currentTimeMillis();

           /*
            * If we have polled enough messages from the delivery 
queue, or we have polled the
            * delivery queue long enough
            */
           if (deliveryList.size() == capacity
             || ((currentTime >= lastDeliveryTime + pace) && 
!deliveryList.isEmpty()))
           {
             /* Ask subclass to handle all buffered deliveries */
             handleBufferedDeliveries(readOnlyDeliveryList);

             /* Acknowledge all buffered deliveries */
             ackDeliveries(readOnlyDeliveryList);

             /* Forget all buffered deliveries */
             deliveryList.clear();

             /* Reset last delivery time */
             lastDeliveryTime = System.currentTimeMillis();
           }

           /*
            * If we need to poll more messages from the delivery 
queue, or we haven't polled the
            * delivery queue long enough
            */
           else
           {
             try
             {
               /*
                * If the delivery list is empty, we need to wait 
forever for one message before being
                * able to deliver something
                */
               if (deliveryList.isEmpty())
                 deliveryList.add(deliveryQueue.take());

               /*
                * If the delivery list is not empty, we only need to 
wait for the time remaining to
                * poll
                */
               else
               {
                 Delivery delivery = 
deliveryQueue.poll(lastDeliveryTime + pace - currentTime,
                   TimeUnit.MILLISECONDS);
                 if (delivery != null)
                   deliveryList.add(delivery);
               }
             }
             catch (InterruptedException ie)
             {
               LOG.warn("Thread " + this.getName() + " for " + 
queueName + " interrupted");
               return;
             }
           }
         }
       }
     };
     consumerThread.setDaemon(true);
     consumerThread.start();
   }

   @Override
   protected void onNewChannel(Channel channel) throws IOException
   {
     super.onNewChannel(channel);

     /*
      * Configure the channel to send not more than a given amount 
of unacknowledged messages. This
      * avoid at the same time to buffer too many messages (which 
could exhaust memory) and to block
      * in handleDelivery() (which could brake the connection).
      */
     channel.basicQos(capacity);
   }

   @Override
   public void handleDelivery(String consumerTag, Envelope envelope, 
BasicProperties properties,
     byte[] body)
   {
     try
     {
       /* Store the delivery in the message buffer */
       deliveryQueue.put(new Delivery(envelope, properties, body));
     }
     catch (InterruptedException ie)
     {
       /* Due to program being exited, do nothing */
     }
   }

   /**
    * Override this method to be notified of buffered deliveries. 
Once this method has run, messages
    * will be acknowledged.
    * @param stack unmodifiable list of deliveries.
    */
   public abstract void handleBufferedDeliveries(List<Delivery> stack);
}


Le 14/07/10 18:53, Simon MacMullen a écrit :
> Hi Vincent. Yes, I think you could do that, although you have to 
> think about what to do when there aren't 1000 messages to deliver 
> - would you just wait indefinitely?
>
> It does sound like you'd be building something like a fixed-length 
> QueueingConsumer though - I don't really see what that would gain 
> you.
>
> Cheers, Simon
>
> On 14/07/10 17:38, Vincent Barat wrote:
>> Concerning this issue, I've read about the basicQos() call.
>>
>> Do you think that I can use this call to force the server to send 
>> me a
>> maximum of, say, 1000 messages that I could store in an array in
>> handleDelivery() (without blocking), and, once I've counted 1000
>> messages exactly, I trigger another thread to store all messages and
>> then ack all messages ?
>>
>> Le 14/07/10 11:30, Simon MacMullen a écrit :
>>> On 13/07/10 16:00, Vincent Barat wrote:
>>> <snip>
>>>> But from time to time, the database blocks during 30s to 
>>>> 1minute and
>>>> thus I don't acknowledge during the same time.
>>>>
>>>> It seems that the RabbitMQ server closes the connection. Is 
>>>> there a
>>>> timeout (or a setting) I can use to fix this issue ?
>>>
>>> Hi Vincent. Not acking for 30s certainly should not close the
>>> connection. When the server closes a connection due to client 
>>> error it
>>> sends an error message; you should see this as an IOException in 
>>> the
>>> Java client. Check the ShutdownSignalException inside the 
>>> IOException
>>> to get the error code / message. Alternatively you could look in 
>>> the
>>> server logs, sometimes there's more detail there.
>>>
>>> One way this could be happening is:
>>>
>>> * You have heartbeating turned on (I think some of the client 
>>> version
>>> have this turned on by default). Heartbeating makes the server
>>> periodically check the client is still alive and responding.
>>>
>>> * You block the connection main loop, say by implementing your own
>>> Consumer rather than using QueueingConsumer, and doing something 
>>> slow
>>> / expensive in Consumer.handleDelivery().
>>>
>>> Does this sound plausible? If not I would look for error 
>>> messages as
>>> above.
>>>
>>> Cheers, Simon
>>>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>


More information about the rabbitmq-discuss mailing list