[rabbitmq-discuss] [PATCH] Suggestions for improving the RabbitMQ Erlang Client

Jesper Louis Andersen jesper.louis.andersen at gmail.com
Tue Sep 3 13:35:33 BST 2013


# Optimizing the Erlang RabbitMQ Client

I originally thought about this as a blog post, but as I don't really have
time finishing it up and polishing it more, I'd just write it here on the
mailing list instead. There are some patches in the mail you may want to
grab and apply on the RabbitMQ Erlang client. In my benchmarks, they helped
me get some more performance squeezed out of my systems in the ballpark of
15-40% more Messages per second.

My benchmark is quite synthetic and bad since it goes great lengths at
playing on all of AMQPs weaknesses. But the patches are probably quite
sound and picks out the low­hanging­fruit there is in there. In the end, I
have written some of the ideas I have for further improvement of the
client. But I did not have the time to do so, and currently, it is fast
enough for our purposes, our load being well under an order of magnitude
lower than the limits it imposes on systems.

In any case, you may want to grab the patches I made since they would
probably help any Erlang RabbitMQ client user. This message was due to me
complaining to Alvaro `gifsockets` Videla about the Erlang client, and he
urged me to write up my findings. So here goes nothing:

## The setup

The current setup is like this:

* Run a RabbitMQ server version 3.1.5.
* Run a fairly recent RabbitMQ erlang client. We use version 3.1.1, but
there are so few changes on it that it doesn't matter it is not the newest
version.
* We run everything on localhost.
* We run with Erlang/OTP R16B01
* We make two connections to RMQ
* One connection manages a "sender" channel, run by a single "sender"
process.
* The other connection runs 10 channels, each channel having a "Handler
function"

The setup is somewhat suboptimal. The same machine runs everything so there
are certain nice optimization like setting scheduler bind types and so on
which won't really help that much. Furthermore, we don't tune memory
allocators, which I have a hunch could help in these situations.

A test is the following:

* We spawn 100 processes. These send 10000 messages each and wait for a
reply.
* The Handlers reply via reply_to queues.
* All messages are very small. A few bytes. We want to test messaging
capability.
* We thus run one million (10⁶) RPC queries. Each query is two messages: A
request and a reply.
* The single "sender" multiplexes and demultiplexes the 100 worker
processes. This is a known bottleneck in the code base.

## Initial state

I have a Core i7-3630QM Based laptop. Making sure it is not running on
battery, we can process roughly 12.5k RPS (RPC's per second) on this setup.
When running, it seems we max out all cores in the machine. Roughly 3 cores
go to RMQ, 4 cores go to our test case and the last core does other stuff
like playing spotify. My benchmark surely runs in a highly controlled
environment :)

What is odd though, is the load of 400% on the Client Erlang node. Why on
*earth* is the client doing more work than the server?

When debugging stuff like this, the first thing to look at is bottom of
things. We must make sure the kernel and the network is behaving like we
expect them to. Old trusty *tcpdump* always comes to the rescue for these
kinds of things. We grab a typical run on disk just to see what is
happening.

tcpdump -i lo -w tcp.dump 'port 5672'
Given such a trace, we can analyze it in tcptrace and in Wireshark. It can
tell us if there are anything odd to look for in the handling of the
low-level communication. Here is a simple breakdown of the packet sizes:

Size Count Percent
40-79 417900 12.38%
 [⋯]
160-329 2814258 83.37%

This is *very* odd. A localhost interface has an MTU of 64 Kilobytes. Yet,
most packets are extremely small. RabbitMQ disables Nagle's algorithm
through setting *TCP_NODELAY* and it does not look like there is good
handling of it. Most of the packets we transfer are pretty small. This is
quite nasty for performance.

Wireshark tells us we can push around 65.5 megabit through the link. This
isn't a lot on a localhost interface. I had a hunch something might be
wrong with the TCP window, but it never closes according to Wireshark, so
this can not be the case.

The major problem here is that we are not filling up packets for some
reason and are sending way too small IP datagrams for this to ever be fast.
We are dying by processing small packets.

## Eprof

Another useful approach is to use the *eprof* profiler. We hook amqp_client
and change the main application start callback such that it will profile
its own tree:

 start(_StartType, _StartArgs) ->
eprof:start_profiling([self()]),
amqp_sup:start_link().

