[rabbitmq-discuss] High volume message consumption slows down consistently over time when using Pika and transactions

Matt Pietrek mpietrek at skytap.com
Fri Feb 3 21:48:37 GMT 2012


I'm writing some basic throughput tests to verify that RabbitMQ will scale
to our needs in HA environments. In one scenario I'm finding that consuming
messages as quickly as possible slows down consistently over time.

My environment:
* Ubuntu 10.04 with RabbitMQ 2.71.
* Erlang R13B03
* Broker instances are all disk nodes on a dedicated VM running nothing
else.
* Queues are declared as durable and x-ha-policy:all
* Clients are written in Python using Pika 0.9.5.
* For this test a single broker is used, but I have the same effect with
mirrored queues.
The test code is extremely simple. It simply reads or writes messages (up to
100,000) as quickly as it can, and shows the throughput rate at 1000 message
intervals.

I've noticed that when I use transactions with basic_ack, e.g.

        channel.tx_select()
        channel.basic_ack(delivery_tag=tag)
        channel.tx_commit()

that the rate I can consume messages drops steadily. Each time I restart my
reading app with a full queue, I see a consistent slowdown in my rate as
measured every 1000 messages:

(Rate for last 1000 messages | Average rate for all messages since app
start)
Read 597.502915814 messages/sec | Average: 597.502915814
Read 431.665048642 messages/sec | Average: 501.222607245
Read 447.932745586 messages/sec | Average: 482.104211325
Read 377.776468153 messages/sec | Average: 450.969014032
Read 394.784579872 messages/sec | Average: 438.488176956
Read 346.943823897 messages/sec | Average: 420.01726691
Read 301.813628274 messages/sec | Average: 397.762754973
Read 313.625720320 messages/sec | Average: 384.856956366
Read 266.972510375 messages/sec | Average: 366.858074188
Read 254.247975169 messages/sec | Average: 351.298554027
Read 251.812483302 messages/sec | Average: 339.118646053
Read 246.701903922 messages/sec | Average: 328.85271509
Read 222.953957109 messages/sec | Average: 317.260966009

If I simply remove the tx_select()/tx_commit, the rate stays reasonably
stable:

(Rate for last 1000 messages | Average rate for all messages since app start
Read 1228.77445722 messages/sec | Average: 1343.23656881
Read 1063.88750879 messages/sec | Average: 1235.13209738
Read 1368.73427666 messages/sec | Average: 1266.02631056
Read 1423.23835115 messages/sec | Average: 1294.62742565
Read 1438.56741702 messages/sec | Average: 1316.58317701
Read 1393.69714404 messages/sec | Average: 1327.07283563
Read 991.831277598 messages/sec | Average: 1273.27648897
Read 1097.96119586 messages/sec | Average: 1251.08044697
Read 1174.27676294 messages/sec | Average: 1242.95091463
Read 1307.42368241 messages/sec | Average: 1248.54813715
Read 1491.50216641 messages/sec | Average: 1265.7295908

One thing I have noticed is that in my client app machine, the CPU
percentage generally climbs over time, with the occasional brief drop back
to a prior %. This might point to a Pika issue in the transaction related
code, but it's not obvious to me from inspection of the code if so.

Here's the relevant snippets of my code:

def do_read(clientlib):
    for i in range(100000):
        _, ack_info = clientlib.read_message(QUEUENAME)
        if ack_info == None:
            print "No messages to read - Exiting"
            return
        clientlib.acknowledge_message(ack_info)


class Clientlib
    def connect(self, hostname, use_transactions):
        self.connection =
pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=5672))
        self.channel = self.connection.channel()
        self.use_transactions = use_transactions

    def read_message(self, queue_name):
        method_frame, _, body = self.channel.basic_get(queue=queue_name)
        return body, (self.channel, method_frame.delivery_tag)

    def acknowledge_message(self, acknowledge_info):
        channel = acknowledge_info[0]
        tag = acknowledge_info[1]

        if self.use_transactions:
            channel.tx_select()

        channel.basic_ack(delivery_tag=tag)

        if self.use_transactions:
            channel.tx_commit()

Any ideas what the issue might be, or what I could be doing wrong?

Thanks,

Matt


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120203/7e0c4525/attachment.htm>


More information about the rabbitmq-discuss mailing list