<html><head></head><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; color: rgb(0, 0, 0); font-size: 14px; font-family: Calibri, sans-serif; "><div>I'm writing some basic throughput tests to verify that RabbitMQ will scale to our needs in HA environments. In one scenario I'm finding substantial message loss. It may be Pika clientlib behavior, but even if so, it's very odd what I'm seeing.</div><div><br></div><div>My environment:</div><ul><li>Ubuntu 10.04 with RabbitMQ 2.71.</li><li>Broker instances are all disk nodes.</li><li>Queues are declared as durable and x-ha-policy:all</li><li>Clients are written in Python using Pika 0.9.5.</li><li>For this test, no transactions or publisher-confirms are used.</li></ul><div>The test code is extremely simple. It simply reads or writes messages (up to 100,000) as quickly as it can, and shows the throughput rate at 1000 message intervals.</div><div><br></div><div>If I run my "write" test against a single broker instance, I rapidly write all the messages and after the app exits, I see 100,000 messages in the queue, as expected.</div><div><br></div><div>However, if I add two more brokers to the cluster (such that the queue is now mirrored), the identical write test yields only ~25K messages in the queue. (They are the first 25K messages written, FWIW.)</div><div><br></div><div>My app calls channel.disconnect() after doing all the writes, so I'd expect that it would flush its internal buffers before exiting.</div><div><br></div><div>I can supply my entire app (about 150 lines of Python) if desired, but for now, here are the relevant pieces:</div><div><br></div><div><div>def do_write(clientlib):</div><div>&nbsp; &nbsp; for i in range(100000):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; clientlib.write_message(QUEUENAME, "abc_%s" % (i))</div><div>&nbsp; &nbsp; clientlib.disconnect()</div><div><br></div></div><div><br></div><div>class Clientlib(object):</div><div><div>&nbsp; &nbsp; def connect(self, hostname, use_transactions):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.connection = pika.BlockingConnection(</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pika.ConnectionParameters(host=hostname, port=5672))</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.channel = self.connection.channel()</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.use_transactions = use_transactions &nbsp;# This is False for this test</div></div><div><br></div><div><div>&nbsp; &nbsp; def write_message(self, queue_name, json_string):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.channel.basic_publish(exchange='',</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; routing_key=queue_name,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; body=json_string,</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; properties=pika.BasicProperties(delivery_mode=2))</div><div><br></div><div><div>&nbsp; &nbsp; def disconnect(self):</div><div>&nbsp; &nbsp; &nbsp; &nbsp; self.connection.close()</div><div><br></div></div></div><div><br></div><div><br></div></body></html>