[rabbitmq-discuss] Pika - how to properly publish from a periodic background thread?

Aaron Voelker aaron at contextlogic.com
Thu Sep 22 21:26:31 BST 2011


I'm using Python 2.6.6, Ubuntu 10.10, and Pika 0.9.5.

I have a situation where I want to publish to a queue from a
background thread at a periodic interval. The main thread is consuming
on some different callback.

I assumed (incorrectly) that wrapping my callback and publish in a
mutex would solve this problem. I ran into some weird behaviour before
I realized that maybe this isn't the correct approach.

What happens is that the worker I publish to from the background
thread will send work back to the callback for the main thread.
Normally, this is fine, but in weird situations (race condition?),
after a few hours of running my application, it will get Pika into an
unrecoverable state. Some sample code which duplicates this problem,
along with the resulting error messages, is included in this message
in case someone happens to Google for this error message (which got me
next to nowhere).

What is the correct way to do this? Note that both the callback and
the scheduler have to have shared access to the same variables (hence
the mutex), and performance does NOT matter (the scheduler runs once
per minute, and doesn't do too much.

The worker->producer looks something like this (run this first and
keep it running):

#!/usr/bin/python

import pika
import time

######### Consumer->Producer Code

connection =
pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test1')
channel.queue_declare(queue='test2')

def callback(ch, method, properties, body):
    print "In Callback", method.routing_key, body
    channel.basic_publish(exchange='',
routing_key=properties.reply_to, body=body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=5)
channel.basic_consume(callback, 'test1')
channel.basic_consume(callback, 'test2')
channel.start_consuming()

-----------------------------------------------------

And the periodic producer looks something like this:

#!/usr/bin/python

import pika
import time
import threading

######### Producer->Consumer Code

mutex = threading.Lock()

connection =
pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test1')
channel.queue_declare(queue='test2')
channel.queue_declare(queue='return_test1')
channel.queue_declare(queue='return_test2')

def send_msg(life):
    if life <= 0: return
    print "Publishing"
    channel.basic_publish(exchange='', routing_key='test1',
properties=pika.BasicProperties(reply_to='return_test1'),
body=str(life))

def callback(ch, method, properties, body):
    mutex.acquire()

    print "In Callback", method.routing_key, body
    send_msg(int(body)-1)
    ch.basic_ack(delivery_tag = method.delivery_tag)

    mutex.release()

def background_periodic_scheduler():
    for i in range(2): # only need two iterations for this test
        time.sleep(2)

        mutex.acquire()
        for j in range(50):
            send_msg(3)
        mutex.release()

threading.Thread(target=background_periodic_scheduler).start()

channel.basic_qos(prefetch_count=5)
channel.basic_consume(callback, 'return_test1')
channel.basic_consume(callback, 'return_test2')
channel.start_consuming()

-----------------------------------------------------

Then upon running this (may take a couple tries!) I get the following
error message(s):

/usr/local/lib/python2.6/dist-packages/pika/callback.py:69:
UserWarning: CallbackManager.add: Duplicate callback found for
"1:Basic.CancelOk"
  (self.__class__.__name__, prefix, key))
Traceback (most recent call last):
  File "./rabbittests2a.py", line 47, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 293, in start_consuming
    self.transport.connection.process_data_events()
  File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "/usr/local/lib/python2.6/dist-packages/pika/connection.py",
line 589, in _on_data_available
    frame)                 # Args
  File "/usr/local/lib/python2.6/dist-packages/pika/callback.py", line
124, in process
    callback(*args, **keywords)
  File "/usr/local/lib/python2.6/dist-packages/pika/connection.py",
line 427, in _on_remote_close
    self.close(frame.method.reply_code, frame.method.reply_text)
  File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 48, in close
    BaseConnection.close(self, code, text)
  File "/usr/local/lib/python2.6/dist-packages/pika/connection.py",
line 383, in close
    self._channels[channel_number].close(code, text)
  File "/usr/local/lib/python2.6/dist-packages/pika/channel.py", line
275, in close
    self.basic_cancel(consumer_tag)
  File "/usr/local/lib/python2.6/dist-packages/pika/channel.py", line
336, in basic_cancel
    self._on_cancel_ok, [spec.Basic.CancelOk])
  File "/usr/local/lib/python2.6/dist-packages/pika/adapters/
blocking_connection.py", line 205, in rpc
    self._on_rpc_complete)
TypeError: 'NoneType' object is not iterable
ERROR:pika:BlockingConnection: Socket Error on 7: 32
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 1599
bytes and an estimated 10 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 1634
bytes and an estimated 10 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 1673
bytes and an estimated 10 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed

...

ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 10033
bytes and an estimated 65 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 10072
bytes and an estimated 65 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))
ERROR:pika:BlockingConnection: Socket is closed
/usr/local/lib/python2.6/dist-packages/pika/connection.py:642:
UserWarning: Pika: Write buffer exceeded warning threshold at 10541
bytes and an estimated 68 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))

...


More information about the rabbitmq-discuss mailing list