[rabbitmq-discuss] Cluster Pathology

Drew Smathers drew.smathers at gmail.com
Thu Feb 12 19:31:42 GMT 2009


On Thu, Feb 12, 2009 at 6:17 AM, Ben Hood <0x6e6562 at gmail.com> wrote:
> Drew,
>
> On Thu, Feb 12, 2009 at 5:18 AM, Drew Smathers <drew.smathers at gmail.com> wrote:
>> Our experience has been the messages are lost (they're also marked
>> persistent via delivery_mode=2 and queues are durable).
>
> Interesting. And you definitely have at least one route to at least on
> queue for each routing key?
>
> Ben
>

Yes, the publisher in our case preemptively declares and binds queues
- to ensure messages are not lost.  (The consumers also make the same
queue declaration).  Below I've included a stripped down example of a
publisher and consumer (using py-amqplib) which can be used to
reproduce the issue.

Steps to reproduce:

1. run publisher and consumer against one node to ensure queue is created there:

 $ python publisher.py hostB 5
 $ python consumer.py hostB # CTL-C after receiving 5 messages

2. run publisher/consumer against other node - hostA

 $ python publisher.py hostA 20
 $ python consumer.py hostA

3. Before publisher from step 2 has finished, bring down rabbitmq on hostB

  hostB $ rabbitmqctl stop

4. After publisher from step 2 has finished, restart consumer:

  $ python consumer.py hostA

Notice messages delivered after hostB was brought down were not delivered.

==== publisher.py ==============
import sys
from time import sleep, time
from amqplib import client_0_8 as amqp

host = sys.argv[1]
message_count = int(sys.argv[2])

SLEEP = 2
EXCHANGE = 'publisher_test'
QUEUE = 'queue1'

connection = amqp.Connection(host, userid='guest', password='guest',
        insist=True, connect_timeout=5)
channel = connection.channel()
channel.access_request('/data', active=True, read=True, write=True)
channel.exchange_declare(EXCHANGE, 'direct',
            auto_delete=False, durable=True)
channel.queue_declare(QUEUE, auto_delete=False, durable=True)
channel.queue_bind(QUEUE, EXCHANGE, routing_key='messages')

for ct in range(message_count):
    message = amqp.Message('%d-%d' % (int(1000 * time()), ct),
                           content_type='text/plain', delivery_mode=2)
    channel.basic_publish(message, EXCHANGE,
        routing_key='messages', mandatory=True)
    print 'published message', message.body
    sleep(SLEEP)

channel.close()
connection.close()


==== python consumer.py  =======================

import sys
from amqplib import client_0_8 as amqp

host = sys.argv[1]

EXCHANGE = 'publisher_test'
QUEUE = 'queue1'

connection = amqp.Connection(host, userid='guest', password='guest',
        insist=True, connect_timeout=5)
channel = connection.channel()
channel.access_request('/data', active=True, read=True, write=True)
channel.exchange_declare(EXCHANGE, 'direct',
            auto_delete=False, durable=True)
channel.queue_declare(QUEUE, auto_delete=False, durable=True)
channel.queue_bind(QUEUE, EXCHANGE, routing_key='messages')

def handle_message(message):
    print 'got message', message.body
    channel.basic_ack(message.delivery_tag)

tag = channel.basic_consume(
    QUEUE, callback=handle_message)
while channel.callbacks:
    try:
        channel.wait()
    except KeyboardInterrupt:
        break

channel.close()
connection.close()




More information about the rabbitmq-discuss mailing list