I have found a couple quirks:<div><meta charset="utf-8">- publishing a single message to a single queue works</div><div>- publishing multiple messages to a single queue still crashes the code</div><div>Still no fix to the problem though.<br>
<br><div class="gmail_quote">On Mon, Jun 18, 2012 at 10:26 AM, Charles Law <span dir="ltr">&lt;<a href="mailto:charles.law@openx.com" target="_blank">charles.law@openx.com</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">








<p>I am writing a consumer (and producer) using Pika.� I want to write asynchronous code, but I&#39;m having trouble getting everything working.� I started with regular I pulled off the Pika docs, then made it work with multiple queues.� To get it working with multiple queues I had to add some checks that make sure all the queues are declared and bound before trying to publish any messages.� That worked very well.</p>


<p><br></p>
<p>Our code also performs some time consuming logic, and we want our consumer to publish to another queue that will go to some logger.� I tried to add this on the same channel that was consuming code, but that did not work.� I wasn&#39;t sure if I needed a separate channel, so I tried using 2 channels on the same connections - 1 channel to consume, and 1 channel to produce, but that is not working.� I notice when the code is trying to publish the 2nd log message that everything seems to stop.� Can anyone give me any insight into what&#39;s going on?</p>


<p><br></p>
<p>Attached are sections of my code:</p>
<p><br></p>
<p>This code is setup so a lot of the multiple queue setup is handled without duplicating any code:</p>
<p><font color="#000000"><span>from</span> pika <span>import</span> spec</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000">EXCHANGE = <span>&#39;customers&#39;</span></font></p>
<p><font color="#000000"><span>EXCHANGEACK = </span><span>&#39;logs&#39;</span><span> </span>#keep this seperate for now</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>class</span><span> </span>ChannelContainer<span>(object):</span></font></p>
<p><font color="#000000">� � <span>def</span> <span>__init__</span>(<span>self</span>, channel, exchange, append, queue_list, callback):</font></p>
<p><font color="#000000">� � � � <span>self</span>.channel = channel</font></p>
<p><font color="#000000">� � � � <span>self</span>.exchange = exchange</font></p>
<p><font color="#000000">� � � � <span>self</span>.append_str = append</font></p>
<p><font color="#000000">� � � � <span>self</span>.queue_list = queue_list</font></p>
<p><font color="#000000">� � � � <span>self</span>.goodQueues = <span>0</span></font></p>
<p><font color="#000000">� � � � <span>self</span>.channelReady = <span>False</span></font></p>
<p><font color="#000000">� � � � <span>self</span>.try_to_continue = callback</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>exchange_declare<span>(</span><span>self</span><span>):</span></font></p>

<p><font color="#000000">� � � � <span>self</span>.channel.exchange_declare(exchange=<span>self</span>.exchange,</font></p>
<p><font color="#000000">� � � � � � � � � � � � � � � � � � � � � type=<span>&#39;direct&#39;</span>, \</font></p>
<p><font color="#000000">� � � � � � � � � � � � � � � � � � � � � callback=<span>self</span>.on_exchange_declare)</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000"><span>� � </span># Step #4</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>on_exchange_declare<span>(</span><span>self</span><span>, frame):</span></font></p>

<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Called when our exchange has been created</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;<span> � � � �</span></font></p>
<p><font color="#000000">� � � � <span>for</span> queue_name <span>in</span> <span>self</span>.queue_list:</font></p>
<p><font color="#000000">� � � � � � full_queue_name = queue_name + <span>self</span>.append_str</font></p>
<p><font color="#000000">�� � � � � �</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channel.queue_declare(queue=full_queue_name, durable=<span>True</span>, \</font></p>
<p><font color="#000000">�� � � � � � � � � � � � � � � � � � � exclusive=<span>False</span>, auto_delete=<span>False</span>, \</font></p>
<p><font color="#000000">�� � � � � � � � � � � � � � � � � � � callback=<span>self</span>.on_queue_declared)</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000"><span>� � </span># Step #5</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>on_queue_declared<span>(</span><span>self</span><span>, frame):</span></font></p>

