[rabbitmq-discuss] Federation and upstream cluster

Vladislav Pernin vladislav.pernin at gmail.com
Fri Dec 28 09:32:43 GMT 2012


Hi,


>> I'm running RabbitMQ 3.0.1 on two cluster of Linux servers.
>>
>> Let's name the two clusters :
>> - downstream cluster running a federation to get messages from upstream
>> cluster
>> - upstream cluster
>>
>> The documentation explains well if a node fails, links to upstream
>> exchanges will be recreated on a surviving node.
>> There is no problem for the "client" side of the federation.
>>
>> I cannot use a load balancer if fail over mode to have high avaibility
>> of the upstream cluster.
>>
>> What would be the recommended solution in this case ?
>>
>>
> I'm struggling to understand what the question is here Vladislav. The
> 'failover' that is being described in the federation plugin documentation
> is applied when using federation in a cluster, so if the node on which the
> downstream link is running dies, then another downstream node will take
> over (i.e., re-establish the links). There is a choice between clustering
> (i.e., ha/mirror queues) and federation - you do not get 'ha of the
> upstream cluster' in the same sense that mirror queues in a cluster are
> 'ha'. You have federated exchanges which copy data using AMQP (with ACKs
> enabled and some other guarantees) and the ability to try and re-establish
> links and so. Federation however, provides only the Availability and
> Partition tolerance parts of the CAP theorem, not the same Consistency
> guarantees as clustering/ha.


I did get that, no problem for the downstream side who hold the federation,
it works well.
Question is really : I have two nodes in the "remote" or upstream cluster,
I want to get messages of one exchange in a reliable way and the network
stream has to be establish by the downstream cluster ; how can I be
tolerant to failure of one remote node ? That is what I have called high
avaibility on the upstream cluster, but only regarding the transmission of
the exchange messages to the downstream cluster.


>
>
>  I have tried to set up two upstream and group them in a upstream set,
>>
>
> Can you post the configuration you're using to do that?


The configuration has been done using the HTTP API.

curl -i -w %{http_code} -k -u "XXX:XXXX" -XPUT -H
"content-type:application/json" -d '{
"pattern":"downstream-exchange","definition":{"federation-upstream-set":"upstreamset-test"}
}' https://localhost:15671/api/policies/%2f/federate-me

curl -i -w %{http_code} -k -u "XXX:XXXX" -XPUT -H
"content-type:application/json" -d '{
"name":"local-nodename","value":"federation-local"
}' https://localhost:15671/api/parameters/federation/%2f/local-nodename

curl -i -w %{http_code} -u "XXX:XXXX" -k -s -XPUT -H
"content-type:application/json" -d "{
        "value":{
            "uri":\"amqps://XXX:XXX@remote-server1
?certfile=XXXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false"
        }
    }"
https://localhost:15671/api/parameters/federation-upstream/%2f/upstream1

curl -i -w %{http_code} -u "XXX:XXXX" -k -s -XPUT -H
"content-type:application/json" -d "{
        "value":{
            "uri":\"amqps://XXX:XXX@remote-server2
?certfile=XXXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false"
        }
    }"
https://localhost:15671/api/parameters/federation-upstream/%2f/upstream2

curl -i -w %{http_code} -k -u "XXX:XXXX" -XPUT -H
"content-type:application/json" -d '{
"value":[{"upstream":"upstream1","exchange":"upstream-exchange"},{"upstream":"upstream2","exchange":"upstream-exchange"}]
}'
https://localhost:15671/api/parameters/federation-upstream-set/%2f/upstreamset-test


>
>
>  but I have the following problem :
>> - when I shut down one the node, the federation status shows the
>> matching upstream down as expected but after having restarted the first
>> one, if I shut down the other one, both the federation status shows both
>> upstream down
>>
>
> Just to confirm: you're saying that
>
> 1. you shut down one of the two upstream nodes
> 2. that node shows up dead in the web interface
> 3. you re-start that node
> 4. that node shows up alive in the web interface
> 5. you shut down the other upstream node
> 6. both nodes show up as dead in the web interface *but*
> 7. one of the upstream nodes *is* alive despite what the web admin says
>
> Have I understood that correctly?


Absolutely, you can find an extract of the downstream node log
(rabbit at XXXX.log) at the end of the mail.
That is not that easy to read but I think there might be an explanation in
the logs.

Case 2 :
web admin says :
- upstream1 : running
- upstream 2 : error (econnrefused)
Everything is OK

