[rabbitmq-discuss] Getting many messages, ack() failing if total messages not equal a 10 increment
Matthew Taft
matthewetaft at gmail.com
Sat Jun 22 15:32:11 BST 2013
Hi everyone,
Hoping someone can help me with this problem. After going through a few
tutorials, I've pieced together some code the produce and consume messages.
I'm using kombu. I'lll past the code below. But the idea is to be able
to send many messages to the queue, possibly hundreds, then have a periodic
task that wakes up and consumes all messages. Right now I haven't set up
the periodic task. It's just a simply python file. When passed "produce,"
it produces some random messages. When passed "consume," it consumes them.
I've found that when the total messages is in increments of 10 (90, 100,
510, etc.), all messages get processed correctly and the qsize value shows
there are 0 messages remaining. However -- and this is better explained by
example - if the queue contains 78 messages, after processing all of them,
the queue reports 8 remaining, but the consumer can no longer get them
(times out). Any time the total qsize at the start is anything other than
a 10 increment (129, 523, 234, etc.), the part beyond the latest 10
increment is left remaining (9, 3, 4, etc.).
I've added some print statements to confirm that each message is actually
consumed correctly, and the ack() is being called for each one. However, I
noticed that the qsize value is only being updated every 10 messages.
Here's the log for some of them to give you a better idea. The first
number is the counter. It's shown three times for each message because I'm
printing before the get, after the get, then after the ack(). Notice the
qsize value only updates every 10 messages:
--------------------------------------------------------------------------------
Total Messages: 78
--------------------------------------------------------------------------------
Counter | Status
0: amount before queue.get: 78
0: amount after queue.get, before ack(): 78
0: amount after queue.get, after ack(): 78
1: amount before queue.get: 78
1: amount after queue.get, before ack(): 78
1: amount after queue.get, after ack(): 78
2: amount before queue.get: 78
2: amount after queue.get, before ack(): 78
2: amount after queue.get, after ack(): 78
3: amount before queue.get: 78
3: amount after queue.get, before ack(): 78
3: amount after queue.get, after ack(): 78
4: amount before queue.get: 78
4: amount after queue.get, before ack(): 78
4: amount after queue.get, after ack(): 78
5: amount before queue.get: 78
5: amount after queue.get, before ack(): 78
5: amount after queue.get, after ack(): 78
6: amount before queue.get: 78
6: amount after queue.get, before ack(): 78
6: amount after queue.get, after ack(): 78
7: amount before queue.get: 78
7: amount after queue.get, before ack(): 78
7: amount after queue.get, after ack(): 78
8: amount before queue.get: 78
8: amount after queue.get, before ack(): 78
8: amount after queue.get, after ack(): 78
9: amount before queue.get: 78
9: amount after queue.get, before ack(): 68
9: amount after queue.get, after ack(): 68
Is it just some configuration I have to set? Or is there something wrong
in the code? Here's the code in full:
import sys
from contextlib import closing
from django.conf import settings
from django.dispatch import Signal
from django.dispatch import receiver
from kombu import BrokerConnection
from Queue import Empty
# Test Stuff:
queue_name = "my.messages"
test_messages_to_produce = 78
consumer_batch_size = 100
consumer_timeout = 5
class UserMessenger(object):
"""
Class for producing and consuming messages
"""
def __init__(self, connection, queue_name, serializer="json",
compression=None):
self.queue = connection.SimpleQueue(queue_name)
self.serializer = serializer
self.compression = compression
def produce_message(self, context={}):
self.queue.put(context,
serializer=self.serializer,
compression=self.compression)
def consume_messages(self, callback, messages_to_get=1, timeout=1):
for i in range(messages_to_get):
try:
before_get = self.get_message_count()
message = self.queue.get(block=True, timeout=timeout)
callback(message.payload)
after_get_before_ack = self.get_message_count()
message.ack() # remove message from queue
after_get_after_ack = self.get_message_count()
deserialized = message.payload
user_id = deserialized["user_id"]
print("%d: amount before queue.get: %d" % (user_id,
before_get))
print("%d: amount after queue.get, before ack(): %d" %
(user_id, after_get_before_ack))
print("%d: amount after queue.get, after ack(): %d" %
(user_id, after_get_after_ack))
except Empty as e:
break;
def get_message_count(self):
return self.queue.qsize()
def purge_messages(self):
self.queue.clear()
def close(self):
self.queue.close()
def produce():
with BrokerConnection(settings.BROKER_URL) as connection:
with closing(UserMessenger(connection, queue_name)) as messenger:
#Just produce some random messages for now:
for i in range(test_messages_to_produce):
context = {
"user_id": i,
"changed_fields": {
'first_name': {
'before':'albert',
'after':'Albert'
}}}
messenger.produce_message(context)
def consume():
messages = []
def consume_message_callback(message):
"""
This callback passed to the messenger consume_messages method,
which calls it for each message.
Just append to messages for now.
"""
if message is not None:
messages.append(message)
with BrokerConnection(settings.BROKER_URL) as connection:
with closing(UserMessenger(connection, queue_name)) as messenger:
messages_count = messenger.get_message_count()
# Debug: How many total?
print "-" * 80; print "Total Messages: %d" % messages_count;
print "-" * 80
for i in range(0, messages_count, consumer_batch_size):
messenger.consume_messages(
consume_message_callback,
messages_to_get = consumer_batch_size,
timeout = consumer_timeout,
)
# Debug: Print the results.
print "Should be done now. Let's see what we have:"
for message in messages:
print message
def purge():
"""
Delete messages (for debugging)
"""
with BrokerConnection(settings.BROKER_URL) as connection:
with closing(UserMessenger(connection, queue_name)) as messenger:
messenger.purge_messages()
def status():
"""
Get messages on queue (for debugging)
"""
with BrokerConnection(settings.BROKER_URL) as connection:
with closing(UserMessenger(connection, queue_name)) as messenger:
print("Remaining messages: %d" % messenger.queue.qsize())
if __name__ == "__main__":
if sys.argv[0].startswith("python"):
option_index = 2
else:
option_index = 1
option = sys.argv[option_index]
if option == "produce":
produce()
elif option == "consume":
consume()
elif option == "purge":
purge()
elif option == "status":
status()
else:
print "Unknown option '%s'; exiting ..." % option
sys.exit(1)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130622/79132aca/attachment.htm>
More information about the rabbitmq-discuss
mailing list