[rabbitmq-discuss] Pika Client: Bad performance for basic_publish to durable mirror queue
linbo liao
llbgurs at gmail.com
Fri Jul 5 17:11:20 BST 2013
Hi,
I encounter a bad performance issue about pika, yesterday it works fine,
but today performance of basci_publish got bad, it cost 2 or 3 second to
finish one message, sometimes down to 7 seconds.
Pika is 0.9.13
RabbitMQ 3.1.0, Erlang R14B04 Queue is Durable
Python: 2.6
Platform: CentOS release 6.3 (Final) X64
My application deployed by django + uwsgi, django receiver the request and
publish the request to rabbitmq.
*Simple Pika Client wrapper class*
import time
import logging
import pika
from pika import exceptions
logger = logging.getLogger('error')
logger.setLevel(logging.DEBUG)
class RabbitMQ:
def __init__(self, host, port, username, password):
self._channel = None
cred = pika.PlainCredentials(username, password)
self._args = dict(host=host, port=port, credentials=cred,
heartbeat_interval=25)
try:
self.reconnect()
except exceptions.AMQPConnectionError, e:
logger.error("Connection to MQ error")
self._conn = None
self._channel = None
raise
def close(self):
if getattr(self, '_channel', None) is not None:
self._channel.close()
self._channel = None
if getattr(self, '_conn', None) is not None:
self._conn.close()
self._conn = None
def close_channel(self):
if getattr(self, '_conn', None) is not None:
self._channel.close()
self._channel = None
def reconnect(self):
logger.error("reconnect to MQ ")
self.close()
self._conn =
pika.BlockingConnection(pika.ConnectionParameters(**self._args))
self._channel = self._conn.channel()
def connect_isopen(self):
return self._conn and self._conn.is_open
def channel_isopen(self):
return self._channel and self._channel.is_open
def ensure_channel(self):
if not self.connect_isopen or not self.channel_isopen():
self.reconnect()
def exchange_declare(self, name, ex_type):
self.ensure_channel()
self._channel.exchange_declare(exchange=name, exchange_type=ex_type)
def bind(self, ex, queue, routing_key):
self.ensure_channel()
self._channel.queue_bind(exchange=ex, queue=queue,
routing_key=routing_key)
def declare_queue(self, queue_name, durable=True, exclusive=False,
auto_delete=False, arguments=None):
self.ensure_channel()
self._channel.queue_declare(queue=queue_name, durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments)
def confirm_delivery(self):
self.ensure_channel()
self._channel.confirm_delivery()
def force_data(self):
self.ensure_channel()
self._channel.force_data_events(True)
def publish_setup(self, ex_name, ex_type, queue_name, routing_key,
durable, exclusive, auto_delete, arguments):
try:
self.exchange_declare(ex_name, ex_type)
self.declare_queue(queue_name, durable, exclusive, auto_delete,
arguments)
self.bind(ex_name, queue_name, routing_key)
self.force_data()
self.confirm_delivery()
except Exception, e:
logger.exception("Setup error %s" %str(e))
raise
def _publish(self, exchange, routing_key, body, delivery_mode=2):
start = time.time()
self.ensure_channel()
# Send a message
if self._channel.basic_publish(exchange=exchange,
#pika.exceptions.ConnectionClosed:
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=delivery_mode)):
#durable=True and delivery_mode=2 is persistent
end = time.time()
logger.info('cost time %f' % (end-start))
return 0 #'Message publish was confirmed'
else:
logger.info('publish error')
return -1 # 'Message publish was not confirmed'
def publish(self, exchange, routing_key, body):
try:
return self._publish(exchange, routing_key, body)
except Exception,e:
logger.exception("Publish error %s" % str(e))
raise
def setup_consume(self, callback, queue_name):
self.ensure_channel()
self._channel.basic_consume(callback, queue_name)
def start_consume(self):
self.ensure_channel()
self._channel.start_consuming()
def stop_consume(self):
self.ensure_channel()
self._channel.stop_consuming()
def __del__(self):
self.close()
*Django code here is :*
queue_args = {"x-ha-policy" : "all" }
global g_mq = None
try:
g_mq = RabbitMQ(host=cf['host'], port=cf['port'],
username=cf['username'],
password=cf['password'])
g_mq.publish_setup(ex_name=cf['exchange'], ex_type=cf['ex_type'],
queue_name=cf['queue'], routing_key=cf['route_key'],
durable=True, exclusive=False, auto_delete=False,
arguments=queue_args)
except Exception,e:
ERR_LOG.info(cf['exchange'])
ERR_LOG.exception("Setup rabbitmq error %s" % str(e))
g_mq = None
return
try:
return g_mq.publish(ex_name, route_key,msg)
except Exception,e:
ERR_LOG.exception("Publish rabbitmq error %s" % str(e))
g_mq = None
*I enable the uwsgi profiler, saw some information:*
[uWSGI Python profiler 10124] CALL:
build/bdist.linux-x86_64/egg/pika/adapters/blocking_connection.py (line
344) -> _flush_outbound 1 args, stacksize 5
Any solution about the issue?
Thanks.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130705/1e466fc9/attachment.htm>
More information about the rabbitmq-discuss
mailing list