[rabbitmq-discuss] Getting many messages, ack() failing if total messages not equal a 10 increment

Ask Solem ask at rabbitmq.com
Fri Jun 28 12:28:05 BST 2013


On Jun 22, 2013, at 3:32 PM, Matthew Taft <matthewetaft at gmail.com> wrote:

> Hi everyone,
> 
> Hoping someone can help me with this problem.  After going through a few tutorials, I've pieced together some code the produce and consume messages.  I'm using kombu.  I'lll past the code below.  But the idea is to be able to send many messages to the queue, possibly hundreds, then have a periodic task that wakes up and consumes all messages.  Right now I haven't set up the periodic task.  It's just a simply python file.  When passed "produce," it produces some random messages.  When passed "consume," it consumes them.
> 
> I've found that when the total messages is in increments of 10 (90, 100, 510, etc.), all messages get processed correctly and the qsize value shows there are 0 messages remaining.  However -- and this is better explained by example - if the queue contains 78 messages, after processing all of them, the queue reports 8 remaining, but the consumer can no longer get them (times out).  Any time the total qsize at the start is anything other than a 10 increment (129, 523, 234, etc.), the part beyond the latest 10 increment is left remaining (9, 3, 4, etc.). 
> 
> I've added some print statements to confirm that each message is actually consumed correctly, and the ack() is being called for each one.  However, I noticed that the qsize value is only being updated every 10 messages.  Here's the log for some of them to give you a better idea. The first number is the counter.  It's shown three times for each message because I'm printing before the get, after the get, then after the ack().  Notice the qsize value only updates every 10 messages:
> 
> Is it just some configuration I have to set?  Or is there something wrong in the code?  Here's the code in full:
> 

Welcome Matthew!

It reports correctly for me here:

from kombu import Connection

def get(q):
   for i in range(1000):
       print(len(q))
       m = q.get()
       m.ack()

def put(q):
   for i in range(1000):
       q.put('hello')

if __name__ = '__main__'
   conn = Connection('amqp://')
   q = conn.SimpleQueue('squ')
   q.channel.basic_qos(prefetch_count=1)
   put(q)
   get(q)


If you change the prefetch count it may skip numbers, but it always decreases.


Maybe you have a left-over consumer still running that you did not terminate correctly?
Take a look at the consumer count by running the command:

 rabbitmqctl list_queues -p $your_vhost name consumers




More information about the rabbitmq-discuss mailing list