<html><head></head><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; color: rgb(0, 0, 0); font-size: 14px; font-family: Calibri, sans-serif; "><div><div>I'm writing some basic throughput tests to verify that RabbitMQ will scale to our needs in HA environments. In one scenario I'm finding that consuming messages as quickly as possible slows down consistently over time.</div><div><br></div><div>My environment:</div><ul><li>Ubuntu 10.04 with RabbitMQ 2.71.</li><li>Erlang R13B03</li><li>Broker instances are all disk nodes on a dedicated VM running nothing else.</li><li>Queues are declared as durable and x-ha-policy:all</li><li>Clients are written in Python using Pika 0.9.5.</li><li>For this test a single broker is used, but I have the same effect with mirrored queues.</li></ul><div>The test code is extremely simple. It simply reads or writes messages (up to 100,000) as quickly as it can, and shows the throughput rate at 1000 message intervals.</div></div><div><br></div><div>I've noticed that when I use transactions with basic_ack, e.g.</div><div><br></div><div><div>&nbsp; &nbsp; &nbsp; &nbsp; channel.tx_select()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; channel.basic_ack(delivery_tag=tag)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; channel.tx_commit()</div></div><div><br></div><div>that the rate I can consume messages drops steadily. Each time I restart my reading app with a full queue, I see a consistent slowdown in my rate as measured every 1000 messages:</div><div><br></div><div>(Rate for last 1000 messages | Average rate for all messages since app start)</div><div><div>Read 597.502915814 messages/sec | Average: 597.502915814</div><div>Read 431.665048642 messages/sec | Average: 501.222607245</div><div>Read 447.932745586 messages/sec | Average: 482.104211325</div><div>Read 377.776468153 messages/sec | Average: 450.969014032</div><div>Read 394.784579872 messages/sec | Average: 438.488176956</div><div>Read 346.943823897 messages/sec | Average: 420.01726691</div><div>Read 301.813628274 messages/sec | Average: 397.762754973</div><div>Read 313.625720320 messages/sec | Average: 384.856956366</div><div>Read 266.972510375 messages/sec | Average: 366.858074188</div><div>Read 254.247975169 messages/sec | Average: 351.298554027</div><div>Read 251.812483302 messages/sec | Average: 339.118646053</div><div>Read 246.701903922 messages/sec | Average: 328.85271509</div><div>Read 222.953957109 messages/sec | Average: 317.260966009</div></div><div><br></div><div>If I simply remove the tx_select()/tx_commit, the rate stays reasonably stable:</div><div><br></div><div>(Rate for last 1000 messages | Average rate for all messages since app start</div><div><div>Read 1228.77445722 messages/sec | Average: 1343.23656881</div><div>Read 1063.88750879 messages/sec | Average: 1235.13209738</div><div>Read 1368.73427666 messages/sec | Average: 1266.02631056</div><div>Read 1423.23835115 messages/sec | Average: 1294.62742565</div><div>Read 1438.56741702 messages/sec | Average: 1316.58317701</div><div>Read 1393.69714404 messages/sec | Average: 1327.07283563</div><div>Read 991.831277598 messages/sec | Average: 1273.27648897</div><div>Read 1097.96119586 messages/sec | Average: 1251.08044697</div><div>Read 1174.27676294 messages/sec | Average: 1242.95091463</div><div>Read 1307.42368241 messages/sec | Average: 1248.54813715</div><div>Read 1491.50216641 messages/sec | Average: 1265.7295908</div></div><div><br></div><div>One thing I have noticed is that in my client app machine, the CPU percentage generally climbs over time, with the occasional brief drop back to a prior %. This might point to a Pika issue in the transaction related code, but it's not obvious to me from inspection of the code if so.</div><div><br></div><div>Here's the relevant snippets of my code:</div><div><br></div><div><div><div>def do_read(clientlib):</div><div>&nbsp; &nbsp; for i in range(100000):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; _, ack_info = clientlib.read_message(QUEUENAME)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if ack_info == None:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print "No messages to read - Exiting"</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return</div><div>&nbsp; &nbsp; &nbsp; &nbsp; clientlib.acknowledge_message(ack_info)</div><div><br></div></div></div><div><br></div><div>class Clientlib</div><div><div>&nbsp; &nbsp; def connect(self, hostname, use_transactions):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=5672))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.channel = self.connection.channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.use_transactions = use_transactions</div><div><br></div><div>&nbsp; &nbsp; def read_message(self, queue_name):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; method_frame, _, body = self.channel.basic_get(queue=queue_name)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; return body, (self.channel, method_frame.delivery_tag)</div><div><br></div><div>&nbsp; &nbsp; def acknowledge_message(self, acknowledge_info):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; channel = acknowledge_info[0]</div><div>&nbsp; &nbsp; &nbsp; &nbsp; tag = acknowledge_info[1]</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; if self.use_transactions:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel.tx_select()</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; channel.basic_ack(delivery_tag=tag)</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; if self.use_transactions:</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel.tx_commit()</div></div><div><br></div><div>Any ideas what the issue might be, or what I could be doing wrong?</div><div><br></div><div>Thanks,</div><div><br></div><div>Matt</div></body></html>