[rabbitmq-discuss] Pika for Asynchronous Consuming and Publishing using multiple queues
Charles Law
charles.law at openx.com
Tue Jun 19 20:23:21 BST 2012
I have found a couple quirks:
- publishing a single message to a single queue works
- publishing multiple messages to a single queue still crashes the code
Still no fix to the problem though.
On Mon, Jun 18, 2012 at 10:26 AM, Charles Law <charles.law at openx.com> wrote:
> I am writing a consumer (and producer) using Pika. I want to write
> asynchronous code, but I'm having trouble getting everything working. I
> started with regular I pulled off the Pika docs, then made it work with
> multiple queues. To get it working with multiple queues I had to add some
> checks that make sure all the queues are declared and bound before trying
> to publish any messages. That worked very well.
>
>
> Our code also performs some time consuming logic, and we want our consumer
> to publish to another queue that will go to some logger. I tried to add
> this on the same channel that was consuming code, but that did not work. I
> wasn't sure if I needed a separate channel, so I tried using 2 channels on
> the same connections - 1 channel to consume, and 1 channel to produce, but
> that is not working. I notice when the code is trying to publish the 2nd
> log message that everything seems to stop. Can anyone give me any insight
> into what's going on?
>
>
> Attached are sections of my code:
>
>
> This code is setup so a lot of the multiple queue setup is handled without
> duplicating any code:
>
> from pika import spec
>
>
>
> EXCHANGE = 'customers'
>
> EXCHANGEACK = 'logs' #keep this seperate for now
>
>
>
> class ChannelContainer(object):
>
> def __init__(self, channel, exchange, append, queue_list, callback):
>
> self.channel = channel
>
> self.exchange = exchange
>
> self.append_str = append
>
> self.queue_list = queue_list
>
> self.goodQueues = 0
>
> self.channelReady = False
>
> self.try_to_continue = callback
>
>
>
>
>
> def exchange_declare(self):
>
> self.channel.exchange_declare(exchange=self.exchange,
>
> type='direct', \
>
> callback=self
> .on_exchange_declare)
>
>
>
> # Step #4
>
> def on_exchange_declare(self, frame):
>
> """
>
> Called when our exchange has been created
>
> """
>
> for queue_name in self.queue_list:
>
> full_queue_name = queue_name + self.append_str
>
>
>
> self.channel.queue_declare(queue=full_queue_name, durable=True,
> \
>
> exclusive=False, auto_delete=False,
> \
>
> callback=self.on_queue_declared)
>
>
>
>
>
> # Step #5
>
> def on_queue_declared(self, frame):
>
> """
>
> Bind the queues to the channel
>
> """
>
> if type(frame.method) == spec.Queue.DeclareOk:
>
> # Get the queue name
>
> queue_name = frame.method.queue
>
>
>
> print self.append_str, "binding", queue_name
>
>
>
> self.channel.queue_bind(exchange=self.exchange,
>
> queue=queue_name,
>
> routing_key=queue_name,
>
> callback=self.on_queue_bound)
>
>
>
> # Step #6
>
> def on_queue_bound(self, frame):
>
> """
>
> Called when RabbitMQ has told us our Queue has been declared,
>
> frame is the response from RabbitMQ
>
> """
>
> print "bound queue"
>
> self.goodQueues += 1
>
>
>
> if self.goodQueues == len(self.queue_list):
>
> self.channelReady = True
>
>
>
> #tell owner to try to continue
>
> self.try_to_continue()
>
>
>
> class TxChannel(ChannelContainer):
>
>
> def __init__(self, channel, exchange, append, queue_list, callback):
>
> ChannelContainer.__init__(self, channel, exchange, append,
> queue_list, callback)
>
>
>
>
>
> def acknowledge(self, queue_name, body):
>
> """
>
> Send the action and object to the acknowledgement queue.
>
> """
>
> full_queue_name = '%s--ack' % queue_name
>
> print "sending ack", full_queue_name
>
>
>
> self.channel.basic_publish(exchange=self.exchange,
>
> routing_key=full_queue_name,
>
> body=body)
>
>
>
>
>
> class RxChannel(ChannelContainer):
>
>
> def __init__(self, channel, exchange, append, queue_list, callback):
>
> ChannelContainer.__init__(self, channel, exchange, append,
> queue_list, callback)
>
>
>
> # Step #6
>
> def consume(self, callback):
>
> """
>
> Called when RabbitMQ has told us our Queue has been declared,
>
> frame is the response from RabbitMQ
>
> """
>
>
> #consume on all queues now
>
> for queue_name in self.queue_list:
>
> self.channel.basic_consume(callback, queue=queue_name)
>
>
>
>
>
>
> Here is the consumer code. I tried to take out everything irrelevant:
>
> import pika
>
> from pika.adapters import SelectConnection
>
>
> from ChannelContainer import RxChannel, TxChannel
>
>
>
> EXCHANGE = 'customers'
>
> EXCHANGEACK = 'logs' #keep this seperate for now
>
>
>
>
> class RabbitConsumer(object):
>
>
>
> def __init__(self):
>
> #initialize some variables
>
> self.queue_list = []
>
> self.goodQueues = 0
>
> self.channel_rx = None
>
> self.channel_tx = None
>
> self.connection = None
>
>
>
> self.parse_config()
>
>
>
> #get the queue_list
>
> self.gather_queue_names()
>
>
>
>
>
>
>
> """
>
> Functions to parse data and load config
>
> """
>
>
> def parse_config(self):
>
> """
>
> Parse the variables in the config file into the instance.
>
> """
>
> self.rabbitmq_host = 'localhost'
>
>
>
>
>
> def gather_queue_names(self):
>
> """
>
> Gather the names of the queues.
>
> """
>
> #These are just for testing
>
> self.queue_list = ['1a', '2b', '3c-4d-5e-6f-7g']
>
>
>
>
>
>
>
>
>
>
> """
>
> Functions to connect to RabbitMQ
>
> """
>
> # Step #2
>
> def on_connected(self, connection):
>
> """
>
> Called when we are fully connected to RabbitMQ
>
> """
>
> # Open a channel for tx and rx
>
> self.connection.channel(self.on_channel_open)
>
> self.connection.channel(self.on_channel_open)
>
>
>
>
>
> # Step #3
>
> def on_channel_open(self, new_channel):
>
> """
>
> Called when our channel has opened
>
> """
>
> if self.channel_rx is None:
>
> #rx channel
>
> self.channel_rx = RxChannel(new_channel, EXCHANGE,
>
> '', self.queue_list, self
> .start_listening)
>
> self.channel_rx.exchange_declare()
>
> else:
>
> self.channel_tx = TxChannel(new_channel, EXCHANGEACK,
>
> '--ack', self.queue_list, self
> .start_listening)
>
> self.channel_tx.exchange_declare()
>
>
>
>
>
> # Step #7
>
> def start_listening(self):
>
> """
>
> Called when all queues for a channel are declared
>
> """
>
> print "test_allset"
>
>
>
> if self.channel_rx.channelReady and self.channel_tx.channelReady:
>
> #both channels are ready, try to continue
>
> self.channel_rx.consume(self.handle_consume)
>
>
>
>
>
> def handle_consume(self, channel, method, properties, body):
>
> print "received message"
>
> queue_name = method.routing_key
>
>
>
> # Insert some long and complicated code goes here
>
> result = 'I finished!'
>
>
>
> #have the tx channel send an ack
>
> self.channel_tx.acknowledge(queue_name, result)
>
>
>
> # Step #1
>
> def start_consumer(self):
>
> # Step #1: Connect to RabbitMQ
>
> self.connection = SelectConnection(pika.ConnectionParameters(\
>
> host=self.rabbitmq_host), self.on_connected)
>
>
>
> try:
>
> # Loop so we can communicate with RabbitMQ
>
> self.connection.ioloop.start()
>
> except KeyboardInterrupt:
>
> # Gracefully close the connection
>
> self.connection.close()
>
> # Loop until we're fully closed, will stop on its own
>
> self.connection.ioloop.start()
>
>
>
>
> if __name__ == '__main__':
>
> consumer = RabbitConsumer()
>
> consumer.start_consumer()
>
> Thanks!
>
> --
> Charles Law
> Watch how we make online advertising simple: http://bit.ly/Ent_vid
> www.openx.com <http://www.openx.org/> | follow us on: Twitter<http://twitter.com/openx>
> Facebook <http://www.facebook.com/OpenX> LinkedIn<http://www.linkedin.com/company/openx/products>
>
>
--
Charles Law Software Developer
Watch how we make online advertising simple: http://bit.ly/Ent_vid
www.openx.com <http://www.openx.org/> | follow us on:
Twitter<http://twitter.com/openx>
Facebook <http://www.facebook.com/OpenX>
LinkedIn<http://www.linkedin.com/company/openx/products>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120619/27d81051/attachment.htm>
More information about the rabbitmq-discuss
mailing list