[rabbitmq-discuss] Duplicate Messages received after basicRecoveryAsync() is called.

John Mann jmann at versatile.com
Fri Feb 26 15:10:23 GMT 2010


On Feb 26, 2010, at 6:42 AM, Matthias Radestock wrote:

> John Mann wrote:
>> From the client perspective, receiving messages that I've already
>> acknowledged is confusing.
> 
> Here is a possible trace, as observed at the server, with delivery tags and acks included:
> 
> -> publish(A)
> <- deliver(1, A)
> -> publish(B)
> <- deliver(2, B)
> -> recover
> <- deliver(3, A)
> <- deliver(4, B)
> -> ack(2)
> -> ack(3)
> 
> As you can see, the ack(2) isn't received by the server until after it has re-delivered both messages.

Ah!  I get it.

> 
>> Is there a way to recover unacknowledged messages without also
>> receiving previously acknowledged messages?
> 
> 'recover' should be a synchronous operation, and such an operation is introduced in the 0-9-1 version of the AMQP spec, which rabbit will implement soon.
> 
> The recover-ok reply of the synchronous operation acts as a marker in the delivery stream - all unacked messages received by the client before it are recovered.
> 
> There is still a problem though: there is no easy way to tell in the client API which messages were received before or after the recover-ok. I will raise a bug for that.

Yup, that's my problem.  I knew that the basicRecoverAsync method was asynchronous (the name helps! :) ), but I didn't know what to wait on.

> Can you elaborate on what you are trying to accomplish?

I have a component (RabbitMQConsumer) that is responsible for listening for messages from a RabbitMQ queue.  Upon receiving a message, the RabbitMQConsumer invokes a method on a configurable listener class that implements the RawConsumer interface. 

Here is the meat of the RabbitMQConsumer class:

...
while (isRunning()) {
  try {
     deliverToRawConsumer(consumer.nextDelivery());
  } catch (InterruptedException e) {
     break;
  }
}
...

private void deliverToRawConsumer(QueueingConsumer.Delivery delivery) {
  String rawMessage = new String(delivery.getBody());

  try {
     rawConsumer.consume(rawMessage);

     acknowledgeMessageDelivery(delivery);
  }
  catch (Exception e) {
     LOG.error("Problem consuming message.", e);
     LOG.info("Not acknowledging message delivery.");
  }
}

By design, the RabbitMQConsumer does not acknowledge a message if the RawConsumer throws an exception. 

The problem is that the behavior of the RawConsumer class can change during runtime.  This means that it can throw an exception while consuming some messages, then later in time it can "recover".  When it recovers, I need to be able to retry all of the previously unacknowledged messages.

> Would closing and re-opening the channel be an option?

I think I can do that.  Are there any pitfalls in this approach.  I don't want to lose any messages.

> PS: could we continue this discussion on rabbitmq-discuss?

Good idea.  Done.

Thanks Matthias.

-John





More information about the rabbitmq-discuss mailing list