Hi everyone,<div><br></div><div>Hoping someone can help me with this problem. &nbsp;After going through a few tutorials, I've pieced together some code the produce and consume messages. &nbsp;I'm using kombu. &nbsp;I'lll past the code below. &nbsp;But the idea is to be able to send many messages to the queue, possibly hundreds, then have a periodic task that wakes up and consumes all messages. &nbsp;Right now I haven't set up the periodic task. &nbsp;It's just a simply python file. &nbsp;When passed "produce," it produces some random messages. &nbsp;When passed "consume," it consumes them.</div><div><br></div><div>I've found that when the total messages is in increments of 10 (90, 100, 510, etc.), all messages get processed correctly and the qsize value shows there are 0 messages remaining. &nbsp;However -- and this is better explained by example - if the queue contains 78 messages, after processing all of them, the queue reports 8 remaining, but the consumer can no longer get them (times out). &nbsp;Any time the total qsize at the start is anything other than a 10 increment (129, 523, 234, etc.), the part beyond the latest 10 increment is left remaining (9, 3, 4, etc.).&nbsp;</div><div><br></div><div>I've added some print statements to confirm that each message is actually consumed correctly, and the ack() is being called for each one. &nbsp;However, I noticed that the qsize value is only being updated every 10 messages. &nbsp;Here's the log for some of them to give you a better idea. The first number is the counter. &nbsp;It's shown three times for each message because I'm printing before the get, after the get, then after the ack(). &nbsp;Notice the qsize value only updates every 10 messages:</div><div><br></div><div><div>--------------------------------------------------------------------------------</div><div>Total Messages: 78</div><div>--------------------------------------------------------------------------------</div><div>Counter | Status</div><div>0: amount before queue.get: 78</div><div>0: amount after queue.get, before ack(): 78</div><div>0: amount after queue.get, after ack(): 78</div><div>1: amount before queue.get: 78</div><div>1: amount after queue.get, before ack(): 78</div><div>1: amount after queue.get, after ack(): 78</div><div>2: amount before queue.get: 78</div><div>2: amount after queue.get, before ack(): 78</div><div>2: amount after queue.get, after ack(): 78</div><div>3: amount before queue.get: 78</div><div>3: amount after queue.get, before ack(): 78</div><div>3: amount after queue.get, after ack(): 78</div><div>4: amount before queue.get: 78</div><div>4: amount after queue.get, before ack(): 78</div><div>4: amount after queue.get, after ack(): 78</div><div>5: amount before queue.get: 78</div><div>5: amount after queue.get, before ack(): 78</div><div>5: amount after queue.get, after ack(): 78</div><div>6: amount before queue.get: 78</div><div>6: amount after queue.get, before ack(): 78</div><div>6: amount after queue.get, after ack(): 78</div><div>7: amount before queue.get: 78</div><div>7: amount after queue.get, before ack(): 78</div><div>7: amount after queue.get, after ack(): 78</div><div>8: amount before queue.get: 78</div><div>8: amount after queue.get, before ack(): 78</div><div>8: amount after queue.get, after ack(): 78</div><div>9: amount before queue.get: 78</div><div>9: amount after queue.get, before ack(): 68</div><div>9: amount after queue.get, after ack(): 68</div></div><div><br></div><div>Is it just some configuration I have to set? &nbsp;Or is there something wrong in the code? &nbsp;Here's the code in full:</div><div><br></div><div><div>import sys</div><div>from contextlib import closing</div><div>from django.conf import settings</div><div>from django.dispatch import Signal</div><div>from django.dispatch import receiver</div><div>from kombu import BrokerConnection</div><div>from Queue import Empty</div><div><br></div><div># Test Stuff:</div><div>queue_name = "my.messages"</div><div>test_messages_to_produce = 78</div><div>consumer_batch_size = 100</div><div>consumer_timeout = 5</div><div><br></div><div><br></div><div>class UserMessenger(object):</div><div>&nbsp; &nbsp; """</div><div>&nbsp; &nbsp; Class for producing and consuming messages</div><div>&nbsp; &nbsp; """</div><div>&nbsp; &nbsp; def __init__(self, connection, queue_name, serializer="json", compression=None):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.queue = connection.SimpleQueue(queue_name)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.serializer = serializer</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.compression = compression</div><div><br></div><div>&nbsp; &nbsp; def produce_message(self, context={}):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.queue.put(context,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;serializer=self.serializer,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;compression=self.compression)</div><div><br></div><div>&nbsp; &nbsp; def consume_messages(self, callback, messages_to_get=1, timeout=1):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; for i in range(messages_to_get):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; before_get = self.get_message_count()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message = self.queue.get(block=True, timeout=timeout)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; callback(message.payload)</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; after_get_before_ack = self.get_message_count()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message.ack() # remove message from queue</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; after_get_after_ack = self.get_message_count()</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deserialized = message.payload</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id = deserialized["user_id"]</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("%d: amount before queue.get: %d" % (user_id, before_get))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("%d: amount after queue.get, before ack(): %d" % (user_id, after_get_before_ack))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("%d: amount after queue.get, after ack(): %d" % (user_id, after_get_after_ack))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except Empty as e:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break;</div><div><br></div><div>&nbsp; &nbsp; def get_message_count(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; return self.queue.qsize()</div><div><br></div><div>&nbsp; &nbsp; def purge_messages(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.queue.clear()</div></div><div><br></div><div><div>&nbsp; &nbsp; def close(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.queue.close()</div><div><br></div><div><br></div><div>def produce():</div><div>&nbsp; &nbsp; with BrokerConnection(settings.BROKER_URL) as connection:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; with closing(UserMessenger(connection, queue_name)) as messenger:</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; #Just produce some random messages for now:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for i in range(test_messages_to_produce):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; context = {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "user_id": i,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "changed_fields": {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'first_name': {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'before':'albert',</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 'after':'Albert'</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}}}</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messenger.produce_message(context)</div><div><br></div><div><br></div><div>def consume():</div><div>&nbsp; &nbsp; messages = []</div><div><br></div><div>&nbsp; &nbsp; def consume_message_callback(message):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; """</div><div>&nbsp; &nbsp; &nbsp; &nbsp; This callback passed to the messenger consume_messages method, which calls it for each message.</div><div>&nbsp; &nbsp; &nbsp; &nbsp; Just append to messages for now.</div><div>&nbsp; &nbsp; &nbsp; &nbsp; """</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if message is not None:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messages.append(message)</div><div><br></div><div><br></div><div>&nbsp; &nbsp; with BrokerConnection(settings.BROKER_URL) as connection:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; with closing(UserMessenger(connection, queue_name)) as messenger:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messages_count = messenger.get_message_count()</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # Debug: How many total?</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print "-" * 80; print "Total Messages: %d" % messages_count; print "-" * 80</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for i in range(0, messages_count, consumer_batch_size):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messenger.consume_messages(<br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consume_message_callback,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messages_to_get = consumer_batch_size,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timeout = consumer_timeout,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # Debug: Print the results.</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print "Should be done now. &nbsp;Let's see what we have:"</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for message in messages:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print message</div><div><br></div><div>def purge():</div><div>&nbsp; &nbsp; """</div><div>&nbsp; &nbsp; Delete messages (for debugging)</div><div>&nbsp; &nbsp; """</div><div>&nbsp; &nbsp; with BrokerConnection(settings.BROKER_URL) as connection:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; with closing(UserMessenger(connection, queue_name)) as messenger:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messenger.purge_messages()</div></div><div><br></div><div><div>def status():</div><div>&nbsp; &nbsp; """</div><div>&nbsp; &nbsp; Get messages on queue (for debugging)</div><div>&nbsp; &nbsp; """</div><div>&nbsp; &nbsp; with BrokerConnection(settings.BROKER_URL) as connection:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; with closing(UserMessenger(connection, queue_name)) as messenger:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("Remaining messages: %d" % messenger.queue.qsize())</div><div><br></div><div><br></div><div>if __name__ == "__main__":</div><div>&nbsp; &nbsp; if sys.argv[0].startswith("python"):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; option_index = 2</div><div>&nbsp; &nbsp; else:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; option_index = 1</div><div>&nbsp; &nbsp; option = sys.argv[option_index]</div><div><br></div><div>&nbsp; &nbsp; if option == "produce":</div><div>&nbsp; &nbsp; &nbsp; &nbsp; produce()</div><div>&nbsp; &nbsp; elif option == "consume":</div><div>&nbsp; &nbsp; &nbsp; &nbsp; consume()</div><div>&nbsp; &nbsp; elif option == "purge":</div><div>&nbsp; &nbsp; &nbsp; &nbsp; purge()</div><div>&nbsp; &nbsp; elif option == "status":</div><div>&nbsp; &nbsp; &nbsp; &nbsp; status()</div><div>&nbsp; &nbsp; else:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; print "Unknown option '%s'; exiting ..." % option</div><div>&nbsp; &nbsp; &nbsp; &nbsp; sys.exit(1)</div></div><div><br></div>