<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Bind the queues to the channel</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000">� � � � <span>if</span> type(frame.method) == spec.Queue.DeclareOk:</font></p>
<p><font color="#000000"><span>� � � � � � </span># Get the queue name</font></p>
<p><font color="#000000">� � � � � � queue_name = frame.method.queue</font></p>
<p><font color="#000000">�� � � � � �</font></p>
<p><font color="#000000">� � � � � � <span>print</span> <span>self</span>.append_str, <span>&quot;binding&quot;</span>, queue_name</font></p>
<p><font color="#000000">�� � � � � �</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channel.queue_bind(exchange=<span>self</span>.exchange,</font></p>
<p><font color="#000000">� � � � � � � � � � � � � � � � � � queue=queue_name,</font></p>
<p><font color="#000000">� � � � � � � � � � � � � � � � � � routing_key=queue_name,�</font></p>
<p><font color="#000000">� � � � � � � � � � � � � � � � � � callback=<span>self</span>.on_queue_bound)</font></p>
<p><font color="#000000">�� � � � � �</font></p>
<p><font color="#000000"><span>� � </span># Step #6</font></p>
<p><font color="#000000">� � <span>def</span> <span>on_queue_bound</span>(<span>self</span>, frame):</font></p>
<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Called when RabbitMQ has told us our Queue has been declared,</font></p>
<p><font color="#000000">� � � � � � frame is the response from RabbitMQ</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000"><span>� � � � </span><span>print</span><span> </span>&quot;bound queue&quot;</font></p>
<p><font color="#000000">� � � � <span>self</span>.goodQueues += <span>1</span></font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">� � � � <span>if</span> <span>self</span>.goodQueues == len(<span>self</span>.queue_list):</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channelReady = <span>True</span></font></p>
<p><font color="#000000">�� � � � � �</font></p>
<p><font color="#000000"><span>� � � � � � </span>#tell owner to try to continue</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.try_to_continue()</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>class</span> <span>TxChannel</span>(ChannelContainer):</font></p>
<p><br></p>
<p><font color="#000000">� � <span>def</span> <span>__init__</span>(<span>self</span>, channel, exchange, append, queue_list, callback):</font></p>
<p><font color="#000000">� � � � ChannelContainer.__init__(<span>self</span>, channel, exchange, append, queue_list, callback)</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">� � <span>def</span> <span>acknowledge</span>(<span>self</span>, queue_name, body):</font></p>
<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Send the action and object to the acknowledgement queue.</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000">� � � � full_queue_name = <span>&#39;%s--ack&#39;</span> % queue_name</font></p>
<p><font color="#000000">� � � � <span>print</span> <span>&quot;sending ack&quot;</span>, full_queue_name</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">� � � � <span>self</span>.channel.basic_publish(exchange=<span>self</span>.exchange,</font></p>
<p><font color="#000000">�� � � � � � � � � � � � � � � � � routing_key=full_queue_name,</font></p>
<p><font color="#000000">�� � � � � � � � � � � � � � � � � body=body)</font></p>
<p><br></p>
<p><font color="#000000">�� � � �</font></p>
<p><br></p>
<p><font color="#000000"><span>class</span> <span>RxChannel</span>(ChannelContainer):</font></p>
<p><br></p>
<p><font color="#000000">� � <span>def</span> <span>__init__</span>(<span>self</span>, channel, exchange, append, queue_list, callback):</font></p>
<p><font color="#000000">� � � � ChannelContainer.__init__(<span>self</span>, channel, exchange, append, queue_list, callback)</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>� � </span># Step #6</font></p>
<p><font color="#000000">� � <span>def</span> <span>consume</span>(<span>self</span>, callback):</font></p>
<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Called when RabbitMQ has told us our Queue has been declared,</font></p>
<p><font color="#000000">� � � � � � frame is the response from RabbitMQ</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><br></p>
<p><font color="#000000"><span>� � � � </span>#consume on all queues now</font></p>
<p><font color="#000000">� � � � <span>for</span> queue_name <span>in</span> <span>self</span>.queue_list:</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channel.basic_consume(callback, queue=queue_name)</font></p>
<p>�� �</p>
<p><br></p>
<p><br></p>
<p><br></p>
<p>Here is the consumer code. �I tried to take out everything irrelevant:</p>
<p><font color="#000000">import<span> pika</span></font></p>
<p><font color="#000000"><span>from</span> pika.adapters <span>import</span> SelectConnection</font></p>
<p><br></p>
<p><font color="#000000"><span>from</span> ChannelContainer <span>import</span> <span>RxChannel</span>, <span>TxChannel</span></font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000">EXCHANGE = <span>&#39;customers&#39;</span></font></p>
<p><font color="#000000"><span>EXCHANGEACK = </span><span>&#39;logs&#39;</span><span> </span>#keep this seperate for now</font></p>
<p><br></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>class</span><span> </span>RabbitConsumer<span>(object):</span></font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>__init__<span>(</span><span>self</span><span>):</span></font></p>

