I have found a couple quirks:<div><meta charset="utf-8">- publishing a single message to a single queue works</div><div>- publishing multiple messages to a single queue still crashes the code</div><div>Still no fix to the problem though.<br>
<br><div class="gmail_quote">On Mon, Jun 18, 2012 at 10:26 AM, Charles Law <span dir="ltr"><<a href="mailto:charles.law@openx.com" target="_blank">charles.law@openx.com</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<p>I am writing a consumer (and producer) using Pika. I want to write asynchronous code, but I'm having trouble getting everything working. I started with regular I pulled off the Pika docs, then made it work with multiple queues. To get it working with multiple queues I had to add some checks that make sure all the queues are declared and bound before trying to publish any messages. That worked very well.</p>
<p><br></p>
<p>Our code also performs some time consuming logic, and we want our consumer to publish to another queue that will go to some logger. I tried to add this on the same channel that was consuming code, but that did not work. I wasn't sure if I needed a separate channel, so I tried using 2 channels on the same connections - 1 channel to consume, and 1 channel to produce, but that is not working. I notice when the code is trying to publish the 2nd log message that everything seems to stop. Can anyone give me any insight into what's going on?</p>
<p><br></p>
<p>Attached are sections of my code:</p>
<p><br></p>
<p>This code is setup so a lot of the multiple queue setup is handled without duplicating any code:</p>
<p><font color="#000000"><span>from</span> pika <span>import</span> spec</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000">EXCHANGE = <span>'customers'</span></font></p>
<p><font color="#000000"><span>EXCHANGEACK = </span><span>'logs'</span><span> </span>#keep this seperate for now</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>class</span><span> </span>ChannelContainer<span>(object):</span></font></p>
<p><font color="#000000"> <span>def</span> <span>__init__</span>(<span>self</span>, channel, exchange, append, queue_list, callback):</font></p>
<p><font color="#000000"> <span>self</span>.channel = channel</font></p>
<p><font color="#000000"> <span>self</span>.exchange = exchange</font></p>
<p><font color="#000000"> <span>self</span>.append_str = append</font></p>
<p><font color="#000000"> <span>self</span>.queue_list = queue_list</font></p>
<p><font color="#000000"> <span>self</span>.goodQueues = <span>0</span></font></p>
<p><font color="#000000"> <span>self</span>.channelReady = <span>False</span></font></p>
<p><font color="#000000"> <span>self</span>.try_to_continue = callback</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>exchange_declare<span>(</span><span>self</span><span>):</span></font></p>
<p><font color="#000000"> <span>self</span>.channel.exchange_declare(exchange=<span>self</span>.exchange,</font></p>
<p><font color="#000000"> type=<span>'direct'</span>, \</font></p>
<p><font color="#000000"> callback=<span>self</span>.on_exchange_declare)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span># Step #4</font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>on_exchange_declare<span>(</span><span>self</span><span>, frame):</span></font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Called when our exchange has been created</font></p>
<p><font color="#000000"> """<span> </span></font></p>
<p><font color="#000000"> <span>for</span> queue_name <span>in</span> <span>self</span>.queue_list:</font></p>
<p><font color="#000000"> full_queue_name = queue_name + <span>self</span>.append_str</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>self</span>.channel.queue_declare(queue=full_queue_name, durable=<span>True</span>, \</font></p>
<p><font color="#000000"> exclusive=<span>False</span>, auto_delete=<span>False</span>, \</font></p>
<p><font color="#000000"> callback=<span>self</span>.on_queue_declared)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span># Step #5</font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>on_queue_declared<span>(</span><span>self</span><span>, frame):</span></font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Bind the queues to the channel</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"> <span>if</span> type(frame.method) == spec.Queue.DeclareOk:</font></p>
<p><font color="#000000"><span> </span># Get the queue name</font></p>
<p><font color="#000000"> queue_name = frame.method.queue</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>print</span> <span>self</span>.append_str, <span>"binding"</span>, queue_name</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>self</span>.channel.queue_bind(exchange=<span>self</span>.exchange,</font></p>
<p><font color="#000000"> queue=queue_name,</font></p>
<p><font color="#000000"> routing_key=queue_name, </font></p>
<p><font color="#000000"> callback=<span>self</span>.on_queue_bound)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span># Step #6</font></p>
<p><font color="#000000"> <span>def</span> <span>on_queue_bound</span>(<span>self</span>, frame):</font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Called when RabbitMQ has told us our Queue has been declared,</font></p>
<p><font color="#000000"> frame is the response from RabbitMQ</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"><span> </span><span>print</span><span> </span>"bound queue"</font></p>
<p><font color="#000000"> <span>self</span>.goodQueues += <span>1</span></font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>if</span> <span>self</span>.goodQueues == len(<span>self</span>.queue_list):</font></p>
<p><font color="#000000"> <span>self</span>.channelReady = <span>True</span></font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span>#tell owner to try to continue</font></p>
<p><font color="#000000"> <span>self</span>.try_to_continue()</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>class</span> <span>TxChannel</span>(ChannelContainer):</font></p>
<p><br></p>
<p><font color="#000000"> <span>def</span> <span>__init__</span>(<span>self</span>, channel, exchange, append, queue_list, callback):</font></p>
<p><font color="#000000"> ChannelContainer.__init__(<span>self</span>, channel, exchange, append, queue_list, callback)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>def</span> <span>acknowledge</span>(<span>self</span>, queue_name, body):</font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Send the action and object to the acknowledgement queue.</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"> full_queue_name = <span>'%s--ack'</span> % queue_name</font></p>
<p><font color="#000000"> <span>print</span> <span>"sending ack"</span>, full_queue_name</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>self</span>.channel.basic_publish(exchange=<span>self</span>.exchange,</font></p>
<p><font color="#000000"> routing_key=full_queue_name,</font></p>
<p><font color="#000000"> body=body)</font></p>
<p><br></p>
<p><font color="#000000"> </font></p>
<p><br></p>
<p><font color="#000000"><span>class</span> <span>RxChannel</span>(ChannelContainer):</font></p>
<p><br></p>
<p><font color="#000000"> <span>def</span> <span>__init__</span>(<span>self</span>, channel, exchange, append, queue_list, callback):</font></p>
<p><font color="#000000"> ChannelContainer.__init__(<span>self</span>, channel, exchange, append, queue_list, callback)</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span> </span># Step #6</font></p>
<p><font color="#000000"> <span>def</span> <span>consume</span>(<span>self</span>, callback):</font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Called when RabbitMQ has told us our Queue has been declared,</font></p>
<p><font color="#000000"> frame is the response from RabbitMQ</font></p>
<p><font color="#000000"> """</font></p>
<p><br></p>
<p><font color="#000000"><span> </span>#consume on all queues now</font></p>
<p><font color="#000000"> <span>for</span> queue_name <span>in</span> <span>self</span>.queue_list:</font></p>
<p><font color="#000000"> <span>self</span>.channel.basic_consume(callback, queue=queue_name)</font></p>
<p> </p>
<p><br></p>
<p><br></p>
<p><br></p>
<p>Here is the consumer code. I tried to take out everything irrelevant:</p>
<p><font color="#000000">import<span> pika</span></font></p>
<p><font color="#000000"><span>from</span> pika.adapters <span>import</span> SelectConnection</font></p>
<p><br></p>
<p><font color="#000000"><span>from</span> ChannelContainer <span>import</span> <span>RxChannel</span>, <span>TxChannel</span></font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000">EXCHANGE = <span>'customers'</span></font></p>
<p><font color="#000000"><span>EXCHANGEACK = </span><span>'logs'</span><span> </span>#keep this seperate for now</font></p>
<p><br></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>class</span><span> </span>RabbitConsumer<span>(object):</span></font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>__init__<span>(</span><span>self</span><span>):</span></font></p>
<p><font color="#000000"><span> </span>#initialize some variables</font></p>
<p><font color="#000000"> <span>self</span>.queue_list = []</font></p>
<p><font color="#000000"> <span>self</span>.goodQueues = <span>0</span></font></p>
<p><font color="#000000"> <span>self</span>.channel_rx = <span>None</span></font></p>
<p><font color="#000000"> <span>self</span>.channel_tx = <span>None</span></font></p>
<p><font color="#000000"> <span>self</span>.connection = <span>None</span></font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>self</span>.parse_config()</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span>#get the queue_list</font></p>
<p><font color="#000000"> <span>self</span>.gather_queue_names()</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Functions to parse data and load config</font></p>
<p><font color="#000000"> """</font></p>
<p><br></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>parse_config<span>(</span><span>self</span><span>):</span></font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Parse the variables in the config file into the instance.</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"> <span>self</span>.rabbitmq_host = <span>'localhost'</span></font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>gather_queue_names<span>(</span><span>self</span><span>):</span></font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Gather the names of the queues.</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"><span> </span>#These are just for testing</font></p>
<p><font color="#000000"> <span>self</span>.queue_list = [<span>'1a'</span>, <span>'2b'</span>, <span>'3c-4d-5e-6f-7g'</span>]</font></p>
<p><br></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Functions to connect to RabbitMQ</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"><span> </span># Step #2</font></p>
<p><font color="#000000"> <span>def</span> <span>on_connected</span>(<span>self</span>, connection):</font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Called when we are fully connected to RabbitMQ</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"><span> </span># Open a channel for tx and rx</font></p>
<p><font color="#000000"> <span>self</span>.connection.channel(<span>self</span>.on_channel_open)</font></p>
<p><font color="#000000"> <span>self</span>.connection.channel(<span>self</span>.on_channel_open)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span># Step #3</font></p>
<p><font color="#000000"> <span>def</span> <span>on_channel_open</span>(<span>self</span>, new_channel):</font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Called when our channel has opened</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"> <span>if</span> <span>self</span>.channel_rx <span>is</span> <span>None</span>:</font></p>
<p><font color="#000000"> <span>#rx channel</span></font></p>
<p><font color="#000000"> <span>self</span>.channel_rx = RxChannel(new_channel, EXCHANGE, </font></p>
<p><font color="#000000"> <span>''</span>, <span>self</span>.queue_list, <span>self</span>.start_listening)</font></p>
<p><font color="#000000"> <span>self</span>.channel_rx.exchange_declare()</font></p>
<p><font color="#000000"> <span>else</span>:</font></p>
<p><font color="#000000"> <span>self</span>.channel_tx = TxChannel(new_channel, EXCHANGEACK, </font></p>
<p><font color="#000000"> <span>'--ack'</span>, <span>self</span>.queue_list, <span>self</span>.start_listening)</font></p>
<p><font color="#000000"> <span>self</span>.channel_tx.exchange_declare()</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span># Step #7</font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>start_listening<span>(</span><span>self</span><span>):</span></font></p>
<p><font color="#000000"> <span>"""</span></font></p>
<p><font color="#000000"> Called when all queues for a channel are declared</font></p>
<p><font color="#000000"> """</font></p>
<p><font color="#000000"><span> </span><span>print</span><span> </span>"test_allset"</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>if</span> <span>self</span>.channel_rx.channelReady <span>and</span> <span>self</span>.channel_tx.channelReady:</font></p>
<p><font color="#000000"><span> </span>#both channels are ready, try to continue</font></p>
<p><font color="#000000"> <span>self</span>.channel_rx.consume(<span>self</span>.handle_consume)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>def</span> <span>handle_consume</span>(<span>self</span>, channel, method, properties, body):</font></p>
<p><font color="#000000"><span> </span><span>print</span><span> </span>"received message"</font></p>
<p><font color="#000000"> queue_name = method.routing_key</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span># Insert some long and complicated code goes here</font></p>
<p><font color="#000000"> result = <span>'I finished!'</span></font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"><span> </span>#have the tx channel send an ack</font></p>
<p><font color="#000000"> <span>self</span>.channel_tx.acknowledge(queue_name, result)</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span> </span># Step #1</font></p>
<p><font color="#000000"><span> </span><span>def</span><span> </span>start_consumer<span>(</span><span>self</span><span>):</span></font></p>
<p><font color="#000000"><span> </span># Step #1: Connect to RabbitMQ</font></p>
<p><font color="#000000"> <span>self</span>.connection = SelectConnection(pika.ConnectionParameters(\</font></p>
<p><font color="#000000"> host=<span>self</span>.rabbitmq_host), <span>self</span>.on_connected)</font></p>
<p><font color="#000000"> </font></p>
<p><font color="#000000"> <span>try</span>:</font></p>
<p><font color="#000000"><span> </span># Loop so we can communicate with RabbitMQ</font></p>
<p><font color="#000000"> <span>self</span>.connection.ioloop.start()</font></p>
<p><font color="#000000"> <span>except</span> KeyboardInterrupt:</font></p>
<p><font color="#000000"><span> </span># Gracefully close the connection</font></p>
<p><font color="#000000"> <span>self</span>.connection.close()</font></p>
<p><font color="#000000"><span> </span># Loop until we're fully closed, will stop on its own</font></p>
<p><font color="#000000"> <span>self</span>.connection.ioloop.start()</font></p>
<p><br></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>if</span> __name__ == <span>'__main__'</span>:</font></p>
<p><font color="#000000"> consumer = RabbitConsumer()</font></p>
<p><font color="#000000"> consumer.start_consumer()</font></p><div><br></div><div>Thanks!</div><span class="HOEnZb"><font color="#888888"><div><br></div>-- <br><span style="font-family:Helvetica,Arial,sans-serif;font-size:14px;line-height:16px">Charles Law</span><div>
Watch how we make online advertising simple: <a href="http://bit.ly/Ent_vid" style="color:rgb(0,114,188);text-decoration:none" target="_blank">http://bit.ly/Ent_vid</a></div><div><a href="http://www.openx.org/" style="color:rgb(0,114,188);text-decoration:none" target="_blank">www.openx.com</a> | follow us on: <a href="http://twitter.com/openx" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Twitter</a> <a href="http://www.facebook.com/OpenX" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Facebook</a> <a href="http://www.linkedin.com/company/openx/products" style="color:rgb(0,114,188);text-decoration:none" target="_blank">LinkedIn</a>
</div><br>
</font></span></blockquote></div><br><br clear="all"><div><br></div>-- <br><span style="font-family:Helvetica,Arial,sans-serif;font-size:14px;line-height:16px">Charles Law</span> <span style="font-family:Helvetica,Arial,sans-serif;font-size:11px;font-style:italic;line-height:13px">Software Developer</span><div>
Watch how we make online advertising simple: <a href="http://bit.ly/Ent_vid" style="color:rgb(0,114,188);text-decoration:none" target="_blank">http://bit.ly/Ent_vid</a></div><div><a href="http://www.openx.org/" style="color:rgb(0,114,188);text-decoration:none" target="_blank">www.openx.com</a> | follow us on: <a href="http://twitter.com/openx" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Twitter</a> <a href="http://www.facebook.com/OpenX" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Facebook</a> <a href="http://www.linkedin.com/company/openx/products" style="color:rgb(0,114,188);text-decoration:none" target="_blank">LinkedIn</a>
</div><br>
</div>