Hi all!<br><br>I&#39;m doing some HA tests stressing my cluster and I&#39;m experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br><b>Test OK</b>: Kill node while consuming:<br>1 - Setup a clean 3 node&#39;s cluster<br>
2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br>
<br><b>Test FAILED</b>: Kill node while producing:<br>
1 - Setup a clean 3 node&#39;s cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br>
<br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages<b> already published</b> to node A are lost.<br><br><br>These are my settings:<br><br><span style="font-family:courier new,monospace">rabbitmqctl status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Status of node &#39;rabbit@i-00000007-asm&#39; ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{pid,11339},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {running_applications,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">     [{rabbitmq_management,&quot;RabbitMQ Management Console&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {rabbitmq_management_agent,&quot;RabbitMQ Management Agent&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {amqp_client,&quot;RabbitMQ AMQP Client&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {rabbit,&quot;RabbitMQ&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {os_mon,&quot;CPO  CXC 138 46&quot;,&quot;2.2.4&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {sasl,&quot;SASL  CXC 138 11&quot;,&quot;2.1.8&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {rabbitmq_mochiweb,&quot;RabbitMQ Mochiweb Embedding&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {webmachine,&quot;webmachine&quot;,&quot;1.7.0-rmq2.7.1-hg&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {mochiweb,&quot;MochiMedia Web Server&quot;,&quot;1.3-rmq2.7.1-git&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {inets,&quot;INETS  CXC 138 49&quot;,&quot;5.2&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {mnesia,&quot;MNESIA  CXC 138 12&quot;,&quot;4.4.12&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {stdlib,&quot;ERTS  CXC 138 10&quot;,&quot;1.16.4&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {kernel,&quot;ERTS  CXC 138 10&quot;,&quot;2.13.4&quot;}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {os,{unix,linux}},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {erlang_version,</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">     &quot;Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n&quot;},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {memory,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">     [{total,92565608},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {processes,4004968},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {processes_used,3996224},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {system,88560640},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {atom,1322033},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {atom_used,1291462},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {binary,32496},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">      {code,15264387},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {ets,1174192}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> {vm_memory_high_watermark,0.3999999999362281},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {vm_memory_limit,2508940902}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br style="font-family:courier new,monospace"><br><br><br><span style="font-family:courier new,monospace">rabbitmqctl cluster_status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Cluster status of node &#39;rabbit@i-00000007-asm&#39; ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{nodes,[{disc,[&#39;rabbit@i-0000001a-zsm&#39;,&#39;rabbit@i-00000007-asm&#39;]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">         {ram,[&#39;rabbit@i-00000009-asm&#39;]}]},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {running_nodes,[&#39;rabbit@i-0000001a-zsm&#39;,&#39;rabbit@i-00000007-asm&#39;]}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br><br><br><b><br>Java amqp-client 2.7.1</b><br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">// Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def config = new ConfigSlurper().parse(new File(&#39;../rabbitmq.properties&#39;).toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.init = { channel -&gt;</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  channel.exchangeDeclare(&#39;myExchange&#39;, &quot;direct&quot;, true) // Durable exchange</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  channel.queueDeclare(&#39;myQueue&#39;, true, false, false, [&quot;x-ha-policy&quot;: &quot;all&quot;]) // Durable exchange and HA policy</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  channel.queueBind(&#39;myQueue&#39;, &#39;myExchange&#39;, &#39;&#39;)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  channel.confirmSelect()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">10000.times { idx -&gt;</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  rabbit.publish { channel -&gt;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    def msg = &quot;Message $idx&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    channel.basicPublish(&#39;myExchange&#39;, &#39;&#39;, properties, msg.getBytes())</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    println msg</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.close()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br>
<br><br>- Consumer.groovy<br><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"></span><span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  // Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  def config = new ConfigSlurper().parse(new File(&#39;../rabbitmq.properties&#39;).toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  </span><span style="font-family:courier new,monospace">// Connect</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  </span><span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  </span><span style="font-family:courier new,monospace">rabbit.onDelivery(&#39;myQueue&#39;){ delivery, channel -&gt;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  def msg = new String(delivery.body)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  println msg</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  // Manual ack</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">  channel.basicAck(delivery.envelope.deliveryTag, false)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">/**</span><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> *</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> * RabbitMQ highly available proxy.</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> * Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"> *</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> */</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">class RabbitHA {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    ConnectionFactory connectionFactory</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    Address[] addresses</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    Closure init</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    Connection connection</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    Channel channel</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    QueueingConsumer consumer</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    public RabbitHA(Map config) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        this(config, null)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    public RabbitHA(Map config, Closure init){</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        this.addresses = Address.parseAddresses(config.addresses)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        this.init = init</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    void onDelivery(String queueName, Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        basicConsume(queueName)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        int i = 0</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        while(true) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">            try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                closure(delivery, channel)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">            } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">e.printStackTrace()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                println &quot;ShutdownSignalException recieved! Reconnection attempt #$i&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                connectChannel()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                basicConsume(queueName)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">            }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    void publish(Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        int i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">boolean retry = true</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        while(retry) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">            try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                closure(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                i = 0</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">retry = false</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">            } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">retry = true</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                e.printStackTrace()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                println &quot;ShutdownSignalException recieved! Reconnection attempt #$i&quot;</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">                connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">            }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    void connectChannel() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        connection = connectionFactory.newConnection(addresses)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        channel = connection.createChannel()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        println &quot;Succesfully connected to $connection.address&quot;</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        if(init) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">            init(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    void basicConsume(queueName) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        consumer = new QueueingConsumer(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        channel.basicConsume(queueName, false, consumer)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    void close() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">channel.waitForConfirmsOrDie()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">channel.close()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">connection.close()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br><br><br>Output:<br>
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat consumer-output.txt |grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">   9957   19914  128330</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat producer-output.txt | grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  10000   20000  128890</span><br>
<br><b>Note that consumer lost 43 messages</b><br><br>Output from producer :<br><span style="font-family:courier new,monospace">...</span><br><span style="font-family:courier new,monospace"><span style="color:rgb(0,0,0)">Message 1466</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1467</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1468</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1469</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46">172.16.158.46</a><br>Message 1471<br>
Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">...</span><br><br><b><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.</b><br>
<br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>
I&#39;ve a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I&#39;ve been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples">https://github.com/pablomolnar/rabbitmq_samples</a><br>