Hi all!<br><br>I&#39;m doing some HA tests stressing my cluster and I&#39;m experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br><b>Test OK</b>: Kill node while consuming:<br>1 - Setup a clean 3 node&#39;s cluster<br>
2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br>
<br><b>Test FAILED</b>: Kill node while producing:<br>
1 - Setup a clean 3 node&#39;s cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br>
<br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages<b> already published</b> to node A are lost.<br><br><br>These are my settings:<br><br><span style="font-family:courier new,monospace">rabbitmqctl status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Status of node &#39;rabbit@i-00000007-asm&#39; ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{pid,11339},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{running_applications,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">���� [{rabbitmq_management,&quot;RabbitMQ Management Console&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {rabbitmq_management_agent,&quot;RabbitMQ Management Agent&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {amqp_client,&quot;RabbitMQ AMQP Client&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {rabbit,&quot;RabbitMQ&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {os_mon,&quot;CPO� CXC 138 46&quot;,&quot;2.2.4&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {sasl,&quot;SASL� CXC 138 11&quot;,&quot;2.1.8&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {rabbitmq_mochiweb,&quot;RabbitMQ Mochiweb Embedding&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {webmachine,&quot;webmachine&quot;,&quot;1.7.0-rmq2.7.1-hg&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {mochiweb,&quot;MochiMedia Web Server&quot;,&quot;1.3-rmq2.7.1-git&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {inets,&quot;INETS� CXC 138 49&quot;,&quot;5.2&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {mnesia,&quot;MNESIA� CXC 138 12&quot;,&quot;4.4.12&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {stdlib,&quot;ERTS� CXC 138 10&quot;,&quot;1.16.4&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {kernel,&quot;ERTS� CXC 138 10&quot;,&quot;2.13.4&quot;}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{os,{unix,linux}},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�{erlang_version,</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">���� &quot;Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{memory,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">���� [{total,92565608},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {processes,4004968},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {processes_used,3996224},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {system,88560640},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {atom,1322033},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {atom_used,1291462},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {binary,32496},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {code,15264387},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {ets,1174192}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{vm_memory_high_watermark,0.3999999999362281},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�{vm_memory_limit,2508940902}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br style="font-family:courier new,monospace"><br><br><br><span style="font-family:courier new,monospace">rabbitmqctl cluster_status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Cluster status of node &#39;rabbit@i-00000007-asm&#39; ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{nodes,[{disc,[&#39;rabbit@i-0000001a-zsm&#39;,&#39;rabbit@i-00000007-asm&#39;]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�������� {ram,[&#39;rabbit@i-00000009-asm&#39;]}]},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�{running_nodes,[&#39;rabbit@i-0000001a-zsm&#39;,&#39;rabbit@i-00000007-asm&#39;]}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br><br><br><b><br>Java amqp-client 2.7.1</b><br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">// Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def config = new ConfigSlurper().parse(new File(&#39;../rabbitmq.properties&#39;).toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.init = { channel -&gt;</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� channel.exchangeDeclare(&#39;myExchange&#39;, &quot;direct&quot;, true) // Durable exchange</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� channel.queueDeclare(&#39;myQueue&#39;, true, false, false, [&quot;x-ha-policy&quot;: &quot;all&quot;]) // Durable exchange and HA policy</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� channel.queueBind(&#39;myQueue&#39;, &#39;myExchange&#39;, &#39;&#39;)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� channel.confirmSelect()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">10000.times { idx -&gt;</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� rabbit.publish { channel -&gt;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� def msg = &quot;Message $idx&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� channel.basicPublish(&#39;myExchange&#39;, &#39;&#39;, properties, msg.getBytes())</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� println msg</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.close()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br>
<br><br>- Consumer.groovy<br><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"></span><span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� // Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� def config = new ConfigSlurper().parse(new File(&#39;../rabbitmq.properties&#39;).toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� </span><span style="font-family:courier new,monospace">// Connect</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� </span><span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� </span><span style="font-family:courier new,monospace">rabbit.onDelivery(&#39;myQueue&#39;){ delivery, channel -&gt;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� def msg = new String(delivery.body)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� println msg</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� // Manual ack</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� channel.basicAck(delivery.envelope.deliveryTag, false)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">/**</span><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�*</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�* RabbitMQ highly available proxy.</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�* Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�*</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�*/</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">class RabbitHA {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� ConnectionFactory connectionFactory</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� Address[] addresses</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� Closure init</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� Connection connection</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� Channel channel</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� QueueingConsumer consumer</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� public RabbitHA(Map config) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� this(config, null)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� public RabbitHA(Map config, Closure init){</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� this.addresses = Address.parseAddresses(config.addresses)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� this.init = init</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� void onDelivery(String queueName, Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� basicConsume(queueName)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� int i = 0</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� while(true) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� closure(delivery, channel)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">e.printStackTrace()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� println &quot;ShutdownSignalException recieved! Reconnection attempt #$i&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� connectChannel()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� basicConsume(queueName)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����������� }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� void publish(Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� int i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">boolean retry = true</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� while(retry) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� closure(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� i = 0</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">retry = false</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">retry = true</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� e.printStackTrace()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� println &quot;ShutdownSignalException recieved! Reconnection attempt #$i&quot;</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����������� }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� void connectChannel() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� connection = connectionFactory.newConnection(addresses)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� channel = connection.createChannel()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� println &quot;Succesfully connected to $connection.address&quot;</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� if(init) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����������� init(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� void basicConsume(queueName) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� consumer = new QueueingConsumer(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� channel.basicConsume(queueName, false, consumer)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� void close() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">channel.waitForConfirmsOrDie()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">channel.close()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">connection.close()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br><br><br>Output:<br>
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat consumer-output.txt |grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�� 9957�� 19914� 128330</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat producer-output.txt | grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� 10000�� 20000� 128890</span><br>
<br><b>Note that consumer lost 43 messages</b><br><br>Output from producer :<br><span style="font-family:courier new,monospace">...</span><br><span style="font-family:courier new,monospace"><span style="color:rgb(0,0,0)">Message 1466</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1467</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1468</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1469</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46">172.16.158.46</a><br>Message 1471<br>
Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">...</span><br><br><b><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.</b><br>
<br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>
I&#39;ve a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I&#39;ve been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples">https://github.com/pablomolnar/rabbitmq_samples</a><br>