[rabbitmq-discuss] High volume message consumption slows down consistently over time when using Pika and transactions

Matt Pietrek mpietrek at skytap.com
Tue Feb 14 23:14:54 GMT 2012


Gavin,

Just wanted to follow up on this thread and gently inquire if you've had
time to look at Emile's resource leak finding?


I understand you're probably busy with other more important things. I'm
just trying to get a sense of whether I need to keep digging into this
problem, or if you're already reasonably confident that it's something on
the Pika side of things.

Thanks much,

Matt


On 2/10/12 5:33 AM, "Emile Joubert" <emile at rabbitmq.com> wrote:

>
>Hi Matt,
>
>Thanks for reporting this. I've been able to reproduce the problem with
>a non-mirrorred queue on a single node, using Pika. However I can't
>reproduce the problem using the Java client, so this looks like a
>possible resource leak in the Pika client. I'll report this to the Pika
>maintainer.
>
>
>-Emile
>
>
>On 03/02/12 21:48, Matt Pietrek wrote:
>> I'm writing some basic throughput tests to verify that RabbitMQ will
>> scale to our needs in HA environments. In one scenario I'm finding that
>> consuming messages as quickly as possible slows down consistently over
>>time.
>> 
>> My environment:
>> 
>>   * Ubuntu 10.04 with RabbitMQ 2.71.
>>   * Erlang R13B03
>>   * Broker instances are all disk nodes on a dedicated VM running
>>     nothing else.
>>   * Queues are declared as durable and x-ha-policy:all
>>   * Clients are written in Python using Pika 0.9.5.
>>   * For this test a single broker is used, but I have the same effect
>>     with mirrored queues.
>> 
>> The test code is extremely simple. It simply reads or writes messages
>> (up to 100,000) as quickly as it can, and shows the throughput rate at
>> 1000 message intervals.
>> 
>> I've noticed that when I use transactions with basic_ack, e.g.
>> 
>>         channel.tx_select()
>>         channel.basic_ack(delivery_tag=tag)
>>         channel.tx_commit()
>> 
>> that the rate I can consume messages drops steadily. Each time I restart
>> my reading app with a full queue, I see a consistent slowdown in my rate
>> as measured every 1000 messages:
>> 
>> (Rate for last 1000 messages | Average rate for all messages since app
>> start)
>> Read 597.502915814 messages/sec | Average: 597.502915814
>> Read 431.665048642 messages/sec | Average: 501.222607245
>> Read 447.932745586 messages/sec | Average: 482.104211325
>> Read 377.776468153 messages/sec | Average: 450.969014032
>> Read 394.784579872 messages/sec | Average: 438.488176956
>> Read 346.943823897 messages/sec | Average: 420.01726691
>> Read 301.813628274 messages/sec | Average: 397.762754973
>> Read 313.625720320 messages/sec | Average: 384.856956366
>> Read 266.972510375 messages/sec | Average: 366.858074188
>> Read 254.247975169 messages/sec | Average: 351.298554027
>> Read 251.812483302 messages/sec | Average: 339.118646053
>> Read 246.701903922 messages/sec | Average: 328.85271509
>> Read 222.953957109 messages/sec | Average: 317.260966009
>> 
>> If I simply remove the tx_select()/tx_commit, the rate stays reasonably
>> stable:
>> 
>> (Rate for last 1000 messages | Average rate for all messages since app
>>start
>> Read 1228.77445722 messages/sec | Average: 1343.23656881
>> Read 1063.88750879 messages/sec | Average: 1235.13209738
>> Read 1368.73427666 messages/sec | Average: 1266.02631056
>> Read 1423.23835115 messages/sec | Average: 1294.62742565
>> Read 1438.56741702 messages/sec | Average: 1316.58317701
>> Read 1393.69714404 messages/sec | Average: 1327.07283563
>> Read 991.831277598 messages/sec | Average: 1273.27648897
>> Read 1097.96119586 messages/sec | Average: 1251.08044697
>> Read 1174.27676294 messages/sec | Average: 1242.95091463
>> Read 1307.42368241 messages/sec | Average: 1248.54813715
>> Read 1491.50216641 messages/sec | Average: 1265.7295908
>> 
>> One thing I have noticed is that in my client app machine, the CPU
>> percentage generally climbs over time, with the occasional brief drop
>> back to a prior %. This might point to a Pika issue in the transaction
>> related code, but it's not obvious to me from inspection of the code if
>>so.
>> 
>> Here's the relevant snippets of my code:
>> 
>> def do_read(clientlib):
>>     for i in range(100000):
>>         _, ack_info = clientlib.read_message(QUEUENAME)
>>         if ack_info == None:
>>             print "No messages to read - Exiting"
>>             return
>>         clientlib.acknowledge_message(ack_info)
>> 
>> 
>> class Clientlib
>>     def connect(self, hostname, use_transactions):
>>         self.connection =
>> pika.BlockingConnection(pika.ConnectionParameters(host=hostname,
>>port=5672))
>>         self.channel = self.connection.channel()
>>         self.use_transactions = use_transactions
>> 
>>     def read_message(self, queue_name):
>>         method_frame, _, body = self.channel.basic_get(queue=queue_name)
>>         return body, (self.channel, method_frame.delivery_tag)
>> 
>>     def acknowledge_message(self, acknowledge_info):
>>         channel = acknowledge_info[0]
>>         tag = acknowledge_info[1]
>> 
>>         if self.use_transactions:
>>             channel.tx_select()
>> 
>>         channel.basic_ack(delivery_tag=tag)
>> 
>>         if self.use_transactions:
>>             channel.tx_commit()




More information about the rabbitmq-discuss mailing list