[rabbitmq-discuss] Cluster Pathology
Drew Smathers
drew.smathers at gmail.com
Thu Feb 12 19:31:42 GMT 2009
On Thu, Feb 12, 2009 at 6:17 AM, Ben Hood <0x6e6562 at gmail.com> wrote:
> Drew,
>
> On Thu, Feb 12, 2009 at 5:18 AM, Drew Smathers <drew.smathers at gmail.com> wrote:
>> Our experience has been the messages are lost (they're also marked
>> persistent via delivery_mode=2 and queues are durable).
>
> Interesting. And you definitely have at least one route to at least on
> queue for each routing key?
>
> Ben
>
Yes, the publisher in our case preemptively declares and binds queues
- to ensure messages are not lost. (The consumers also make the same
queue declaration). Below I've included a stripped down example of a
publisher and consumer (using py-amqplib) which can be used to
reproduce the issue.
Steps to reproduce:
1. run publisher and consumer against one node to ensure queue is created there:
$ python publisher.py hostB 5
$ python consumer.py hostB # CTL-C after receiving 5 messages
2. run publisher/consumer against other node - hostA
$ python publisher.py hostA 20
$ python consumer.py hostA
3. Before publisher from step 2 has finished, bring down rabbitmq on hostB
hostB $ rabbitmqctl stop
4. After publisher from step 2 has finished, restart consumer:
$ python consumer.py hostA
Notice messages delivered after hostB was brought down were not delivered.
==== publisher.py ==============
import sys
from time import sleep, time
from amqplib import client_0_8 as amqp
host = sys.argv[1]
message_count = int(sys.argv[2])
SLEEP = 2
EXCHANGE = 'publisher_test'
QUEUE = 'queue1'
connection = amqp.Connection(host, userid='guest', password='guest',
insist=True, connect_timeout=5)
channel = connection.channel()
channel.access_request('/data', active=True, read=True, write=True)
channel.exchange_declare(EXCHANGE, 'direct',
auto_delete=False, durable=True)
channel.queue_declare(QUEUE, auto_delete=False, durable=True)
channel.queue_bind(QUEUE, EXCHANGE, routing_key='messages')
for ct in range(message_count):
message = amqp.Message('%d-%d' % (int(1000 * time()), ct),
content_type='text/plain', delivery_mode=2)
channel.basic_publish(message, EXCHANGE,
routing_key='messages', mandatory=True)
print 'published message', message.body
sleep(SLEEP)
channel.close()
connection.close()
==== python consumer.py =======================
import sys
from amqplib import client_0_8 as amqp
host = sys.argv[1]
EXCHANGE = 'publisher_test'
QUEUE = 'queue1'
connection = amqp.Connection(host, userid='guest', password='guest',
insist=True, connect_timeout=5)
channel = connection.channel()
channel.access_request('/data', active=True, read=True, write=True)
channel.exchange_declare(EXCHANGE, 'direct',
auto_delete=False, durable=True)
channel.queue_declare(QUEUE, auto_delete=False, durable=True)
channel.queue_bind(QUEUE, EXCHANGE, routing_key='messages')
def handle_message(message):
print 'got message', message.body
channel.basic_ack(message.delivery_tag)
tag = channel.basic_consume(
QUEUE, callback=handle_message)
while channel.callbacks:
try:
channel.wait()
except KeyboardInterrupt:
break
channel.close()
connection.close()
More information about the rabbitmq-discuss
mailing list