[rabbitmq-discuss] Lost messages in HA tests in a cluster
Simone Busoli
simone.busoli at gmail.com
Wed Feb 1 06:13:18 GMT 2012
The name of the example is probably a little misleading, in that it simply
allows you to know if messages may have been lost, but does nothing to cope
with eventually lost messages.
You indeed have to implement a confirm listener and set up republishing
yourself. Also, by definition republishing unconfirmed messages may lead to
duplicate messages.
On Feb 1, 2012 1:27 AM, "Pablo Molnar" <pablomolnar at gmail.com> wrote:
> Ok...so I'm maybe confused. Basically I follow this example:
>
>
> http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java
>
> Using the sentences ch.confirmSelect(); and ch.waitForConfirmsOrDie(); all
> in a durable queue.
>
> This example doesn't cover republishing nacks? Do you have an example? I
> have to implement a ConfirmListener?
>
> Thanks for helping Simone!
>
> On Tue, Jan 31, 2012 at 8:43 PM, Simone Busoli <simone.busoli at gmail.com>wrote:
>
>> Hi Pablo, looking at your code I can't see where you are republishing
>> messages which have not been confirmed by the broker. If you want to make
>> sure that publishes will eventually be delivered you have to keep track of
>> them and re-issue them in case an ack doesn't arrive from the broker, which
>> usually happens when the broker dies.
>> On Jan 31, 2012 11:52 PM, "Pablo Molnar" <pablomolnar at gmail.com> wrote:
>>
>>> Hi all!
>>>
>>> I'm doing some HA tests stressing my cluster and I'm experimenting lost
>>> messages ONLY when the publisher lost the connection due a kill -9.
>>>
>>> *Test OK*: Kill node while consuming:
>>> 1 - Setup a clean 3 node's cluster
>>> 2 - Execute producer with 10.000 messages connected to node A
>>> 3 - Wait producer to finish
>>> 4 - Execute consumer connected to node A
>>> 5 - While consumer is running kill node A
>>>
>>> Result: The consumer reconnects to another node and consume the rest of
>>> messages ok. All 10.000 messages were consumed ok.
>>>
>>> *Test FAILED*: Kill node while producing:
>>> 1 - Setup a clean 3 node's cluster
>>> 2 - Execute consumer to start listening connected node A
>>> 3 - Execute producer with 10.000 messages connected to node A
>>> 4 - While producer is running kill node A
>>>
>>> Result: The producer reconnects fine (the consumer too) and keep
>>> publishing but some of the messages* already published* to node A are
>>> lost.
>>>
>>>
>>> These are my settings:
>>>
>>> rabbitmqctl status
>>> Status of node 'rabbit at i-00000007-asm' ...
>>> [{pid,11339},
>>> {running_applications,
>>> [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
>>> {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
>>> {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
>>> {rabbit,"RabbitMQ","2.7.1"},
>>> {os_mon,"CPO CXC 138 46","2.2.4"},
>>> {sasl,"SASL CXC 138 11","2.1.8"},
>>> {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
>>> {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
>>> {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
>>> {inets,"INETS CXC 138 49","5.2"},
>>> {mnesia,"MNESIA CXC 138 12","4.4.12"},
>>> {stdlib,"ERTS CXC 138 10","1.16.4"},
>>> {kernel,"ERTS CXC 138 10","2.13.4"}]},
>>> {os,{unix,linux}},
>>> {erlang_version,
>>> "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2]
>>> [async-threads:0] [hipe] [kernel-poll:false]\n"},
>>> {memory,
>>> [{total,92565608},
>>> {processes,4004968},
>>> {processes_used,3996224},
>>> {system,88560640},
>>> {atom,1322033},
>>> {atom_used,1291462},
>>> {binary,32496},
>>> {code,15264387},
>>> {ets,1174192}]},
>>> {vm_memory_high_watermark,0.3999999999362281},
>>> {vm_memory_limit,2508940902}]
>>> ...done.
>>>
>>>
>>>
>>> rabbitmqctl cluster_status
>>> Cluster status of node 'rabbit at i-00000007-asm' ...
>>> [{nodes,[{disc,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']},
>>> {ram,['rabbit at i-00000009-asm']}]},
>>> {running_nodes,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']}]
>>> ...done.
>>>
>>>
>>> *
>>> Java amqp-client 2.7.1*
>>>
>>>
>>> - Producer.groovy (java amqp-client 2.7.1)
>>>
>>> import com.rabbitmq.client.*
>>> try{
>>>
>>> // Get rabbitmq config
>>> def config = new ConfigSlurper().parse(new
>>> File('../rabbitmq.properties').toURL())
>>>
>>> def rabbit = new RabbitHA(config)
>>> rabbit.init = { channel ->
>>> channel.exchangeDeclare('myExchange', "direct", true) // Durable
>>> exchange
>>> channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy":
>>> "all"]) // Durable exchange and HA policy
>>> channel.queueBind('myQueue', 'myExchange', '')
>>> channel.confirmSelect()
>>> }
>>>
>>> 10000.times { idx ->
>>> rabbit.publish { channel ->
>>> def properties = new
>>> AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2:
>>> persistent
>>> def msg = "Message $idx"
>>> channel.basicPublish('myExchange', '', properties, msg.getBytes())
>>> println msg
>>> }
>>> }
>>>
>>> rabbit.close()
>>>
>>> } catch(e){e.printStackTrace()}
>>>
>>>
>>> - Consumer.groovy
>>>
>>> import com.rabbitmq.client.*
>>>
>>> try{
>>>
>>> // Get rabbitmq config
>>> def config = new ConfigSlurper().parse(new
>>> File('../rabbitmq.properties').toURL())
>>>
>>> // Connect
>>> def rabbit = new RabbitHA(config)
>>> rabbit.onDelivery('myQueue'){ delivery, channel ->
>>> def msg = new String(delivery.body)
>>> println msg
>>>
>>> // Manual ack
>>> channel.basicAck(delivery.envelope.deliveryTag, false)
>>>
>>> } catch(e){e.printStackTrace()}
>>>
>>>
>>> - RabbitHA.groovy
>>>
>>> import com.rabbitmq.client.*
>>>
>>> /**
>>> *
>>> * RabbitMQ highly available proxy.
>>> * Basic implementation of a basic suscriber/publisher with reconnect
>>> logic.
>>> *
>>> */
>>> class RabbitHA {
>>> ConnectionFactory connectionFactory
>>> Address[] addresses
>>> Closure init
>>>
>>> Connection connection
>>> Channel channel
>>> QueueingConsumer consumer
>>>
>>> public RabbitHA(Map config) {
>>> this(config, null)
>>> }
>>>
>>> public RabbitHA(Map config, Closure init){
>>> this.connectionFactory = new ConnectionFactory([username:
>>> config.username, password: config.password, virtualHost:
>>> config.virtualHost])
>>> this.addresses = Address.parseAddresses(config.addresses)
>>> this.init = init
>>> connectChannel()
>>> }
>>>
>>> void onDelivery(String queueName, Closure closure) {
>>> basicConsume(queueName)
>>> int i = 0
>>>
>>> while(true) {
>>> try {
>>> QueueingConsumer.Delivery delivery =
>>> consumer.nextDelivery()
>>> closure(delivery, channel)
>>> i = 0
>>> } catch(e) {
>>> // Only handle exceptions
>>> if(!(e in ShutdownSignalException || e in IOException
>>> || e in AlreadyClosedException)) throw e
>>>
>>> i++
>>> e.printStackTrace()
>>> println "ShutdownSignalException recieved! Reconnection
>>> attempt #$i"
>>> connectChannel()
>>> basicConsume(queueName)
>>> }
>>> }
>>> }
>>>
>>> void publish(Closure closure) {
>>> int i = 0
>>> boolean retry = true
>>> while(retry) {
>>> try {
>>> closure(channel)
>>> i = 0
>>> retry = false
>>> } catch(e) {
>>> // Only handle exceptions
>>> if(!(e in ShutdownSignalException || e in IOException
>>> || e in AlreadyClosedException)) throw e
>>>
>>> i++
>>> retry = true
>>> e.printStackTrace()
>>> println "ShutdownSignalException recieved! Reconnection
>>> attempt #$i"
>>> connectChannel()
>>> }
>>> }
>>> }
>>>
>>>
>>> void connectChannel() {
>>> connection = connectionFactory.newConnection(addresses)
>>> channel = connection.createChannel()
>>>
>>> println "Succesfully connected to $connection.address"
>>>
>>> if(init) {
>>> init(channel)
>>> }
>>> }
>>>
>>> void basicConsume(queueName) {
>>> consumer = new QueueingConsumer(channel)
>>> channel.basicConsume(queueName, false, consumer)
>>> }
>>>
>>> void close() {
>>> channel.waitForConfirmsOrDie()
>>> channel.close()
>>> connection.close()
>>> }
>>> }
>>>
>>>
>>>
>>> Output:
>>>
>>> cat consumer-output.txt |grep Message | wc
>>> 9957 19914 128330
>>>
>>> cat producer-output.txt | grep Message | wc
>>> 10000 20000 128890
>>>
>>> *Note that consumer lost 43 messages*
>>>
>>> Output from producer :
>>> ...
>>> Message 1466
>>> Message 1467
>>> Message 1468
>>> Message 1469
>>> Message 1470
>>> ShutdownSignalException recieved! Reconnection attempt #1
>>> Succesfully connected to i-0000001a-zsm/172.16.158.46
>>> Message 1471
>>> Message 1472
>>> Message 1473
>>> Message 1474
>>> Message 1475
>>> Message 1476
>>> ...
>>>
>>> *
>>> Note messages [1471..1476] were consumed ok, but [1466..1470] are
>>> missing in consumer output.*
>>>
>>> The complete outputs are in
>>> https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There
>>> you can see reconnection log of both parts.
>>> I've a strong feeling the publisher confirms is not well configured.
>>>
>>> Please anyone could shed some light on the issue?
>>>
>>> Cheers,
>>> Pablo Molnar
>>>
>>> PD: Also I take the opportunity to share a lot of groovy examples I've
>>> been working in the last days:
>>> https://github.com/pablomolnar/rabbitmq_samples
>>>
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>
>>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120201/5cda261b/attachment.htm>
More information about the rabbitmq-discuss
mailing list