In our test case, we can now run a call to `eprof:stop_profiling()`
followed by a call to `eprof:log/1` and `eprof:analyze()`. This only
profiles the amqp_client, not our own test code which we use to load it.
Hence, all the time reported is spent in the AMQP side of things. We could
also try to optimize the test code, but for now, the focus is on the
`amqp_client` side of things. We get a lot of interesting output, most
notably one for a process doing a lot of work:

 ****** Process <0.184.0>    -- 18.45 % of profiled time ***
 FUNCTION                                            CALLS      %      TIME
 [uS / CALLS]
 --------                                            -----    ---      ----
 [----------]
 [⋯]
 amqp_main_reader:init/1                                 1   0.00         1
 [      1.00]
 [⋯]
 erlang:port_control/3                             6000087   4.06   2124919
 [      0.35]
 prim_inet:ctl_cmd/3                               6000087   4.16   2177725
 [      0.36]
 gen_server:dispatch/3                             6000086   4.24   2223023
 [      0.37]
 prim_inet:enc_time/1                              6000087   5.07   2656325
 [      0.44]
 erlang:send/3                                     3000040   6.39   3349936
 [      1.12]
 amqp_main_reader:handle_info/2                    6000086   7.10   3717176
 [      0.62]
 erts_internal:port_control/3                      6000087  38.26  20042440
 [      3.34]

This process runs around 20% of the load and runs roughly 6 million calls
to `erts_internal:port_control/3`. This is odd. We would expect there to be
fewer calls. Since we are passing both the request and the reply through
this area in the code base, it seems odd that we are making 6 calls per
RPC. In fact, the number shouldn't even be two! We are passing a *lot* of
data through the socket so we expect Erlang to be able to pick up a lot of
data and process it with a single call to the underlying socket. Not a call
per message which is very expensive.

Looking into the `amqp_main_reader` module, we see that it handles the
underlying socket in a very odd way. First, it reads exactly 7 bytes off
the socket for the AMQP header. From this it obtains the payload length and
then it reads the payload length. This means two context switches to get a
single packet. And the reason why we are spending all the time in port
control.

The optimization is straight forward: Read as much data as possible from
the underlying socket per call. Then process as much data as you can before
going to the socket again. There are basically two scenarios. Either the
socket is not loaded at all and then there are no reason to even think
about optimization. Or the socket is loaded in which case we should
optimize for the fast path. The fast path in this case is that all data is
available for a message. If not, we can just append data to the old data

<<OldData/binary, NewData/binary>>

since in Erlang releases after OTP R12B this will use the extra capacity
allocated for binaries and avoid copying in most cases anyhow. Also, when
the socket is running slowly, a bit of extra copying like this is not going
to be a problem. Whereas if the socket processes lots of data, we can
fast­path the code. In other words, the natural implementation is
reasonably fast. Implementing this change immediately pushes messages per
second to 31k, so we are up 6000 or around 3000 RPC calls.

It also pushes the 6 million calls down to 6 * 10⁵ shaving off an order of
magnitude calls. And it shuffles the load in the system to other parts for
now. The main problem now is the small packets being pushed all the time,
which has to do with the send­path in RabbitMQ.

[
https://github.com/issuu/amqp_client/commit/d8b64beb31f0aa63df7624e240bc324a0ed3f8f8.patch
]
 [
https://github.com/issuu/amqp_client/commit/5f01fd355e82a4e6555e646d4f9fb6a521ef1911.patch
]

## Next optimizations

After these optimizations we have the following breakdown:

amqp_channels_manager 9.91% time

It uses its time for two things. First, for decoding incoming frames. And
secondly on looking up and updating a `gb_trees` structure mapping to the
channel.

amqp_main_reader 8.64% time
 amqp_main_reader 7.43% time

Most of the time is now spent decoding and analyzing frames in the two main
reader processes. They are not working on the socket and using around 20%
cpu each. Rather, they are spending the time in the frame decoder loop,
which is exactly where we want them to spend their time. Optimizing further
is definitely a great idea, but Amdahl tells us there are other culprits
where we can gain more by optimizing.

amqp_channels_manager 8.81% time
amqp_direct_consumer 4.32% time
 amqp_channel 13.11% time
rabbit_writer 14.45% time
 This is around 66.6% of all time spent for the selected processes and
their underlying modules. The remaining 33% are in the same modules, but
for processes that spend less than 2% time each. For the second round, the
major processes to look into is the channel itself and the rabbit_writer
process. Perhaps looking at the latter can tell us why packets are so
small. And looking into the former can explain why we spend so much time
handling channels.

Let us look at what the `rabbit_writer` is doing. It manages writing of
frames to the underlying socket. It can write in two modes, which are the
ubiquituous async and sync variants. The idea is that you can buffer up
writes before pushing them onto the socket, which is running in TCP_NODELAY
mode. This means that we can get low latency operation, but it also
requires us to handle buffering in our end. Otherwise the packets will get
small. Who calls rabbit writer?

 # `gg` is an alias for [git grep -n "$@"]
 ; gg rabbit_writer:
 src/amqp_channel.erl:784:              {network, none, _}  ->
