[rabbitmq-discuss] Pika for Asynchronous Consuming and Publishing using multiple queues

Charles Law charles.law at openx.com
Mon Jun 18 18:26:44 BST 2012


I am writing a consumer (and producer) using Pika.  I want to write
asynchronous code, but I'm having trouble getting everything working.  I
started with regular I pulled off the Pika docs, then made it work with
multiple queues.  To get it working with multiple queues I had to add some
checks that make sure all the queues are declared and bound before trying
to publish any messages.  That worked very well.


Our code also performs some time consuming logic, and we want our consumer
to publish to another queue that will go to some logger.  I tried to add
this on the same channel that was consuming code, but that did not work.  I
wasn't sure if I needed a separate channel, so I tried using 2 channels on
the same connections - 1 channel to consume, and 1 channel to produce, but
that is not working.  I notice when the code is trying to publish the 2nd
log message that everything seems to stop.  Can anyone give me any insight
into what's going on?


Attached are sections of my code:


This code is setup so a lot of the multiple queue setup is handled without
duplicating any code:

from pika import spec



EXCHANGE = 'customers'

EXCHANGEACK = 'logs' #keep this seperate for now



class ChannelContainer(object):

    def __init__(self, channel, exchange, append, queue_list, callback):

        self.channel = channel

        self.exchange = exchange

        self.append_str = append

        self.queue_list = queue_list

        self.goodQueues = 0

        self.channelReady = False

        self.try_to_continue = callback





    def exchange_declare(self):

        self.channel.exchange_declare(exchange=self.exchange,

                                          type='direct', \

                                          callback=self.on_exchange_declare)



    # Step #4

    def on_exchange_declare(self, frame):

        """

        Called when our exchange has been created

        """

        for queue_name in self.queue_list:

            full_queue_name = queue_name + self.append_str



            self.channel.queue_declare(queue=full_queue_name, durable=True,
\

                                       exclusive=False, auto_delete=False, \

                                       callback=self.on_queue_declared)





    # Step #5

    def on_queue_declared(self, frame):

        """

        Bind the queues to the channel

        """

        if type(frame.method) == spec.Queue.DeclareOk:

            # Get the queue name

            queue_name = frame.method.queue



            print self.append_str, "binding", queue_name



            self.channel.queue_bind(exchange=self.exchange,

                                    queue=queue_name,

                                    routing_key=queue_name,

                                    callback=self.on_queue_bound)



    # Step #6

    def on_queue_bound(self, frame):

        """

        Called when RabbitMQ has told us our Queue has been declared,

            frame is the response from RabbitMQ

        """

        print "bound queue"

        self.goodQueues += 1



        if self.goodQueues == len(self.queue_list):

            self.channelReady = True



            #tell owner to try to continue

            self.try_to_continue()



class TxChannel(ChannelContainer):


    def __init__(self, channel, exchange, append, queue_list, callback):

        ChannelContainer.__init__(self, channel, exchange, append,
queue_list, callback)





    def acknowledge(self, queue_name, body):

        """

        Send the action and object to the acknowledgement queue.

        """

        full_queue_name = '%s--ack' % queue_name

        print "sending ack", full_queue_name



        self.channel.basic_publish(exchange=self.exchange,

                                   routing_key=full_queue_name,

                                   body=body)





class RxChannel(ChannelContainer):


    def __init__(self, channel, exchange, append, queue_list, callback):

        ChannelContainer.__init__(self, channel, exchange, append,
queue_list, callback)



    # Step #6

    def consume(self, callback):

        """

        Called when RabbitMQ has told us our Queue has been declared,

            frame is the response from RabbitMQ

        """


        #consume on all queues now

        for queue_name in self.queue_list:

            self.channel.basic_consume(callback, queue=queue_name)






Here is the consumer code.  I tried to take out everything irrelevant:

import pika

from pika.adapters import SelectConnection


from ChannelContainer import RxChannel, TxChannel



EXCHANGE = 'customers'

EXCHANGEACK = 'logs' #keep this seperate for now




class RabbitConsumer(object):



    def __init__(self):

        #initialize some variables

        self.queue_list = []

        self.goodQueues = 0

        self.channel_rx = None

        self.channel_tx = None

        self.connection = None



        self.parse_config()



        #get the queue_list

        self.gather_queue_names()







    """

    Functions to parse data and load config

    """


    def parse_config(self):

        """

        Parse the variables in the config file into the instance.

        """

        self.rabbitmq_host = 'localhost'





    def gather_queue_names(self):

        """

        Gather the names of the queues.

        """

        #These are just for testing

        self.queue_list = ['1a', '2b', '3c-4d-5e-6f-7g']










    """

    Functions to connect to RabbitMQ

    """

    # Step #2

    def on_connected(self, connection):

        """

        Called when we are fully connected to RabbitMQ

        """

        # Open a channel for tx and rx

        self.connection.channel(self.on_channel_open)

        self.connection.channel(self.on_channel_open)





    # Step #3

    def on_channel_open(self, new_channel):

        """

        Called when our channel has opened

        """

        if self.channel_rx is None:

            #rx channel

            self.channel_rx = RxChannel(new_channel, EXCHANGE,

                                     '', self.queue_list, self
.start_listening)

            self.channel_rx.exchange_declare()

        else:

            self.channel_tx = TxChannel(new_channel, EXCHANGEACK,

                                     '--ack', self.queue_list, self
.start_listening)

            self.channel_tx.exchange_declare()





    # Step #7

    def start_listening(self):

        """

        Called when all queues for a channel are declared

        """

        print "test_allset"



        if self.channel_rx.channelReady and self.channel_tx.channelReady:

            #both channels are ready, try to continue

            self.channel_rx.consume(self.handle_consume)





    def handle_consume(self, channel, method, properties, body):

        print "received message"

        queue_name = method.routing_key



        # Insert some long and complicated code goes here

        result = 'I finished!'



        #have the tx channel send an ack

        self.channel_tx.acknowledge(queue_name, result)



    # Step #1

    def start_consumer(self):

        # Step #1: Connect to RabbitMQ

        self.connection = SelectConnection(pika.ConnectionParameters(\

                host=self.rabbitmq_host), self.on_connected)



        try:

            # Loop so we can communicate with RabbitMQ

            self.connection.ioloop.start()

        except KeyboardInterrupt:

            # Gracefully close the connection

            self.connection.close()

            # Loop until we're fully closed, will stop on its own

            self.connection.ioloop.start()




if __name__ == '__main__':

    consumer = RabbitConsumer()

    consumer.start_consumer()

Thanks!

-- 
Charles Law
Watch how we make online advertising simple: http://bit.ly/Ent_vid
www.openx.com <http://www.openx.org/>   |   follow us on:
Twitter<http://twitter.com/openx>
   Facebook <http://www.facebook.com/OpenX>
LinkedIn<http://www.linkedin.com/company/openx/products>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120618/8848d6cb/attachment.htm>


More information about the rabbitmq-discuss mailing list