[rabbitmq-discuss] Lost messages in HA tests in a cluster

Simone Busoli simone.busoli at gmail.com
Tue Jan 31 23:43:53 GMT 2012


Hi Pablo, looking at your code I can't see where you are republishing
messages which have not been confirmed by the broker. If you want to make
sure that publishes will eventually be delivered you have to keep track of
them and re-issue them in case an ack doesn't arrive from the broker, which
usually happens when the broker dies.
On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar at gmail.com> wrote:

> Hi all!
>
> I'm doing some HA tests stressing my cluster and I'm experimenting lost
> messages ONLY when the publisher lost the connection due a kill -9.
>
> *Test OK*: Kill node while consuming:
> 1 - Setup a clean 3 node's cluster
> 2 - Execute producer with 10.000 messages connected to node A
> 3 - Wait producer to finish
> 4 - Execute consumer connected to node A
> 5 - While consumer is running kill node A
>
> Result: The consumer reconnects to another node and consume the rest of
> messages ok. All 10.000 messages were consumed ok.
>
> *Test FAILED*: Kill node while producing:
> 1 - Setup a clean 3 node's cluster
> 2 - Execute consumer to start listening connected node A
> 3 - Execute producer with 10.000 messages connected to node A
> 4 - While producer is running kill node A
>
> Result: The producer reconnects fine (the consumer too) and keep
> publishing but some of the messages* already published* to node A are
> lost.
>
>
> These are my settings:
>
> rabbitmqctl status
> Status of node 'rabbit at i-00000007-asm' ...
> [{pid,11339},
>  {running_applications,
>      [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
>       {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
>       {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
>       {rabbit,"RabbitMQ","2.7.1"},
>       {os_mon,"CPO  CXC 138 46","2.2.4"},
>       {sasl,"SASL  CXC 138 11","2.1.8"},
>       {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
>       {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
>       {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
>       {inets,"INETS  CXC 138 49","5.2"},
>       {mnesia,"MNESIA  CXC 138 12","4.4.12"},
>       {stdlib,"ERTS  CXC 138 10","1.16.4"},
>       {kernel,"ERTS  CXC 138 10","2.13.4"}]},
>  {os,{unix,linux}},
>  {erlang_version,
>      "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2]
> [async-threads:0] [hipe] [kernel-poll:false]\n"},
>  {memory,
>      [{total,92565608},
>       {processes,4004968},
>       {processes_used,3996224},
>       {system,88560640},
>       {atom,1322033},
>       {atom_used,1291462},
>       {binary,32496},
>       {code,15264387},
>       {ets,1174192}]},
>  {vm_memory_high_watermark,0.3999999999362281},
>  {vm_memory_limit,2508940902}]
> ...done.
>
>
>
> rabbitmqctl cluster_status
> Cluster status of node 'rabbit at i-00000007-asm' ...
> [{nodes,[{disc,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']},
>          {ram,['rabbit at i-00000009-asm']}]},
>  {running_nodes,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']}]
> ...done.
>
>
> *
> Java amqp-client 2.7.1*
>
>
> - Producer.groovy (java amqp-client 2.7.1)
>
> import com.rabbitmq.client.*
> try{
>
> // Get rabbitmq config
> def config = new ConfigSlurper().parse(new
> File('../rabbitmq.properties').toURL())
>
> def rabbit = new RabbitHA(config)
> rabbit.init = { channel ->
>   channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange
>   channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy":
> "all"]) // Durable exchange and HA policy
>   channel.queueBind('myQueue', 'myExchange', '')
>   channel.confirmSelect()
> }
>
> 10000.times { idx ->
>   rabbit.publish { channel ->
>     def properties = new
> AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2:
> persistent
>     def msg = "Message $idx"
>     channel.basicPublish('myExchange', '', properties, msg.getBytes())
>     println msg
>   }
> }
>
> rabbit.close()
>
> } catch(e){e.printStackTrace()}
>
>
> - Consumer.groovy
>
> import com.rabbitmq.client.*
>
> try{
>
>   // Get rabbitmq config
>   def config = new ConfigSlurper().parse(new
> File('../rabbitmq.properties').toURL())
>
>   // Connect
>   def rabbit = new RabbitHA(config)
>   rabbit.onDelivery('myQueue'){ delivery, channel ->
>   def msg = new String(delivery.body)
>   println msg
>
>   // Manual ack
>   channel.basicAck(delivery.envelope.deliveryTag, false)
>
> } catch(e){e.printStackTrace()}
>
>
> - RabbitHA.groovy
>
> import com.rabbitmq.client.*
>
> /**
>  *
>  * RabbitMQ highly available proxy.
>  * Basic implementation of a basic suscriber/publisher with reconnect
> logic.
>  *
>  */
> class RabbitHA {
>     ConnectionFactory connectionFactory
>     Address[] addresses
>     Closure init
>
>     Connection connection
>     Channel channel
>     QueueingConsumer consumer
>
>     public RabbitHA(Map config) {
>         this(config, null)
>     }
>
>     public RabbitHA(Map config, Closure init){
>         this.connectionFactory = new ConnectionFactory([username:
> config.username, password: config.password, virtualHost:
> config.virtualHost])
>         this.addresses = Address.parseAddresses(config.addresses)
>         this.init = init
>         connectChannel()
>     }
>
>     void onDelivery(String queueName, Closure closure) {
>         basicConsume(queueName)
>         int i = 0
>
>         while(true) {
>             try {
>                 QueueingConsumer.Delivery delivery =
> consumer.nextDelivery()
>                 closure(delivery, channel)
>                 i = 0
>             } catch(e) {
>                 // Only handle exceptions
>                 if(!(e in ShutdownSignalException || e in IOException ||
> e in AlreadyClosedException)) throw e
>
>                 i++
>                 e.printStackTrace()
>                 println "ShutdownSignalException recieved! Reconnection
> attempt #$i"
>                 connectChannel()
>                 basicConsume(queueName)
>             }
>         }
>     }
>
>     void publish(Closure closure) {
>         int i = 0
>         boolean retry = true
>         while(retry) {
>             try {
>                 closure(channel)
>                 i = 0
>                 retry = false
>             } catch(e) {
>                 // Only handle exceptions
>                 if(!(e in ShutdownSignalException || e in IOException ||
> e in AlreadyClosedException)) throw e
>
>                 i++
>                 retry = true
>                 e.printStackTrace()
>                 println "ShutdownSignalException recieved! Reconnection
> attempt #$i"
>                 connectChannel()
>             }
>         }
>     }
>
>
>     void connectChannel() {
>         connection = connectionFactory.newConnection(addresses)
>         channel = connection.createChannel()
>
>         println "Succesfully connected to $connection.address"
>
>         if(init) {
>             init(channel)
>         }
>     }
>
>     void basicConsume(queueName) {
>         consumer = new QueueingConsumer(channel)
>         channel.basicConsume(queueName, false, consumer)
>     }
>
>     void close() {
>         channel.waitForConfirmsOrDie()
>         channel.close()
>         connection.close()
>     }
> }
>
>
>
> Output:
>
> cat consumer-output.txt |grep Message | wc
>    9957   19914  128330
>
> cat producer-output.txt | grep Message | wc
>   10000   20000  128890
>
> *Note that consumer lost 43 messages*
>
> Output from producer :
> ...
> Message 1466
> Message 1467
> Message 1468
> Message 1469
> Message 1470
> ShutdownSignalException recieved! Reconnection attempt #1
> Succesfully connected to i-0000001a-zsm/172.16.158.46
> Message 1471
> Message 1472
> Message 1473
> Message 1474
> Message 1475
> Message 1476
> ...
>
> *
> Note messages [1471..1476] were consumed ok, but [1466..1470] are missing
> in consumer output.*
>
> The complete outputs are in
> https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There
> you can see reconnection log of both parts.
> I've a strong feeling the publisher confirms is not well configured.
>
> Please anyone could shed some light on the issue?
>
> Cheers,
> Pablo Molnar
>
> PD: Also I take the opportunity to share a lot of groovy examples I've
> been working in the last days:
> https://github.com/pablomolnar/rabbitmq_samples
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120201/92cc3e5b/attachment.htm>


More information about the rabbitmq-discuss mailing list