[rabbitmq-discuss] .NET API - Using PublishConfirms to get Reliability
martywaz
marty.wasznicky at neudesic.com
Wed Feb 26 05:07:51 GMT 2014
hello,
I'm using Publish Confirms and am trying to achieve zero message loss when
the Rabbit MQ instances go down.
I have 2 instances of Rabbit MQ, clustered with all Queues mirrored on my
local desktop.
I think use Publish Confirms to send a batch of about 50,000 messages.
I've implemented duplicate message detection in my consumer, and
implemented the storage of all messages sent on the publisher. I wired in
the following events and remove the messages as Acks come in i.e.:
this.model.BasicAcks -= this.MessageAcknowledged;
this.model.BasicNacks -= this.MessageNotAcknowledged;
this.model.BasicReturn -= this.MessageReturned;
this.model.FlowControl -= this.FlowControlChanged;
this.model.CallbackException -= ReportCallbackException;
Fairly straight forward I thought. On the consumer side I did from the QOS
setting for consuming messages. I do this in a tight loop like so:
consumerModel = this.connection.CreateModel();
consumerModel.BasicQos(0, 10000, false);
var consumer = new QueueingBasicConsumer(consumerModel);
var consumerTag = consumerModel.BasicConsume(
this.queueName,false,
consumer);
while (true)
{
BasicDeliverEventArgs item = null;
try
{
if (!consumer.Queue.Dequeue(3000, out item))
{
if (null == item) continue;
}
// do stuff
consumerModel.BasicAck(item.DeliveryTag, false);
}
}
My first finding was that performance was very slow. with 250 byte
messages, I'm getting about 500 to 600 msg/sec delivered.
Regardless though, my testing was around recover-ability. What I'm doing
is sending groups of messages...about 50,000 at time. Then while they are
sending, I take down both instances of my Rabbit MQ cluster to simulate a
failure like so using Powershell:
&.\rabbitmqctl.bat -n cluster1 stop_app
&.\rabbitmqctl.bat -n cluster2 stop_app
My results are always the same. There appear to be several thousand
messages that I never received Acks for still in my internal queue on the
Publisher side. There are also pending messages to be delivered that
written to disk...that I don't expect my consumer to get until one of the
cluster instances start back up.
Hence, I start up the main cluster instance followed by the second.
First observation is that the messages pending in the Rabbit MQ queue do
indeed get delivered to the consumer. great.
I then resubmit all the messages in my internal queue on the publisher
side. The result is always the same. I'm always 3 or 4 messages short!
I then retested without a cluster and without shutting down. After my test
run..and after the consumer successfully gets all the messages I always
find the same thing. sometimes I have thousands of unacknowledged messages
left in my publisher queue. I'd come back 5 minutes later....still there.
In short, 2 serious issues I'm seeing.
First one is that Rabbit MQ is losing a few messages if both servers in the
cluster are shut down.
Second one, Acks/Nacks seem to just get lost by Rabbit MQ.
Third one...only happens now again...The producer actually receives Acks
for delivery tags/messages that don't exist in the its internal queue.
Fourth one....when both servers go down, sometimes, but not always, the
consumer will not throw an exception when it tries to read the message in
the while loop. i.e : if (!consumer.Queue.Dequeue(3000, out item)). The
item comes back null, but If I look in the debugger, the consumer's and
connection's isopen property is true...and the CloseReason is null.
Does anyone have any ideas or have experienced this?
How I'm sending the message is pretty straight forward:
messageProperties.MessageId =
System.Guid.NewGuid().ToString()
lock (this.activeMessagesLock)
{
var deliveryTag = 0UL;
deliveryTag = this.model.NextPublishSeqNo;
this.model.BasicPublish(
this.exchangeName,
message.Header.Topic,
this.properties.Durable,
immediate,
messageProperties,
body);
this.activeMessages[deliveryTag] = message;
}
I'm using the message ID to add to a concurrentdictionary collection on the
consumer side so I can detect duplicates. The message is added to this
duplicate detection dictionary before the Ack is sent back to Rabbit MQ
Any help would be appreciated.
by the way, if I use the TxCommit and TxRollback....I don't have any issues
on resending by resending everything in the producer's internal queue.
Message loss only seems to happen with PublishConfirms
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140225/802112b4/attachment.html>
More information about the rabbitmq-discuss
mailing list