[rabbitmq-discuss] dead-letter-exchange behaviour

cogitate monish.unni at gmail.com
Wed Aug 1 17:00:00 BST 2012


hi mathias:

i am trying to get dead-letter-exchange work.. 
 steps below: 
[0] s2.direct is an exchange that declares a dead-letter-exchange(DLE)
called s2.dlx ( rabbitmq ) 
[1] setup DLE consumer -  declares a DLQ (pyDlxQueue) binds to DLE(s2.dlx) 
[2] setup a consumer listening to queue (EchoPayMe) binds to
s2.direct(exchange) 
    queue EchoPayMe setup with arguments x-message-ttl=10 and DLE=s2.dlx 
    the consumer also sleeps for 100 seconds ( to simulate
long-running/hanging service) 
    the idea is a producer puts 10 messages in it's queue will only have
message 1 served, 
    the rest of the messages should go to DLE(s2.dlx) when the DLE consumer
replies. 
[3] the producer - publish messages to s2.direct exchange w/ rk="EchoPayMe" 

the problem is no messages are received by the DLQ or get routed to the DLE
consumer. 
the code for the DLE consumer, App consumer and App Producer in python/pika
below. 

i am sure there's a fundamental error in what i am doing, cause, i can get
it working (somewhat) with java 
code (the x-message-ttl = 10 doesn't really hold the messages for 10 ms only
before returning, so i am somewhat puzzled by that, but at least the DLE
consumer gets the message) 

regards, 
-monish 


-- DLE Consumer-- 
connection = pika.BlockingConnection(pika.ConnectionParameters( 
        host='localhost')) 

channel = connection.channel() 

result = channel.queue_declare(queue='pyDlxQueue') 
queue_name = result.method.queue 
channel.queue_bind(exchange='s2.dlx', 
                   queue=queue_name) 
                   #routing_key='pyDlxQueue') 

def on_request(ch, method, props, body): 
    print " [x] pyDLX Got %r " % (body,) 
    response = "ERROR FROM pyDLX" 

    ch.basic_publish(exchange='', 
                     routing_key=props.reply_to, 
                     properties=pika.BasicProperties(correlation_id = \ 
                                                     props.correlation_id), 
                     body=str(response)) 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

channel.basic_consume(on_request, queue='pyDlxQueue',no_ack=False) 

print " [x] pyDLX consumer started " 
channel.start_consuming() 

--  Consumer --- 
connection = pika.BlockingConnection(pika.ConnectionParameters( 
        host='localhost')) 
channel = connection.channel() 
result =
channel.queue_declare(queue='EchoPayMe',exclusive=False,auto_delete=True,durable=False,arguments={'x-dead-letter-exchange':'s2.dlx','x-message-ttl':10}) 

queue_name = result.method.queue 
channel.queue_bind(exchange='s2.direct', 
                   queue=queue_name, 
                   routing_key='EchoPayMe') 

def on_request(ch, method, props, body): 
    sleep_time = 100 
    print " [x] Got %r with corr_id=%r \n ======Sleeping for %s seconds
=============" % (body,props.correlation_id,sleep_time) 
    time.sleep(sleep_time) 
    response = "Got "+ body 
    print "[x] Returning response %r" %(response,) 

    ch.basic_publish(exchange='', 
                     routing_key=props.reply_to, 
                     properties=pika.BasicProperties(correlation_id = \ 
                                                     props.correlation_id), 
                     body=str(response)) 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

#channel.basic_qos(prefetch_count=1) 
channel.basic_consume(on_request, queue='EchoPayMe') 

print " [x] Awaiting service requests" 
channel.start_consuming() 


-- Producer -- 
class EchoClient(object): 
    def __init__(self): 
        self.connection = pika.BlockingConnection(pika.ConnectionParameters( 
                host='localhost')) 
        self.channel = self.connection.channel() 

    def call(self, request): 
        self.response = None 
        self.corr_id = str(uuid.uuid4()) 
        self.channel.basic_publish(exchange='s2.direct', 
                                   routing_key='EchoPayMe', 
                                   properties=pika.BasicProperties( 
                                   correlation_id=self.corr_id,), 
                                   body=request) 

echoClient = EchoClient() 
total_count = 10 
for i in xrange(total_count): 
    print " [x] Calling echoService %d"%(i,) 
    request=str(i) 
    echoClient.call(request) 




--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/dead-letter-exchange-behaviour-tp21167.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list