[rabbitmq-discuss] Pika: Error when doing a stop_consuming() from a timeout callback
Christian Haintz
christian.haintz at gmail.com
Fri Jul 29 00:10:10 BST 2011
On 29/7/11 1:05, Gavin M. Roy wrote:
> I'll take a look shortly, mind opening a issue on github for me to track
> with?
Opened an issue on github (https://github.com/pika/pika/issues/75)
Thank you for your help!
Best Regards,
Christian
>
> On Thursday, July 28, 2011 at 7:04 PM, Christian Haintz wrote:
>
>> On 29/7/11 24:54, Gavin M. Roy wrote:
>>>
>>> On Thursday, July 28, 2011 at 6:45 PM, Christian Haintz wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to stop the consuming loop of a BlockingConnection after some
>>>> timeout. I used the following code. Producer is not running because I
>>>> want to simulate the timeout. Current pika version is used from github.
>>> I'd not consider the master branch on Pika stable, which is one of the
>>> reasons I've not tagged 0.9.6. I'd do a pip install of 0.9.5 and see if
>>> that solves your problem.
>>>
>>> Gavin
>>
>> I tried it first with 0.9.5 but that leads to a different error which
>> seams to be fixed in the master branch (used the same test code as in
>> the previous e-mail):
>>
>> --------8<--------- snip ----------8<-----------
>> python pika_test.py
>> Traceback (most recent call last):
>> File "pika_test.py", line 21, in <module>
>> channel.start_consuming()
>> File
>> "build/bdist.macosx-10.6-universal/egg/pika/adapters/blocking_connection.py",
>>
>> line 293, in start_consuming
>> File
>> "build/bdist.macosx-10.6-universal/egg/pika/adapters/blocking_connection.py",
>>
>> line 103, in process_data_events
>> File
>> "build/bdist.macosx-10.6-universal/egg/pika/adapters/blocking_connection.py",
>>
>> line 157, in process_timeouts
>> NameError: global name 'log' is not defined
>> --------8<------- snip end --------8<-----------
>>
>> Best Regards,
>> Christian
>>
>>
>>>
>>> --------8<--------- snip ----------8<-----------
>>> import pika
>>> connection = pika.BlockingConnection(pika.ConnectionParameters(
>>> host='localhost'))
>>>
>>> channel = connection.channel()
>>>
>>> result = channel.queue_declare(exclusive=True)
>>> callback_queue = result.method.queue
>>>
>>> def on_response(self, ch, method, props, body):
>>> #do some processing
>>> channel.stop_consuming()
>>>
>>> # normaly i do some publish here but that's not relevant
>>> # to reproduce the error
>>>
>>> channel.basic_consume(on_response, no_ack=True,
>>> queue=callback_queue)
>>>
>>> def panic():
>>> channel.stop_consuming()
>>>
>>> connection.add_timeout(1, panic)
>>> channel.start_consuming()
>>>
>>> --------8<------- snip end --------8<-----------
>>>
>>> But I get the following error:
>>>
>>> --------8<--------- snip ----------8<-----------
>>> $ python pika_test.py
>>> /Users/christian/devel/pika/pika/callback.py:69: UserWarning:
>>> CallbackManager.add: Duplicate callback found for "1:Basic.CancelOk"
>>> (self.__class__.__name__, prefix, key))
>>> Traceback (most recent call last):
>>> File "pika_test.py", line 21, in <module>
>>> channel.start_consuming()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 307, in start_consuming
>>> self.transport.connection.process_data_events()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 104, in process_data_events
>>> self.process_timeouts()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 164, in process_timeouts
>>> self._timeouts[timeout_id]['handler']()
>>> File "pika_test.py", line 18, in panic
>>> channel.stop_consuming()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 318, in stop_consuming
>>> self.basic_cancel(consumer_tag)
>>> File "/Users/christian/devel/pika/pika/channel.py", line 333, in
>>> basic_cancel
>>> self._on_cancel_ok, [spec.Basic.CancelOk])
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 220, in rpc
>>> self.send_method(method, None, wait)
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 249, in send_method
>>> self.connection.process_data_events()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 104, in process_data_events
>>> self.process_timeouts()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 164, in process_timeouts
>>> self._timeouts[timeout_id]['handler']()
>>> File "pika_test.py", line 18, in panic
>>> channel.stop_consuming()
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 318, in stop_consuming
>>> self.basic_cancel(consumer_tag)
>>> File "/Users/christian/devel/pika/pika/channel.py", line 333, in
>>> basic_cancel
>>> self._on_cancel_ok, [spec.Basic.CancelOk])
>>> File
>>> "/Users/christian/devel/pika/pika/adapters/blocking_connection.py",
>>> line
>>> 209, in rpc
>>> self._on_rpc_complete)
>>> TypeError: 'NoneType' object is not iterable
>>>
>>>
>>> What I am doing wrong? Are the timeout callbacks not capable of
>>> stopping
>>> the consumer?
>>>
>>>
>>> Regards,
>>> Christian
>>> _______________________________________________
>>> rabbitmq-discuss mailing list
>>> rabbitmq-discuss at lists.rabbitmq.com
>>> <mailto:rabbitmq-discuss at lists.rabbitmq.com>
>>> <mailto:rabbitmq-discuss at lists.rabbitmq.com>
>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>
>
More information about the rabbitmq-discuss
mailing list