[rabbitmq-discuss] Pika: Error when doing a stop_consuming() from a timeout callback

Christian Haintz christian.haintz at gmail.com
Thu Jul 28 23:45:40 BST 2011


Hi,

I want to stop the consuming loop of a BlockingConnection after some 
timeout. I used the following code. Producer is not running because I
want to simulate the timeout. Current pika version is used from github.

--------8<--------- snip ----------8<-----------
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
                                 host='localhost'))

channel = connection.channel()

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

def on_response(self, ch, method, props, body):
    #do some processing
    channel.stop_consuming()

# normaly i do some publish here but that's not relevant
# to reproduce the error

channel.basic_consume(on_response, no_ack=True,
    queue=callback_queue)

def panic():
    channel.stop_consuming()

connection.add_timeout(1, panic)
channel.start_consuming()

--------8<------- snip end --------8<-----------

But I get the following error:

--------8<--------- snip ----------8<-----------
$ python pika_test.py
/Users/christian/devel/pika/pika/callback.py:69: UserWarning: 
CallbackManager.add: Duplicate callback found for "1:Basic.CancelOk"
   (self.__class__.__name__, prefix, key))
Traceback (most recent call last):
   File "pika_test.py", line 21, in <module>
     channel.start_consuming()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
307, in start_consuming
     self.transport.connection.process_data_events()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
104, in process_data_events
     self.process_timeouts()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
164, in process_timeouts
     self._timeouts[timeout_id]['handler']()
   File "pika_test.py", line 18, in panic
     channel.stop_consuming()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
318, in stop_consuming
     self.basic_cancel(consumer_tag)
   File "/Users/christian/devel/pika/pika/channel.py", line 333, in 
basic_cancel
     self._on_cancel_ok, [spec.Basic.CancelOk])
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
220, in rpc
     self.send_method(method, None, wait)
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
249, in send_method
     self.connection.process_data_events()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
104, in process_data_events
     self.process_timeouts()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
164, in process_timeouts
     self._timeouts[timeout_id]['handler']()
   File "pika_test.py", line 18, in panic
     channel.stop_consuming()
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
318, in stop_consuming
     self.basic_cancel(consumer_tag)
   File "/Users/christian/devel/pika/pika/channel.py", line 333, in 
basic_cancel
     self._on_cancel_ok, [spec.Basic.CancelOk])
   File 
"/Users/christian/devel/pika/pika/adapters/blocking_connection.py", line 
209, in rpc
     self._on_rpc_complete)
TypeError: 'NoneType' object is not iterable


What I am doing wrong? Are the timeout callbacks not capable of stopping 
the consumer?


Regards,
Christian


More information about the rabbitmq-discuss mailing list