[rabbitmq-discuss] High volume message consumption slows down consistently over time when using Pika and transactions

Emile Joubert emile at rabbitmq.com
Fri Feb 10 13:33:56 GMT 2012


Hi Matt,

Thanks for reporting this. I've been able to reproduce the problem with
a non-mirrorred queue on a single node, using Pika. However I can't
reproduce the problem using the Java client, so this looks like a
possible resource leak in the Pika client. I'll report this to the Pika
maintainer.


-Emile


On 03/02/12 21:48, Matt Pietrek wrote:
> I'm writing some basic throughput tests to verify that RabbitMQ will
> scale to our needs in HA environments. In one scenario I'm finding that
> consuming messages as quickly as possible slows down consistently over time.
> 
> My environment:
> 
>   * Ubuntu 10.04 with RabbitMQ 2.71.
>   * Erlang R13B03
>   * Broker instances are all disk nodes on a dedicated VM running
>     nothing else.
>   * Queues are declared as durable and x-ha-policy:all
>   * Clients are written in Python using Pika 0.9.5.
>   * For this test a single broker is used, but I have the same effect
>     with mirrored queues.
> 
> The test code is extremely simple. It simply reads or writes messages
> (up to 100,000) as quickly as it can, and shows the throughput rate at
> 1000 message intervals.
> 
> I've noticed that when I use transactions with basic_ack, e.g.
> 
>         channel.tx_select()
>         channel.basic_ack(delivery_tag=tag)
>         channel.tx_commit()
> 
> that the rate I can consume messages drops steadily. Each time I restart
> my reading app with a full queue, I see a consistent slowdown in my rate
> as measured every 1000 messages:
> 
> (Rate for last 1000 messages | Average rate for all messages since app
> start)
> Read 597.502915814 messages/sec | Average: 597.502915814
> Read 431.665048642 messages/sec | Average: 501.222607245
> Read 447.932745586 messages/sec | Average: 482.104211325
> Read 377.776468153 messages/sec | Average: 450.969014032
> Read 394.784579872 messages/sec | Average: 438.488176956
> Read 346.943823897 messages/sec | Average: 420.01726691
> Read 301.813628274 messages/sec | Average: 397.762754973
> Read 313.625720320 messages/sec | Average: 384.856956366
> Read 266.972510375 messages/sec | Average: 366.858074188
> Read 254.247975169 messages/sec | Average: 351.298554027
> Read 251.812483302 messages/sec | Average: 339.118646053
> Read 246.701903922 messages/sec | Average: 328.85271509
> Read 222.953957109 messages/sec | Average: 317.260966009
> 
> If I simply remove the tx_select()/tx_commit, the rate stays reasonably
> stable:
> 
> (Rate for last 1000 messages | Average rate for all messages since app start
> Read 1228.77445722 messages/sec | Average: 1343.23656881
> Read 1063.88750879 messages/sec | Average: 1235.13209738
> Read 1368.73427666 messages/sec | Average: 1266.02631056
> Read 1423.23835115 messages/sec | Average: 1294.62742565
> Read 1438.56741702 messages/sec | Average: 1316.58317701
> Read 1393.69714404 messages/sec | Average: 1327.07283563
> Read 991.831277598 messages/sec | Average: 1273.27648897
> Read 1097.96119586 messages/sec | Average: 1251.08044697
> Read 1174.27676294 messages/sec | Average: 1242.95091463
> Read 1307.42368241 messages/sec | Average: 1248.54813715
> Read 1491.50216641 messages/sec | Average: 1265.7295908
> 
> One thing I have noticed is that in my client app machine, the CPU
> percentage generally climbs over time, with the occasional brief drop
> back to a prior %. This might point to a Pika issue in the transaction
> related code, but it's not obvious to me from inspection of the code if so.
> 
> Here's the relevant snippets of my code:
> 
> def do_read(clientlib):
>     for i in range(100000):
>         _, ack_info = clientlib.read_message(QUEUENAME)
>         if ack_info == None:
>             print "No messages to read - Exiting"
>             return
>         clientlib.acknowledge_message(ack_info)
> 
> 
> class Clientlib
>     def connect(self, hostname, use_transactions):
>         self.connection =
> pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=5672))
>         self.channel = self.connection.channel()
>         self.use_transactions = use_transactions
> 
>     def read_message(self, queue_name):
>         method_frame, _, body = self.channel.basic_get(queue=queue_name)
>         return body, (self.channel, method_frame.delivery_tag)
> 
>     def acknowledge_message(self, acknowledge_info):
>         channel = acknowledge_info[0]
>         tag = acknowledge_info[1]
> 
>         if self.use_transactions:
>             channel.tx_select()
> 
>         channel.basic_ack(delivery_tag=tag)
> 
>         if self.use_transactions:
>             channel.tx_commit()


More information about the rabbitmq-discuss mailing list