Message deleted at subscription?

Matthias Radestock matthias at lshift.net
Sun Jan 18 09:41:02 GMT 2009


Julien Genestoux wrote:
> A process puts messages in a queue every second roughly. Atfer some 
> time, I check the queue.
>  > rabbitmqctl list_queues messages name consumers
> Listing queues ...
> 508    /queue/MyQueue    0
> Now, I start a consumer with the following code :
> queue_client.subscribe("/queue/FeedsToParse", {:durable => true, 
> "auto-delete".intern => false}) do |message|
>   sleep 600
>   puts "done!"
> end
> As you can see, when starting this client, the number of elements in my 
> queue should decrease pretty slowly (1 every 10 minutes)... but here is 
> what happens if I check the queue again:
>  > rabbitmqctl list_queues messages name consumers
> Listing queues ...
> 417    /queue/MyQueue    1
> So it seems that starting a consumer deletes some messages in the queue. 
> I have no idea why. Can anyone explain?

The 'messages' count is the sum of:

- messages_ready - Number of ready messages (i.e. messages ready to be 
delivered to client)

- messages_unacknowledged - Number of unacknowledged messages.

- messages_uncommitted - Number of uncommitted messages (i.e. messages 
published in a transaction but not yet committed)

Since your example does not involve transactions, only messages_ready + 
messages_unacknowledged come into play.

On publication, messages_ready increases.

When delivering messages to a client, messages_ready decreases and 
messages_unacknowledged increases by the same amount, so the sum remains 
the same.

When an acknowledgement is received from the client, 
messages_unacknowledged decreases, and so does the sum.

Now ...

I am familiar with Aman's ruby client, but looking at the code and docs 
it appears that by default the subscribe method creates a consumer from 
which the server does not expect acknowledgements (and indeed there is 
no explicit ack in the code above). Here's what ruby client's docs say;

     # == Options
     # * :ack => true | false (default false)
     # If this field is set to false the server does not expect 
     # for messages. That is, when a message is delivered to the client
     # the server automatically and silently acknowledges it on behalf
     # of the client. This functionality increases performance but at
     # the cost of reliability. Messages can get lost if a client dies
     # before it can deliver them to the application.

So as soon as your code creates the subscription, the server will start 
sending it the queue's messages (resulting in messages_ready to 
decrease) and automatically acknowledge all such messages (resulting in 
messages_unacknowledged to remain unchanged), and thus the 'messages' 
count decreases. This is an asynchronous operation; so the client will 
receive a stream of messages without having to do anything. The stream 
is only paused by network/tcp congestion, and basic.qos (search the 
archives for discussions on the latter).

If that is not what you want I suggest you change the flag to true and 
explicitly acknowledge messages in your code.



