[rabbitmq-discuss] Lost messages in HA tests in a cluster

Pablo Molnar pablomolnar at gmail.com
Wed Feb 1 00:27:04 GMT 2012


Ok...so I'm maybe confused. Basically I follow this example:

http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Using the sentences ch.confirmSelect(); and ch.waitForConfirmsOrDie();  all
in a durable queue.

This example doesn't cover republishing nacks? Do you have an example? I
have to implement a ConfirmListener?

Thanks for helping Simone!

On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <simone.busoli at gmail.com>wrote:

> Hi Pablo, looking at your code I can't see where you are republishing
> messages which have not been confirmed by the broker. If you want to make
> sure that publishes will eventually be delivered you have to keep track of
> them and re-issue them in case an ack doesn't arrive from the broker, which
> usually happens when the broker dies.
> On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar at gmail.com> wrote:
>
>> Hi all!
>>
>> I'm doing some HA tests stressing my cluster and I'm experimenting lost
>> messages ONLY when the publisher lost the connection due a kill -9.
>>
>> *Test OK*: Kill node while consuming:
>> 1 - Setup a clean 3 node's cluster
>> 2 - Execute producer with 10.000 messages connected to node A
>> 3 - Wait producer to finish
>> 4 - Execute consumer connected to node A
>> 5 - While consumer is running kill node A
>>
>> Result: The consumer reconnects to another node and consume the rest of
>> messages ok. All 10.000 messages were consumed ok.
>>
>> *Test FAILED*: Kill node while producing:
>> 1 - Setup a clean 3 node's cluster
>> 2 - Execute consumer to start listening connected node A
>> 3 - Execute producer with 10.000 messages connected to node A
>> 4 - While producer is running kill node A
>>
>> Result: The producer reconnects fine (the consumer too) and keep
>> publishing but some of the messages* already published* to node A are
>> lost.
>>
>>
>> These are my settings:
>>
>> rabbitmqctl status
>> Status of node 'rabbit at i-00000007-asm' ...
>> [{pid,11339},
>>  {running_applications,
>>      [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
>>       {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
>>       {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
>>       {rabbit,"RabbitMQ","2.7.1"},
>>       {os_mon,"CPO  CXC 138 46","2.2.4"},
>>       {sasl,"SASL  CXC 138 11","2.1.8"},
>>       {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
>>       {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
>>       {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
>>       {inets,"INETS  CXC 138 49","5.2"},
>>       {mnesia,"MNESIA  CXC 138 12","4.4.12"},
>>       {stdlib,"ERTS  CXC 138 10","1.16.4"},
>>       {kernel,"ERTS  CXC 138 10","2.13.4"}]},
>>  {os,{unix,linux}},
>>  {erlang_version,
>>      "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2]
>> [async-threads:0] [hipe] [kernel-poll:false]\n"},
>>  {memory,
>>      [{total,92565608},
>>       {processes,4004968},
>>       {processes_used,3996224},
>>       {system,88560640},
>>       {atom,1322033},
>>       {atom_used,1291462},
>>       {binary,32496},
>>       {code,15264387},
>>       {ets,1174192}]},
>>  {vm_memory_high_watermark,0.3999999999362281},
>>  {vm_memory_limit,2508940902}]
>> ...done.
>>
>>
>>
>> rabbitmqctl cluster_status
>> Cluster status of node 'rabbit at i-00000007-asm' ...
>> [{nodes,[{disc,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']},
>>          {ram,['rabbit at i-00000009-asm']}]},
>>  {running_nodes,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']}]
>> ...done.
>>
>>
>> *
>> Java amqp-client 2.7.1*
>>
>>
>> - Producer.groovy (java amqp-client 2.7.1)
>>
>> import com.rabbitmq.client.*
>> try{
>>
>> // Get rabbitmq config
>> def config = new ConfigSlurper().parse(new
>> File('../rabbitmq.properties').toURL())
>>
>> def rabbit = new RabbitHA(config)
>> rabbit.init = { channel ->
>>   channel.exchangeDeclare('myExchange', "direct", true) // Durable
>> exchange
>>   channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy":
>> "all"]) // Durable exchange and HA policy
>>   channel.queueBind('myQueue', 'myExchange', '')
>>   channel.confirmSelect()
>> }
>>
>> 10000.times { idx ->
>>   rabbit.publish { channel ->
>>     def properties = new
>> AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2:
>> persistent
>>     def msg = "Message $idx"
>>     channel.basicPublish('myExchange', '', properties, msg.getBytes())
>>     println msg
>>   }
>> }
>>
>> rabbit.close()
>>
>> } catch(e){e.printStackTrace()}
>>
>>
>> - Consumer.groovy
>>
>> import com.rabbitmq.client.*
>>
>> try{
>>
>>   // Get rabbitmq config
>>   def config = new ConfigSlurper().parse(new
>> File('../rabbitmq.properties').toURL())
>>
>>   // Connect
>>   def rabbit = new RabbitHA(config)
>>   rabbit.onDelivery('myQueue'){ delivery, channel ->
>>   def msg = new String(delivery.body)
>>   println msg
>>
>>   // Manual ack
>>   channel.basicAck(delivery.envelope.deliveryTag, false)
>>
>> } catch(e){e.printStackTrace()}
>>
>>
>> - RabbitHA.groovy
>>
>> import com.rabbitmq.client.*
>>
>> /**
>>  *
>>  * RabbitMQ highly available proxy.
>>  * Basic implementation of a basic suscriber/publisher with reconnect
>> logic.
>>  *
>>  */
>> class RabbitHA {
>>     ConnectionFactory connectionFactory
>>     Address[] addresses
>>     Closure init
>>
>>     Connection connection
>>     Channel channel
>>     QueueingConsumer consumer
>>
>>     public RabbitHA(Map config) {
>>         this(config, null)
>>     }
>>
>>     public RabbitHA(Map config, Closure init){
>>         this.connectionFactory = new ConnectionFactory([username:
>> config.username, password: config.password, virtualHost:
>> config.virtualHost])
>>         this.addresses = Address.parseAddresses(config.addresses)
>>         this.init = init
>>         connectChannel()
>>     }
>>
>>     void onDelivery(String queueName, Closure closure) {
>>         basicConsume(queueName)
>>         int i = 0
>>
>>         while(true) {
>>             try {
>>                 QueueingConsumer.Delivery delivery =
>> consumer.nextDelivery()
>>                 closure(delivery, channel)
>>                 i = 0
>>             } catch(e) {
>>                 // Only handle exceptions
>>                 if(!(e in ShutdownSignalException || e in IOException ||
>> e in AlreadyClosedException)) throw e
>>
>>                 i++
>>                 e.printStackTrace()
>>                 println "ShutdownSignalException recieved! Reconnection
>> attempt #$i"
>>                 connectChannel()
>>                 basicConsume(queueName)
>>             }
>>         }
>>     }
>>
>>     void publish(Closure closure) {
>>         int i = 0
>>         boolean retry = true
>>         while(retry) {
>>             try {
>>                 closure(channel)
>>                 i = 0
>>                 retry = false
>>             } catch(e) {
>>                 // Only handle exceptions
>>                 if(!(e in ShutdownSignalException || e in IOException ||
>> e in AlreadyClosedException)) throw e
>>
>>                 i++
>>                 retry = true
>>                 e.printStackTrace()
>>                 println "ShutdownSignalException recieved! Reconnection
>> attempt #$i"
>>                 connectChannel()
>>             }
>>         }
>>     }
>>
>>
>>     void connectChannel() {
>>         connection = connectionFactory.newConnection(addresses)
>>         channel = connection.createChannel()
>>
>>         println "Succesfully connected to $connection.address"
>>
>>         if(init) {
>>             init(channel)
>>         }
>>     }
>>
>>     void basicConsume(queueName) {
>>         consumer = new QueueingConsumer(channel)
>>         channel.basicConsume(queueName, false, consumer)
>>     }
>>
>>     void close() {
>>         channel.waitForConfirmsOrDie()
>>         channel.close()
>>         connection.close()
>>     }
>> }
>>
>>
>>
>> Output:
>>
>> cat consumer-output.txt |grep Message | wc
>>    9957   19914  128330
>>
>> cat producer-output.txt | grep Message | wc
>>   10000   20000  128890
>>
>> *Note that consumer lost 43 messages*
>>
>> Output from producer :
>> ...
>> Message 1466
>> Message 1467
>> Message 1468
>> Message 1469
>> Message 1470
>> ShutdownSignalException recieved! Reconnection attempt #1
>> Succesfully connected to i-0000001a-zsm/172.16.158.46
>> Message 1471
>> Message 1472
>> Message 1473
>> Message 1474
>> Message 1475
>> Message 1476
>> ...
>>
>> *
>> Note messages [1471..1476] were consumed ok, but [1466..1470] are missing
>> in consumer output.*
>>
>> The complete outputs are in
>> https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There
>> you can see reconnection log of both parts.
>> I've a strong feeling the publisher confirms is not well configured.
>>
>> Please anyone could shed some light on the issue?
>>
>> Cheers,
>> Pablo Molnar
>>
>> PD: Also I take the opportunity to share a lot of groovy examples I've
>> been working in the last days:
>> https://github.com/pablomolnar/rabbitmq_samples
>>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120131/c142b975/attachment.htm>


More information about the rabbitmq-discuss mailing list