[rabbitmq-discuss] Lost messages in HA tests in a cluster

Simone Busoli simone.busoli at gmail.com
Wed Feb 1 06:13:18 GMT 2012


The name of the example is probably a little misleading, in that it simply
allows you to know if messages may have been lost, but does nothing to cope
with eventually lost messages.
You indeed have to implement a confirm listener and set up republishing
yourself. Also, by definition republishing unconfirmed messages may lead to
duplicate messages.
On Feb 1, 2012 1:27 AM, "Pablo Molnar" <pablomolnar at gmail.com> wrote:

> Ok...so I'm maybe confused. Basically I follow this example:
>
>
> http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java
>
> Using the sentences ch.confirmSelect(); and ch.waitForConfirmsOrDie();  all
> in a durable queue.
>
> This example doesn't cover republishing nacks? Do you have an example? I
> have to implement a ConfirmListener?
>
> Thanks for helping Simone!
>
> On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <simone.busoli at gmail.com>wrote:
>
>> Hi Pablo, looking at your code I can't see where you are republishing
>> messages which have not been confirmed by the broker. If you want to make
>> sure that publishes will eventually be delivered you have to keep track of
>> them and re-issue them in case an ack doesn't arrive from the broker, which
>> usually happens when the broker dies.
>> On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar at gmail.com> wrote:
>>
>>> Hi all!
>>>
>>> I'm doing some HA tests stressing my cluster and I'm experimenting lost
>>> messages ONLY when the publisher lost the connection due a kill -9.
>>>
>>> *Test OK*: Kill node while consuming:
>>> 1 - Setup a clean 3 node's cluster
>>> 2 - Execute producer with 10.000 messages connected to node A
>>> 3 - Wait producer to finish
>>> 4 - Execute consumer connected to node A
>>> 5 - While consumer is running kill node A
>>>
>>> Result: The consumer reconnects to another node and consume the rest of
>>> messages ok. All 10.000 messages were consumed ok.
>>>
>>> *Test FAILED*: Kill node while producing:
>>> 1 - Setup a clean 3 node's cluster
>>> 2 - Execute consumer to start listening connected node A
>>> 3 - Execute producer with 10.000 messages connected to node A
>>> 4 - While producer is running kill node A
>>>
>>> Result: The producer reconnects fine (the consumer too) and keep
>>> publishing but some of the messages* already published* to node A are
>>> lost.
>>>
>>>
>>> These are my settings:
>>>
>>> rabbitmqctl status
>>> Status of node 'rabbit at i-00000007-asm' ...
>>> [{pid,11339},
>>>  {running_applications,
>>>      [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
>>>       {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
>>>       {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
>>>       {rabbit,"RabbitMQ","2.7.1"},
>>>       {os_mon,"CPO  CXC 138 46","2.2.4"},
>>>       {sasl,"SASL  CXC 138 11","2.1.8"},
>>>       {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
>>>       {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
>>>       {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
>>>       {inets,"INETS  CXC 138 49","5.2"},
>>>       {mnesia,"MNESIA  CXC 138 12","4.4.12"},
>>>       {stdlib,"ERTS  CXC 138 10","1.16.4"},
>>>       {kernel,"ERTS  CXC 138 10","2.13.4"}]},
>>>  {os,{unix,linux}},
>>>  {erlang_version,
>>>      "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2]
>>> [async-threads:0] [hipe] [kernel-poll:false]\n"},
>>>  {memory,
>>>      [{total,92565608},
>>>       {processes,4004968},
>>>       {processes_used,3996224},
>>>       {system,88560640},
>>>       {atom,1322033},
>>>       {atom_used,1291462},
>>>       {binary,32496},
>>>       {code,15264387},
>>>       {ets,1174192}]},
>>>  {vm_memory_high_watermark,0.3999999999362281},
>>>  {vm_memory_limit,2508940902}]
>>> ...done.
>>>
>>>
>>>
>>> rabbitmqctl cluster_status
>>> Cluster status of node 'rabbit at i-00000007-asm' ...
>>> [{nodes,[{disc,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']},
>>>          {ram,['rabbit at i-00000009-asm']}]},
>>>  {running_nodes,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']}]
>>> ...done.
>>>
>>>
>>> *
>>> Java amqp-client 2.7.1*
>>>
>>>
>>> - Producer.groovy (java amqp-client 2.7.1)
>>>
>>> import com.rabbitmq.client.*
>>> try{
>>>
>>> // Get rabbitmq config
>>> def config = new ConfigSlurper().parse(new
>>> File('../rabbitmq.properties').toURL())
>>>
>>> def rabbit = new RabbitHA(config)
>>> rabbit.init = { channel ->
>>>   channel.exchangeDeclare('myExchange', "direct", true) // Durable
>>> exchange
>>>   channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy":
>>> "all"]) // Durable exchange and HA policy
>>>   channel.queueBind('myQueue', 'myExchange', '')
>>>   channel.confirmSelect()
>>> }
>>>
>>> 10000.times { idx ->
>>>   rabbit.publish { channel ->
>>>     def properties = new
>>> AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2:
>>> persistent
>>>     def msg = "Message $idx"
>>>     channel.basicPublish('myExchange', '', properties, msg.getBytes())
>>>     println msg
>>>   }
>>> }
>>>
>>> rabbit.close()
>>>
>>> } catch(e){e.printStackTrace()}
>>>
>>>
>>> - Consumer.groovy
>>>
>>> import com.rabbitmq.client.*
>>>
>>> try{
>>>
>>>   // Get rabbitmq config
>>>   def config = new ConfigSlurper().parse(new
>>> File('../rabbitmq.properties').toURL())
>>>
>>>   // Connect
>>>   def rabbit = new RabbitHA(config)
>>>   rabbit.onDelivery('myQueue'){ delivery, channel ->
>>>   def msg = new String(delivery.body)
>>>   println msg
>>>
>>>   // Manual ack
>>>   channel.basicAck(delivery.envelope.deliveryTag, false)
>>>
>>> } catch(e){e.printStackTrace()}
>>>
>>>
>>> - RabbitHA.groovy
>>>
>>> import com.rabbitmq.client.*
>>>
>>> /**
>>>  *
>>>  * RabbitMQ highly available proxy.
>>>  * Basic implementation of a basic suscriber/publisher with reconnect
>>> logic.
>>>  *
>>>  */
>>> class RabbitHA {
>>>     ConnectionFactory connectionFactory
>>>     Address[] addresses
>>>     Closure init
>>>
>>>     Connection connection
>>>     Channel channel
>>>     QueueingConsumer consumer
>>>
>>>     public RabbitHA(Map config) {
>>>         this(config, null)
>>>     }
>>>
>>>     public RabbitHA(Map config, Closure init){
>>>         this.connectionFactory = new ConnectionFactory([username:
>>> config.username, password: config.password, virtualHost:
>>> config.virtualHost])
>>>         this.addresses = Address.parseAddresses(config.addresses)
>>>         this.init = init
>>>         connectChannel()
>>>     }
>>>
>>>     void onDelivery(String queueName, Closure closure) {
>>>         basicConsume(queueName)
>>>         int i = 0
>>>
>>>         while(true) {
>>>             try {
>>>                 QueueingConsumer.Delivery delivery =
>>> consumer.nextDelivery()
>>>                 closure(delivery, channel)
>>>                 i = 0
>>>             } catch(e) {
>>>                 // Only handle exceptions
>>>                 if(!(e in ShutdownSignalException || e in IOException
>>> || e in AlreadyClosedException)) throw e
>>>
>>>                 i++
>>>                 e.printStackTrace()
>>>                 println "ShutdownSignalException recieved! Reconnection
>>> attempt #$i"
>>>                 connectChannel()
>>>                 basicConsume(queueName)
>>>             }
>>>         }
>>>     }
>>>
>>>     void publish(Closure closure) {
>>>         int i = 0
>>>         boolean retry = true
>>>         while(retry) {
>>>             try {
>>>                 closure(channel)
>>>                 i = 0
>>>                 retry = false
>>>             } catch(e) {
>>>                 // Only handle exceptions
>>>                 if(!(e in ShutdownSignalException || e in IOException
>>> || e in AlreadyClosedException)) throw e
>>>
>>>                 i++
>>>                 retry = true
>>>                 e.printStackTrace()
>>>                 println "ShutdownSignalException recieved! Reconnection
>>> attempt #$i"
>>>                 connectChannel()
>>>             }
>>>         }
>>>     }
>>>
>>>
>>>     void connectChannel() {
>>>         connection = connectionFactory.newConnection(addresses)
>>>         channel = connection.createChannel()
>>>
>>>         println "Succesfully connected to $connection.address"
>>>
>>>         if(init) {
>>>             init(channel)
>>>         }
>>>     }
>>>
>>>     void basicConsume(queueName) {
>>>         consumer = new QueueingConsumer(channel)
>>>         channel.basicConsume(queueName, false, consumer)
>>>     }
>>>
>>>     void close() {
>>>         channel.waitForConfirmsOrDie()
>>>         channel.close()
>>>         connection.close()
>>>     }
>>> }
>>>
>>>
>>>
>>> Output:
>>>
>>> cat consumer-output.txt |grep Message | wc
>>>    9957   19914  128330
>>>
>>> cat producer-output.txt | grep Message | wc
>>>   10000   20000  128890
>>>
>>> *Note that consumer lost 43 messages*
>>>
>>> Output from producer :
>>> ...
>>> Message 1466
>>> Message 1467
>>> Message 1468
>>> Message 1469
>>> Message 1470
>>> ShutdownSignalException recieved! Reconnection attempt #1
>>> Succesfully connected to i-0000001a-zsm/172.16.158.46
>>> Message 1471
>>> Message 1472
>>> Message 1473
>>> Message 1474
>>> Message 1475
>>> Message 1476
>>> ...
>>>
>>> *
>>> Note messages [1471..1476] were consumed ok, but [1466..1470] are
>>> missing in consumer output.*
>>>
>>> The complete outputs are in
>>> https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There
>>> you can see reconnection log of both parts.
>>> I've a strong feeling the publisher confirms is not well configured.
>>>
>>> Please anyone could shed some light on the issue?
>>>
>>> Cheers,
>>> Pablo Molnar
>>>
>>> PD: Also I take the opportunity to share a lot of groovy examples I've
>>> been working in the last days:
>>> https://github.com/pablomolnar/rabbitmq_samples
>>>
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>
>>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120201/5cda261b/attachment.htm>


More information about the rabbitmq-discuss mailing list