Hi all!<br><br>I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br><b>Test OK</b>: Kill node while consuming:<br>1 - Setup a clean 3 node's cluster<br>
2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br>
<br><b>Test FAILED</b>: Kill node while producing:<br>
1 - Setup a clean 3 node's cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br>
<br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages<b> already published</b> to node A are lost.<br><br><br>These are my settings:<br><br><span style="font-family:courier new,monospace">rabbitmqctl status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Status of node 'rabbit@i-00000007-asm' ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{pid,11339},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {running_applications,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {amqp_client,"RabbitMQ AMQP Client","2.7.1"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {rabbit,"RabbitMQ","2.7.1"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {os_mon,"CPO CXC 138 46","2.2.4"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {sasl,"SASL CXC 138 11","2.1.8"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {inets,"INETS CXC 138 49","5.2"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {mnesia,"MNESIA CXC 138 12","4.4.12"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {stdlib,"ERTS CXC 138 10","1.16.4"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {kernel,"ERTS CXC 138 10","2.13.4"}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {os,{unix,linux}},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {erlang_version,</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {memory,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> [{total,92565608},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {processes,4004968},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {processes_used,3996224},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {system,88560640},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {atom,1322033},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {atom_used,1291462},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {binary,32496},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {code,15264387},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {ets,1174192}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {vm_memory_high_watermark,0.3999999999362281},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {vm_memory_limit,2508940902}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br style="font-family:courier new,monospace"><br><br><br><span style="font-family:courier new,monospace">rabbitmqctl cluster_status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Cluster status of node 'rabbit@i-00000007-asm' ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{nodes,[{disc,['rabbit@i-0000001a-zsm','rabbit@i-00000007-asm']},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {ram,['rabbit@i-00000009-asm']}]},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {running_nodes,['rabbit@i-0000001a-zsm','rabbit@i-00000007-asm']}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br><br><br><b><br>Java amqp-client 2.7.1</b><br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">// Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.init = { channel -></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> channel.queueBind('myQueue', 'myExchange', '')</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> channel.confirmSelect()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">10000.times { idx -></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> rabbit.publish { channel -></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> def msg = "Message $idx"</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> channel.basicPublish('myExchange', '', properties, msg.getBytes())</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> println msg</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.close()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br>
<br><br>- Consumer.groovy<br><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"></span><span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> // Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">// Connect</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">rabbit.onDelivery('myQueue'){ delivery, channel -></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> def msg = new String(delivery.body)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> println msg</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> // Manual ack</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> channel.basicAck(delivery.envelope.deliveryTag, false)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">/**</span><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> *</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> * RabbitMQ highly available proxy.</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> * Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> *</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> */</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">class RabbitHA {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> ConnectionFactory connectionFactory</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> Address[] addresses</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> Closure init</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> Connection connection</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> Channel channel</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> QueueingConsumer consumer</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> public RabbitHA(Map config) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> this(config, null)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> public RabbitHA(Map config, Closure init){</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> this.addresses = Address.parseAddresses(config.addresses)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> this.init = init</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> void onDelivery(String queueName, Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> basicConsume(queueName)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> int i = 0</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> while(true) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> closure(delivery, channel)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">e.printStackTrace()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> connectChannel()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> basicConsume(queueName)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> void publish(Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> int i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">boolean retry = true</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> while(retry) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> closure(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> i = 0</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">retry = false</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">retry = true</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> e.printStackTrace()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> void connectChannel() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> connection = connectionFactory.newConnection(addresses)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> channel = connection.createChannel()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> println "Succesfully connected to $connection.address"</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> if(init) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> init(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> void basicConsume(queueName) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> consumer = new QueueingConsumer(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> channel.basicConsume(queueName, false, consumer)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> void close() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">channel.waitForConfirmsOrDie()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">channel.close()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> </span><span style="font-family:courier new,monospace">connection.close()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br><br><br>Output:<br>
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat consumer-output.txt |grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> 9957 19914 128330</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat producer-output.txt | grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> 10000 20000 128890</span><br>
<br><b>Note that consumer lost 43 messages</b><br><br>Output from producer :<br><span style="font-family:courier new,monospace">...</span><br><span style="font-family:courier new,monospace"><span style="color:rgb(0,0,0)">Message 1466</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1467</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1468</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1469</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46">172.16.158.46</a><br>Message 1471<br>
Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">...</span><br><br><b><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.</b><br>
<br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>
I've a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples">https://github.com/pablomolnar/rabbitmq_samples</a><br>