[rabbitmq-discuss] Proposed change to shovel

Fred Dushin fred at dushin.net
Thu Apr 18 16:04:57 BST 2013


Hi,

I would like to consider a change to the shovel plugin, to allow the shovel to failover more predictably against the list of broker endpoints in the rabbitmq_shovel configuration.  Essentially, the idea is to try each endpoint in order of the declaration, and to use the first endpoint to which a connection can be made, instead of selecting an endpoint at random.

This does slightly violate the spirit of Erlang's "let it fail" philosophy, and I am not sure if the random endpoint selection in the current code is really more designed for clustered environments.  Perhaps it would make more sense to make the iterative failover strategy an "opt-in" feature, if not simply for the sake of backwards-compatibility.

I am providing a patch, but not suggesting that this actual change go into production code, as I really did the change for testing purposes.  I'm really more interested in whether others would find the patch useful, and whether there would be a way for us to fold a feature like this into the shovel.  If there is interest, I would be happy to contribute something more production-worthy, for review.

Thanks,

-Fred

Here is the patch (based off 3.0.1 tag):

# HG changeset patch
# User Fred Dushin <fred at dushin.net>
# Date 1366295416 14400
# Branch fdushin-failover
# Node ID 0436511e18908fd8d38d5f56a1a3b0e5af807206
# Parent  cdf80b9ac08a61de718314baaff13f8ca5740c4c
Added changes to make failover to additional endpoints deterministic.

diff --git a/src/rabbit_shovel_worker.erl b/src/rabbit_shovel_worker.erl
--- a/src/rabbit_shovel_worker.erl
+++ b/src/rabbit_shovel_worker.erl
@@ -53,9 +53,9 @@
   random:seed(A, B, C),
   #shovel{sources = Sources, destinations = Destinations} = Config,
   {InboundConn, InboundChan, InboundParams} =
-        make_conn_and_chan(Sources#endpoint.amqp_params),
+        make_conn_and_chan(lists:reverse(Sources#endpoint.amqp_params)),
   {OutboundConn, OutboundChan, OutboundParams} =
-        make_conn_and_chan(Destinations#endpoint.amqp_params),
+        make_conn_and_chan(lists:reverse(Destinations#endpoint.amqp_params)),

   create_resources(InboundChan,
                    Sources#endpoint.resource_declarations),
@@ -240,11 +240,27 @@
       amqp_channel:call(Chan, #'channel.flow'{active = Active}),
   ok.

-make_conn_and_chan(AmqpParams) ->
-    AmqpParam = lists:nth(random:uniform(length(AmqpParams)), AmqpParams),
+amqp_params_to_string(#amqp_params_network{host=Host, port=Port, virtual_host=VirtualHost}) ->
+    io_lib:format("Host: ~p Port: ~p VirtualHost: ~p", [Host, Port, VirtualHost]);
+amqp_params_to_string(#amqp_params_direct{virtual_host=VirtualHost}) ->
+    io_lib:format("(Direct) Virtual Host: ~p", [VirtualHost]).
+
+make_conn_and_chan([]) ->
+    throw(no_endpoints);
+make_conn_and_chan([AmqpParam|Rest]) ->
+    try
+        get_conn_and_chan(AmqpParam)
+    catch
+        Type:Reason ->
+            error_logger:error_msg("Shovel failed to connect to ~s: ~p:~p", [amqp_params_to_string(AmqpParam), Type, Reason]),
+            make_conn_and_chan(Rest)
+    end.
+
+get_conn_and_chan(AmqpParam) ->
   {ok, Conn} = amqp_connection:start(AmqpParam),
   link(Conn),
   {ok, Chan} = amqp_connection:open_channel(Conn),
+    error_logger:info_msg("Shovel connected to ~s.", [amqp_params_to_string(AmqpParam)]),
   {Conn, Chan, AmqpParam}.

create_resources(Chan, Declarations) ->

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130418/ef9dd494/attachment.htm>


More information about the rabbitmq-discuss mailing list