Hi everyone,<div><br></div><div>Hoping someone can help me with this problem. After going through a few tutorials, I've pieced together some code the produce and consume messages. I'm using kombu. I'lll past the code below. But the idea is to be able to send many messages to the queue, possibly hundreds, then have a periodic task that wakes up and consumes all messages. Right now I haven't set up the periodic task. It's just a simply python file. When passed "produce," it produces some random messages. When passed "consume," it consumes them.</div><div><br></div><div>I've found that when the total messages is in increments of 10 (90, 100, 510, etc.), all messages get processed correctly and the qsize value shows there are 0 messages remaining. However -- and this is better explained by example - if the queue contains 78 messages, after processing all of them, the queue reports 8 remaining, but the consumer can no longer get them (times out). Any time the total qsize at the start is anything other than a 10 increment (129, 523, 234, etc.), the part beyond the latest 10 increment is left remaining (9, 3, 4, etc.). </div><div><br></div><div>I've added some print statements to confirm that each message is actually consumed correctly, and the ack() is being called for each one. However, I noticed that the qsize value is only being updated every 10 messages. Here's the log for some of them to give you a better idea. The first number is the counter. It's shown three times for each message because I'm printing before the get, after the get, then after the ack(). Notice the qsize value only updates every 10 messages:</div><div><br></div><div><div>--------------------------------------------------------------------------------</div><div>Total Messages: 78</div><div>--------------------------------------------------------------------------------</div><div>Counter | Status</div><div>0: amount before queue.get: 78</div><div>0: amount after queue.get, before ack(): 78</div><div>0: amount after queue.get, after ack(): 78</div><div>1: amount before queue.get: 78</div><div>1: amount after queue.get, before ack(): 78</div><div>1: amount after queue.get, after ack(): 78</div><div>2: amount before queue.get: 78</div><div>2: amount after queue.get, before ack(): 78</div><div>2: amount after queue.get, after ack(): 78</div><div>3: amount before queue.get: 78</div><div>3: amount after queue.get, before ack(): 78</div><div>3: amount after queue.get, after ack(): 78</div><div>4: amount before queue.get: 78</div><div>4: amount after queue.get, before ack(): 78</div><div>4: amount after queue.get, after ack(): 78</div><div>5: amount before queue.get: 78</div><div>5: amount after queue.get, before ack(): 78</div><div>5: amount after queue.get, after ack(): 78</div><div>6: amount before queue.get: 78</div><div>6: amount after queue.get, before ack(): 78</div><div>6: amount after queue.get, after ack(): 78</div><div>7: amount before queue.get: 78</div><div>7: amount after queue.get, before ack(): 78</div><div>7: amount after queue.get, after ack(): 78</div><div>8: amount before queue.get: 78</div><div>8: amount after queue.get, before ack(): 78</div><div>8: amount after queue.get, after ack(): 78</div><div>9: amount before queue.get: 78</div><div>9: amount after queue.get, before ack(): 68</div><div>9: amount after queue.get, after ack(): 68</div></div><div><br></div><div>Is it just some configuration I have to set? Or is there something wrong in the code? Here's the code in full:</div><div><br></div><div><div>import sys</div><div>from contextlib import closing</div><div>from django.conf import settings</div><div>from django.dispatch import Signal</div><div>from django.dispatch import receiver</div><div>from kombu import BrokerConnection</div><div>from Queue import Empty</div><div><br></div><div># Test Stuff:</div><div>queue_name = "my.messages"</div><div>test_messages_to_produce = 78</div><div>consumer_batch_size = 100</div><div>consumer_timeout = 5</div><div><br></div><div><br></div><div>class UserMessenger(object):</div><div> """</div><div> Class for producing and consuming messages</div><div> """</div><div> def __init__(self, connection, queue_name, serializer="json", compression=None):</div><div> self.queue = connection.SimpleQueue(queue_name)</div><div> self.serializer = serializer</div><div> self.compression = compression</div><div><br></div><div> def produce_message(self, context={}):</div><div> self.queue.put(context,</div><div> serializer=self.serializer,</div><div> compression=self.compression)</div><div><br></div><div> def consume_messages(self, callback, messages_to_get=1, timeout=1):</div><div> for i in range(messages_to_get):</div><div> try:</div><div> before_get = self.get_message_count()</div><div> message = self.queue.get(block=True, timeout=timeout)</div><div> callback(message.payload)</div><div><br></div><div> after_get_before_ack = self.get_message_count()</div><div> message.ack() # remove message from queue</div><div> after_get_after_ack = self.get_message_count()</div><div><br></div><div> deserialized = message.payload</div><div> user_id = deserialized["user_id"]</div><div> print("%d: amount before queue.get: %d" % (user_id, before_get))</div><div> print("%d: amount after queue.get, before ack(): %d" % (user_id, after_get_before_ack))</div><div> print("%d: amount after queue.get, after ack(): %d" % (user_id, after_get_after_ack))</div><div> except Empty as e:</div><div> break;</div><div><br></div><div> def get_message_count(self):</div><div> return self.queue.qsize()</div><div><br></div><div> def purge_messages(self):</div><div> self.queue.clear()</div></div><div><br></div><div><div> def close(self):</div><div> self.queue.close()</div><div><br></div><div><br></div><div>def produce():</div><div> with BrokerConnection(settings.BROKER_URL) as connection:</div><div> with closing(UserMessenger(connection, queue_name)) as messenger:</div><div><br></div><div> #Just produce some random messages for now:</div><div> for i in range(test_messages_to_produce):</div><div> context = {</div><div> "user_id": i,</div><div> "changed_fields": {</div><div> 'first_name': {</div><div> 'before':'albert',</div><div> 'after':'Albert'</div><div> }}}</div><div><br></div><div> messenger.produce_message(context)</div><div><br></div><div><br></div><div>def consume():</div><div> messages = []</div><div><br></div><div> def consume_message_callback(message):</div><div> """</div><div> This callback passed to the messenger consume_messages method, which calls it for each message.</div><div> Just append to messages for now.</div><div> """</div><div> if message is not None:</div><div> messages.append(message)</div><div><br></div><div><br></div><div> with BrokerConnection(settings.BROKER_URL) as connection:</div><div> with closing(UserMessenger(connection, queue_name)) as messenger:</div><div> messages_count = messenger.get_message_count()</div><div><br></div><div> # Debug: How many total?</div><div> print "-" * 80; print "Total Messages: %d" % messages_count; print "-" * 80</div><div><br></div><div> for i in range(0, messages_count, consumer_batch_size):</div><div> messenger.consume_messages(<br></div><div> consume_message_callback,</div><div> messages_to_get = consumer_batch_size,</div><div> timeout = consumer_timeout,</div><div> )</div><div><br></div><div> # Debug: Print the results.</div><div> print "Should be done now. Let's see what we have:"</div><div> for message in messages:</div><div> print message</div><div><br></div><div>def purge():</div><div> """</div><div> Delete messages (for debugging)</div><div> """</div><div> with BrokerConnection(settings.BROKER_URL) as connection:</div><div> with closing(UserMessenger(connection, queue_name)) as messenger:</div><div> messenger.purge_messages()</div></div><div><br></div><div><div>def status():</div><div> """</div><div> Get messages on queue (for debugging)</div><div> """</div><div> with BrokerConnection(settings.BROKER_URL) as connection:</div><div> with closing(UserMessenger(connection, queue_name)) as messenger:</div><div> print("Remaining messages: %d" % messenger.queue.qsize())</div><div><br></div><div><br></div><div>if __name__ == "__main__":</div><div> if sys.argv[0].startswith("python"):</div><div> option_index = 2</div><div> else:</div><div> option_index = 1</div><div> option = sys.argv[option_index]</div><div><br></div><div> if option == "produce":</div><div> produce()</div><div> elif option == "consume":</div><div> consume()</div><div> elif option == "purge":</div><div> purge()</div><div> elif option == "status":</div><div> status()</div><div> else:</div><div> print "Unknown option '%s'; exiting ..." % option</div><div> sys.exit(1)</div></div><div><br></div>