[rabbitmq-discuss] Pika Client: Bad performance for basic_publish to durable mirror queue

linbo liao llbgurs at gmail.com
Fri Jul 5 17:11:20 BST 2013


Hi,

I encounter a bad performance issue about pika, yesterday it works fine, 
but today performance of basci_publish got bad, it cost 2 or 3 second to 
finish one message, sometimes down to 7 seconds.

Pika is 0.9.13
RabbitMQ 3.1.0, Erlang R14B04   Queue is Durable
Python: 2.6
Platform: CentOS release 6.3 (Final) X64


My application deployed by django + uwsgi, django receiver the request and 
publish the request to rabbitmq.


*Simple Pika Client wrapper class*

import time
import logging

import pika
from pika import exceptions

logger = logging.getLogger('error')
logger.setLevel(logging.DEBUG)


class RabbitMQ:
    def __init__(self, host,  port,  username,  password):
        self._channel = None
        cred = pika.PlainCredentials(username, password)
        self._args = dict(host=host, port=port, credentials=cred, 
heartbeat_interval=25)
        try:
            self.reconnect()
        except exceptions.AMQPConnectionError, e:
            logger.error("Connection to MQ error")
            self._conn = None
            self._channel = None
            raise

    def close(self):
        if getattr(self, '_channel', None) is not None:
            self._channel.close()
            self._channel = None
        if getattr(self, '_conn', None) is not None:
            self._conn.close()
            self._conn = None

    def close_channel(self):
        if getattr(self, '_conn', None) is not None:
            self._channel.close()
            self._channel = None

    def reconnect(self):
        logger.error("reconnect to MQ ")
        self.close()
        self._conn = 
pika.BlockingConnection(pika.ConnectionParameters(**self._args))
        self._channel = self._conn.channel()

    def connect_isopen(self):
        return self._conn and self._conn.is_open

    def channel_isopen(self):
        return self._channel and self._channel.is_open

    def ensure_channel(self):
        if not self.connect_isopen or not self.channel_isopen():
            self.reconnect()

    def exchange_declare(self, name,  ex_type):
        self.ensure_channel()
        self._channel.exchange_declare(exchange=name, exchange_type=ex_type)

    def bind(self, ex,  queue,  routing_key):
        self.ensure_channel()
        self._channel.queue_bind(exchange=ex, queue=queue, 
routing_key=routing_key)

    def declare_queue(self, queue_name,  durable=True, exclusive=False, 
auto_delete=False, arguments=None):
        self.ensure_channel()
        self._channel.queue_declare(queue=queue_name, durable=durable, 
exclusive=exclusive, 
                                    auto_delete=auto_delete, 
arguments=arguments) 

    def confirm_delivery(self):
        self.ensure_channel()
        self._channel.confirm_delivery()  

    def force_data(self):
        self.ensure_channel()
        self._channel.force_data_events(True)  

    def publish_setup(self, ex_name, ex_type, queue_name, routing_key, 
              durable, exclusive, auto_delete, arguments):
        try:
            self.exchange_declare(ex_name, ex_type)
            self.declare_queue(queue_name, durable, exclusive, auto_delete, 
arguments)
            self.bind(ex_name, queue_name, routing_key)
            self.force_data()
            self.confirm_delivery()
        except Exception, e:
            logger.exception("Setup error %s" %str(e))
            raise

    def _publish(self, exchange, routing_key, body, delivery_mode=2):
        start = time.time()
        self.ensure_channel()

        # Send a message
        if self._channel.basic_publish(exchange=exchange,   
#pika.exceptions.ConnectionClosed:
                                       routing_key=routing_key,
                                       body=body,
                                      
 properties=pika.BasicProperties(content_type='text/plain',
                                       delivery_mode=delivery_mode)): 
 #durable=True and delivery_mode=2 is persistent
            end = time.time()
            logger.info('cost time %f' % (end-start))
            return 0 #'Message publish was confirmed'
        else:
            logger.info('publish error')
            return -1 # 'Message publish was not confirmed'

    def publish(self, exchange, routing_key, body):
        try:
            return self._publish(exchange, routing_key, body)
        except Exception,e:
            logger.exception("Publish error %s" % str(e))
            raise

    def setup_consume(self, callback, queue_name):
        self.ensure_channel()
        self._channel.basic_consume(callback, queue_name)

    def start_consume(self):
        self.ensure_channel()
        self._channel.start_consuming()

    def stop_consume(self):
        self.ensure_channel()
        self._channel.stop_consuming()

    def __del__(self):
        self.close()


*Django code here is :*

queue_args = {"x-ha-policy" : "all" }

global g_mq = None
try:     
    g_mq = RabbitMQ(host=cf['host'], port=cf['port'],
                            username=cf['username'], 
password=cf['password'])
                 
    g_mq.publish_setup(ex_name=cf['exchange'], ex_type=cf['ex_type'], 
                      queue_name=cf['queue'], routing_key=cf['route_key'],
                      durable=True, exclusive=False, auto_delete=False,
                      arguments=queue_args)
except Exception,e:
   ERR_LOG.info(cf['exchange'])
   ERR_LOG.exception("Setup rabbitmq error %s" % str(e))
   g_mq = None
   return
                     
try:         
    return g_mq.publish(ex_name, route_key,msg)
except Exception,e:
    ERR_LOG.exception("Publish rabbitmq error %s" % str(e))
    g_mq = None
 

*I enable the uwsgi profiler, saw some information:*

[uWSGI Python profiler 10124] CALL: 
build/bdist.linux-x86_64/egg/pika/adapters/blocking_connection.py (line 
344) -> _flush_outbound 1 args, stacksize 5


Any solution about the issue?

Thanks.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130705/1e466fc9/attachment.htm>


More information about the rabbitmq-discuss mailing list