[rabbitmq-discuss] High volume message consumption slows down consistently over time when using Pika and transactions
Emile Joubert
emile at rabbitmq.com
Fri Feb 10 13:33:56 GMT 2012
Hi Matt,
Thanks for reporting this. I've been able to reproduce the problem with
a non-mirrorred queue on a single node, using Pika. However I can't
reproduce the problem using the Java client, so this looks like a
possible resource leak in the Pika client. I'll report this to the Pika
maintainer.
-Emile
On 03/02/12 21:48, Matt Pietrek wrote:
> I'm writing some basic throughput tests to verify that RabbitMQ will
> scale to our needs in HA environments. In one scenario I'm finding that
> consuming messages as quickly as possible slows down consistently over time.
>
> My environment:
>
> * Ubuntu 10.04 with RabbitMQ 2.71.
> * Erlang R13B03
> * Broker instances are all disk nodes on a dedicated VM running
> nothing else.
> * Queues are declared as durable and x-ha-policy:all
> * Clients are written in Python using Pika 0.9.5.
> * For this test a single broker is used, but I have the same effect
> with mirrored queues.
>
> The test code is extremely simple. It simply reads or writes messages
> (up to 100,000) as quickly as it can, and shows the throughput rate at
> 1000 message intervals.
>
> I've noticed that when I use transactions with basic_ack, e.g.
>
> channel.tx_select()
> channel.basic_ack(delivery_tag=tag)
> channel.tx_commit()
>
> that the rate I can consume messages drops steadily. Each time I restart
> my reading app with a full queue, I see a consistent slowdown in my rate
> as measured every 1000 messages:
>
> (Rate for last 1000 messages | Average rate for all messages since app
> start)
> Read 597.502915814 messages/sec | Average: 597.502915814
> Read 431.665048642 messages/sec | Average: 501.222607245
> Read 447.932745586 messages/sec | Average: 482.104211325
> Read 377.776468153 messages/sec | Average: 450.969014032
> Read 394.784579872 messages/sec | Average: 438.488176956
> Read 346.943823897 messages/sec | Average: 420.01726691
> Read 301.813628274 messages/sec | Average: 397.762754973
> Read 313.625720320 messages/sec | Average: 384.856956366
> Read 266.972510375 messages/sec | Average: 366.858074188
> Read 254.247975169 messages/sec | Average: 351.298554027
> Read 251.812483302 messages/sec | Average: 339.118646053
> Read 246.701903922 messages/sec | Average: 328.85271509
> Read 222.953957109 messages/sec | Average: 317.260966009
>
> If I simply remove the tx_select()/tx_commit, the rate stays reasonably
> stable:
>
> (Rate for last 1000 messages | Average rate for all messages since app start
> Read 1228.77445722 messages/sec | Average: 1343.23656881
> Read 1063.88750879 messages/sec | Average: 1235.13209738
> Read 1368.73427666 messages/sec | Average: 1266.02631056
> Read 1423.23835115 messages/sec | Average: 1294.62742565
> Read 1438.56741702 messages/sec | Average: 1316.58317701
> Read 1393.69714404 messages/sec | Average: 1327.07283563
> Read 991.831277598 messages/sec | Average: 1273.27648897
> Read 1097.96119586 messages/sec | Average: 1251.08044697
> Read 1174.27676294 messages/sec | Average: 1242.95091463
> Read 1307.42368241 messages/sec | Average: 1248.54813715
> Read 1491.50216641 messages/sec | Average: 1265.7295908
>
> One thing I have noticed is that in my client app machine, the CPU
> percentage generally climbs over time, with the occasional brief drop
> back to a prior %. This might point to a Pika issue in the transaction
> related code, but it's not obvious to me from inspection of the code if so.
>
> Here's the relevant snippets of my code:
>
> def do_read(clientlib):
> for i in range(100000):
> _, ack_info = clientlib.read_message(QUEUENAME)
> if ack_info == None:
> print "No messages to read - Exiting"
> return
> clientlib.acknowledge_message(ack_info)
>
>
> class Clientlib
> def connect(self, hostname, use_transactions):
> self.connection =
> pika.BlockingConnection(pika.ConnectionParameters(host=hostname, port=5672))
> self.channel = self.connection.channel()
> self.use_transactions = use_transactions
>
> def read_message(self, queue_name):
> method_frame, _, body = self.channel.basic_get(queue=queue_name)
> return body, (self.channel, method_frame.delivery_tag)
>
> def acknowledge_message(self, acknowledge_info):
> channel = acknowledge_info[0]
> tag = acknowledge_info[1]
>
> if self.use_transactions:
> channel.tx_select()
>
> channel.basic_ack(delivery_tag=tag)
>
> if self.use_transactions:
> channel.tx_commit()
More information about the rabbitmq-discuss
mailing list