[rabbitmq-discuss] .NET API - Using PublishConfirms to get Reliability

Martywaz marty.wasznicky at neudesic.com
Wed Feb 26 05:08:21 GMT 2014


hello,

I'm using Publish Confirms and am trying to achieve zero message loss when 
the Rabbit MQ instances go down.

I have 2 instances of Rabbit MQ, clustered with all Queues mirrored on my 
local desktop.

I think use Publish Confirms to send a batch of about 50,000 messages.

I've implemented duplicate message detection in my consumer, and implemented 
the storage of all messages sent on the publisher. I wired in the following 
events and remove the messages as Acks come in i.e.:

            this.model.BasicAcks -= this.MessageAcknowledged;
            this.model.BasicNacks -= this.MessageNotAcknowledged;
            this.model.BasicReturn -= this.MessageReturned;
            this.model.FlowControl -= this.FlowControlChanged;
            this.model.CallbackException -= ReportCallbackException;

Fairly straight forward I thought.  On the consumer side I did from the QOS 
setting for consuming messages.  I do this in a tight loop like so:

                consumerModel = this.connection.CreateModel();
                consumerModel.BasicQos(0, 10000, false);
                var consumer = new QueueingBasicConsumer(consumerModel);
               
                var consumerTag = consumerModel.BasicConsume(
                    this.queueName,false,
                    consumer);

                while (true)
                {
                    BasicDeliverEventArgs item = null;
                    try
                    {
                        if (!consumer.Queue.Dequeue(3000, out item))
                        {
                            if (null == item) continue;
                        }
                        // do stuff
                        consumerModel.BasicAck(item.DeliveryTag, false);
                    }
                 }

My first finding was that performance was very slow.  with 250 byte 
messages, I'm getting about 500 to 600 msg/sec delivered.  

Regardless though, my testing was around recover-ability.  What I'm doing is 
sending groups of messages...about 50,000 at time.  Then while they are 
sending, I take down both instances of my Rabbit MQ cluster to simulate a 
failure like so using Powershell:

     &.\rabbitmqctl.bat -n cluster1 stop_app
     &.\rabbitmqctl.bat -n cluster2 stop_app

My results are always the same.  There appear to be several thousand 
messages that I never received Acks for still in my internal queue on the 
Publisher side.  There are also pending messages to be delivered that 
written to disk...that I don't expect my consumer to get until one of the 
cluster instances start back up.

Hence, I start up the main cluster instance followed by the second. 

First observation is that the messages pending in the Rabbit MQ queue do 
indeed get delivered to the consumer.  great.

I then resubmit all the messages in my internal queue on the publisher side.  
The result is always the same.  I'm always 3 or 4 messages short!

I then retested without a cluster and without shutting down.  After my test 
run..and after the consumer successfully gets all the messages I always find 
the same thing. sometimes I have thousands of unacknowledged messages left 
in my publisher queue.  I'd come back 5 minutes later....still there.

In short, 2 serious issues I'm seeing.  

First one is that Rabbit MQ is losing a few messages if both servers in the 
cluster are shut down.

Second one, Acks/Nacks seem to just get lost by Rabbit MQ.

Third one...only happens now again...The producer actually receives Acks for 
delivery tags/messages that don't exist in the its internal queue. 

Fourth one....when both servers go down, sometimes, but not always, the 
consumer will not throw an exception when it tries to read the message in 
the while loop. i.e : if (!consumer.Queue.Dequeue(3000, out item)).  The 
item comes back null, but If I look in the debugger, the consumer's and 
connection's isopen property is true...and the CloseReason is null.

Does anyone have any ideas or have experienced this?

How I'm sending the message is pretty straight forward:


                       messageProperties.MessageId = 
System.Guid.NewGuid().ToString()
                       lock (this.activeMessagesLock)
                        {
                            var deliveryTag = 0UL;
                            deliveryTag = this.model.NextPublishSeqNo;

                            this.model.BasicPublish(
                                this.exchangeName,
                                message.Header.Topic,
                                this.properties.Durable,
                                immediate,
                                messageProperties,
                                body);

                            this.activeMessages[deliveryTag] = message;
                        }

I'm using the message ID to add to a concurrentdictionary collection on the 
consumer side so I can detect duplicates.  The message is added to this 
duplicate detection dictionary before the Ack is sent back to Rabbit MQ

Any help would be appreciated.

by the way, if I use the TxCommit and TxRollback....I don't have any issues 
on resending by resending everything in the producer's internal queue. 
Message loss only seems to happen with PublishConfirms




More information about the rabbitmq-discuss mailing list