[rabbitmq-discuss] FW: RabbitMQ

Marek Majkowski majek04 at gmail.com
Fri Oct 29 15:56:56 BST 2010


Feng,

On Thu, Oct 28, 2010 at 19:11, Feng Tian <ftian at vmware.com> wrote:
> I am using pika to write a multi threaded python program.  From what I learned so far, the BlockingConnection is a better fit for my need.   At least for me, seems that the Asyncore is not really designed for multi threaded app.  I modified the test_threads.py so that it sort of "works", except I have the following
>
>            while True:
>                ch_a.basic_consume(hdler, queue=q)
>                print "Wait for recv ..."
>                time.sleep(5)
>
> This have some problems that I cannot figure out how to resolve,
> 1. No one want to call sleep :-)
> 2. basic_consume seems to have a round trip talk with server, even though there is nothing to consume.
>
> That is exactly what puzzled me in the first place, if using amqplib, I can do
>
> ch.basic_consume ...
> while True:
>     ch.wait()
>
> What can I do with pika to get this effect?  And of course,  ch.wait(timeout) would be even better.  Any clue how to do this with pika?

In all of the AMQP clients `basic_consume` takes a callback function
as an argument.
This function is later executed when a client receives a message. But,
in order for
this to work, someone, somewhere, needs to sit on the socket descriptor
and wait for incoming data from Rabbit.

That's usually done in some sort of `wait` function. As you mentioned py-amqplib
exposes `wait` function. Pika uses some kind of  event loop.

In AsyncoreConenction the main asyncore event loop is used.
In BlockingConnection the same role was taken by a `mainloop`
function. For example:

conn = pika.BlockingConnection(pika.ConnectionParameters(
        (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',
        credentials = pika.PlainCredentials('guest', 'guest'),
        heartbeat = 0))

ch = conn.channel()
ch.queue_declare(queue=qname, durable=True, exclusive=False, auto_delete=False)

def handle_delivery(ch, method, header, body):
    print "method=%r" % (method,)
    print "header=%r" % (header,)
    print "  body=%r" % (body,)
    ch.basic_ack(delivery_tag = method.delivery_tag)

ch.basic_consume(handle_delivery, queue = qname)

conn.mainloop()


This function, also like the asyncore counterpart, blocks.
If you need to do something every few seconds, probably the
best idea is to:
 - spawn a separate thread for that
 - use blocking connection and use drain_events with timeout parameter
 - use asyncore loop, with 'timeout' parameter.


Marek


More information about the rabbitmq-discuss mailing list