rabbit_writer:send_command_sync(W, Method);
 src/amqp_channel.erl:785:              {network, _, _}     ->
rabbit_writer:send_command_sync(W, Method,
 src/amqp_network_connection.erl:56:    catch
rabbit_writer:send_command_sync(Writer, Method).

Ouch! All writes are synchronous! This means we are forcing a flush to the
underlying socket whenever we write something on it. And we are forcing a
context switch from the `amqp_channel` code to the `rabbit_writer` all the
time, even though we don't need to do so. This accounts for the small
packets we are seeing when we are writing. The response is written on the
socket and then it immediately pushes the result because of TCP_NODELAY. It
also means each write results in a write to the underlying erlang `port()`
which is quite expensive.

[
https://github.com/issuu/amqp_client/commit/42856090936e7196eab6c8e71107a479883f97a9.patch
]

RabbitMQ itself does not use the sync writes for anything but closing down
the line. So we can probably safely change to use async writes on the
socket. The change severely cuts down on the amount of calls to the
`port()` since they can now happen in larger batches.

The change does not help too much on the packet sizes however. Most packets
are still small due to TCP_NODELAY and due to how the processing is
happening on the interface. The main loop looks like this:

 mainloop1(State) ->
     receive
         Message -> ?MODULE:mainloop1(handle_message(Message, State))
     after 0 ->
             ?MODULE:mainloop1(internal_flush(State))
     end.

So if the mailbox is usually small, then we will often push out small
packets. Looking at a running system through the `observer` application
shows us that the main users of reductions in the system is currently
`amqp_channels_mgr` and our muxing client we use for the test.

## Two major problems

The main problems at this point is two­fold: We spend a lot of time
decoding and encoding packets. And we spend an extremely high amount of
time finding the correct channel endpoint for a message. The reason for the
latter is simply the fact that every time we find an appropriate channel
process, we also update the appropriate assembler state. And we run the
decoder loop in the channels manager.

The assembler state is a protocol verifier. It tracks the protocol version
and it checks the next frame matches the expected frame in question. This
is of course a nice and necessary thing to do. But keeping all channels in
the same process and managing the decoder loop like this is odd. We get no
parallel execution on the connection and we have to update a state map for
the assembler state all the time. Furthermore, by decoding first and then
passing data, we will have to send more complex messages rather than simply
a binary reference.

I would probably have chosen a design based on an ETS lookup table. And
then have kept the state in the channel process itself to maximize parallel
behaviour. In turn, we can avoid spending a lot of time updating internal
memory state in a process on the critical path which would speed up the
system by quite a lot. Also, we are paying reductions on busy channels.
This will help fairness.

Also, this means we can do away with the channels_manager process entirely.
There is no need since a decoded message can just be forwarded to the
channel directly, skipping an unneeded message pass on the critical decoder
path.

## More controversial patches to `rabbit_writer`

The writer process inside RabbitMQ is something I wanted to optimize as
well, so I tried changing two things in it. First, I avoided a
recomputation of `iolist_size/1` on every received message. And I just
dropped keeping a small 1414 byte packet size around. The rationale is
this: either we will be running at a high pace in the writer code and then
easily exceed the 1414 byte packet size. In this case, bumping it to 64
kilobytes and making few calls to the underlying port seems to be the right
thing to do. Otherwise, we will be running slowly, which means we will
eventually be able to empty the mailbox of the writer process. When this
happens, we will flush whatever is in our buffer anyway. The controversy is
the fact that if we get into a situation where the writer is kept buffering
all the time, then the 64 kilobyte window is quite large and may take time
to fill. But I guess this won't happen since writing like this will be
awfully fast and it is unlikely that you will be overwhelmed with exactly
that kind of slow-but-steady trickling of messages in practice.

[
https://github.com/issuu/rabbit_common/commit/62104657aaf2189e35eb28f93fa2033b28973dde.patch
]

While here, I don't know why we are producing an iolist() in the reverse
direction of what we need. Rather, we can just build up an iolist() tree
and then send it out over the line. This avoids a reverse on the critical
path and there is no reason to do that:

[
https://github.com/issuu/rabbit_common/commit/15c270fcb75ab758d312cf0e4d1d5864127f5244.patch
]

This patch is also slightly controversial since it may not be what you guys
want to do. Furthermore, it did not increase the send and receive rates in
a significant way in my tests.


-- 
J.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130903/f5cf6a8a/attachment.htm>


More information about the rabbitmq-discuss mailing list