<div dir="ltr"><p class="MsoPlainText">hello,<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I'm using Publish Confirms and am trying to achieve zero
message loss when the Rabbit MQ instances go down.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I have 2 instances of Rabbit MQ, clustered with all
Queues mirrored on my local desktop.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I think use Publish Confirms to send a batch of about
50,000 messages.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I've implemented duplicate message detection in my
consumer, and implemented the storage of all messages sent on the publisher. I
wired in the following events and remove the messages as Acks come in i.e.:<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
this.model.BasicAcks -= this.MessageAcknowledged;<o:p></o:p></p>
<p class="MsoPlainText">
this.model.BasicNacks -= this.MessageNotAcknowledged;<o:p></o:p></p>
<p class="MsoPlainText">
this.model.BasicReturn -= this.MessageReturned;<o:p></o:p></p>
<p class="MsoPlainText">
this.model.FlowControl -= this.FlowControlChanged;<o:p></o:p></p>
<p class="MsoPlainText">
this.model.CallbackException -= ReportCallbackException;<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Fairly straight forward I thought. On the consumer side I did from the QOS
setting for consuming messages. I do
this in a tight loop like so:<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
consumerModel = this.connection.CreateModel();<o:p></o:p></p>
<p class="MsoPlainText">
consumerModel.BasicQos(0, 10000, false);<o:p></o:p></p>
<p class="MsoPlainText"> var
consumer = new QueueingBasicConsumer(consumerModel);<o:p></o:p></p>
<p class="MsoPlainText"> <o:p></o:p></p>
<p class="MsoPlainText"> var
consumerTag = consumerModel.BasicConsume(<o:p></o:p></p>
<p class="MsoPlainText">
this.queueName,false,<o:p></o:p></p>
<p class="MsoPlainText">
consumer);<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
while (true)<o:p></o:p></p>
<p class="MsoPlainText"> {<o:p></o:p></p>
<p class="MsoPlainText">
BasicDeliverEventArgs item = null;<o:p></o:p></p>
<p class="MsoPlainText">
try<o:p></o:p></p>
<p class="MsoPlainText">
{<o:p></o:p></p>
<p class="MsoPlainText">
if (!consumer.Queue.Dequeue(3000, out item))<o:p></o:p></p>
<p class="MsoPlainText">
{<o:p></o:p></p>
<p class="MsoPlainText"> if (null == item)
continue;<o:p></o:p></p>
<p class="MsoPlainText">
}<o:p></o:p></p>
<p class="MsoPlainText"> // do stuff<o:p></o:p></p>
<p class="MsoPlainText">
consumerModel.BasicAck(item.DeliveryTag, false);<o:p></o:p></p>
<p class="MsoPlainText">
}<o:p></o:p></p>
<p class="MsoPlainText"> }<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">My first finding was that performance was very slow. with 250 byte messages, I'm getting about 500
to 600 msg/sec delivered. <o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Regardless though, my testing was around
recover-ability. What I'm doing is
sending groups of messages...about 50,000 at time. Then while they are sending, I take down both
instances of my Rabbit MQ cluster to simulate a failure like so using
Powershell:<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
&.\rabbitmqctl.bat -n cluster1 stop_app<o:p></o:p></p>
<p class="MsoPlainText">
&.\rabbitmqctl.bat -n cluster2 stop_app<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">My results are always the same. There appear to be several thousand messages
that I never received Acks for still in my internal queue on the Publisher
side. There are also pending messages to
be delivered that written to disk...that I don't expect my consumer to get
until one of the cluster instances start back up.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Hence, I start up the main cluster instance followed by
the second. <o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">First observation is that the messages pending in the
Rabbit MQ queue do indeed get delivered to the consumer. great.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I then resubmit all the messages in my internal queue on
the publisher side. The result is always
the same. I'm always 3 or 4 messages
short!<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I then retested without a cluster and without shutting
down. After my test run..and after the
consumer successfully gets all the messages I always find the same thing.
sometimes I have thousands of unacknowledged messages left in my publisher
queue. I'd come back 5 minutes
later....still there.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">In short, 2 serious issues I'm seeing. <o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">First one is that Rabbit MQ is losing a few messages if
both servers in the cluster are shut down.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Second one, Acks/Nacks seem to just get lost by Rabbit
MQ.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Third one...only happens now again...The producer
actually receives Acks for delivery tags/messages that don't exist in the its
internal queue. <o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Fourth one....when both servers go down, sometimes, but
not always, the consumer will not throw an exception when it tries to read the
message in the while loop. i.e : if (!consumer.Queue.Dequeue(3000, out
item)). The item comes back null, but If
I look in the debugger, the consumer's and connection's isopen property is
true...and the CloseReason is null.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Does anyone have any ideas or have experienced this?<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">How I'm sending the message is pretty straight forward:<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
messageProperties.MessageId = System.Guid.NewGuid().ToString()<o:p></o:p></p>
<p class="MsoPlainText">
lock (this.activeMessagesLock)<o:p></o:p></p>
<p class="MsoPlainText">
{<o:p></o:p></p>
<p class="MsoPlainText"> var deliveryTag =
0UL;<o:p></o:p></p>
<p class="MsoPlainText"> deliveryTag =
this.model.NextPublishSeqNo;<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
this.model.BasicPublish(<o:p></o:p></p>
<p class="MsoPlainText">
this.exchangeName,<o:p></o:p></p>
<p class="MsoPlainText">
message.Header.Topic,<o:p></o:p></p>
<p class="MsoPlainText">
this.properties.Durable,<o:p></o:p></p>
<p class="MsoPlainText"> immediate,<o:p></o:p></p>
<p class="MsoPlainText">
messageProperties,<o:p></o:p></p>
<p class="MsoPlainText"> body);<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">
this.activeMessages[deliveryTag] = message;<o:p></o:p></p>
<p class="MsoPlainText">
}<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">I'm using the message ID to add to a concurrentdictionary
collection on the consumer side so I can detect duplicates. The message is added to this duplicate
detection dictionary before the Ack is sent back to Rabbit MQ<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">Any help would be appreciated.<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p>
<p class="MsoPlainText">by the way, if I use the TxCommit and TxRollback....I
don't have any issues on resending by resending everything in the producer's
internal queue. Message loss only seems to happen with PublishConfirms<o:p></o:p></p>
<p class="MsoPlainText"><o:p> </o:p></p></div>