Hi all!<br><br>I'm doing some HA tests stressing my cluster and I'm experimenting lost messages ONLY when the publisher lost the connection due a kill -9.<br><br><b>Test OK</b>: Kill node while consuming:<br>1 - Setup a clean 3 node's cluster<br>
2 - Execute producer with 10.000 messages connected to node A<br>3 - Wait producer to finish<br>4 - Execute consumer connected to node A<br>5 - While consumer is running kill node A<br><br>Result: The consumer reconnects to another node and consume the rest of messages ok. All 10.000 messages were consumed ok.<br>
<br><b>Test FAILED</b>: Kill node while producing:<br>
1 - Setup a clean 3 node's cluster<br>2 - Execute consumer to start listening connected node A<br>3 - Execute producer with 10.000 messages connected to node A<br>
4 - While producer is running kill node A<br>
<br>
Result: The producer reconnects fine (the consumer too) and keep publishing but some of the messages<b> already published</b> to node A are lost.<br><br><br>These are my settings:<br><br><span style="font-family:courier new,monospace">rabbitmqctl status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Status of node 'rabbit@i-00000007-asm' ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{pid,11339},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{running_applications,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">���� [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {amqp_client,"RabbitMQ AMQP Client","2.7.1"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {rabbit,"RabbitMQ","2.7.1"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {os_mon,"CPO� CXC 138 46","2.2.4"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {sasl,"SASL� CXC 138 11","2.1.8"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {inets,"INETS� CXC 138 49","5.2"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {mnesia,"MNESIA� CXC 138 12","4.4.12"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {stdlib,"ERTS� CXC 138 10","1.16.4"},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {kernel,"ERTS� CXC 138 10","2.13.4"}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{os,{unix,linux}},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�{erlang_version,</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">���� "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]\n"},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{memory,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">���� [{total,92565608},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {processes,4004968},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {processes_used,3996224},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {system,88560640},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {atom,1322033},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {atom_used,1291462},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {binary,32496},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����� {code,15264387},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����� {ets,1174192}]},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�{vm_memory_high_watermark,0.3999999999362281},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�{vm_memory_limit,2508940902}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br style="font-family:courier new,monospace"><br><br><br><span style="font-family:courier new,monospace">rabbitmqctl cluster_status</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">Cluster status of node 'rabbit@i-00000007-asm' ...</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">[{nodes,[{disc,['rabbit@i-0000001a-zsm','rabbit@i-00000007-asm']},</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�������� {ram,['rabbit@i-00000009-asm']}]},</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�{running_nodes,['rabbit@i-0000001a-zsm','rabbit@i-00000007-asm']}]</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">...done.</span><br><br><br><b><br>Java amqp-client 2.7.1</b><br><br><br>- Producer.groovy (java amqp-client 2.7.1)<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">// Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.init = { channel -></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy": "all"]) // Durable exchange and HA policy</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� channel.queueBind('myQueue', 'myExchange', '')</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� channel.confirmSelect()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">10000.times { idx -></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� rabbit.publish { channel -></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� def properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2: persistent</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� def msg = "Message $idx"</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� channel.basicPublish('myExchange', '', properties, msg.getBytes())</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� println msg</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">rabbit.close()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br>
<br><br>- Consumer.groovy<br><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace"></span><span style="font-family:courier new,monospace">try{</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� // Get rabbitmq config</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� def config = new ConfigSlurper().parse(new File('../rabbitmq.properties').toURL())</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� </span><span style="font-family:courier new,monospace">// Connect</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� </span><span style="font-family:courier new,monospace">def rabbit = new RabbitHA(config)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� </span><span style="font-family:courier new,monospace">rabbit.onDelivery('myQueue'){ delivery, channel -></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� def msg = new String(delivery.body)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� println msg</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� // Manual ack</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">� channel.basicAck(delivery.envelope.deliveryTag, false)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">} catch(e){e.printStackTrace()}</span><br><br><br>- RabbitHA.groovy<br><br><span style="font-family:courier new,monospace">import com.rabbitmq.client.*</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">/**</span><span style="font-family:courier new,monospace"></span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�*</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�* RabbitMQ highly available proxy.</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�* Basic implementation of a basic suscriber/publisher with reconnect logic.</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">�*</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�*/</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">class RabbitHA {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� ConnectionFactory connectionFactory</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� Address[] addresses</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� Closure init</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� Connection connection</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� Channel channel</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� QueueingConsumer consumer</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� public RabbitHA(Map config) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� this(config, null)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� public RabbitHA(Map config, Closure init){</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� this.connectionFactory = new ConnectionFactory([username: config.username, password: config.password, virtualHost: config.virtualHost])</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� this.addresses = Address.parseAddresses(config.addresses)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� this.init = init</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� void onDelivery(String queueName, Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� basicConsume(queueName)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� int i = 0</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� while(true) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� QueueingConsumer.Delivery delivery = consumer.nextDelivery()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� closure(delivery, channel)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">e.printStackTrace()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� connectChannel()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� basicConsume(queueName)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����������� }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� void publish(Closure closure) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� int i = 0</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">boolean retry = true</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� while(retry) {</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� try {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� closure(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� i = 0</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">retry = false</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">����������� } catch(e) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� // Only handle exceptions</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">if(!(e in ShutdownSignalException || e in IOException || e in AlreadyClosedException)) throw e</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� i++</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� </span><span style="font-family:courier new,monospace">retry = true</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� e.printStackTrace()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��������������� println "ShutdownSignalException recieved! Reconnection attempt #$i"</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��������������� connectChannel()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����������� }</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� void connectChannel() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� connection = connectionFactory.newConnection(addresses)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� channel = connection.createChannel()</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� println "Succesfully connected to $connection.address"</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� if(init) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">����������� init(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� void basicConsume(queueName) {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� consumer = new QueueingConsumer(channel)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� channel.basicConsume(queueName, false, consumer)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� void close() {</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">channel.waitForConfirmsOrDie()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">channel.close()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� </span><span style="font-family:courier new,monospace">connection.close()</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� }</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">}</span><br style="font-family:courier new,monospace"><br><br><br>Output:<br>
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat consumer-output.txt |grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">�� 9957�� 19914� 128330</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">cat producer-output.txt | grep Message | wc</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">� 10000�� 20000� 128890</span><br>
<br><b>Note that consumer lost 43 messages</b><br><br>Output from producer :<br><span style="font-family:courier new,monospace">...</span><br><span style="font-family:courier new,monospace"><span style="color:rgb(0,0,0)">Message 1466</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1467</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1468</span><br style="color:rgb(0,0,0)"><span style="color:rgb(0,0,0)">Message 1469</span><br style="color:rgb(0,0,0)">
<span style="color:rgb(0,0,0)">Message 1470</span><br>ShutdownSignalException recieved! Reconnection attempt #1<br>Succesfully connected to i-0000001a-zsm/<a href="http://172.16.158.46">172.16.158.46</a><br>Message 1471<br>
Message 1472<br>Message 1473<br>Message 1474<br>Message 1475<br>Message 1476</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">...</span><br><br><b><br>Note messages [1471..1476] were consumed ok, but [1466..1470] are missing in consumer output.</b><br>
<br>The complete outputs are in <a href="https://github.com/pablomolnar/rabbitmq_samples/tree/master/out">https://github.com/pablomolnar/rabbitmq_samples/tree/master/out</a>. There you can see reconnection log of both parts.<br>
I've a strong feeling the publisher confirms is not well configured.<br><br>Please anyone could shed some light on the issue?<br><br>Cheers,<br>Pablo Molnar<br><br>PD: Also I take the opportunity to share a lot of groovy examples I've been working in the last days: <a href="https://github.com/pablomolnar/rabbitmq_samples">https://github.com/pablomolnar/rabbitmq_samples</a><br>