[rabbitmq-discuss] RabbitMQ + amqplib (python)

Jared Smith jaredtsmith at gmail.com
Mon Aug 2 17:19:14 BST 2010


I wrote some test code to understand how amqplib and RabbitMQ work together
and I ran into an issue I'm trying to figure out if this is a bug or user
error on my part.

Scenario is that I wanted to simulate an RPC client/Server the client sends
a message to the server and the server will bounce back an answer in the
reply to address.

To simulate the RPC I created a queue client side that I cache for all of my
requests as I found making a new queue for each call makes things much
slower.

Everything works good for the first pass where I send 100k request/response
pairs and I'm getting on the order of 4-5k rpc calls per second.  To make
amqplib have blocking semantics I registered a callback with basic_consume
and have that deposit messages that are read into a blocking queue which the
client thread reads from this; it seems to work ok as far as I can tell.

Problem I'm seeing is that on the second run of the program I get the
following errors:

==============client error=================
Traceback (most recent call last):
  File "amqp_auth_client.py", line 58, in <module>
    client.close()
  File "/andorian2/xsa/mgmt-plane/xsa/mgmt/configd/base_amqp_client.py",
line 171, in close
    self.chan.basic_cancel(queue_name)
  File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/channel.py", line
1704, in basic_cancel
  File
"build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py", line
105, in wait
  File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/channel.py", line
273, in _close
amqplib.client_0_8.exceptions.AMQPChannelException: (406,
u'PRECONDITION_FAILED - timeout waiting for channel.flow_ok{active=false}',
(0, 0), '')
=======================================

============== matching server error ================
Traceback (most recent call last):
  File "amqp_auth_server.py", line 28, in <module>
    client.waitOnCallback()
  File "/andorian2/xsa/mgmt-plane/xsa/mgmt/configd/base_amqp_client.py",
line 144, in waitOnCallback
    self.chan.wait()
  File
"build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py", line
89, in wait
  File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/connection.py", line
218, in _wait_method
  File
"build/bdist.linux-x86_64/egg/amqplib/client_0_8/abstract_channel.py", line
105, in wait
  File "build/bdist.linux-x86_64/egg/amqplib/client_0_8/connection.py", line
367, in _close
amqplib.client_0_8.exceptions.AMQPConnectionException: (503,
u'COMMAND_INVALID - basic.publish received after
channel.flow_ok{active=false}', (60, 40), 'Channel.basic_publish')
==============================================


When this is run I start RabbitMQ, start my server process and then I start
the client process.  The server stays running for both passes of the test
and the client runs and then exits and then I run it a second time and it
gets the above errors.

Hoping someone can point me to the probable cause here.  Here are some
details on the code that is run:

The queue to receive the response is declared as:
========CLIENT QUEUE FOR RPC RESPONSE=============
retval = self.chan.queue_declare(durable=False,
                                             exclusive=True,
                                             auto_delete=True)
queue_name = retval[0]
self.chan.queue_bind(queue=queue_name,
                                 exchange=BaseMessageClient.exchange_name,
                                 routing_key=queue_name)
=====================

The queue on the server side is declared as follows:

==================SERVER QUEUE ======================
retval = self.chan.queue_declare(
                durable=True,
                exclusive=False,
                auto_delete=False
            )
queue_name = retval[0]
self.subjectToQueue[subject] = queue_name
self.chan.queue_bind(queue=self.subjectToQueue[subject],
                                 exchange=BaseMessageClient.exchange_name,
                                 routing_key=subject)
=====================================================


Server method in the stack is pretty simple just doing this:

    def waitOnCallback(self):
        # Waits on whatever callbacks are registered for this client
        # allowed_methods=[callback]
        while True:
            self.chan.wait()


Client method is doing this:
    def close(self):
        with self._responseQueueLock:
            while not self._responseQueueCache.empty():
                (message_queue,
                 queue_name) = self._responseQueueCache.get(True)
                #self.chan.queue_delete(queue=queue_name)
                self.chan.basic_cancel(queue_name)
        self.chan.close()
        self.conn.close()

Where _responseQueueCache is just a structure that holds the response queues
so they can be re-used.  In normal condition there is just one of these used
as I use a single thread to do the processing on the client.   Since this
works on the first pass not sure if I need to do additional clean up or if
something is wrong in the close method?  I did notice that it does seem to
hang for what seems like a long time when it is shutting down maybe 3-4
seconds at the end of the first pass.


Thank you any advice is appreciated,

Jared
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100802/dd50b0ba/attachment-0001.htm>


More information about the rabbitmq-discuss mailing list