<p><font color="#000000"><span>� � � � </span>#initialize some variables</font></p>
<p><font color="#000000">� � � � <span>self</span>.queue_list = []</font></p>
<p><font color="#000000">� � � � <span>self</span>.goodQueues = <span>0</span></font></p>
<p><font color="#000000">� � � � <span>self</span>.channel_rx = <span>None</span></font></p>
<p><font color="#000000">� � � � <span>self</span>.channel_tx = <span>None</span></font></p>
<p><font color="#000000">� � � � <span>self</span>.connection = <span>None</span></font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">� � � � <span>self</span>.parse_config()</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000"><span>� � � � </span>#get the queue_list</font></p>
<p><font color="#000000">� � � � <span>self</span>.gather_queue_names()</font></p>
<p><font color="#000000">� � � �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">� � �</font></p>
<p><font color="#000000">� � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � Functions to parse data and load config</font></p>
<p><font color="#000000">� � &quot;&quot;&quot;</font></p>
<p><br></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>parse_config<span>(</span><span>self</span><span>):</span></font></p>

<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Parse the variables in the config file into the instance.</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000">� � � � <span>self</span>.rabbitmq_host = <span>&#39;localhost&#39;</span></font></p>
<p><font color="#000000">�� � � � � � � �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>gather_queue_names<span>(</span><span>self</span><span>):</span></font></p>