Case 6 :
web admin says :
- upstream1 : error (econnrefused)
- upstream 2 : shutdown (server_initiated_code,404,<<"NOT_FOUND ...
the upstream1 (remote-server1) has been shutdown, but not the upstream2
(remote-server2).


>
>
>  - so, I tried to add a ha-mode policy to all on the federated queue, it
>> is now possible to shutdown either one or the other node,
>>
>
> I'm not sure I understand this at all. Are you saying it was not possible
> to shut down one or both of the upstream nodes before? That seems different
> from your earlier comment.


My bad ! It is not explained properly. When I was saying "not possible to
shutdown", I meant shutting down the remote node and having the proper
status in the federation.


>
>
>  but it seems that I'm losing some messages.
>>
>>
> When you say 'the federated queue' do you mean the queue created in the
> upstream exchange's broker? Why would you want to add ha-mode policy that?
> The upstream queue is internal to the federation mechanism so you should be
> binding to the downstream exchange only. Or are you saying that you've
> bound a queue to the downstream exchange and made that ha-enabled? Because
> in the latter case, that will make no difference to reliability: if both
> upstream nodes go down before messages are delivered and ack'ed by the
> downstream for example.
>

Yes, "federated queue" is the queue created in the upstream exchange's
broker. So, yes, it does not really make sense to add a ha policy, that was
just an attempt in order to investigate a little bit further.
And yes I did bound a ha queue to the downstream exchange, but I agree, it
has nothing to do with the subject.


>
> I'd be interested to hear how you've set this ha-mode policy and why and
> also how you've determined that there was message loss? I suspect that you
> have assumed expectations about the reliability of federation (in the face
> of node failures) that do not hold. If your messages sat in an exchange on
> an upstream node (or pair of exchanges/nodes, etc) and both nodes die
> before successfully transmitting the messages, then they will not arrive at
> the downstream exchange. The guarantees about message delivery for
> ha/mirror queues apply to nodes in *that* cluster only. The federation
> guarantees are different and orthogonal to ha/clustering.
>

I understand, I just want to make sure that the messages will arrive at the
downstream exchange, not duplicated and without loss :
- if one upstream node dies
- if there is network failure between downstream and upstream nodes
- if upstream nodes fail and come back again

Thanks.
Vlad

*Extract of logs for case 2* :

=WARNING REPORT==== 28-Dec-2012::10:27:08 ===
Connection (<0.14803.351>) closing: received hard error {'connection.close',
                                                         320,

 <<"CONNECTION_FORCED - broker forced connection closure with reason
'shutdown'">>,
                                                         0,0} from server

=ERROR REPORT==== 28-Dec-2012::10:27:08 ===
** Generic server <0.14803.351> terminating
** Last message in was {#Ref<0.0.127.4786>,{error,closed}}
** When Server state == {state,amqp_network_connection,
                            {state,
                                {ssl_socket,#Port<0.46383>,
                                    {sslsocket,new_ssl,<0.14806.351>}},
                                600,<0.14808.351>,131072,
                                {server_initiated_close,320,
                                    <<"CONNECTION_FORCED - broker forced
connection closure with reason 'shutdown'">>},
                                false},
                            <0.14802.351>,<0.14805.351>,
                            {amqp_params_network,<<"XXXX">>,
                                <<"XXXX">>,<<"/">>,"remote-server2",5671,0,
                                0,0,infinity,
                                [{fail_if_no_peer_cert,false},
                                 {verify,verify_none},
                                 {keyfile,
                                     "XXXX"},
                                 {certfile,
                                     "XXXX"}],
                                [#Fun<amqp_uri.7.123484526>,
                                 #Fun<amqp_uri.7.123484526>],
                                [],[]},
                            0,
                            [{<<"capabilities">>,table,
                              [{<<"publisher_confirms">>,bool,true},
                               {<<"exchange_exchange_bindings">>,bool,true},
                               {<<"basic.nack">>,bool,true},
                               {<<"consumer_cancel_notify">>,bool,true}]},
                             {<<"copyright">>,longstr,
                              <<"Copyright (C) 2007-2012 VMware, Inc.">>},
                             {<<"information">>,longstr,
                              <<"Licensed under the MPL.  See
http://www.rabbitmq.com/">>},
                             {<<"platform">>,longstr,<<"Erlang/OTP">>},
                             {<<"product">>,longstr,<<"RabbitMQ">>},
                             {<<"version">>,longstr,<<"3.0.1">>}],
                            #Fun<amqp_connection_sup.0.39273983>,
                            #Fun<amqp_connection_sup.2.54430129>,
                            {closing,server_initiated_close,
                                {'connection.close',320,
                                    <<"CONNECTION_FORCED - broker forced
connection closure with reason 'shutdown'">>,
                                    0,0},
                                none}}
** Reason for termination ==
** socket_closed_unexpectedly

=INFO REPORT==== 28-Dec-2012::10:27:08 ===
Federation exchange 'downstream-exchange' in vhost '/' disconnected from
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server2
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{upstream_channel_down,
    {connection_closing,
        {server_initiated_close,320,
            <<"CONNECTION_FORCED - broker forced connection closure with
reason 'shutdown'">>}}}

=WARNING REPORT==== 28-Dec-2012::10:27:08 ===
Federation exchange 'downstream-exchange' in vhost '/' did not connect to
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server2
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{error,econnrefused}

==> rabbit at XXXX-sasl.log <==

=CRASH REPORT==== 28-Dec-2012::10:27:08 ===
  crasher:
    initial call: amqp_gen_connection:init/1
    pid: <0.14803.351>
    registered_name: []
    exception exit: socket_closed_unexpectedly
      in function  gen_server:terminate/6 (gen_server.erl, line 747)
    ancestors: [<0.14802.351>,amqp_sup,<0.49.0>]
    messages: []
    links: [<0.14802.351>]
    dictionary: []
    trap_exit: true
    status: running
    heap_size: 2584
    stack_size: 24
    reductions: 1786
  neighbours:

=SUPERVISOR REPORT==== 28-Dec-2012::10:27:08 ===
     Supervisor: {<0.14802.351>,amqp_connection_sup}
     Context:    child_terminated
     Reason:     socket_closed_unexpectedly
     Offender:   [{pid,<0.14803.351>},
                  {name,connection},
                  {mfa,
                      {amqp_gen_connection,start_link,
                          [amqp_network_connection,
                           {amqp_params_network,<<"XXXX">>,
                               <<"XXXX">>,<<"/">>,"remote-server2",5671,0,0,
                               0,infinity,
                               [{fail_if_no_peer_cert,false},
                                {verify,verify_none},
                                {keyfile,
                                    "XXXX"},
                                {certfile,
                                    "XXXX"}],
                               [#Fun<amqp_uri.7.123484526>,
                                #Fun<amqp_uri.7.123484526>],
                               [],[]},
                           #Fun<amqp_connection_sup.0.39273983>,
                           #Fun<amqp_connection_sup.2.54430129>,[]]}},
                  {restart_type,intrinsic},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 28-Dec-2012::10:27:08 ===
     Supervisor: {<0.14802.351>,amqp_connection_sup}
     Context:    shutdown
     Reason:     reached_max_restart_intensity
     Offender:   [{pid,<0.14803.351>},
                  {name,connection},
                  {mfa,
                      {amqp_gen_connection,start_link,
                          [amqp_network_connection,
                           {amqp_params_network,<<"XXXX">>,
                               <<"XXXX">>,<<"/">>,"remote-server2",5671,0,0,
                               0,infinity,
                               [{fail_if_no_peer_cert,false},
                                {verify,verify_none},
                                {keyfile,
                                    "XXXX"},
                                {certfile,
                                    "XXXX"}],
                               [#Fun<amqp_uri.7.123484526>,
                                #Fun<amqp_uri.7.123484526>],
                               [],[]},
                           #Fun<amqp_connection_sup.0.39273983>,
                           #Fun<amqp_connection_sup.2.54430129>,[]]}},
                  {restart_type,intrinsic},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


==> rabbit at XXXX.log <==

=WARNING REPORT==== 28-Dec-2012::10:27:10 ===
Federation exchange 'downstream-exchange' in vhost '/' did not connect to
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server2
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{error,econnrefused}


*Extract of logs for case 6* :

=WARNING REPORT==== 28-Dec-2012::10:16:42 ===
Connection (<0.32151.341>) closing: received hard error {'connection.close',
                                                         320,

 <<"CONNECTION_FORCED - broker forced connection closure with reason
'shutdown'">>,
                                                         0,0} from server

=ERROR REPORT==== 28-Dec-2012::10:16:42 ===
** Generic server <0.32151.341> terminating
** Last message in was {#Ref<0.0.123.217100>,{error,closed}}
** When Server state == {state,amqp_network_connection,
                            {state,
                                {ssl_socket,#Port<0.45661>,
                                    {sslsocket,new_ssl,<0.32161.341>}},
                                600,<0.32170.341>,131072,
                                {server_initiated_close,320,
                                    <<"CONNECTION_FORCED - broker forced
connection closure with reason 'shutdown'">>},
                                false},
                            <0.32149.341>,<0.32154.341>,
                            {amqp_params_network,<<"XXXX">>,
                                <<"XXXX">>,<<"/">>,"remote-server1",5671,0,
                                0,0,infinity,
                                [{fail_if_no_peer_cert,false},
                                 {verify,verify_none},
                                 {keyfile,
                                     "XXXX"},
                                 {certfile,
                                     "XXXX"}],
                                [#Fun<amqp_uri.7.123484526>,
                                 #Fun<amqp_uri.7.123484526>],
                                [],[]},
                            0,
                            [{<<"capabilities">>,table,
                              [{<<"publisher_confirms">>,bool,true},
                               {<<"exchange_exchange_bindings">>,bool,true},
                               {<<"basic.nack">>,bool,true},
                               {<<"consumer_cancel_notify">>,bool,true}]},
                             {<<"copyright">>,longstr,
                              <<"Copyright (C) 2007-2012 VMware, Inc.">>},
                             {<<"information">>,longstr,
                              <<"Licensed under the MPL.  See
http://www.rabbitmq.com/">>},
                             {<<"platform">>,longstr,<<"Erlang/OTP">>},
                             {<<"product">>,longstr,<<"RabbitMQ">>},
                             {<<"version">>,longstr,<<"3.0.1">>}],
                            #Fun<amqp_connection_sup.0.39273983>,
                            #Fun<amqp_connection_sup.2.54430129>,
                            {closing,server_initiated_close,
                                {'connection.close',320,
                                    <<"CONNECTION_FORCED - broker forced
connection closure with reason 'shutdown'">>,
                                    0,0},
                                none}}
** Reason for termination ==
** socket_closed_unexpectedly

=ERROR REPORT==== 28-Dec-2012::10:16:42 ===
** Generic server <0.32126.341> terminating
** Last message in was {'DOWN',#Ref<0.0.123.217187>,process,<0.32193.341>,
                               shutdown}
** When Server state == {state,
                         {upstream,
                          {amqp_params_network,<<"XXXX">>,
                           <<"XXXX">>,<<"/">>,"remote-server1",undefined,0,
                           0,0,infinity,
                           [{fail_if_no_peer_cert,false},
                            {verify,verify_none},
                            {keyfile,
                             "XXXX"},
                            {certfile,
                             "XXXX"}],
                           [#Fun<amqp_uri.7.123484526>,
                            #Fun<amqp_uri.7.123484526>],
                           [],[]},
                          <<"amqps://XXXX:XXXX@remote-server1
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false">>,
                          {exchange,

 {resource,<<"/">>,exchange,<<"upstream-exchange">>},
                           direct,true,false,false,[],undefined,
                           [{vhost,<<"/">>},
                            {name,<<"federate-me">>},
                            {pattern,<<"downstream-exchange">>},
                            {definition,
                             [{<<"federation-upstream-set">>,
                               <<"upstreamset-test">>}]},
                            {priority,0}]},
                          1000,1,1,none,none,false,none,<<"upstream1">>},
                         <0.32151.341>,<0.32193.341>,
                         <<"amq.ctag-iCWCgBnLBU7S3cWTi06V1A">>,
                         <<"federation: upstream-exchange ->
federation-local:downstream-exchange">>,
                         <<"federation: upstream-exchange ->
federation-local:downstream-exchange B">>,
                         {0,nil},
                         1,
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                            [[{<<"test">>,[]}|
                              {set,1,16,16,8,80,48,

 {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
                                []},
                               {{[],[],[],[],
                                 [{resource,<<"/">>,queue,
                                   <<"downstream-queue">>}],
                                 [],[],[],[],[],[],[],[],[],[],[]}}}]]}}},
                         <0.32129.341>,<0.32141.341>,

 {resource,<<"/">>,exchange,<<"downstream-exchange">>},
                         {0,nil}}
** Reason for termination ==
** {upstream_channel_down,shutdown}

=INFO REPORT==== 28-Dec-2012::10:16:42 ===
Federation exchange 'downstream-exchange' in vhost '/' received
'basic.cancel'

=WARNING REPORT==== 28-Dec-2012::10:16:42 ===
Federation exchange 'downstream-exchange' in vhost '/' did not connect to
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server1
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{error,econnrefused}

==> rabbit at XXXX-sasl.log <==

=CRASH REPORT==== 28-Dec-2012::10:16:42 ===
  crasher:
    initial call: amqp_gen_connection:init/1
    pid: <0.32151.341>
    registered_name: []
    exception exit: socket_closed_unexpectedly
      in function  gen_server:terminate/6 (gen_server.erl, line 747)
    ancestors: [<0.32149.341>,amqp_sup,<0.49.0>]
    messages: [socket_closed]
    links: [<0.32149.341>]
    dictionary: []
    trap_exit: true
    status: running
    heap_size: 2584
    stack_size: 24
    reductions: 1794
  neighbours:

=SUPERVISOR REPORT==== 28-Dec-2012::10:16:42 ===
     Supervisor: {<0.32149.341>,amqp_connection_sup}
     Context:    child_terminated
     Reason:     socket_closed_unexpectedly
     Offender:   [{pid,<0.32151.341>},
                  {name,connection},
                  {mfa,
                      {amqp_gen_connection,start_link,
                          [amqp_network_connection,
                           {amqp_params_network,<<"XXXX">>,
                               <<"XXXX">>,<<"/">>,"remote-server1",5671,0,0,
                               0,infinity,
                               [{fail_if_no_peer_cert,false},
                                {verify,verify_none},
                                {keyfile,
                                    "XXXX"},
                                {certfile,
                                    "XXXX"}],
                               [#Fun<amqp_uri.7.123484526>,
                                #Fun<amqp_uri.7.123484526>],
                               [],[]},
                           #Fun<amqp_connection_sup.0.39273983>,
                           #Fun<amqp_connection_sup.2.54430129>,[]]}},
                  {restart_type,intrinsic},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 28-Dec-2012::10:16:42 ===
     Supervisor: {<0.32149.341>,amqp_connection_sup}
     Context:    shutdown
     Reason:     reached_max_restart_intensity
     Offender:   [{pid,<0.32151.341>},
                  {name,connection},
                  {mfa,
                      {amqp_gen_connection,start_link,
                          [amqp_network_connection,
                           {amqp_params_network,<<"XXXX">>,
                               <<"XXXX">>,<<"/">>,"remote-server1",5671,0,0,
                               0,infinity,
                               [{fail_if_no_peer_cert,false},
                                {verify,verify_none},
                                {keyfile,
                                    "XXXX"},
                                {certfile,
                                    "XXXX"}],
                               [#Fun<amqp_uri.7.123484526>,
                                #Fun<amqp_uri.7.123484526>],
                               [],[]},
                           #Fun<amqp_connection_sup.0.39273983>,
                           #Fun<amqp_connection_sup.2.54430129>,[]]}},
                  {restart_type,intrinsic},
                  {shutdown,brutal_kill},
                  {child_type,worker}]


=CRASH REPORT==== 28-Dec-2012::10:16:42 ===
  crasher:
    initial call: gen:init_it/6
    pid: <0.32126.341>
    registered_name: []
    exception exit: {upstream_channel_down,shutdown}
      in function  gen_server2:terminate/3
    ancestors: [<0.32125.341>,<0.218.0>,rabbit_federation_link_sup_sup,
                  rabbit_federation_sup,rabbit_sup,<0.165.0>]
    messages: [{'DOWN',#Ref<0.0.123.217038>,process,<0.32141.341>,normal}]
    links: [<0.32125.341>]
    dictionary: []
    trap_exit: true
    status: running
    heap_size: 1597
    stack_size: 24
    reductions: 2291192
  neighbours:

==> rabbit at XXXX.log <==

=WARNING REPORT==== 28-Dec-2012::10:16:44 ===
Federation exchange 'downstream-exchange' in vhost '/' did not connect to
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server1
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{error,econnrefused}

=WARNING REPORT==== 28-Dec-2012::10:16:46 ===
Federation exchange 'downstream-exchange' in vhost '/' did not connect to
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server2
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{{shutdown,{server_initiated_close,404,
                                   <<"NOT_FOUND - home node
'rabbit at remote-server1' of durable queue 'federation: upstream-exchange ->
federation-local:downstream-exchange' in vhost '/' is down or
inaccessible">>}},
 {gen_server,call,
             [<0.7847.351>,
              {call,{'queue.declare',0,
                                     <<"federation: upstream-exchange ->
federation-local:downstream-exchange">>,
                                     false,true,false,false,false,[]},
                    none,<0.7815.351>},
              infinity]}}

=WARNING REPORT==== 28-Dec-2012::10:16:48 ===
Federation exchange 'downstream-exchange' in vhost '/' did not connect to
exchange 'upstream-exchange' in vhost '/' on
amqps://XXXX:XXXX@remote-server1
?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
{error,econnrefused}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20121228/e7f20c49/attachment.htm>


More information about the rabbitmq-discuss mailing list