[rabbitmq-discuss] pika DLE, DLQ setup
cogitate
monish.unni at gmail.com
Tue Jul 31 16:19:15 BST 2012
i am trying to get dead-letter-exchange work with pika.
steps below:
[0] s2.direct is an exchange that declares a dead-letter-exchange(DLE)
called s2.dlx ( rabbitmq )
[1] setup DLE consumer - declares a DLQ (pyDlxQueue) binds to DLE(s2.dlx)
[2] setup a consumer listening to queue (EchoPayMe) binds to
s2.direct(exchange)
queue EchoPayMe setup with arguments x-message-ttl=10 and DLE=s2.dlx
the consumer also sleeps for 100 seconds ( to simulate
long-running/hanging service)
the idea is a producer puts 10 messages in it's queue will only have
message 1 served,
the rest of the messages should go to DLE(s2.dlx) when the DLE consumer
replies.
[3] the producer - publish messages to s2.direct exchange w/ rk="EchoPayMe"
the problem is no messages are received by the DLQ or get routed to the DLE
consumer.
the code for the DLE consumer, App consumer and App Producer in python/pika
below.
i am sure there's a fundamental error in what i am doing, cause, i can get
it working (somewhat) with java
code (the x-message-ttl = 10 doesn't really hold the messages for 10 ms only
before returning, so i am somewhat puzzled by that, but at least the DLE
consumer gets the message)
regards,
-monish
-- DLE Consumer--
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
result = channel.queue_declare(queue='pyDlxQueue')
queue_name = result.method.queue
channel.queue_bind(exchange='s2.dlx',
queue=queue_name)
#routing_key='pyDlxQueue')
def on_request(ch, method, props, body):
print " [x] pyDLX Got %r " % (body,)
response = "ERROR FROM pyDLX"
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(on_request, queue='pyDlxQueue',no_ack=False)
print " [x] pyDLX consumer started "
channel.start_consuming()
-- Consumer ---
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
result =
channel.queue_declare(queue='EchoPayMe',exclusive=False,auto_delete=True,durable=False,arguments={'x-dead-letter-exchange':'s2.dlx','x-message-ttl':10})
queue_name = result.method.queue
channel.queue_bind(exchange='s2.direct',
queue=queue_name,
routing_key='EchoPayMe')
def on_request(ch, method, props, body):
sleep_time = 100
print " [x] Got %r with corr_id=%r \n ======Sleeping for %s seconds
=============" % (body,props.correlation_id,sleep_time)
time.sleep(sleep_time)
response = "Got "+ body
print "[x] Returning response %r" %(response,)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='EchoPayMe')
print " [x] Awaiting service requests"
channel.start_consuming()
-- Producer --
class EchoClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
def call(self, request):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='s2.direct',
routing_key='EchoPayMe',
properties=pika.BasicProperties(
correlation_id=self.corr_id,),
body=request)
echoClient = EchoClient()
total_count = 10
for i in xrange(total_count):
print " [x] Calling echoService %d"%(i,)
request=str(i)
echoClient.call(request)
--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/pika-DLE-DLQ-setup-tp21135.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list