[rabbitmq-discuss] .NET API - Using PublishConfirms to get Reliability
Marty Wasznicky
marty.wasznicky at neudesic.com
Wed Feb 26 02:03:50 GMT 2014
hello,
I'm using Publish Confirms and am trying to achieve zero message loss when the Rabbit MQ instances go down.
I have 2 instances of Rabbit MQ, clustered with all Queues mirrored on my local desktop.
I think use Publish Confirms to send a batch of about 50,000 messages.
I've implemented duplicate message detection in my consumer, and implemented the storage of all messages sent on the publisher. I wired in the following events and remove the messages as Acks come in i.e.:
this.model.BasicAcks -= this.MessageAcknowledged;
this.model.BasicNacks -= this.MessageNotAcknowledged;
this.model.BasicReturn -= this.MessageReturned;
this.model.FlowControl -= this.FlowControlChanged;
this.model.CallbackException -= ReportCallbackException;
Fairly straight forward I thought. On the consumer side I did from the QOS setting for consuming messages. I do this in a tight loop like so:
consumerModel = this.connection.CreateModel();
consumerModel.BasicQos(0, 10000, false);
var consumer = new QueueingBasicConsumer(consumerModel);
var consumerTag = consumerModel.BasicConsume(
this.queueName,false,
consumer);
while (true)
{
BasicDeliverEventArgs item = null;
try
{
if (!consumer.Queue.Dequeue(3000, out item))
{
if (null == item) continue;
}
// do stuff
consumerModel.BasicAck(item.DeliveryTag, false);
}
}
My first finding was that performance was very slow. with 250 byte messages, I'm getting about 500 to 600 msg/sec delivered.
Regardless though, my testing was around recover-ability. What I'm doing is sending groups of messages...about 50,000 at time. Then while they are sending, I take down both instances of my Rabbit MQ cluster to simulate a failure like so using Powershell:
&.\rabbitmqctl.bat -n cluster1 stop_app
&.\rabbitmqctl.bat -n cluster2 stop_app
My results are always the same. There appear to be several thousand messages that I never received Acks for still in my internal queue on the Publisher side. There are also pending messages to be delivered that written to disk...that I don't expect my consumer to get until one of the cluster instances start back up.
Hence, I start up the main cluster instance followed by the second.
First observation is that the messages pending in the Rabbit MQ queue do indeed get delivered to the consumer. great.
I then resubmit all the messages in my internal queue on the publisher side. The result is always the same. I'm always 3 or 4 messages short!
I then retested without a cluster and without shutting down. After my test run..and after the consumer successfully gets all the messages I always find the same thing. sometimes I have thousands of unacknowledged messages left in my publisher queue. I'd come back 5 minutes later....still there.
In short, 2 serious issues I'm seeing.
First one is that Rabbit MQ is losing a few messages if both servers in the cluster are shut down.
Second one, Acks/Nacks seem to just get lost by Rabbit MQ.
Third one...only happens now again...The producer actually receives Acks for delivery tags/messages that don't exist in the its internal queue.
Fourth one....when both servers go down, sometimes, but not always, the consumer will not throw an exception when it tries to read the message in the while loop. i.e : if (!consumer.Queue.Dequeue(3000, out item)). The item comes back null, but If I look in the debugger, the consumer's and connection's isopen property is true...and the CloseReason is null.
Does anyone have any ideas or have experienced this?
How I'm sending the message is pretty straight forward:
messageProperties.MessageId = System.Guid.NewGuid().ToString()
lock (this.activeMessagesLock)
{
var deliveryTag = 0UL;
deliveryTag = this.model.NextPublishSeqNo;
this.model.BasicPublish(
this.exchangeName,
message.Header.Topic,
this.properties.Durable,
immediate,
messageProperties,
body);
this.activeMessages[deliveryTag] = message;
}
I'm using the message ID to add to a concurrentdictionary collection on the consumer side so I can detect duplicates. The message is added to this duplicate detection dictionary before the Ack is sent back to Rabbit MQ
Any help would be appreciated.
by the way, if I use the TxCommit and TxRollback....I don't have any issues on resending by resending everything in the producer's internal queue. Message loss only seems to happen with PublishConfirms
Confidentiality Notice: This email and any attachments are confidential. If you have received this in error, please let us know by email reply and delete the email and all attachments from your system.
More information about the rabbitmq-discuss
mailing list