[rabbitmq-discuss] Publish on a queue, Wait for Reply on another, add time out for the monitor (Select Connection)

Ask Solem ask at rabbitmq.com
Fri Dec 2 14:42:27 GMT 2011

On 29 Nov 2011, at 21:33, Christopher Lefevre wrote:

> Hello,
> I’m trying to do EXACTLY THIS:
> http://stackoverflow.com/questions/2799731/wait-for-a-single-rabbitmq-message-with-a-timeout
> However I’m using Pika for all of my RabbitMQ communications, and unfortunately the “Answer” is just “Fixed” which isn’t really applicable to use in my situation.

The 'fixed' refers to amqplib, it doesn't mention pika.

> For a quick summary, I’m trying to send a message to RabbitMQ, then wait on a reply from the workers on the other side of the RabbitMQ queue on an exclusive queue. I would like to have a timeout setting on the monitor to wait for the message, just in case the server is down, or taking an absurd amount of time.

I don't have much experience using pika, but I'll try
to provide a general answer.

You shouldn't be blocking waiting for a single message
if your program is async.  Instead you should use basic_consume
for every reply you are interested in, with a callback for each.

Something like:

reply_exchange = "replies"

def handle_reply(message):
   reply_id = message.properties

def add_reply_callback(channel, publisher_id, reply_id, callback):
   channel.queue_bind(publisher_id, reply_exchange, reply_id)

   channel.basic_consume(callback, queue=publisher_id)

def send_rpc(message, callback, producer_channel,
       consumer_channel, publisher_id):
   reply_id = uuid()

   add_reply_callback(consumer_channel, publisher_id, reply_id, callback)
   producer_channel.basic_publish(message, properties={
	"reply_to": reply_id})

This assumes you have a 'thread' draining events from the consumer_channel.

If you need to add a timeout on top of this, then the best route
depends on the async framework you are using?

More information about the rabbitmq-discuss mailing list