[rabbitmq-discuss] Pika for Asynchronous Consuming and Publishing using multiple queues

Charles Law charles.law at openx.com
Tue Jun 26 23:56:19 BST 2012


This code works now!

I needed the latest pika code, which I got from github.  I originally
installed pika using pip, which got a version from 3/2011.  There have been
tons of commits since then.

On Tue, Jun 19, 2012 at 12:23 PM, Charles Law <charles.law at openx.com> wrote:

> I have found a couple quirks:
> - publishing a single message to a single queue works
> - publishing multiple messages to a single queue still crashes the code
> Still no fix to the problem though.
>
>
> On Mon, Jun 18, 2012 at 10:26 AM, Charles Law <charles.law at openx.com>wrote:
>
>> I am writing a consumer (and producer) using Pika.  I want to write
>> asynchronous code, but I'm having trouble getting everything working.  I
>> started with regular I pulled off the Pika docs, then made it work with
>> multiple queues.  To get it working with multiple queues I had to add some
>> checks that make sure all the queues are declared and bound before trying
>> to publish any messages.  That worked very well.
>>
>>
>> Our code also performs some time consuming logic, and we want our
>> consumer to publish to another queue that will go to some logger.  I tried
>> to add this on the same channel that was consuming code, but that did not
>> work.  I wasn't sure if I needed a separate channel, so I tried using 2
>> channels on the same connections - 1 channel to consume, and 1 channel to
>> produce, but that is not working.  I notice when the code is trying to
>> publish the 2nd log message that everything seems to stop.  Can anyone give
>> me any insight into what's going on?
>>
>>
>> Attached are sections of my code:
>>
>>
>> This code is setup so a lot of the multiple queue setup is handled
>> without duplicating any code:
>>
>> from pika import spec
>>
>>
>>
>> EXCHANGE = 'customers'
>>
>> EXCHANGEACK = 'logs' #keep this seperate for now
>>
>>
>>
>> class ChannelContainer(object):
>>
>>     def __init__(self, channel, exchange, append, queue_list, callback):
>>
>>         self.channel = channel
>>
>>         self.exchange = exchange
>>
>>         self.append_str = append
>>
>>         self.queue_list = queue_list
>>
>>         self.goodQueues = 0
>>
>>         self.channelReady = False
>>
>>         self.try_to_continue = callback
>>
>>
>>
>>
>>
>>     def exchange_declare(self):
>>
>>         self.channel.exchange_declare(exchange=self.exchange,
>>
>>                                           type='direct', \
>>
>>                                           callback=self
>> .on_exchange_declare)
>>
>>
>>
>>     # Step #4
>>
>>     def on_exchange_declare(self, frame):
>>
>>         """
>>
>>         Called when our exchange has been created
>>
>>         """
>>
>>         for queue_name in self.queue_list:
>>
>>             full_queue_name = queue_name + self.append_str
>>
>>
>>
>>             self.channel.queue_declare(queue=full_queue_name, durable=
>> True, \
>>
>>                                        exclusive=False, auto_delete=False,
>> \
>>
>>                                        callback=self.on_queue_declared)
>>
>>
>>
>>
>>
>>     # Step #5
>>
>>     def on_queue_declared(self, frame):
>>
>>         """
>>
>>         Bind the queues to the channel
>>
>>         """
>>
>>         if type(frame.method) == spec.Queue.DeclareOk:
>>
>>             # Get the queue name
>>
>>             queue_name = frame.method.queue
>>
>>
>>
>>             print self.append_str, "binding", queue_name
>>
>>
>>
>>             self.channel.queue_bind(exchange=self.exchange,
>>
>>                                     queue=queue_name,
>>
>>                                     routing_key=queue_name,
>>
>>                                     callback=self.on_queue_bound)
>>
>>
>>
>>     # Step #6
>>
>>     def on_queue_bound(self, frame):
>>
>>         """
>>
>>         Called when RabbitMQ has told us our Queue has been declared,
>>
>>             frame is the response from RabbitMQ
>>
>>         """
>>
>>         print "bound queue"
>>
>>         self.goodQueues += 1
>>
>>
>>
>>         if self.goodQueues == len(self.queue_list):
>>
>>             self.channelReady = True
>>
>>
>>
>>             #tell owner to try to continue
>>
>>             self.try_to_continue()
>>
>>
>>
>> class TxChannel(ChannelContainer):
>>
>>
>>     def __init__(self, channel, exchange, append, queue_list, callback):
>>
>>         ChannelContainer.__init__(self, channel, exchange, append,
>> queue_list, callback)
>>
>>
>>
>>
>>
>>     def acknowledge(self, queue_name, body):
>>
>>         """
>>
>>         Send the action and object to the acknowledgement queue.
>>
>>         """
>>
>>         full_queue_name = '%s--ack' % queue_name
>>
>>         print "sending ack", full_queue_name
>>
>>
>>
>>         self.channel.basic_publish(exchange=self.exchange,
>>
>>                                    routing_key=full_queue_name,
>>
>>                                    body=body)
>>
>>
>>
>>
>>
>> class RxChannel(ChannelContainer):
>>
>>
>>     def __init__(self, channel, exchange, append, queue_list, callback):
>>
>>         ChannelContainer.__init__(self, channel, exchange, append,
>> queue_list, callback)
>>
>>
>>
>>     # Step #6
>>
>>     def consume(self, callback):
>>
>>         """
>>
>>         Called when RabbitMQ has told us our Queue has been declared,
>>
>>             frame is the response from RabbitMQ
>>
>>         """
>>
>>
>>         #consume on all queues now
>>
>>         for queue_name in self.queue_list:
>>
>>             self.channel.basic_consume(callback, queue=queue_name)
>>
>>
>>
>>
>>
>>
>> Here is the consumer code.  I tried to take out everything irrelevant:
>>
>> import pika
>>
>> from pika.adapters import SelectConnection
>>
>>
>> from ChannelContainer import RxChannel, TxChannel
>>
>>
>>
>> EXCHANGE = 'customers'
>>
>> EXCHANGEACK = 'logs' #keep this seperate for now
>>
>>
>>
>>
>> class RabbitConsumer(object):
>>
>>
>>
>>     def __init__(self):
>>
>>         #initialize some variables
>>
>>         self.queue_list = []
>>
>>         self.goodQueues = 0
>>
>>         self.channel_rx = None
>>
>>         self.channel_tx = None
>>
>>         self.connection = None
>>
>>
>>
>>         self.parse_config()
>>
>>
>>
>>         #get the queue_list
>>
>>         self.gather_queue_names()
>>
>>
>>
>>
>>
>>
>>
>>     """
>>
>>     Functions to parse data and load config
>>
>>     """
>>
>>
>>     def parse_config(self):
>>
>>         """
>>
>>         Parse the variables in the config file into the instance.
>>
>>         """
>>
>>         self.rabbitmq_host = 'localhost'
>>
>>
>>
>>
>>
>>     def gather_queue_names(self):
>>
>>         """
>>
>>         Gather the names of the queues.
>>
>>         """
>>
>>         #These are just for testing
>>
>>         self.queue_list = ['1a', '2b', '3c-4d-5e-6f-7g']
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>     """
>>
>>     Functions to connect to RabbitMQ
>>
>>     """
>>
>>     # Step #2
>>
>>     def on_connected(self, connection):
>>
>>         """
>>
>>         Called when we are fully connected to RabbitMQ
>>
>>         """
>>
>>         # Open a channel for tx and rx
>>
>>         self.connection.channel(self.on_channel_open)
>>
>>         self.connection.channel(self.on_channel_open)
>>
>>
>>
>>
>>
>>     # Step #3
>>
>>     def on_channel_open(self, new_channel):
>>
>>         """
>>
>>         Called when our channel has opened
>>
>>         """
>>
>>         if self.channel_rx is None:
>>
>>             #rx channel
>>
>>             self.channel_rx = RxChannel(new_channel, EXCHANGE,
>>
>>                                      '', self.queue_list, self
>> .start_listening)
>>
>>             self.channel_rx.exchange_declare()
>>
>>         else:
>>
>>             self.channel_tx = TxChannel(new_channel, EXCHANGEACK,
>>
>>                                      '--ack', self.queue_list, self
>> .start_listening)
>>
>>             self.channel_tx.exchange_declare()
>>
>>
>>
>>
>>
>>     # Step #7
>>
>>     def start_listening(self):
>>
>>         """
>>
>>         Called when all queues for a channel are declared
>>
>>         """
>>
>>         print "test_allset"
>>
>>
>>
>>         if self.channel_rx.channelReady and self.channel_tx.channelReady:
>>
>>             #both channels are ready, try to continue
>>
>>             self.channel_rx.consume(self.handle_consume)
>>
>>
>>
>>
>>
>>     def handle_consume(self, channel, method, properties, body):
>>
>>         print "received message"
>>
>>         queue_name = method.routing_key
>>
>>
>>
>>         # Insert some long and complicated code goes here
>>
>>         result = 'I finished!'
>>
>>
>>
>>         #have the tx channel send an ack
>>
>>         self.channel_tx.acknowledge(queue_name, result)
>>
>>
>>
>>     # Step #1
>>
>>     def start_consumer(self):
>>
>>         # Step #1: Connect to RabbitMQ
>>
>>         self.connection = SelectConnection(pika.ConnectionParameters(\
>>
>>                 host=self.rabbitmq_host), self.on_connected)
>>
>>
>>
>>         try:
>>
>>             # Loop so we can communicate with RabbitMQ
>>
>>             self.connection.ioloop.start()
>>
>>         except KeyboardInterrupt:
>>
>>             # Gracefully close the connection
>>
>>             self.connection.close()
>>
>>             # Loop until we're fully closed, will stop on its own
>>
>>             self.connection.ioloop.start()
>>
>>
>>
>>
>> if __name__ == '__main__':
>>
>>     consumer = RabbitConsumer()
>>
>>     consumer.start_consumer()
>>
>> Thanks!
>>
>> --
>> Charles Law
>> Watch how we make online advertising simple: http://bit.ly/Ent_vid
>> www.openx.com <http://www.openx.org/>   |   follow us on:   Twitter<http://twitter.com/openx>
>>    Facebook <http://www.facebook.com/OpenX>    LinkedIn<http://www.linkedin.com/company/openx/products>
>>
>>
>
>
> --
> Charles Law   Software Developer
> Watch how we make online advertising simple: http://bit.ly/Ent_vid
> www.openx.com <http://www.openx.org/>   |   follow us on:   Twitter<http://twitter.com/openx>
>    Facebook <http://www.facebook.com/OpenX>    LinkedIn<http://www.linkedin.com/company/openx/products>
>
>


-- 
Charles Law   Software Developer
Watch how we make online advertising simple: http://bit.ly/Ent_vid
www.openx.com <http://www.openx.org/>   |   follow us on:
Twitter<http://twitter.com/openx>
   Facebook <http://www.facebook.com/OpenX>
LinkedIn<http://www.linkedin.com/company/openx/products>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120626/83beda82/attachment.htm>


More information about the rabbitmq-discuss mailing list