[rabbitmq-discuss] pika DLE, DLQ setup

cogitate monish.unni at gmail.com
Tue Jul 31 16:19:15 BST 2012


i am trying to get dead-letter-exchange work with pika.
 steps below:
[0] s2.direct is an exchange that declares a dead-letter-exchange(DLE)
called s2.dlx ( rabbitmq )
[1] setup DLE consumer -  declares a DLQ (pyDlxQueue) binds to DLE(s2.dlx)
[2] setup a consumer listening to queue (EchoPayMe) binds to
s2.direct(exchange)
    queue EchoPayMe setup with arguments x-message-ttl=10 and DLE=s2.dlx
    the consumer also sleeps for 100 seconds ( to simulate
long-running/hanging service)
    the idea is a producer puts 10 messages in it's queue will only have
message 1 served,
    the rest of the messages should go to DLE(s2.dlx) when the DLE consumer
replies.
[3] the producer - publish messages to s2.direct exchange w/ rk="EchoPayMe"

the problem is no messages are received by the DLQ or get routed to the DLE
consumer.
the code for the DLE consumer, App consumer and App Producer in python/pika
below.

i am sure there's a fundamental error in what i am doing, cause, i can get
it working (somewhat) with java
code (the x-message-ttl = 10 doesn't really hold the messages for 10 ms only
before returning, so i am somewhat puzzled by that, but at least the DLE
consumer gets the message)

regards,
-monish


-- DLE Consumer--
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

result = channel.queue_declare(queue='pyDlxQueue')
queue_name = result.method.queue
channel.queue_bind(exchange='s2.dlx',
                   queue=queue_name)
                   #routing_key='pyDlxQueue')

def on_request(ch, method, props, body):
    print " [x] pyDLX Got %r " % (body,)
    response = "ERROR FROM pyDLX"

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='pyDlxQueue',no_ack=False)

print " [x] pyDLX consumer started "
channel.start_consuming()

--  Consumer ---
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
result =
channel.queue_declare(queue='EchoPayMe',exclusive=False,auto_delete=True,durable=False,arguments={'x-dead-letter-exchange':'s2.dlx','x-message-ttl':10})

queue_name = result.method.queue
channel.queue_bind(exchange='s2.direct',
                   queue=queue_name,
                   routing_key='EchoPayMe')

def on_request(ch, method, props, body):
    sleep_time = 100
    print " [x] Got %r with corr_id=%r \n ======Sleeping for %s seconds
=============" % (body,props.correlation_id,sleep_time)
    time.sleep(sleep_time)
    response = "Got "+ body
    print "[x] Returning response %r" %(response,)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='EchoPayMe')

print " [x] Awaiting service requests"
channel.start_consuming()


-- Producer --
class EchoClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()

    def call(self, request):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='s2.direct',
                                   routing_key='EchoPayMe',
                                   properties=pika.BasicProperties(
                                   correlation_id=self.corr_id,),
                                   body=request)

echoClient = EchoClient()
total_count = 10
for i in xrange(total_count):
    print " [x] Calling echoService %d"%(i,)
    request=str(i)
    echoClient.call(request)




--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/pika-DLE-DLQ-setup-tp21135.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list