[rabbitmq-discuss] [Minimum Air Induction] Introducing Shovel: An AMQP Relay

Valentino Volonghi dialtone at gmail.com
Sun Sep 21 19:53:21 BST 2008


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On Sep 21, 2008, at 9:17 AM, Ben Hood wrote:

> Sure, but I think these days people are going for 1.5PC instead of 2PC
> - it's just a cost-benefit comparison.

Will need to dig into that, seems interesting.

> I shouldn't imagine that this will cost you an order of magnitude -
> the persister is write-optimized after all, though it does have to
> synchronously write to disk.

I made my measurements and the results are not bad given the test  
platform.

Producer running alone:
Transaction size    -   1K msgs rate
====================================
500                     1538.1 msg/sec
1000                    1919.1 msg/sec
1500                    2145.8 msg/sec
2000                    2390.2 msg/sec
2500                    2459.5 msg/sec
5000                    2740.6 msg/sec
10000                   3014.7 msg/sec
20000                   3075.2 msg/sec

Producer running alone with delivery_mode=0 and 20000 transaction size:
3419.17693154 msg/sec

Producer running alone with delivery_mode=0 and 500 transaction size:
3372.17693154 msg/sec

Consumer running alone:
2734.8 msg/sec

Considering that I'm using an old and slow macbook pro on 10.5.5 I  
think those
numbers are basically what I wanted to see. Transactions without  
persistent
messages are a lot cheaper of course.

Running alone means that the producer/consumer was running only with  
rabbitmq
and nothing else on the other side, the reason for this is to reduce  
the bottleneck
caused by having 2 python processes running (producer and consumer).

So basically this is enough proof for me that this is fast enough for  
what I need.
(attached are the 2 test programs I used).

One thing I noticed is the following errors:

The following on rabbitmq:

=INFO REPORT==== 21-Sep-2008::11:14:37 ===
Rolling persister log to "/Users/dialtone/dev/mnesia/ 
rabbit_persister.LOG.previous"

=ERROR REPORT==== 21-Sep-2008::11:14:42 ===
connection <0.297.0> (running), channel 1 - error:
{commit_failed,
     [{exit,
          {timeout,
              {gen_server,call,[<0.131.0>,{commit,{{26,<0.303.0>}, 
1003}}]}}}]}

=ERROR REPORT==== 21-Sep-2008::11:14:42 ===
Non-AMQP exit reason '{commit_failed,
                           [{exit,
                                {timeout,
                                    {gen_server,call,
                                        [<0.131.0>,
                                         {commit,{{26,<0.303.0>}, 
1003}}]}}}]}'

And this on the client side:

Traceback (most recent call last):
   File "my_send.py", line 34, in <module>
     main()
   File "my_send.py", line 23, in main
     ch.tx_commit()
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
3336, in tx_commit
     (90, 21),    # Channel.tx_commit_ok
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
183, in wait
     frame_type, payload = self._next_frame()
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
123, in _next_frame
     return self.connection._wait_channel(self.channel_id)
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
430, in _wait_channel
     self.wait()
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
203, in wait
     return self._dispatch(method_sig, args, content)
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
115, in _dispatch
     return amqp_method(self, args)
   File "/Users/dialtone/dev/py-amqplib/amqplib/client_0_8.py", line  
563, in _close
     raise AMQPConnectionException(reply_code, reply_text, (class_id,  
method_id))
amqplib.client_0_8.AMQPConnectionException: (541, u'INTERNAL_ERROR',  
(0, 0), '')


Sometimes (happened twice) transactions failed with this error output  
and another weird
thing is that message order is changed during rolling. The test  
programs always sent 20001
messages but sometimes I would receive only 19802 before the 'quit'  
message (usually
the last one) and then after restarting the receiver another 199  
messages (that sum
to 20001 so it's fine anyway).

For my usecase message order has absolutely no importance but I guess  
for something
else like real time data it is.

>> They look OK in the sense that even persistent messages can reach  
>> those
>> rates?
>
> I guess the IO bandwidth of your filesystem would be an important
> factor in this equation.

Yeah, absolutely.

Hope this measurements where useful for you :)

Here are the 2 scripts, can be used like this:

python my_send.py MESSAGES_NUMBER TRANSACTION_SIZE
python my_receive_direct.py QUEUE_NAME


# my_send.py
#!/usr/bin/env python
import sys
import time

import amqplib.client_0_8 as amqp

MAX = int(sys.argv[1])

def main():
     msg_body = '0'*1000 # 1k

     conn = amqp.Connection("localhost", userid="guest",  
password="guest")
     ch = conn.channel()
     ch.access_request('/data', active=True, write=True)
     ch.exchange_declare('X', type='direct', auto_delete=True)

     msg = amqp.Message(msg_body)

     t = time.time()
     ch.tx_select()
     interval = (int(sys.argv[2]) or MAX-1)
     for i in xrange(MAX):
         if i % interval == 0:
             ch.tx_commit()
             ch.tx_select()
         ch.basic_publish(msg, 'X')
     ch.basic_publish(amqp.Message('quit'), 'X')
     ch.tx_commit()
     ch.close()
     conn.close()

     print MAX/(time.time()-t), "KB/sec (or msg/sec)"

if __name__ == '__main__':
     main()
# end my_send.py

# my_receive_direct.py
#!/usr/bin/env python
import time
import sys
import amqplib.client_0_8 as amqp

counter = 0

def callback(msg):
     global counter
     counter += 1
     msg.channel.basic_ack(msg.delivery_tag)
     if msg.body == 'quit':
         msg.channel.basic_cancel(msg.consumer_tag)
     if counter % 100 == 0:
         sys.stdout.write(".")
         sys.stdout.flush()
     if counter % 1000 == 0:
         sys.stdout.write(str(counter))
         sys.stdout.flush()

def main():
     realm = '/data'
     exchange = 'X'
     queue = sys.argv[1]
     conn = amqp.Connection("localhost", userid="guest",  
password="guest")

     ch = conn.channel()
     ch.access_request(realm, active=True, passive=True, read=True)
     ch.exchange_declare(exchange, type='direct', durable=True,  
auto_delete=False)
     qname, _, _ = ch.queue_declare(queue, durable=True,  
auto_delete=False)
     ch.queue_bind(queue, exchange)
     ch.basic_consume(queue, callback=callback)

     try:
         t = time.time()
         while ch.callbacks:
             ch.wait()
     except:
         import traceback
         print traceback.format_exc()
     finally:
         #ch.queue_delete(qname)
         ch.close()
         conn.close()

     print
     print "Total Messages:", counter
     print counter / (time.time() - t), "KB/sec (or msg/sec)"


if __name__ == '__main__':
     main()
# end my_receive_direct.py

- --
Valentino Volonghi aka Dialtone
Now running MacOS X 10.5
Home Page: http://www.twisted.it
http://www.adroll.com

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Darwin)

iEYEARECAAYFAkjWmCEACgkQ9Llz28widGX2yQCcCQtB5/eZIUf16cLVarCtxsmo
sIEAoMerWqfI6ljsuXW0lfXtY716CtTd
=7wug
-----END PGP SIGNATURE-----




More information about the rabbitmq-discuss mailing list