I wrote some test code to understand how amqplib and RabbitMQ work together and I ran into an issue I'm trying to figure out if this is a bug or user error on my part.<div><br></div><div>Scenario is that I wanted to simulate an RPC client/Server the client sends a message to the server and the server will bounce back an answer in the reply to address.</div>
<div><br></div><div>To simulate the RPC I created a queue client side that I cache for all of my requests as I found making a new queue for each call makes things much slower. </div><div><br></div><div>Everything works good for the first pass where I send 100k request/response pairs and I'm getting on the order of 4-5k rpc calls per second. To make amqplib have blocking semantics I registered a callback with basic_consume and have that deposit messages that are read into a blocking queue which the client thread reads from this; it seems to work ok as far as I can tell.</div>
<div><br></div><div>Problem I'm seeing is that on the second run of the program I get the following errors:</div><div><br></div><div>==============client error=================</div><div><div>Traceback (most recent call last):</div>
<div> File "amqp_auth_client.py", line 58, in <module></div><div> client.close()</div><div> File "/andorian2/xsa/mgmt-plane/xsa/mgmt/configd/base_amqp_client.py", line 171, in close</div><div>
self.chan.basic_cancel(queue_name)</div><div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/channel.py", line 1704, in basic_cancel</div><div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py", line 105, in wait</div>
<div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/channel.py", line 273, in _close</div><div>amqplib.client_0_8.exceptions.AMQPChannelException: (406, u'PRECONDITION_FAILED - timeout waiting for channel.flow_ok{active=false}', (0, 0), '')</div>
</div><div>=======================================</div><div><br></div><div>============== matching server error ================</div><div><div>Traceback (most recent call last):</div><div> File "amqp_auth_server.py", line 28, in <module></div>
<div> client.waitOnCallback()</div><div> File "/andorian2/xsa/mgmt-plane/xsa/mgmt/configd/base_amqp_client.py", line 144, in waitOnCallback</div><div> self.chan.wait()</div><div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py", line 89, in wait</div>
<div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/connection.py", line 218, in _wait_method</div><div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py", line 105, in wait</div>
<div> File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/connection.py", line 367, in _close</div><div>amqplib.client_0_8.exceptions.AMQPConnectionException: (503, u'COMMAND_INVALID - basic.publish received after channel.flow_ok{active=false}', (60, 40), 'Channel.basic_publish')</div>
</div><div>==============================================</div><div><br></div><div><br></div><div>When this is run I start RabbitMQ, start my server process and then I start the client process. The server stays running for both passes of the test and the client runs and then exits and then I run it a second time and it gets the above errors.</div>
<div><br></div><div>Hoping someone can point me to the probable cause here. Here are some details on the code that is run:</div><div><br></div><div>The queue to receive the response is declared as:</div><div>========CLIENT QUEUE FOR RPC RESPONSE=============</div>
<div><div>retval = self.chan.queue_declare(durable=False, </div><div> exclusive=True, </div><div> auto_delete=True)</div><div>queue_name = retval[0]</div>
<div>self.chan.queue_bind(queue=queue_name, </div><div> exchange=BaseMessageClient.exchange_name, </div><div> routing_key=queue_name)</div><div>=====================</div>
<div><br></div><div>The queue on the server side is declared as follows:</div><div><br></div><div>==================SERVER QUEUE ======================</div><div><div>retval = self.chan.queue_declare(</div><div> durable=True, </div>
<div> exclusive=False, </div><div> auto_delete=False</div><div> )</div><div>queue_name = retval[0]</div><div>self.subjectToQueue[subject] = queue_name</div><div>self.chan.queue_bind(queue=self.subjectToQueue[subject], </div>
<div> exchange=BaseMessageClient.exchange_name, </div><div> routing_key=subject)</div></div><div>=====================================================</div><div>
<br></div><div><br></div><div>Server method in the stack is pretty simple just doing this:</div><div><br></div><div><div> def waitOnCallback(self):</div><div> # Waits on whatever callbacks are registered for this client</div>
<div> # allowed_methods=[callback]</div><div> while True:</div><div> self.chan.wait()</div><div><br></div><div><br></div><div>Client method is doing this:</div><div><div> def close(self):</div>
<div> with self._responseQueueLock:</div><div> while not self._responseQueueCache.empty():</div><div> (message_queue, </div><div> queue_name) = self._responseQueueCache.get(True)</div>
<div> #self.chan.queue_delete(queue=queue_name)</div><div> self.chan.basic_cancel(queue_name)</div><div> self.chan.close()</div><div> self.conn.close()</div><div><br></div><div>
Where _responseQueueCache is just a structure that holds the response queues so they can be re-used. In normal condition there is just one of these used as I use a single thread to do the processing on the client. Since this works on the first pass not sure if I need to do additional clean up or if something is wrong in the close method? I did notice that it does seem to hang for what seems like a long time when it is shutting down maybe 3-4 seconds at the end of the first pass.</div>
<div><br></div><div><br></div><div>Thank you any advice is appreciated,</div><div><br></div><div>Jared</div></div><div><br></div></div></div>