[rabbitmq-discuss] Pika: Error when doing a stop_consuming() from a timeout callback

Christian Haintz christian.haintz at gmail.com
Fri Jul 29 00:10:10 BST 2011


On 29/7/11 1:05, Gavin M. Roy wrote:
> I'll take a look shortly, mind opening a issue on github for me to track
> with?

Opened an issue on github (https://github.com/pika/pika/issues/75)

Thank you for your help!

Best Regards,
Christian


>
> On Thursday, July 28, 2011 at 7:04 PM, Christian Haintz wrote:
>
>> On 29/7/11 24:54, Gavin M. Roy wrote:
>>>
>>> On Thursday, July 28, 2011 at 6:45 PM, Christian Haintz wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to stop the consuming loop of a BlockingConnection after some
>>>> timeout. I used the following code. Producer is not running because I
>>>> want to simulate the timeout. Current pika version is used from github.
>>> I'd not consider the master branch on Pika stable, which is one of the
>>> reasons I've not tagged 0.9.6. I'd do a pip install of 0.9.5 and see if
>>> that solves your problem.
>>>
>>> Gavin
>>
>> I tried it first with 0.9.5 but that leads to a different error which
>> seams to be fixed in the master branch (used the same test code as in
>> the previous e-mail):
>>
>> --------8<--------- snip ----------8<-----------
>> python pika_test.py
>> Traceback (most recent call last):
>> File "pika_test.py", line 21, in <module>
>> channel.start_consuming()
>> File
>> "build/bdist.macosx-10.6-universal/egg/pika/adapters/blocking_connection.py",
>>
>> line 293, in start_consuming
>> File
>> "build/bdist.macosx-10.6-universal/egg/pika/adapters/blocking_connection.py",
>>
>> line 103, in process_data_events
>> File
>> "build/bdist.macosx-10.6-universal/egg/pika/adapters/blocking_connection.py",
>>
>> line 157, in process_timeouts
>> NameError: global name 'log' is not defined
>> --------8<------- snip end --------8<-----------
>>
>> Best Regards,
>> Christian
>>
>>
>>>
>>>     --------8<--------- snip ----------8<-----------
>>>     import pika
>>>     connection = pika.BlockingConnection(pika.ConnectionParameters(
>>>     host='localhost'))
>>>
>>>     channel = connection.channel()
>>>
>>>     result = channel.queue_declare(exclusive=True)
>>>     callback_queue = result.method.queue
>>>
>>>     def on_response(self, ch, method, props, body):
>>>     #do some processing
>>>     channel.stop_consuming()
>>>
>>>     # normaly i do some publish here but that's not relevant
>>>     # to reproduce the error
>>>
>>>     channel.basic_consume(on_response, no_ack=True,
>>>     queue=callback_queue)
>>>
>>>     def panic():
>>>     channel.stop_consuming()
>>>
>>>     connection.add_timeout(1, panic)
>>>     channel.start_consuming()
>>>
>>>     --------8<------- snip end --------8<-----------
>>>
>>>     But I get the following error:
>>>
>>>     --------8<--------- snip ----------8<-----------
>>>     $ python pika_test.py
>>>     /Users/christian/devel/pika/pika/callback.py:69: UserWarning:
>>>     CallbackManager.add: Duplicate callback found for "1:Basic.CancelOk"
>>>     (self.__class__.__name__, prefix, key))
>>>     Traceback (most recent call last):
>>>     File "pika_test.py", line 21, in <module>
>>>     channel.start_consuming()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     307, in start_consuming
>>>     self.transport.connection.process_data_events()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     104, in process_data_events
>>>     self.process_timeouts()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     164, in process_timeouts
>>>     self._timeouts[timeout_id]['handler']()
>>>     File "pika_test.py", line 18, in panic
>>>     channel.stop_consuming()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     318, in stop_consuming
>>>     self.basic_cancel(consumer_tag)
>>>     File "/Users/christian/devel/pika/pika/channel.py", line 333, in
>>>     basic_cancel
>>>     self._on_cancel_ok, [spec.Basic.CancelOk])
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     220, in rpc
>>>     self.send_method(method, None, wait)
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     249, in send_method
>>>     self.connection.process_data_events()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     104, in process_data_events
>>>     self.process_timeouts()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     164, in process_timeouts
>>>     self._timeouts[timeout_id]['handler']()
>>>     File "pika_test.py", line 18, in panic
>>>     channel.stop_consuming()
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     318, in stop_consuming
>>>     self.basic_cancel(consumer_tag)
>>>     File "/Users/christian/devel/pika/pika/channel.py", line 333, in
>>>     basic_cancel
>>>     self._on_cancel_ok, [spec.Basic.CancelOk])
>>>     File
>>>     "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>>     line
>>>     209, in rpc
>>>     self._on_rpc_complete)
>>>     TypeError: 'NoneType' object is not iterable
>>>
>>>
>>>     What I am doing wrong? Are the timeout callbacks not capable of
>>>     stopping
>>>     the consumer?
>>>
>>>
>>>     Regards,
>>>     Christian
>>>     _______________________________________________
>>>     rabbitmq-discuss mailing list
>>>     rabbitmq-discuss at lists.rabbitmq.com
>>>     <mailto:rabbitmq-discuss at lists.rabbitmq.com>
>>>     <mailto:rabbitmq-discuss at lists.rabbitmq.com>
>>>     https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>



More information about the rabbitmq-discuss mailing list