I&#39;ve been drilling into an issue for a few days where my Pika channel is abruptly closed. I&#39;m pretty sure I&#39;ve found either:<br><ul><li>A large gap in my understanding</li><li>A misuse of the Rabbit/Pika APIs</li>
<li>A serious bug in Pika.</li></ul><p>My original understanding about transactions is that calling tx_select() is a synchronous operation, and that when it returns, whatever actions you&#39;ve sent to the broker are committed.</p>
<p>However, in stepping through the Pika tx_commit() code, it seems like it all it does is add the Tx.Commit message to an outgoing buffer, and that the tx_commit() call returns without anything actually sent to the broker. (I&#39;ve set breakpoints in the debugger at strategic points to verify this.)</p>
<p>In my message handler, I&#39;m writing a reply to the incoming message, using the same channel. What I&#39;m seeing (verified via WireShark) is that the Tx.Commit message is followed by some number of Basic.Publish, Content-Header, and Content-Body records corresponding to my reply message writes. This violates the AMQP standard, which says that nothing should be sent to the broker after sending a Tx.Commit and before receiving a Tx.Commit-Ok reply. (If it helps understanding, my incoming message queue has a few messages in it already, so I see a series of incoming messages immediately after starting.)<br>
</p><p>I see that the tx_commit() method takes a callback method, but if I were to simply block in my message handler, waiting for the callback to be invoked, I&#39;d deadlock because Pika&#39;s buffer processing code doesn&#39;t get a chance to run.<br>
</p>So given the above, is it me? Is it Pika?<br><br>My code looks like this:<br><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">def echo_on_connected(connection):</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� &quot;&quot;&quot;Called when we are fully connected to RabbitMQ&quot;&quot;&quot;</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� # Open a channel</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� connection.channel(echo_on_channel_open)</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace"># Step #3</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">def echo_on_channel_open(channel):</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� channel.queue_declare(queue=echo_queue_name, durable=True, arguments={&quot;x-ha-policy&quot;: &quot;all&quot;})</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� channel.tx_select()</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� channel.basic_consume(echo_handle_delivery, queue=echo_queue_name)</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">def echo_handle_delivery(channel, method, header, body):</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� print &quot;Received message %s&quot; % (body)</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� # Write a reply message on the same channel</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� channel.basic_publish(exchange=&#39;&#39;,</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� routing_key=&#39;reply_queue&#39;,</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">������� body=&quot;replying to %s&quot; % (body),</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">������� properties=BasicProperties(delivery_mode=2))</span><br style="font-family:courier new,monospace"><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� # Ack the incoming message, then force the write to be flushed.</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� channel.basic_ack(delivery_tag=method.delivery_tag)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� channel.tx_commit()</span><br style="font-family:courier new,monospace">
<br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">def echo_thread():</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� connection_parameters = ConnectionParameters(host=broker_host)</span><br style="font-family:courier new,monospace">
<span style="font-family:courier new,monospace">��� echo_connection = SelectConnection(connection_parameters, echo_on_connected)</span><br style="font-family:courier new,monospace"><span style="font-family:courier new,monospace">��� echo_connection.ioloop.start()</span><br>