[rabbitmq-discuss] HA - missing or incompletely replicated queues

Matthew Sackman matthew at rabbitmq.com
Wed Nov 30 14:51:21 GMT 2011


On Wed, Nov 30, 2011 at 02:22:16PM +0000, Matthew Sackman wrote:
> 1. Make sure you set basic.qos, but don't set it to a very low number.
> Often something around 100 to 1000 often works well. Call this number N.
> 
> 2. Rather than acking every single message, instead only ack every N / 2
> (for example), and when you do ack, turn on the multiple flag.

For example:

-module(test).
-compile([export_all]).

-include_lib("amqp_client/include/amqp_client.hrl").

mirror_create_delete(Count, N, Delay) ->
    {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
    {ok, Chan} = amqp_connection:open_channel(Conn),
    #'queue.declare_ok' { queue = Q } =
        amqp_channel:call(
          Chan,
          #'queue.declare'{ queue     = mirror_queue(),
                            durable   = true,
                            exclusive = false,
                            arguments = [{<<"x-ha-policy">>, longstr, <<"all">>}]}),

    Msg = #amqp_msg { props = #'P_basic'{ delivery_mode = 1 }, payload = <<"hello">> },
    Publish = #'basic.publish'{ routing_key = Q },

    M = N div 2,

    spawn(fun () ->
                  {ok, Chan2} = amqp_connection:open_channel(Conn),

                  #'basic.qos_ok'{} =
                      amqp_channel:call(Chan2, #'basic.qos'{ prefetch_count = N }),
                  #'basic.consume_ok' { consumer_tag = Tag } =
                      amqp_channel:subscribe(Chan2, #'basic.consume' { queue = Q }, self()),
                  [begin
                       timer:sleep(Delay),
                       receive
                           {#'basic.deliver'{ delivery_tag = AckTag }, _Content} ->
                               case C rem M of
                                   0 ->
                                       amqp_channel:cast(Chan2, #'basic.ack'{ delivery_tag = AckTag,
                                                                              multiple = true });
                                   _ -> ok
                               end
                       after 30000 ->
                               amqp_channel:call(Chan2, #'basic.cancel'{ consumer_tag = Tag }),
                               amqp_channel:close(Chan2),
                               exit(normal)
                       end
                   end || C <- lists:seq(1, Count)]
          end),

    [amqp_channel:cast(Chan, Publish, Msg) || _ <- lists:seq(1, Count)],

    timer:sleep(10000),

    amqp_channel:call(Chan, #'queue.delete'{ queue = Q }),
    amqp_connection:close(Conn).



If you try running this with too small a Delay then you'll find a large
backlog builds up:

(amqp_client at hazel)2> test:mirror_create_delete(500000, 1000, 0). 

Does not perform well at all: the queue constantly reports that it's
empty because whenever it receives a message, it sends it out to the
consumer, and almost as soon as it has sent out the 1000th msg, it
gets an ack back for the first 500 msgs, so has no "spare" time to
process its backlog. Queue ingress and egress rates don't get above 1kHz
for me, and are frequently down at 100Hz.

However, if you simulate the consumer doing something a bit more complex
by increasing the delay then you get much better performance:

test:mirror_create_delete(500000, 1000, 5). 

Once the queue has sent out the first 1000 msgs to the consumer,
there'll then be a helpful gap before the first ack arrives. During this
time, the queue churns through a good chunk of its backlog. After this,
egress stays around 100Hz (not surprising - a 5ms processing time per
message would give a max of 200Hz egress), but ingress jumps up to
around 10kHz, and everything's much happier.

Even with

test:mirror_create_delete(500000, 1000, 1).

things are happier - the delay is sufficent. I see peaks of ingress at
20kHz and egress at 500Hz.


Matthew


More information about the rabbitmq-discuss mailing list