[rabbitmq-discuss] Ending a computation when the queue runs out of items

Ask Solem ask at rabbitmq.com
Tue Jun 26 13:17:03 BST 2012


On 26 Jun 2012, at 11:35, Simon MacMullen wrote:

> On 25/06/12 12:27, Rafael Calsaverini wrote:
>> I want the process to stop when I run out of items
> 
> I assume you mean "when the queue becomes empty" - obviously there's nothing to stop a publisher publishing more work even after a queue has been drained.
> 
> There's no way to get immediately notified that a queue is empty as a client. The best you can do is have your consumer keep redeclaring the queue occasionally, and looking at the queue.declare-ok to see how many messages the queue claims to have.
> 

This isn't necessarily true, it depends on what he means by 'when it runs out of items',
e.g. the following is sometimes used in Kombu:

import socket
from kombu import Connection, Consumer, Exchange, Queue, eventloop

queue = Queue('q', exchange=Exchange('q'), routing_key='q')

def on_message(body, message):
    print(body)
    message.ack()

def drain_queue(queue, callback):
    with Connection('amqp://guest:guest@localhost://") as connection:
        with Consumer(connection, [queue], callbacks=[callback]):
             try:
                eventloop(limit=None, timeout=1)
             except socket.timeout:
                return  # no more messages
drain_queue(queue)

The important part in the example to keep draining events from the connection
until it raises a timeout.  It will wait for 1 second in this example,
and there are cases where the socket recv times out *even if there
are more messages*, but it works well enough for applications where
this is not important.

(You can also set the timeout to 0.0, but that increases the chance of a false
positive, and you have to catch EAGAIN or EINTR as well).


More information about the rabbitmq-discuss mailing list