[rabbitmq-discuss] Lost messages in HA tests in a cluster

Pablo Molnar pablomolnar at gmail.com
Tue Jan 31 22:51:44 GMT 2012


Hi all!

I'm doing some HA tests stressing my cluster and I'm experimenting lost
messages ONLY when the publisher lost the connection due a kill -9.

*Test OK*: Kill node while consuming:
1 - Setup a clean 3 node's cluster
2 - Execute producer with 10.000 messages connected to node A
3 - Wait producer to finish
4 - Execute consumer connected to node A
5 - While consumer is running kill node A

Result: The consumer reconnects to another node and consume the rest of
messages ok. All 10.000 messages were consumed ok.

*Test FAILED*: Kill node while producing:
1 - Setup a clean 3 node's cluster
2 - Execute consumer to start listening connected node A
3 - Execute producer with 10.000 messages connected to node A
4 - While producer is running kill node A

Result: The producer reconnects fine (the consumer too) and keep publishing
but some of the messages* already published* to node A are lost.


These are my settings:

rabbitmqctl status
Status of node 'rabbit at i-00000007-asm' ...
[{pid,11339},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
      {amqp_client,"RabbitMQ AMQP Client","2.7.1"},
      {rabbit,"RabbitMQ","2.7.1"},
      {os_mon,"CPO  CXC 138 46","2.2.4"},
      {sasl,"SASL  CXC 138 11","2.1.8"},
      {rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
      {webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
      {mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
      {inets,"INETS  CXC 138 49","5.2"},
      {mnesia,"MNESIA  CXC 138 12","4.4.12"},
      {stdlib,"ERTS  CXC 138 10","1.16.4"},
      {kernel,"ERTS  CXC 138 10","2.13.4"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2]
[async-threads:0] [hipe] [kernel-poll:false]\n"},
 {memory,
     [{total,92565608},
      {processes,4004968},
      {processes_used,3996224},
      {system,88560640},
      {atom,1322033},
      {atom_used,1291462},
      {binary,32496},
      {code,15264387},
      {ets,1174192}]},
 {vm_memory_high_watermark,0.3999999999362281},
 {vm_memory_limit,2508940902}]
...done.



rabbitmqctl cluster_status
Cluster status of node 'rabbit at i-00000007-asm' ...
[{nodes,[{disc,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']},
         {ram,['rabbit at i-00000009-asm']}]},
 {running_nodes,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']}]
...done.


*
Java amqp-client 2.7.1*


- Producer.groovy (java amqp-client 2.7.1)

import com.rabbitmq.client.*
try{

// Get rabbitmq config
def config = new ConfigSlurper().parse(new
File('../rabbitmq.properties').toURL())

def rabbit = new RabbitHA(config)
rabbit.init = { channel ->
  channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange
  channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy":
"all"]) // Durable exchange and HA policy
  channel.queueBind('myQueue', 'myExchange', '')
  channel.confirmSelect()
}

10000.times { idx ->
  rabbit.publish { channel ->
    def properties = new
AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2:
persistent
    def msg = "Message $idx"
    channel.basicPublish('myExchange', '', properties, msg.getBytes())
    println msg
  }
}

rabbit.close()

} catch(e){e.printStackTrace()}


- Consumer.groovy

import com.rabbitmq.client.*

try{

  // Get rabbitmq config
  def config = new ConfigSlurper().parse(new
File('../rabbitmq.properties').toURL())

  // Connect
  def rabbit = new RabbitHA(config)
  rabbit.onDelivery('myQueue'){ delivery, channel ->
  def msg = new String(delivery.body)
  println msg

  // Manual ack
  channel.basicAck(delivery.envelope.deliveryTag, false)

} catch(e){e.printStackTrace()}


- RabbitHA.groovy

import com.rabbitmq.client.*

/**
 *
 * RabbitMQ highly available proxy.
 * Basic implementation of a basic suscriber/publisher with reconnect logic.
 *
 */
class RabbitHA {
    ConnectionFactory connectionFactory
    Address[] addresses
    Closure init

    Connection connection
    Channel channel
    QueueingConsumer consumer

    public RabbitHA(Map config) {
        this(config, null)
    }

    public RabbitHA(Map config, Closure init){
        this.connectionFactory = new ConnectionFactory([username:
config.username, password: config.password, virtualHost:
config.virtualHost])
        this.addresses = Address.parseAddresses(config.addresses)
        this.init = init
        connectChannel()
    }

    void onDelivery(String queueName, Closure closure) {
        basicConsume(queueName)
        int i = 0

        while(true) {
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery()
                closure(delivery, channel)
                i = 0
            } catch(e) {
                // Only handle exceptions
                if(!(e in ShutdownSignalException || e in IOException || e
in AlreadyClosedException)) throw e

                i++
                e.printStackTrace()
                println "ShutdownSignalException recieved! Reconnection
attempt #$i"
                connectChannel()
                basicConsume(queueName)
            }
        }
    }

    void publish(Closure closure) {
        int i = 0
        boolean retry = true
        while(retry) {
            try {
                closure(channel)
                i = 0
                retry = false
            } catch(e) {
                // Only handle exceptions
                if(!(e in ShutdownSignalException || e in IOException || e
in AlreadyClosedException)) throw e

                i++
                retry = true
                e.printStackTrace()
                println "ShutdownSignalException recieved! Reconnection
attempt #$i"
                connectChannel()
            }
        }
    }


    void connectChannel() {
        connection = connectionFactory.newConnection(addresses)
        channel = connection.createChannel()

        println "Succesfully connected to $connection.address"

        if(init) {
            init(channel)
        }
    }

    void basicConsume(queueName) {
        consumer = new QueueingConsumer(channel)
        channel.basicConsume(queueName, false, consumer)
    }

    void close() {
        channel.waitForConfirmsOrDie()
        channel.close()
        connection.close()
    }
}



Output:

cat consumer-output.txt |grep Message | wc
   9957   19914  128330

cat producer-output.txt | grep Message | wc
  10000   20000  128890

*Note that consumer lost 43 messages*

Output from producer :
...
Message 1466
Message 1467
Message 1468
Message 1469
Message 1470
ShutdownSignalException recieved! Reconnection attempt #1
Succesfully connected to i-0000001a-zsm/172.16.158.46
Message 1471
Message 1472
Message 1473
Message 1474
Message 1475
Message 1476
...

*
Note messages [1471..1476] were consumed ok, but [1466..1470] are missing
in consumer output.*

The complete outputs are in
https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There you
can see reconnection log of both parts.
I've a strong feeling the publisher confirms is not well configured.

Please anyone could shed some light on the issue?

Cheers,
Pablo Molnar

PD: Also I take the opportunity to share a lot of groovy examples I've been
working in the last days: https://github.com/pablomolnar/rabbitmq_samples
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120131/778556da/attachment.htm>


More information about the rabbitmq-discuss mailing list