<p>Hi Pablo, looking at your code I can&#39;t see where you are republishing messages which have not been confirmed by the broker. If you want to make sure that publishes will eventually be delivered you have to keep track of them and re-issue them in case an ack doesn&#39;t arrive from the broker, which usually happens when the broker dies.</p>

<div class="gmail_quote">On Jan 31, 2012 11:52 PM, &quot;Pablo Molnar&quot; &lt;<a href="mailto:pablomolnar@gmail.com">pablomolnar@gmail.com</a>&gt; wrote:<br type="attribution"><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
Hi all!<br><br>I&#39;m doing some HA tests stressing my cluster and I&#39;m experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br><b>Test OK</b>: Kill node while consuming:<br>1 - Setup a clean 3 node&#39;s cluster<br>

2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br>

<br><b>Test FAILED</b>: Kill node while producing:<br>
1 - Setup a clean 3 node&#39;s cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br>
<br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages<b> already published</b> to node A are lost.<br><br><br>These are my settings:<br><br><span style="font-family:courier new,monospace">rabbitmqctl status</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">Status of node &#39;rabbit@i-00000007-asm&#39; ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{pid,11339},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"> {running_applications,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">     [{rabbitmq_management,&quot;RabbitMQ Management Console&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {rabbitmq_management_agent,&quot;RabbitMQ Management Agent&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {amqp_client,&quot;RabbitMQ AMQP Client&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {rabbit,&quot;RabbitMQ&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {os_mon,&quot;CPO  CXC 138 46&quot;,&quot;2.2.4&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {sasl,&quot;SASL  CXC 138 11&quot;,&quot;2.1.8&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {rabbitmq_mochiweb,&quot;RabbitMQ Mochiweb Embedding&quot;,&quot;2.7.1&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {webmachine,&quot;webmachine&quot;,&quot;1.7.0-rmq2.7.1-hg&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {mochiweb,&quot;MochiMedia Web Server&quot;,&quot;1.3-rmq2.7.1-git&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {inets,&quot;INETS  CXC 138 49&quot;,&quot;5.2&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {mnesia,&quot;MNESIA  CXC 138 12&quot;,&quot;4.4.12&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {stdlib,&quot;ERTS  CXC 138 10&quot;,&quot;1.16.4&quot;},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {kernel,&quot;ERTS  CXC 138 10&quot;,&quot;2.13.4&quot;}]},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"> {os,{unix,linux}},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {erlang_version,</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">     &quot;Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n&quot;},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"> {memory,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">     [{total,92565608},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {processes,4004968},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {processes_used,3996224},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {system,88560640},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {atom,1322033},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {atom_used,1291462},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {binary,32496},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">      {code,15264387},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">      {ets,1174192}]},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"> {vm_memory_high_watermark,0.3999999999362281},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {vm_memory_limit,<a href="tel:2508940902" value="+12508940902" target="_blank">2508940902</a>}]</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">...done.</span><br style="font-family:courier new,monospace"><br><br><br><span style="font-family:courier new,monospace">rabbitmqctl cluster_status</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">Cluster status of node &#39;rabbit@i-00000007-asm&#39; ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{nodes,[{disc,[&#39;rabbit@i-0000001a-zsm&#39;,&#39;rabbit@i-00000007-asm&#39;]},</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">         {ram,[&#39;rabbit@i-00000009-asm&#39;]}]},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> {running_nodes,[&#39;rabbit@i-0000001a-zsm&#39;,&#39;rabbit@i-00000007-asm&#39;]}]</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">...done.</span><br><br><br><b><br>Java amqp-client 2.7.1</b><br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">// Get rabbitmq config</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">def config = new ConfigSlurper().parse(new File(&#39;../rabbitmq.properties&#39;).toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.init = { channel -&gt;</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  channel.exchangeDeclare(&#39;myExchange&#39;, &quot;direct&quot;, true) // Durable exchange</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  channel.queueDeclare(&#39;myQueue&#39;, true, false, false, [&quot;x-ha-policy&quot;: &quot;all&quot;]) // Durable exchange and HA policy</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  channel.queueBind(&#39;myQueue&#39;, &#39;myExchange&#39;, &#39;&#39;)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  channel.confirmSelect()</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">10000.times { idx -&gt;</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  rabbit.publish { channel -&gt;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    def msg = &quot;Message $idx&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    channel.basicPublish(&#39;myExchange&#39;, &#39;&#39;, properties, msg.getBytes())</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    println msg</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.close()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br>

<br><br>- Consumer.groovy<br><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"></span><span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  // Get rabbitmq config</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  def config = new ConfigSlurper().parse(new File(&#39;../rabbitmq.properties&#39;).toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  </span><span style="font-family:courier new,monospace">// Connect</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  </span><span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  </span><span style="font-family:courier new,monospace">rabbit.onDelivery(&#39;myQueue&#39;){ delivery, channel -&gt;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  def msg = new String(delivery.body)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  println msg</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  // Manual ack</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">  channel.basicAck(delivery.envelope.deliveryTag, false)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">/**</span><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> *</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"> * RabbitMQ highly available proxy.</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> * Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace"> *</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"> */</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">class RabbitHA {</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    ConnectionFactory connectionFactory</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    Address[] addresses</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    Closure init</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    Connection connection</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    Channel channel</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    QueueingConsumer consumer</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    public RabbitHA(Map config) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        this(config, null)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    public RabbitHA(Map config, Closure init){</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        this.addresses = Address.parseAddresses(config.addresses)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        this.init = init</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    void onDelivery(String queueName, Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        basicConsume(queueName)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        int i = 0</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        while(true) {</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">            try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                closure(delivery, channel)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                i = 0</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">            } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                // Only handle exceptions</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">e.printStackTrace()</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                println &quot;ShutdownSignalException recieved! Reconnection attempt #$i&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                connectChannel()</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                basicConsume(queueName)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">            }</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    void publish(Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        int i = 0</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">boolean retry = true</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        while(retry) {</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">            try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                closure(channel)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                i = 0</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">retry = false</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">            } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                // Only handle exceptions</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                </span><span style="font-family:courier new,monospace">retry = true</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                e.printStackTrace()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">                println &quot;ShutdownSignalException recieved! Reconnection attempt #$i&quot;</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">                connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">            }</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    void connectChannel() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        connection = connectionFactory.newConnection(addresses)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        channel = connection.createChannel()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        println &quot;Succesfully connected to $connection.address&quot;</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        if(init) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">            init(channel)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    void basicConsume(queueName) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        consumer = new QueueingConsumer(channel)</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        channel.basicConsume(queueName, false, consumer)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">    void close() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">channel.waitForConfirmsOrDie()</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">channel.close()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">        </span><span style="font-family:courier new,monospace">connection.close()</span><br style="font-family:courier new,monospace">

<span style="font-family:courier new,monospace">    }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br><br><br>
Output:<br>
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat consumer-output.txt |grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">   9957   19914  128330</span><br style="font-family:courier new,monospace">

<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat producer-output.txt | grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">  10000   20000  128890</span><br>

<br><b>Note that consumer lost 43 messages</b><br><br>Output from producer :<br><span style="font-family:courier new,monospace">...</span><br><span style="font-family:courier new,monospace"><span style>Message 1466</span><br style>

<span style>Message 1467</span><br style><span style>Message 1468</span><br style><span style>Message 1469</span><br style>
<span style>Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46" target="_blank">172.16.158.46</a><br>Message 1471<br>

Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">...</span><br><br><b><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.</b><br>

<br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out" target="_blank">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>

I&#39;ve a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I&#39;ve been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples" target="_blank">https://github.com/pablomolnar/rabbitmq_samples</a><br>

<br>_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
<br></blockquote></div>