[rabbitmq-discuss] Getting many messages, ack() failing if total messages not equal a 10 increment

Matthew Taft matthewetaft at gmail.com
Sat Jun 22 15:32:11 BST 2013


Hi everyone,

Hoping someone can help me with this problem.  After going through a few 
tutorials, I've pieced together some code the produce and consume messages. 
 I'm using kombu.  I'lll past the code below.  But the idea is to be able 
to send many messages to the queue, possibly hundreds, then have a periodic 
task that wakes up and consumes all messages.  Right now I haven't set up 
the periodic task.  It's just a simply python file.  When passed "produce," 
it produces some random messages.  When passed "consume," it consumes them.

I've found that when the total messages is in increments of 10 (90, 100, 
510, etc.), all messages get processed correctly and the qsize value shows 
there are 0 messages remaining.  However -- and this is better explained by 
example - if the queue contains 78 messages, after processing all of them, 
the queue reports 8 remaining, but the consumer can no longer get them 
(times out).  Any time the total qsize at the start is anything other than 
a 10 increment (129, 523, 234, etc.), the part beyond the latest 10 
increment is left remaining (9, 3, 4, etc.). 

I've added some print statements to confirm that each message is actually 
consumed correctly, and the ack() is being called for each one.  However, I 
noticed that the qsize value is only being updated every 10 messages. 
 Here's the log for some of them to give you a better idea. The first 
number is the counter.  It's shown three times for each message because I'm 
printing before the get, after the get, then after the ack().  Notice the 
qsize value only updates every 10 messages:

--------------------------------------------------------------------------------
Total Messages: 78
--------------------------------------------------------------------------------
Counter | Status
0: amount before queue.get: 78
0: amount after queue.get, before ack(): 78
0: amount after queue.get, after ack(): 78
1: amount before queue.get: 78
1: amount after queue.get, before ack(): 78
1: amount after queue.get, after ack(): 78
2: amount before queue.get: 78
2: amount after queue.get, before ack(): 78
2: amount after queue.get, after ack(): 78
3: amount before queue.get: 78
3: amount after queue.get, before ack(): 78
3: amount after queue.get, after ack(): 78
4: amount before queue.get: 78
4: amount after queue.get, before ack(): 78
4: amount after queue.get, after ack(): 78
5: amount before queue.get: 78
5: amount after queue.get, before ack(): 78
5: amount after queue.get, after ack(): 78
6: amount before queue.get: 78
6: amount after queue.get, before ack(): 78
6: amount after queue.get, after ack(): 78
7: amount before queue.get: 78
7: amount after queue.get, before ack(): 78
7: amount after queue.get, after ack(): 78
8: amount before queue.get: 78
8: amount after queue.get, before ack(): 78
8: amount after queue.get, after ack(): 78
9: amount before queue.get: 78
9: amount after queue.get, before ack(): 68
9: amount after queue.get, after ack(): 68

Is it just some configuration I have to set?  Or is there something wrong 
in the code?  Here's the code in full:

import sys
from contextlib import closing
from django.conf import settings
from django.dispatch import Signal
from django.dispatch import receiver
from kombu import BrokerConnection
from Queue import Empty

# Test Stuff:
queue_name = "my.messages"
test_messages_to_produce = 78
consumer_batch_size = 100
consumer_timeout = 5


class UserMessenger(object):
    """
    Class for producing and consuming messages
    """
    def __init__(self, connection, queue_name, serializer="json", 
compression=None):
        self.queue = connection.SimpleQueue(queue_name)
        self.serializer = serializer
        self.compression = compression

    def produce_message(self, context={}):
        self.queue.put(context,
                       serializer=self.serializer,
                       compression=self.compression)

    def consume_messages(self, callback, messages_to_get=1, timeout=1):
        for i in range(messages_to_get):
            try:
                before_get = self.get_message_count()
                message = self.queue.get(block=True, timeout=timeout)
                callback(message.payload)

                after_get_before_ack = self.get_message_count()
                message.ack() # remove message from queue
                after_get_after_ack = self.get_message_count()

                deserialized = message.payload
                user_id = deserialized["user_id"]
                print("%d: amount before queue.get: %d" % (user_id, 
before_get))
                print("%d: amount after queue.get, before ack(): %d" % 
(user_id, after_get_before_ack))
                print("%d: amount after queue.get, after ack(): %d" % 
(user_id, after_get_after_ack))
            except Empty as e:
                break;

    def get_message_count(self):
        return self.queue.qsize()

    def purge_messages(self):
        self.queue.clear()

    def close(self):
        self.queue.close()


def produce():
    with BrokerConnection(settings.BROKER_URL) as connection:
        with closing(UserMessenger(connection, queue_name)) as messenger:

            #Just produce some random messages for now:
            for i in range(test_messages_to_produce):
                context = {
                    "user_id": i,
                    "changed_fields": {
                        'first_name': {
                            'before':'albert',
                            'after':'Albert'
                     }}}

                messenger.produce_message(context)


def consume():
    messages = []

    def consume_message_callback(message):
        """
        This callback passed to the messenger consume_messages method, 
which calls it for each message.
        Just append to messages for now.
        """
        if message is not None:
            messages.append(message)


    with BrokerConnection(settings.BROKER_URL) as connection:
        with closing(UserMessenger(connection, queue_name)) as messenger:
            messages_count = messenger.get_message_count()

            # Debug: How many total?
            print "-" * 80; print "Total Messages: %d" % messages_count; 
print "-" * 80

            for i in range(0, messages_count, consumer_batch_size):
                messenger.consume_messages(
                            consume_message_callback,
                            messages_to_get = consumer_batch_size,
                            timeout = consumer_timeout,
                        )

            # Debug: Print the results.
            print "Should be done now.  Let's see what we have:"
            for message in messages:
                print message

def purge():
    """
    Delete messages (for debugging)
    """
    with BrokerConnection(settings.BROKER_URL) as connection:
        with closing(UserMessenger(connection, queue_name)) as messenger:
            messenger.purge_messages()

def status():
    """
    Get messages on queue (for debugging)
    """
    with BrokerConnection(settings.BROKER_URL) as connection:
        with closing(UserMessenger(connection, queue_name)) as messenger:
            print("Remaining messages: %d" % messenger.queue.qsize())


if __name__ == "__main__":
    if sys.argv[0].startswith("python"):
        option_index = 2
    else:
        option_index = 1
    option = sys.argv[option_index]

    if option == "produce":
        produce()
    elif option == "consume":
        consume()
    elif option == "purge":
        purge()
    elif option == "status":
        status()
    else:
        print "Unknown option '%s'; exiting ..." % option
        sys.exit(1)

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130622/79132aca/attachment.htm>


More information about the rabbitmq-discuss mailing list