[rabbitmq-discuss] Pika for Asynchronous Consuming and Publishing using multiple queues
Charles Law
charles.law at openx.com
Tue Jun 26 23:56:19 BST 2012
This code works now!
I needed the latest pika code, which I got from github. I originally
installed pika using pip, which got a version from 3/2011. There have been
tons of commits since then.
On Tue, Jun 19, 2012 at 12:23 PM, Charles Law <charles.law at openx.com> wrote:
> I have found a couple quirks:
> - publishing a single message to a single queue works
> - publishing multiple messages to a single queue still crashes the code
> Still no fix to the problem though.
>
>
> On Mon, Jun 18, 2012 at 10:26 AM, Charles Law <charles.law at openx.com>wrote:
>
>> I am writing a consumer (and producer) using Pika. I want to write
>> asynchronous code, but I'm having trouble getting everything working. I
>> started with regular I pulled off the Pika docs, then made it work with
>> multiple queues. To get it working with multiple queues I had to add some
>> checks that make sure all the queues are declared and bound before trying
>> to publish any messages. That worked very well.
>>
>>
>> Our code also performs some time consuming logic, and we want our
>> consumer to publish to another queue that will go to some logger. I tried
>> to add this on the same channel that was consuming code, but that did not
>> work. I wasn't sure if I needed a separate channel, so I tried using 2
>> channels on the same connections - 1 channel to consume, and 1 channel to
>> produce, but that is not working. I notice when the code is trying to
>> publish the 2nd log message that everything seems to stop. Can anyone give
>> me any insight into what's going on?
>>
>>
>> Attached are sections of my code:
>>
>>
>> This code is setup so a lot of the multiple queue setup is handled
>> without duplicating any code:
>>
>> from pika import spec
>>
>>
>>
>> EXCHANGE = 'customers'
>>
>> EXCHANGEACK = 'logs' #keep this seperate for now
>>
>>
>>
>> class ChannelContainer(object):
>>
>> def __init__(self, channel, exchange, append, queue_list, callback):
>>
>> self.channel = channel
>>
>> self.exchange = exchange
>>
>> self.append_str = append
>>
>> self.queue_list = queue_list
>>
>> self.goodQueues = 0
>>
>> self.channelReady = False
>>
>> self.try_to_continue = callback
>>
>>
>>
>>
>>
>> def exchange_declare(self):
>>
>> self.channel.exchange_declare(exchange=self.exchange,
>>
>> type='direct', \
>>
>> callback=self
>> .on_exchange_declare)
>>
>>
>>
>> # Step #4
>>
>> def on_exchange_declare(self, frame):
>>
>> """
>>
>> Called when our exchange has been created
>>
>> """
>>
>> for queue_name in self.queue_list:
>>
>> full_queue_name = queue_name + self.append_str
>>
>>
>>
>> self.channel.queue_declare(queue=full_queue_name, durable=
>> True, \
>>
>> exclusive=False, auto_delete=False,
>> \
>>
>> callback=self.on_queue_declared)
>>
>>
>>
>>
>>
>> # Step #5
>>
>> def on_queue_declared(self, frame):
>>
>> """
>>
>> Bind the queues to the channel
>>
>> """
>>
>> if type(frame.method) == spec.Queue.DeclareOk:
>>
>> # Get the queue name
>>
>> queue_name = frame.method.queue
>>
>>
>>
>> print self.append_str, "binding", queue_name
>>
>>
>>
>> self.channel.queue_bind(exchange=self.exchange,
>>
>> queue=queue_name,
>>
>> routing_key=queue_name,
>>
>> callback=self.on_queue_bound)
>>
>>
>>
>> # Step #6
>>
>> def on_queue_bound(self, frame):
>>
>> """
>>
>> Called when RabbitMQ has told us our Queue has been declared,
>>
>> frame is the response from RabbitMQ
>>
>> """
>>
>> print "bound queue"
>>
>> self.goodQueues += 1
>>
>>
>>
>> if self.goodQueues == len(self.queue_list):
>>
>> self.channelReady = True
>>
>>
>>
>> #tell owner to try to continue
>>
>> self.try_to_continue()
>>
>>
>>
>> class TxChannel(ChannelContainer):
>>
>>
>> def __init__(self, channel, exchange, append, queue_list, callback):
>>
>> ChannelContainer.__init__(self, channel, exchange, append,
>> queue_list, callback)
>>
>>
>>
>>
>>
>> def acknowledge(self, queue_name, body):
>>
>> """
>>
>> Send the action and object to the acknowledgement queue.
>>
>> """
>>
>> full_queue_name = '%s--ack' % queue_name
>>
>> print "sending ack", full_queue_name
>>
>>
>>
>> self.channel.basic_publish(exchange=self.exchange,
>>
>> routing_key=full_queue_name,
>>
>> body=body)
>>
>>
>>
>>
>>
>> class RxChannel(ChannelContainer):
>>
>>
>> def __init__(self, channel, exchange, append, queue_list, callback):
>>
>> ChannelContainer.__init__(self, channel, exchange, append,
>> queue_list, callback)
>>
>>
>>
>> # Step #6
>>
>> def consume(self, callback):
>>
>> """
>>
>> Called when RabbitMQ has told us our Queue has been declared,
>>
>> frame is the response from RabbitMQ
>>
>> """
>>
>>
>> #consume on all queues now
>>
>> for queue_name in self.queue_list:
>>
>> self.channel.basic_consume(callback, queue=queue_name)
>>
>>
>>
>>
>>
>>
>> Here is the consumer code. I tried to take out everything irrelevant:
>>
>> import pika
>>
>> from pika.adapters import SelectConnection
>>
>>
>> from ChannelContainer import RxChannel, TxChannel
>>
>>
>>
>> EXCHANGE = 'customers'
>>
>> EXCHANGEACK = 'logs' #keep this seperate for now
>>
>>
>>
>>
>> class RabbitConsumer(object):
>>
>>
>>
>> def __init__(self):
>>
>> #initialize some variables
>>
>> self.queue_list = []
>>
>> self.goodQueues = 0
>>
>> self.channel_rx = None
>>
>> self.channel_tx = None
>>
>> self.connection = None
>>
>>
>>
>> self.parse_config()
>>
>>
>>
>> #get the queue_list
>>
>> self.gather_queue_names()
>>
>>
>>
>>
>>
>>
>>
>> """
>>
>> Functions to parse data and load config
>>
>> """
>>
>>
>> def parse_config(self):
>>
>> """
>>
>> Parse the variables in the config file into the instance.
>>
>> """
>>
>> self.rabbitmq_host = 'localhost'
>>
>>
>>
>>
>>
>> def gather_queue_names(self):
>>
>> """
>>
>> Gather the names of the queues.
>>
>> """
>>
>> #These are just for testing
>>
>> self.queue_list = ['1a', '2b', '3c-4d-5e-6f-7g']
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> """
>>
>> Functions to connect to RabbitMQ
>>
>> """
>>
>> # Step #2
>>
>> def on_connected(self, connection):
>>
>> """
>>
>> Called when we are fully connected to RabbitMQ
>>
>> """
>>
>> # Open a channel for tx and rx
>>
>> self.connection.channel(self.on_channel_open)
>>
>> self.connection.channel(self.on_channel_open)
>>
>>
>>
>>
>>
>> # Step #3
>>
>> def on_channel_open(self, new_channel):
>>
>> """
>>
>> Called when our channel has opened
>>
>> """
>>
>> if self.channel_rx is None:
>>
>> #rx channel
>>
>> self.channel_rx = RxChannel(new_channel, EXCHANGE,
>>
>> '', self.queue_list, self
>> .start_listening)
>>
>> self.channel_rx.exchange_declare()
>>
>> else:
>>
>> self.channel_tx = TxChannel(new_channel, EXCHANGEACK,
>>
>> '--ack', self.queue_list, self
>> .start_listening)
>>
>> self.channel_tx.exchange_declare()
>>
>>
>>
>>
>>
>> # Step #7
>>
>> def start_listening(self):
>>
>> """
>>
>> Called when all queues for a channel are declared
>>
>> """
>>
>> print "test_allset"
>>
>>
>>
>> if self.channel_rx.channelReady and self.channel_tx.channelReady:
>>
>> #both channels are ready, try to continue
>>
>> self.channel_rx.consume(self.handle_consume)
>>
>>
>>
>>
>>
>> def handle_consume(self, channel, method, properties, body):
>>
>> print "received message"
>>
>> queue_name = method.routing_key
>>
>>
>>
>> # Insert some long and complicated code goes here
>>
>> result = 'I finished!'
>>
>>
>>
>> #have the tx channel send an ack
>>
>> self.channel_tx.acknowledge(queue_name, result)
>>
>>
>>
>> # Step #1
>>
>> def start_consumer(self):
>>
>> # Step #1: Connect to RabbitMQ
>>
>> self.connection = SelectConnection(pika.ConnectionParameters(\
>>
>> host=self.rabbitmq_host), self.on_connected)
>>
>>
>>
>> try:
>>
>> # Loop so we can communicate with RabbitMQ
>>
>> self.connection.ioloop.start()
>>
>> except KeyboardInterrupt:
>>
>> # Gracefully close the connection
>>
>> self.connection.close()
>>
>> # Loop until we're fully closed, will stop on its own
>>
>> self.connection.ioloop.start()
>>
>>
>>
>>
>> if __name__ == '__main__':
>>
>> consumer = RabbitConsumer()
>>
>> consumer.start_consumer()
>>
>> Thanks!
>>
>> --
>> Charles Law
>> Watch how we make online advertising simple: http://bit.ly/Ent_vid
>> www.openx.com <http://www.openx.org/> | follow us on: Twitter<http://twitter.com/openx>
>> Facebook <http://www.facebook.com/OpenX> LinkedIn<http://www.linkedin.com/company/openx/products>
>>
>>
>
>
> --
> Charles Law Software Developer
> Watch how we make online advertising simple: http://bit.ly/Ent_vid
> www.openx.com <http://www.openx.org/> | follow us on: Twitter<http://twitter.com/openx>
> Facebook <http://www.facebook.com/OpenX> LinkedIn<http://www.linkedin.com/company/openx/products>
>
>
--
Charles Law Software Developer
Watch how we make online advertising simple: http://bit.ly/Ent_vid
www.openx.com <http://www.openx.org/> | follow us on:
Twitter<http://twitter.com/openx>
Facebook <http://www.facebook.com/OpenX>
LinkedIn<http://www.linkedin.com/company/openx/products>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120626/83beda82/attachment.htm>
More information about the rabbitmq-discuss
mailing list