[rabbitmq-discuss] Pika - how to properly publish from a periodic background thread?
Aaron Voelker
aaron at contextlogic.com
Thu Sep 22 21:26:31 BST 2011
I'm using Python 2.6.6, Ubuntu 10.10, and Pika 0.9.5.
I have a situation where I want to publish to a queue from a
background thread at a periodic interval. The main thread is consuming
on some different callback.
I assumed (incorrectly) that wrapping my callback and publish in a
mutex would solve this problem. I ran into some weird behaviour before
I realized that maybe this isn't the correct approach.
What happens is that the worker I publish to from the background
thread will send work back to the callback for the main thread.
Normally, this is fine, but in weird situations (race condition?),
after a few hours of running my application, it will get Pika into an
unrecoverable state. Some sample code which duplicates this problem,
along with the resulting error messages, is included in this message
in case someone happens to Google for this error message (which got me
next to nowhere).
What is the correct way to do this? Note that both the callback and
the scheduler have to have shared access to the same variables (hence
the mutex), and performance does NOT matter (the scheduler runs once
per minute, and doesn't do too much.
The worker->producer looks something like this (run this first and
keep it running):
#!/usr/bin/python
import pika
import time
######### Consumer->Producer Code
connection =
pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test1')
channel.queue_declare(queue='test2')
def callback(ch, method, properties, body):
print "In Callback", method.routing_key, body
channel.basic_publish(exchange='',
routing_key=properties.reply_to, body=body)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=5)
channel.basic_consume(callback, 'test1')
channel.basic_consume(callback, 'test2')
channel.start_consuming()
-----------------------------------------------------
And the periodic producer looks something like this:
#!/usr/bin/python
import pika
import time
import threading
######### Producer->Consumer Code
mutex = threading.Lock()
connection =
pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test1')
channel.queue_declare(queue='test2')
channel.queue_declare(queue='return_test1')
channel.queue_declare(queue='return_test2')
def send_msg(life):
if life <= 0: return
print "Publishing"
channel.basic_publish(exchange='', routing_key='test1',
properties=pika.BasicProperties(reply_to='return_test1'),
body=str(life))
def callback(ch, method, properties, body):
mutex.acquire()
print "In Callback", method.routing_key, body
send_msg(int(body)-1)
ch.basic_ack(delivery_tag = method.delivery_tag)
mutex.release()
def background_periodic_scheduler():
for i in range(2): # only need two iterations for this test
time.sleep(2)
mutex.acquire()
for j in range(50):
send_msg(3)
mutex.release()
threading.Thread(target=background_periodic_scheduler).start()
channel.basic_qos(prefetch_count=5)
channel.basic_consume(callback, 'return_test1')
channel.basic_consume(callback, 'return_test2')
channel.start_consuming()
-----------------------------------------------------
Then upon running this (may take a couple tries!) I get the following
error message(s):
/usr/local/lib/python2.6/dist-packages/pika/callback.py:69:
UserWarning: CallbackManager.add: Duplicate callback found for
"1:Basic.CancelOk"
(self.__class__.__name__, prefix, key))
Traceback (most recent call last):
File "./rabbittests2a.py", line 47, in <module>
channel.start_consuming()
File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 293, in start_consuming
self.transport.connection.process_data_events()
File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 94, in process_data_events
self._handle_read()
File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
base_connection.py", line 162, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.6/dist-packages/pika/connection.py",
line 589, in _on_data_available
frame) # Args
File "/usr/local/lib/python2.6/dist-packages/pika/callback.py", line
124, in process
callback(*args, **keywords)
File "/usr/local/lib/python2.6/dist-packages/pika/connection.py",
line 427, in _on_remote_close
self.close(frame.method.reply_code, frame.method.reply_text)
File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 48, in close
BaseConnection.close(self, code, text)
File "/usr/local/lib/python2.6/dist-packages/pika/connection.py",
line 383, in close
self._channels[channel_number].close(code, text)
File "/usr/local/lib/python2.6/dist-packages/pika/channel.py", line
275, in close
self.basic_cancel(consumer_tag)
File "/usr/local/lib/python2.6/dist-packages/pika/channel.py", line
336, in basic_cancel
self._on_cancel_ok, [spec.Basic.CancelOk])
File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 205, in rpc
self._on_rpc_complete)
TypeError: 'NoneType' object is not iterable
ERROR:pika:BlockingConnection: Socket Error on 7: 32
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 1599
bytes and an estimated 10 frames behind
warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 1634
bytes and an estimated 10 frames behind
warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 1673
bytes and an estimated 10 frames behind
warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
...
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 10033
bytes and an estimated 65 frames behind
warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 10072
bytes and an estimated 65 frames behind
warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 10541
bytes and an estimated 68 frames behind
warn(message % (self.outbound_buffer.size, est_frames_behind))
...
More information about the rabbitmq-discuss
mailing list