<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Gather the names of the queues.</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000"><span>� � � � </span>#These are just for testing</font></p>
<p><font color="#000000">� � � � <span>self</span>.queue_list = [<span>&#39;1a&#39;</span>, <span>&#39;2b&#39;</span>, <span>&#39;3c-4d-5e-6f-7g&#39;</span>]</font></p>

<p><br></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">� � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � Functions to connect to RabbitMQ</font></p>
<p><font color="#000000">� � &quot;&quot;&quot;</font></p>
<p><font color="#000000"><span>� � </span># Step #2</font></p>
<p><font color="#000000">� � <span>def</span> <span>on_connected</span>(<span>self</span>, connection):</font></p>
<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Called when we are fully connected to RabbitMQ</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000"><span>� � � � </span># Open a channel for tx and rx</font></p>
<p><font color="#000000">� � � � <span>self</span>.connection.channel(<span>self</span>.on_channel_open)</font></p>
<p><font color="#000000">� � � � <span>self</span>.connection.channel(<span>self</span>.on_channel_open)</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000"><span>� � </span># Step #3</font></p>
<p><font color="#000000">� � <span>def</span> <span>on_channel_open</span>(<span>self</span>, new_channel):</font></p>
<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Called when our channel has opened</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000">� � � � <span>if</span> <span>self</span>.channel_rx <span>is</span> <span>None</span>:</font></p>
<p><font color="#000000">� � � � � � <span>#rx channel</span></font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channel_rx = RxChannel(new_channel, EXCHANGE,�</font></p>
<p><font color="#000000">�� � � � � � � � � � � � � � � � � � <span>&#39;&#39;</span>, <span>self</span>.queue_list, <span>self</span>.start_listening)</font></p>

<p><font color="#000000">� � � � � � <span>self</span>.channel_rx.exchange_declare()</font></p>
<p><font color="#000000">� � � � <span>else</span>:</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channel_tx = TxChannel(new_channel, EXCHANGEACK,�</font></p>
<p><font color="#000000">�� � � � � � � � � � � � � � � � � � <span>&#39;--ack&#39;</span>, <span>self</span>.queue_list, <span>self</span>.start_listening)</font></p>

<p><font color="#000000">� � � � � � <span>self</span>.channel_tx.exchange_declare()</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000"><span>� � </span># Step #7</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>start_listening<span>(</span><span>self</span><span>):</span></font></p>

<p><font color="#000000">� � � � <span>&quot;&quot;&quot;</span></font></p>
<p><font color="#000000">� � � � Called when all queues for a channel are declared</font></p>
<p><font color="#000000">� � � � &quot;&quot;&quot;</font></p>
<p><font color="#000000"><span>� � � � </span><span>print</span><span> </span>&quot;test_allset&quot;</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">� � � � <span>if</span> <span>self</span>.channel_rx.channelReady <span>and</span> <span>self</span>.channel_tx.channelReady:</font></p>

<p><font color="#000000"><span>� � � � � � </span>#both channels are ready, try to continue</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.channel_rx.consume(<span>self</span>.handle_consume)</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">�� �</font></p>
<p><font color="#000000">� � <span>def</span> <span>handle_consume</span>(<span>self</span>, channel, method, properties, body):</font></p>
<p><font color="#000000"><span>� � � � </span><span>print</span><span> </span>&quot;received message&quot;</font></p>
<p><font color="#000000">� � � � queue_name = method.routing_key</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000"><span>� � � � </span># Insert some long and complicated code goes here</font></p>
<p><font color="#000000">� � � � result = <span>&#39;I finished!&#39;</span></font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000"><span>� � � � </span>#have the tx channel send an ack</font></p>
<p><font color="#000000">� � � � <span>self</span>.channel_tx.acknowledge(queue_name, result)</font></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>� � </span># Step #1</font></p>
<p><font color="#000000"><span>� � </span><span>def</span><span> </span>start_consumer<span>(</span><span>self</span><span>):</span></font></p>

<p><font color="#000000"><span>� � � � </span># Step #1: Connect to RabbitMQ</font></p>
<p><font color="#000000">� � � � <span>self</span>.connection = SelectConnection(pika.ConnectionParameters(\</font></p>
<p><font color="#000000">� � � � � � � � host=<span>self</span>.rabbitmq_host), <span>self</span>.on_connected)</font></p>
<p><font color="#000000">�� � � �</font></p>
<p><font color="#000000">� � � � <span>try</span>:</font></p>
<p><font color="#000000"><span>� � � � � � </span># Loop so we can communicate with RabbitMQ</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.connection.ioloop.start()</font></p>
<p><font color="#000000">� � � � <span>except</span> KeyboardInterrupt:</font></p>
<p><font color="#000000"><span>� � � � � � </span># Gracefully close the connection</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.connection.close()</font></p>
<p><font color="#000000"><span>� � � � � � </span># Loop until we&#39;re fully closed, will stop on its own</font></p>
<p><font color="#000000">� � � � � � <span>self</span>.connection.ioloop.start()</font></p>
<p><br></p>
<p><br></p>
<p><br></p>
<p><font color="#000000"><span>if</span> __name__ == <span>&#39;__main__&#39;</span>:</font></p>
<p><font color="#000000">� � consumer = RabbitConsumer()</font></p>
<p><font color="#000000">� � consumer.start_consumer()</font></p><div><br></div><div>Thanks!</div><span class="HOEnZb"><font color="#888888"><div><br></div>-- <br><span style="font-family:Helvetica,Arial,sans-serif;font-size:14px;line-height:16px">Charles Law</span><div>

Watch how we make online advertising simple:�<a href="http://bit.ly/Ent_vid" style="color:rgb(0,114,188);text-decoration:none" target="_blank">http://bit.ly/Ent_vid</a></div><div><a href="http://www.openx.org/" style="color:rgb(0,114,188);text-decoration:none" target="_blank">www.openx.com</a>���|���follow us on:���<a href="http://twitter.com/openx" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Twitter</a>���<a href="http://www.facebook.com/OpenX" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Facebook</a>����<a href="http://www.linkedin.com/company/openx/products" style="color:rgb(0,114,188);text-decoration:none" target="_blank">LinkedIn</a>
</div><br>
</font></span></blockquote></div><br><br clear="all"><div><br></div>-- <br><span style="font-family:Helvetica,Arial,sans-serif;font-size:14px;line-height:16px">Charles Law</span>���<span style="font-family:Helvetica,Arial,sans-serif;font-size:11px;font-style:italic;line-height:13px">Software Developer</span><div>
Watch how we make online advertising simple:�<a href="http://bit.ly/Ent_vid" style="color:rgb(0,114,188);text-decoration:none" target="_blank">http://bit.ly/Ent_vid</a></div><div><a href="http://www.openx.org/" style="color:rgb(0,114,188);text-decoration:none" target="_blank">www.openx.com</a>���|���follow us on:���<a href="http://twitter.com/openx" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Twitter</a>���<a href="http://www.facebook.com/OpenX" style="color:rgb(0,114,188);text-decoration:none" target="_blank">Facebook</a>����<a href="http://www.linkedin.com/company/openx/products" style="color:rgb(0,114,188);text-decoration:none" target="_blank">LinkedIn</a>
</div><br>
</div>