[rabbitmq-discuss] Lost messages in HA tests in a cluster
Pablo Molnar
pablomolnar at gmail.com
Tue Jan 31 22:51:44 GMT 2012
Hi all!
I'm doing some HA tests stressing my cluster and I'm experimenting lost
messages ONLY when the publisher lost the connection due a kill -9.
*Test OK*: Kill node while consuming:
1 - Setup a clean 3 node's cluster
2 - Execute producer with 10.000 messages connected to node A
3 - Wait producer to finish
4 - Execute consumer connected to node A
5 - While consumer is running kill node A
Result: The consumer reconnects to another node and consume the rest of
messages ok. All 10.000 messages were consumed ok.
*Test FAILED*: Kill node while producing:
1 - Setup a clean 3 node's cluster
2 - Execute consumer to start listening connected node A
3 - Execute producer with 10.000 messages connected to node A
4 - While producer is running kill node A
Result: The producer reconnects fine (the consumer too) and keep publishing
but some of the messages* already published* to node A are lost.
These are my settings:
rabbitmqctl status
Status of node 'rabbit at i-00000007-asm' ...
[{pid,11339},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","2.7.1"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","2.7.1"},
{amqp_client,"RabbitMQ AMQP Client","2.7.1"},
{rabbit,"RabbitMQ","2.7.1"},
{os_mon,"CPO CXC 138 46","2.2.4"},
{sasl,"SASL CXC 138 11","2.1.8"},
{rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.7.1"},
{webmachine,"webmachine","1.7.0-rmq2.7.1-hg"},
{mochiweb,"MochiMedia Web Server","1.3-rmq2.7.1-git"},
{inets,"INETS CXC 138 49","5.2"},
{mnesia,"MNESIA CXC 138 12","4.4.12"},
{stdlib,"ERTS CXC 138 10","1.16.4"},
{kernel,"ERTS CXC 138 10","2.13.4"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2]
[async-threads:0] [hipe] [kernel-poll:false]\n"},
{memory,
[{total,92565608},
{processes,4004968},
{processes_used,3996224},
{system,88560640},
{atom,1322033},
{atom_used,1291462},
{binary,32496},
{code,15264387},
{ets,1174192}]},
{vm_memory_high_watermark,0.3999999999362281},
{vm_memory_limit,2508940902}]
...done.
rabbitmqctl cluster_status
Cluster status of node 'rabbit at i-00000007-asm' ...
[{nodes,[{disc,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']},
{ram,['rabbit at i-00000009-asm']}]},
{running_nodes,['rabbit at i-0000001a-zsm','rabbit at i-00000007-asm']}]
...done.
*
Java amqp-client 2.7.1*
- Producer.groovy (java amqp-client 2.7.1)
import com.rabbitmq.client.*
try{
// Get rabbitmq config
def config = new ConfigSlurper().parse(new
File('../rabbitmq.properties').toURL())
def rabbit = new RabbitHA(config)
rabbit.init = { channel ->
channel.exchangeDeclare('myExchange', "direct", true) // Durable exchange
channel.queueDeclare('myQueue', true, false, false, ["x-ha-policy":
"all"]) // Durable exchange and HA policy
channel.queueBind('myQueue', 'myExchange', '')
channel.confirmSelect()
}
10000.times { idx ->
rabbit.publish { channel ->
def properties = new
AMQP.BasicProperties.Builder().deliveryMode(2).build() // Delivery mode 2:
persistent
def msg = "Message $idx"
channel.basicPublish('myExchange', '', properties, msg.getBytes())
println msg
}
}
rabbit.close()
} catch(e){e.printStackTrace()}
- Consumer.groovy
import com.rabbitmq.client.*
try{
// Get rabbitmq config
def config = new ConfigSlurper().parse(new
File('../rabbitmq.properties').toURL())
// Connect
def rabbit = new RabbitHA(config)
rabbit.onDelivery('myQueue'){ delivery, channel ->
def msg = new String(delivery.body)
println msg
// Manual ack
channel.basicAck(delivery.envelope.deliveryTag, false)
} catch(e){e.printStackTrace()}
- RabbitHA.groovy
import com.rabbitmq.client.*
/**
*
* RabbitMQ highly available proxy.
* Basic implementation of a basic suscriber/publisher with reconnect logic.
*
*/
class RabbitHA {
ConnectionFactory connectionFactory
Address[] addresses
Closure init
Connection connection
Channel channel
QueueingConsumer consumer
public RabbitHA(Map config) {
this(config, null)
}
public RabbitHA(Map config, Closure init){
this.connectionFactory = new ConnectionFactory([username:
config.username, password: config.password, virtualHost:
config.virtualHost])
this.addresses = Address.parseAddresses(config.addresses)
this.init = init
connectChannel()
}
void onDelivery(String queueName, Closure closure) {
basicConsume(queueName)
int i = 0
while(true) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery()
closure(delivery, channel)
i = 0
} catch(e) {
// Only handle exceptions
if(!(e in ShutdownSignalException || e in IOException || e
in AlreadyClosedException)) throw e
i++
e.printStackTrace()
println "ShutdownSignalException recieved! Reconnection
attempt #$i"
connectChannel()
basicConsume(queueName)
}
}
}
void publish(Closure closure) {
int i = 0
boolean retry = true
while(retry) {
try {
closure(channel)
i = 0
retry = false
} catch(e) {
// Only handle exceptions
if(!(e in ShutdownSignalException || e in IOException || e
in AlreadyClosedException)) throw e
i++
retry = true
e.printStackTrace()
println "ShutdownSignalException recieved! Reconnection
attempt #$i"
connectChannel()
}
}
}
void connectChannel() {
connection = connectionFactory.newConnection(addresses)
channel = connection.createChannel()
println "Succesfully connected to $connection.address"
if(init) {
init(channel)
}
}
void basicConsume(queueName) {
consumer = new QueueingConsumer(channel)
channel.basicConsume(queueName, false, consumer)
}
void close() {
channel.waitForConfirmsOrDie()
channel.close()
connection.close()
}
}
Output:
cat consumer-output.txt |grep Message | wc
9957 19914 128330
cat producer-output.txt | grep Message | wc
10000 20000 128890
*Note that consumer lost 43 messages*
Output from producer :
...
Message 1466
Message 1467
Message 1468
Message 1469
Message 1470
ShutdownSignalException recieved! Reconnection attempt #1
Succesfully connected to i-0000001a-zsm/172.16.158.46
Message 1471
Message 1472
Message 1473
Message 1474
Message 1475
Message 1476
...
*
Note messages [1471..1476] were consumed ok, but [1466..1470] are missing
in consumer output.*
The complete outputs are in
https://github.com/pablomolnar/rabbitmq_samples/tree/master/out. There you
can see reconnection log of both parts.
I've a strong feeling the publisher confirms is not well configured.
Please anyone could shed some light on the issue?
Cheers,
Pablo Molnar
PD: Also I take the opportunity to share a lot of groovy examples I've been
working in the last days: https://github.com/pablomolnar/rabbitmq_samples
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120131/778556da/attachment.htm>
More information about the rabbitmq-discuss
mailing list