[rabbitmq-discuss] Pika for Asynchronous Consuming and Publishing using multiple queues

Charles Law charles.law at openx.com
Mon Jun 18 18:26:44 BST 2012

I am writing a consumer (and producer) using Pika.  I want to write
asynchronous code, but I'm having trouble getting everything working.  I
started with regular I pulled off the Pika docs, then made it work with
multiple queues.  To get it working with multiple queues I had to add some
checks that make sure all the queues are declared and bound before trying
to publish any messages.  That worked very well.

Our code also performs some time consuming logic, and we want our consumer
to publish to another queue that will go to some logger.  I tried to add
this on the same channel that was consuming code, but that did not work.  I
wasn't sure if I needed a separate channel, so I tried using 2 channels on
the same connections - 1 channel to consume, and 1 channel to produce, but
that is not working.  I notice when the code is trying to publish the 2nd
log message that everything seems to stop.  Can anyone give me any insight
into what's going on?

Attached are sections of my code:

This code is setup so a lot of the multiple queue setup is handled without
duplicating any code:

from pika import spec

EXCHANGE = 'customers'

EXCHANGEACK = 'logs' #keep this seperate for now

class ChannelContainer(object):

    def __init__(self, channel, exchange, append, queue_list, callback):

        self.channel = channel

        self.exchange = exchange

        self.append_str = append

        self.queue_list = queue_list

        self.goodQueues = 0

        self.channelReady = False

        self.try_to_continue = callback

    def exchange_declare(self):


                                          type='direct', \


    # Step #4

    def on_exchange_declare(self, frame):


        Called when our exchange has been created


        for queue_name in self.queue_list:

            full_queue_name = queue_name + self.append_str

            self.channel.queue_declare(queue=full_queue_name, durable=True,

                                       exclusive=False, auto_delete=False, \


    # Step #5

    def on_queue_declared(self, frame):


        Bind the queues to the channel


        if type(frame.method) == spec.Queue.DeclareOk:

            # Get the queue name

            queue_name = frame.method.queue

            print self.append_str, "binding", queue_name





    # Step #6

    def on_queue_bound(self, frame):


        Called when RabbitMQ has told us our Queue has been declared,

            frame is the response from RabbitMQ


        print "bound queue"

        self.goodQueues += 1

        if self.goodQueues == len(self.queue_list):

            self.channelReady = True

            #tell owner to try to continue


class TxChannel(ChannelContainer):

    def __init__(self, channel, exchange, append, queue_list, callback):

        ChannelContainer.__init__(self, channel, exchange, append,
queue_list, callback)

    def acknowledge(self, queue_name, body):


        Send the action and object to the acknowledgement queue.


        full_queue_name = '%s--ack' % queue_name

        print "sending ack", full_queue_name




class RxChannel(ChannelContainer):

    def __init__(self, channel, exchange, append, queue_list, callback):

        ChannelContainer.__init__(self, channel, exchange, append,
queue_list, callback)

    # Step #6

    def consume(self, callback):


        Called when RabbitMQ has told us our Queue has been declared,

            frame is the response from RabbitMQ


        #consume on all queues now

        for queue_name in self.queue_list:

            self.channel.basic_consume(callback, queue=queue_name)

Here is the consumer code.  I tried to take out everything irrelevant:

import pika

from pika.adapters import SelectConnection

from ChannelContainer import RxChannel, TxChannel

EXCHANGE = 'customers'

EXCHANGEACK = 'logs' #keep this seperate for now

class RabbitConsumer(object):

    def __init__(self):

        #initialize some variables

        self.queue_list = []

        self.goodQueues = 0

        self.channel_rx = None

        self.channel_tx = None

        self.connection = None


        #get the queue_list



    Functions to parse data and load config


    def parse_config(self):


        Parse the variables in the config file into the instance.


        self.rabbitmq_host = 'localhost'

    def gather_queue_names(self):


        Gather the names of the queues.


        #These are just for testing

        self.queue_list = ['1a', '2b', '3c-4d-5e-6f-7g']


    Functions to connect to RabbitMQ


    # Step #2

    def on_connected(self, connection):


        Called when we are fully connected to RabbitMQ


        # Open a channel for tx and rx



    # Step #3

    def on_channel_open(self, new_channel):


        Called when our channel has opened


        if self.channel_rx is None:

            #rx channel

            self.channel_rx = RxChannel(new_channel, EXCHANGE,

                                     '', self.queue_list, self



            self.channel_tx = TxChannel(new_channel, EXCHANGEACK,

                                     '--ack', self.queue_list, self


    # Step #7

    def start_listening(self):


        Called when all queues for a channel are declared


        print "test_allset"

        if self.channel_rx.channelReady and self.channel_tx.channelReady:

            #both channels are ready, try to continue


    def handle_consume(self, channel, method, properties, body):

        print "received message"

        queue_name = method.routing_key

        # Insert some long and complicated code goes here

        result = 'I finished!'

        #have the tx channel send an ack

        self.channel_tx.acknowledge(queue_name, result)

    # Step #1

    def start_consumer(self):

        # Step #1: Connect to RabbitMQ

        self.connection = SelectConnection(pika.ConnectionParameters(\

                host=self.rabbitmq_host), self.on_connected)


            # Loop so we can communicate with RabbitMQ


        except KeyboardInterrupt:

            # Gracefully close the connection


            # Loop until we're fully closed, will stop on its own


if __name__ == '__main__':

    consumer = RabbitConsumer()



Charles Law
Watch how we make online advertising simple: http://bit.ly/Ent_vid
www.openx.com <http://www.openx.org/>   |   follow us on:
   Facebook <http://www.facebook.com/OpenX>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120618/8848d6cb/attachment.htm>

More information about the rabbitmq-discuss mailing list