Hi,<div><br></div><div>I encounter a bad performance issue about pika, yesterday it works fine, but today performance of basci_publish got bad, it cost 2 or 3 second to finish one message, sometimes down to 7 seconds.</div><div><br></div><div><font face="arial, sans-serif">Pika is 0.9.13</font></div><div><font face="arial, sans-serif"><span style="color: rgb(68, 68, 68); font-size: 12px; text-align: right;">RabbitMQ 3.1.0,&nbsp;</span><acronym title="Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:30] [kernel-poll:true]" style="border-bottom-width: 1px; border-style: none none dotted; background-image: none; color: rgb(68, 68, 68); border-top-left-radius: 2px; border-top-right-radius: 2px; border-bottom-right-radius: 2px; border-bottom-left-radius: 2px; font-size: 12px; text-align: right;">Erlang R14B04 &nbsp; Queue is Durable</acronym><br></font></div><div><acronym title="Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:30] [kernel-poll:true]" style="border-bottom-width: 1px; border-style: none none dotted; background-image: none; color: rgb(68, 68, 68); border-top-left-radius: 2px; border-top-right-radius: 2px; border-bottom-right-radius: 2px; border-bottom-left-radius: 2px; font-size: 12px; text-align: right;"><font face="arial, sans-serif">Python: 2.6</font></acronym></div><div><acronym title="Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:30] [kernel-poll:true]" style="border-bottom-width: 1px; border-style: none none dotted; background-image: none; color: rgb(68, 68, 68); border-top-left-radius: 2px; border-top-right-radius: 2px; border-bottom-right-radius: 2px; border-bottom-left-radius: 2px; font-size: 12px; text-align: right;"><font face="arial, sans-serif">Platform:&nbsp;</font></acronym><font color="#444444" face="arial, sans-serif"><span style="font-size: 12px;">CentOS release 6.3 (Final) X64</span></font></div><div><font color="#444444" face="arial, sans-serif"><span style="font-size: 12px;"><br></span></font></div><div><br></div><div><acronym title="Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:30] [kernel-poll:true]" style="border-bottom-width: 1px; border-style: none none dotted; background-image: none; color: rgb(68, 68, 68); border-top-left-radius: 2px; border-top-right-radius: 2px; border-bottom-right-radius: 2px; border-bottom-left-radius: 2px; font-size: 12px; text-align: right;"><font face="arial, sans-serif">My application deployed by django + uwsgi, django receiver the request and publish the request to rabbitmq.</font></acronym></div><div><br></div><div><br></div><div><b>Simple Pika Client wrapper class</b></div><div><br></div><div>import time</div><div>import logging</div><div><br></div><div>import pika</div><div>from pika import exceptions</div><div><br></div><div>logger = logging.getLogger('error')</div><div>logger.setLevel(logging.DEBUG)</div><div><br></div><div><br></div><div>class RabbitMQ:</div><div>&nbsp; &nbsp; def __init__(self, host, &nbsp;port, &nbsp;username, &nbsp;password):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel = None</div><div>&nbsp; &nbsp; &nbsp; &nbsp; cred = pika.PlainCredentials(<wbr>username, password)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._args = dict(host=host, port=port, credentials=cred, heartbeat_interval=25)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; try:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.reconnect()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; except exceptions.<wbr>AMQPConnectionError, e:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logger.error("Connection to MQ error")</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._conn = None</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._channel = None</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise</div><div><br></div><div>&nbsp; &nbsp; def close(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if getattr(self, '_channel', None) is not None:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._channel.close()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._channel = None</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if getattr(self, '_conn', None) is not None:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._conn.close()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._conn = None</div><div><br></div><div>&nbsp; &nbsp; def close_channel(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if getattr(self, '_conn', None) is not None:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._channel.close()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self._channel = None</div><div><br></div><div>&nbsp; &nbsp; def reconnect(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; logger.error("reconnect to MQ ")</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.close()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._conn = pika.BlockingConnection(pika.<wbr>ConnectionParameters(**self._<wbr>args))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel = self._conn.channel()</div><div><br></div><div>&nbsp; &nbsp; def connect_isopen(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; return self._conn and self._conn.is_open</div><div><br></div><div>&nbsp; &nbsp; def channel_isopen(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; return self._channel and self._channel.is_open</div><div><br></div><div>&nbsp; &nbsp; def ensure_channel(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if not self.connect_isopen or not self.channel_isopen():</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.reconnect()</div><div><br></div><div>&nbsp; &nbsp; def exchange_declare(self, name, &nbsp;ex_type):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.exchange_<wbr>declare(exchange=name, exchange_type=ex_type)</div><div><br></div><div>&nbsp; &nbsp; def bind(self, ex, &nbsp;queue, &nbsp;routing_key):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.queue_bind(<wbr>exchange=ex, queue=queue, routing_key=routing_key)</div><div><br></div><div>&nbsp; &nbsp; def declare_queue(self, queue_name, &nbsp;durable=True, exclusive=False, auto_delete=False, arguments=None):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.queue_declare(<wbr>queue=queue_name, durable=durable, exclusive=exclusive,&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; auto_delete=auto_delete, arguments=arguments)&nbsp;</div><div><br></div><div>&nbsp; &nbsp; def confirm_delivery(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.confirm_<wbr>delivery() &nbsp;</div><div><br></div><div>&nbsp; &nbsp; def force_data(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.force_data_<wbr>events(True) &nbsp;</div><div><br></div><div>&nbsp; &nbsp; def publish_setup(self, ex_name, ex_type, queue_name, routing_key,&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; durable, exclusive, auto_delete, arguments):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; try:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.exchange_declare(ex_name, ex_type)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.declare_queue(queue_name, durable, exclusive, auto_delete, arguments)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.bind(ex_name, queue_name, routing_key)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.force_data()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.confirm_delivery()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; except Exception, e:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logger.exception("Setup error %s" %str(e))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise</div><div><br></div><div>&nbsp; &nbsp; def _publish(self, exchange, routing_key, body, delivery_mode=2):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; start = time.time()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; # Send a message</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if self._channel.basic_publish(<wbr>exchange=exchange, &nbsp; #pika.exceptions.<wbr>ConnectionClosed:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;routing_key=routing_key,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;body=body,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;properties=pika.<wbr>BasicProperties(content_type='<wbr>text/plain',</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;delivery_mode=delivery_mode))<wbr>: &nbsp;#durable=True and delivery_mode=2 is persistent</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; end = time.time()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;<a href="http://logger.info/" target="_blank" style="cursor: pointer;">logger.info</a>('cost time %f' % (end-start))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return 0 #'Message publish was confirmed'</div><div>&nbsp; &nbsp; &nbsp; &nbsp; else:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;<a href="http://logger.info/" target="_blank" style="cursor: pointer;">logger.info</a>('publish error')</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return -1 # 'Message publish was not confirmed'</div><div><br></div><div>&nbsp; &nbsp; def publish(self, exchange, routing_key, body):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; try:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return self._publish(exchange, routing_key, body)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; except Exception,e:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logger.exception("Publish error %s" % str(e))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise</div><div><br></div><div>&nbsp; &nbsp; def setup_consume(self, callback, queue_name):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.basic_consume(<wbr>callback, queue_name)</div><div><br></div><div>&nbsp; &nbsp; def start_consume(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.start_consuming(<wbr>)</div><div><br></div><div>&nbsp; &nbsp; def stop_consume(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.ensure_channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self._channel.stop_consuming()</div><div><br></div><div>&nbsp; &nbsp; def __del__(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.close()</div><div><br></div><div><br></div><div><b>Django code here is :</b></div><div><br></div><div>queue_args = {"x-ha-policy" : "all" }<br></div><div><br></div><div>global g_mq = None</div><div><div>try: &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; g_mq = RabbitMQ(host=cf['host'], port=cf['port'],</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; username=cf['username'], password=cf['password'])</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;</div><div>&nbsp; &nbsp; g_mq.publish_setup(ex_name=cf[<wbr>'exchange'], ex_type=cf['ex_type'],&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; queue_name=cf['queue'], routing_key=cf['route_key'],</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; durable=True, exclusive=False, auto_delete=False,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; arguments=queue_args)</div><div>except Exception,e:</div><div>&nbsp; &nbsp;ERR_LOG.info(cf['exchange'])</div><div>&nbsp; &nbsp;ERR_LOG.exception("Setup rabbitmq error %s" % str(e))</div><div>&nbsp; &nbsp;g_mq = None</div><div>&nbsp; &nbsp;return</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;</div><div>try: &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; return g_mq.publish(ex_name, route_key,msg)</div><div>except Exception,e:</div><div>&nbsp; &nbsp; ERR_LOG.exception("Publish rabbitmq error %s" % str(e))</div><div>&nbsp; &nbsp; g_mq = None</div></div><div>&nbsp;</div><div><br></div><div><b>I enable the uwsgi profiler, saw some information:</b></div><div><br></div><div>[uWSGI Python profiler 10124] CALL: build/bdist.linux-x86_64/egg/<wbr>pika/adapters/blocking_<wbr>connection.py (line 344) -&gt; _flush_outbound 1 args, stacksize 5</div><div><br></div><div><br></div><div>Any solution about the issue?</div><div><br></div><div>Thanks.</div>