I wrote some test code to understand how amqplib and RabbitMQ work together and I ran into an issue I&#39;m trying to figure out if this is a bug or user error on my part.<div><br></div><div>Scenario is that I wanted to simulate an RPC client/Server the client sends a message to the server and the server will bounce back an answer in the reply to address.</div>
<div><br></div><div>To simulate the RPC I created a queue client side that I cache for all of my requests as I found making a new queue for each call makes things much slower. �</div><div><br></div><div>Everything works good for the first pass where I send 100k request/response pairs and I&#39;m getting on the order of 4-5k rpc calls per second. �To make amqplib have blocking semantics I registered a callback with basic_consume and have that deposit messages that are read into a blocking queue which the client thread reads from this; it seems to work ok as far as I can tell.</div>
<div><br></div><div>Problem I&#39;m seeing is that on the second run of the program I get the following errors:</div><div><br></div><div>==============client error=================</div><div><div>Traceback (most recent call last):</div>
<div>��File &quot;amqp_auth_client.py&quot;, line 58, in &lt;module&gt;</div><div>�� �client.close()</div><div>��File &quot;/andorian2/xsa/mgmt-plane/xsa/mgmt/configd/base_amqp_client.py&quot;, line 171, in close</div><div>
�� �self.chan.basic_cancel(queue_name)</div><div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/channel.py&quot;, line 1704, in basic_cancel</div><div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py&quot;, line 105, in wait</div>
<div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/channel.py&quot;, line 273, in _close</div><div>amqplib.client_0_8.exceptions.AMQPChannelException: (406, u&#39;PRECONDITION_FAILED - timeout waiting for channel.flow_ok{active=false}&#39;, (0, 0), &#39;&#39;)</div>
</div><div>=======================================</div><div><br></div><div>============== matching server error ================</div><div><div>Traceback (most recent call last):</div><div>��File &quot;amqp_auth_server.py&quot;, line 28, in &lt;module&gt;</div>
<div>�� �client.waitOnCallback()</div><div>��File &quot;/andorian2/xsa/mgmt-plane/xsa/mgmt/configd/base_amqp_client.py&quot;, line 144, in waitOnCallback</div><div>�� �self.chan.wait()</div><div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py&quot;, line 89, in wait</div>
<div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/connection.py&quot;, line 218, in _wait_method</div><div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py&quot;, line 105, in wait</div>
<div>��File &quot;build/bdist.linux-x86_64/egg/amqplib/client_0_8/connection.py&quot;, line 367, in _close</div><div>amqplib.client_0_8.exceptions.AMQPConnectionException: (503, u&#39;COMMAND_INVALID - basic.publish received after channel.flow_ok{active=false}&#39;, (60, 40), &#39;Channel.basic_publish&#39;)</div>
</div><div>==============================================</div><div><br></div><div><br></div><div>When this is run I start RabbitMQ, start my server process and then I start the client process. �The server stays running for both passes of the test and the client runs and then exits and then I run it a second time and it gets the above errors.</div>
<div><br></div><div>Hoping someone can point me to the probable cause here. �Here are some details on the code that is run:</div><div><br></div><div>The queue to receive the response is declared as:</div><div>========CLIENT QUEUE FOR RPC RESPONSE=============</div>
<div><div>retval = self.chan.queue_declare(durable=False,�</div><div>�� � � � � � � � � � � � � � � � � � � � � � exclusive=True,�</div><div>�� � � � � � � � � � � � � � � � � � � � � � auto_delete=True)</div><div>queue_name = retval[0]</div>
<div>self.chan.queue_bind(queue=queue_name,�</div><div>�� � � � � � � � � � � � � � � � exchange=BaseMessageClient.exchange_name,�</div><div>�� � � � � � � � � � � � � � � � routing_key=queue_name)</div><div>=====================</div>
<div><br></div><div>The queue on the server side is declared as follows:</div><div><br></div><div>==================SERVER QUEUE ======================</div><div><div>retval = self.chan.queue_declare(</div><div>�� � � � � � � �durable=True,�</div>
<div>�� � � � � � � �exclusive=False,�</div><div>�� � � � � � � �auto_delete=False</div><div>�� � � � � �)</div><div>queue_name = retval[0]</div><div>self.subjectToQueue[subject] = queue_name</div><div>self.chan.queue_bind(queue=self.subjectToQueue[subject],�</div>
<div>�� � � � � � � � � � � � � � � � exchange=BaseMessageClient.exchange_name,�</div><div>�� � � � � � � � � � � � � � � � routing_key=subject)</div></div><div>=====================================================</div><div>
<br></div><div><br></div><div>Server method in the stack is pretty simple just doing this:</div><div><br></div><div><div>�� �def waitOnCallback(self):</div><div>�� � � �# Waits on whatever callbacks are registered for this client</div>
<div>�� � � �# allowed_methods=[callback]</div><div>�� � � �while True:</div><div>�� � � � � �self.chan.wait()</div><div><br></div><div><br></div><div>Client method is doing this:</div><div><div>�� �def close(self):</div>
<div>�� � � �with self._responseQueueLock:</div><div>�� � � � � �while not self._responseQueueCache.empty():</div><div>�� � � � � � � �(message_queue,�</div><div>�� � � � � � � � queue_name) = self._responseQueueCache.get(True)</div>
<div>�� � � � � � � �#self.chan.queue_delete(queue=queue_name)</div><div>�� � � � � � � �self.chan.basic_cancel(queue_name)</div><div>�� � � �self.chan.close()</div><div>�� � � �self.conn.close()</div><div><br></div><div>
Where _responseQueueCache is just a structure that holds the response queues so they can be re-used. �In normal condition there is just one of these used as I use a single thread to do the processing on the client. � Since this works on the first pass not sure if I need to do additional clean up or if something is wrong in the close method? �I did notice that it does seem to hang for what seems like a long time when it is shutting down maybe 3-4 seconds at the end of the first pass.</div>
<div><br></div><div><br></div><div>Thank you any advice is appreciated,</div><div><br></div><div>Jared</div></div><div><br></div></div></div>