[rabbitmq-discuss] Federation and upstream cluster
Tim Watson
tim at rabbitmq.com
Fri Dec 28 11:04:59 GMT 2012
Vladislav,
Is the upstream exchange you're federating durable? If not, it's going to be deleted when you shut the broker down, which would explain the NOT_FOUND error.
Cheers,
Tim
On 28 Dec 2012, at 09:32, Vladislav Pernin wrote:
> Hi,
>
>
> I'm running RabbitMQ 3.0.1 on two cluster of Linux servers.
>
> Let's name the two clusters :
> - downstream cluster running a federation to get messages from upstream
> cluster
> - upstream cluster
>
> The documentation explains well if a node fails, links to upstream
> exchanges will be recreated on a surviving node.
> There is no problem for the "client" side of the federation.
>
> I cannot use a load balancer if fail over mode to have high avaibility
> of the upstream cluster.
>
> What would be the recommended solution in this case ?
>
>
> I'm struggling to understand what the question is here Vladislav. The 'failover' that is being described in the federation plugin documentation is applied when using federation in a cluster, so if the node on which the downstream link is running dies, then another downstream node will take over (i.e., re-establish the links). There is a choice between clustering (i.e., ha/mirror queues) and federation - you do not get 'ha of the upstream cluster' in the same sense that mirror queues in a cluster are 'ha'. You have federated exchanges which copy data using AMQP (with ACKs enabled and some other guarantees) and the ability to try and re-establish links and so. Federation however, provides only the Availability and Partition tolerance parts of the CAP theorem, not the same Consistency guarantees as clustering/ha.
>
> I did get that, no problem for the downstream side who hold the federation, it works well.
> Question is really : I have two nodes in the "remote" or upstream cluster, I want to get messages of one exchange in a reliable way and the network stream has to be establish by the downstream cluster ; how can I be tolerant to failure of one remote node ? That is what I have called high avaibility on the upstream cluster, but only regarding the transmission of the exchange messages to the downstream cluster.
>
>
>
> I have tried to set up two upstream and group them in a upstream set,
>
> Can you post the configuration you're using to do that?
>
> The configuration has been done using the HTTP API.
>
> curl -i -w %{http_code} -k -u "XXX:XXXX" -XPUT -H "content-type:application/json" -d '{
> "pattern":"downstream-exchange","definition":{"federation-upstream-set":"upstreamset-test"}
> }' https://localhost:15671/api/policies/%2f/federate-me
>
> curl -i -w %{http_code} -k -u "XXX:XXXX" -XPUT -H "content-type:application/json" -d '{
> "name":"local-nodename","value":"federation-local"
> }' https://localhost:15671/api/parameters/federation/%2f/local-nodename
>
> curl -i -w %{http_code} -u "XXX:XXXX" -k -s -XPUT -H "content-type:application/json" -d "{
> "value":{
> "uri":\"amqps://XXX:XXX@remote-server1?certfile=XXXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false"
> }
> }" https://localhost:15671/api/parameters/federation-upstream/%2f/upstream1
>
> curl -i -w %{http_code} -u "XXX:XXXX" -k -s -XPUT -H "content-type:application/json" -d "{
> "value":{
> "uri":\"amqps://XXX:XXX@remote-server2?certfile=XXXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false"
> }
> }" https://localhost:15671/api/parameters/federation-upstream/%2f/upstream2
>
> curl -i -w %{http_code} -k -u "XXX:XXXX" -XPUT -H "content-type:application/json" -d '{
> "value":[{"upstream":"upstream1","exchange":"upstream-exchange"},{"upstream":"upstream2","exchange":"upstream-exchange"}]
> }' https://localhost:15671/api/parameters/federation-upstream-set/%2f/upstreamset-test
>
>
>
> but I have the following problem :
> - when I shut down one the node, the federation status shows the
> matching upstream down as expected but after having restarted the first
> one, if I shut down the other one, both the federation status shows both
> upstream down
>
> Just to confirm: you're saying that
>
> 1. you shut down one of the two upstream nodes
> 2. that node shows up dead in the web interface
> 3. you re-start that node
> 4. that node shows up alive in the web interface
> 5. you shut down the other upstream node
> 6. both nodes show up as dead in the web interface *but*
> 7. one of the upstream nodes *is* alive despite what the web admin says
>
> Have I understood that correctly?
>
> Absolutely, you can find an extract of the downstream node log (rabbit at XXXX.log) at the end of the mail.
> That is not that easy to read but I think there might be an explanation in the logs.
>
> Case 2 :
> web admin says :
> - upstream1 : running
> - upstream 2 : error (econnrefused)
> Everything is OK
>
> Case 6 :
> web admin says :
> - upstream1 : error (econnrefused)
> - upstream 2 : shutdown (server_initiated_code,404,<<"NOT_FOUND ...
> the upstream1 (remote-server1) has been shutdown, but not the upstream2 (remote-server2).
>
>
>
> - so, I tried to add a ha-mode policy to all on the federated queue, it
> is now possible to shutdown either one or the other node,
>
> I'm not sure I understand this at all. Are you saying it was not possible to shut down one or both of the upstream nodes before? That seems different from your earlier comment.
>
> My bad ! It is not explained properly. When I was saying "not possible to shutdown", I meant shutting down the remote node and having the proper status in the federation.
>
>
>
> but it seems that I'm losing some messages.
>
>
> When you say 'the federated queue' do you mean the queue created in the upstream exchange's broker? Why would you want to add ha-mode policy that? The upstream queue is internal to the federation mechanism so you should be binding to the downstream exchange only. Or are you saying that you've bound a queue to the downstream exchange and made that ha-enabled? Because in the latter case, that will make no difference to reliability: if both upstream nodes go down before messages are delivered and ack'ed by the downstream for example.
>
> Yes, "federated queue" is the queue created in the upstream exchange's broker. So, yes, it does not really make sense to add a ha policy, that was just an attempt in order to investigate a little bit further.
> And yes I did bound a ha queue to the downstream exchange, but I agree, it has nothing to do with the subject.
>
>
> I'd be interested to hear how you've set this ha-mode policy and why and also how you've determined that there was message loss? I suspect that you have assumed expectations about the reliability of federation (in the face of node failures) that do not hold. If your messages sat in an exchange on an upstream node (or pair of exchanges/nodes, etc) and both nodes die before successfully transmitting the messages, then they will not arrive at the downstream exchange. The guarantees about message delivery for ha/mirror queues apply to nodes in *that* cluster only. The federation guarantees are different and orthogonal to ha/clustering.
>
> I understand, I just want to make sure that the messages will arrive at the downstream exchange, not duplicated and without loss :
> - if one upstream node dies
> - if there is network failure between downstream and upstream nodes
> - if upstream nodes fail and come back again
>
> Thanks.
> Vlad
>
> Extract of logs for case 2 :
>
> =WARNING REPORT==== 28-Dec-2012::10:27:08 ===
> Connection (<0.14803.351>) closing: received hard error {'connection.close',
> 320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>,
> 0,0} from server
>
> =ERROR REPORT==== 28-Dec-2012::10:27:08 ===
> ** Generic server <0.14803.351> terminating
> ** Last message in was {#Ref<0.0.127.4786>,{error,closed}}
> ** When Server state == {state,amqp_network_connection,
> {state,
> {ssl_socket,#Port<0.46383>,
> {sslsocket,new_ssl,<0.14806.351>}},
> 600,<0.14808.351>,131072,
> {server_initiated_close,320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>},
> false},
> <0.14802.351>,<0.14805.351>,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server2",5671,0,
> 0,0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> 0,
> [{<<"capabilities">>,table,
> [{<<"publisher_confirms">>,bool,true},
> {<<"exchange_exchange_bindings">>,bool,true},
> {<<"basic.nack">>,bool,true},
> {<<"consumer_cancel_notify">>,bool,true}]},
> {<<"copyright">>,longstr,
> <<"Copyright (C) 2007-2012 VMware, Inc.">>},
> {<<"information">>,longstr,
> <<"Licensed under the MPL. See http://www.rabbitmq.com/">>},
> {<<"platform">>,longstr,<<"Erlang/OTP">>},
> {<<"product">>,longstr,<<"RabbitMQ">>},
> {<<"version">>,longstr,<<"3.0.1">>}],
> #Fun<amqp_connection_sup.0.39273983>,
> #Fun<amqp_connection_sup.2.54430129>,
> {closing,server_initiated_close,
> {'connection.close',320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>,
> 0,0},
> none}}
> ** Reason for termination ==
> ** socket_closed_unexpectedly
>
> =INFO REPORT==== 28-Dec-2012::10:27:08 ===
> Federation exchange 'downstream-exchange' in vhost '/' disconnected from exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server2?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {upstream_channel_down,
> {connection_closing,
> {server_initiated_close,320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>}}}
>
> =WARNING REPORT==== 28-Dec-2012::10:27:08 ===
> Federation exchange 'downstream-exchange' in vhost '/' did not connect to exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server2?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {error,econnrefused}
>
> ==> rabbit at XXXX-sasl.log <==
>
> =CRASH REPORT==== 28-Dec-2012::10:27:08 ===
> crasher:
> initial call: amqp_gen_connection:init/1
> pid: <0.14803.351>
> registered_name: []
> exception exit: socket_closed_unexpectedly
> in function gen_server:terminate/6 (gen_server.erl, line 747)
> ancestors: [<0.14802.351>,amqp_sup,<0.49.0>]
> messages: []
> links: [<0.14802.351>]
> dictionary: []
> trap_exit: true
> status: running
> heap_size: 2584
> stack_size: 24
> reductions: 1786
> neighbours:
>
> =SUPERVISOR REPORT==== 28-Dec-2012::10:27:08 ===
> Supervisor: {<0.14802.351>,amqp_connection_sup}
> Context: child_terminated
> Reason: socket_closed_unexpectedly
> Offender: [{pid,<0.14803.351>},
> {name,connection},
> {mfa,
> {amqp_gen_connection,start_link,
> [amqp_network_connection,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server2",5671,0,0,
> 0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> #Fun<amqp_connection_sup.0.39273983>,
> #Fun<amqp_connection_sup.2.54430129>,[]]}},
> {restart_type,intrinsic},
> {shutdown,brutal_kill},
> {child_type,worker}]
>
>
> =SUPERVISOR REPORT==== 28-Dec-2012::10:27:08 ===
> Supervisor: {<0.14802.351>,amqp_connection_sup}
> Context: shutdown
> Reason: reached_max_restart_intensity
> Offender: [{pid,<0.14803.351>},
> {name,connection},
> {mfa,
> {amqp_gen_connection,start_link,
> [amqp_network_connection,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server2",5671,0,0,
> 0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> #Fun<amqp_connection_sup.0.39273983>,
> #Fun<amqp_connection_sup.2.54430129>,[]]}},
> {restart_type,intrinsic},
> {shutdown,brutal_kill},
> {child_type,worker}]
>
>
> ==> rabbit at XXXX.log <==
>
> =WARNING REPORT==== 28-Dec-2012::10:27:10 ===
> Federation exchange 'downstream-exchange' in vhost '/' did not connect to exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server2?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {error,econnrefused}
>
>
> Extract of logs for case 6 :
>
> =WARNING REPORT==== 28-Dec-2012::10:16:42 ===
> Connection (<0.32151.341>) closing: received hard error {'connection.close',
> 320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>,
> 0,0} from server
>
> =ERROR REPORT==== 28-Dec-2012::10:16:42 ===
> ** Generic server <0.32151.341> terminating
> ** Last message in was {#Ref<0.0.123.217100>,{error,closed}}
> ** When Server state == {state,amqp_network_connection,
> {state,
> {ssl_socket,#Port<0.45661>,
> {sslsocket,new_ssl,<0.32161.341>}},
> 600,<0.32170.341>,131072,
> {server_initiated_close,320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>},
> false},
> <0.32149.341>,<0.32154.341>,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server1",5671,0,
> 0,0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> 0,
> [{<<"capabilities">>,table,
> [{<<"publisher_confirms">>,bool,true},
> {<<"exchange_exchange_bindings">>,bool,true},
> {<<"basic.nack">>,bool,true},
> {<<"consumer_cancel_notify">>,bool,true}]},
> {<<"copyright">>,longstr,
> <<"Copyright (C) 2007-2012 VMware, Inc.">>},
> {<<"information">>,longstr,
> <<"Licensed under the MPL. See http://www.rabbitmq.com/">>},
> {<<"platform">>,longstr,<<"Erlang/OTP">>},
> {<<"product">>,longstr,<<"RabbitMQ">>},
> {<<"version">>,longstr,<<"3.0.1">>}],
> #Fun<amqp_connection_sup.0.39273983>,
> #Fun<amqp_connection_sup.2.54430129>,
> {closing,server_initiated_close,
> {'connection.close',320,
> <<"CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'">>,
> 0,0},
> none}}
> ** Reason for termination ==
> ** socket_closed_unexpectedly
>
> =ERROR REPORT==== 28-Dec-2012::10:16:42 ===
> ** Generic server <0.32126.341> terminating
> ** Last message in was {'DOWN',#Ref<0.0.123.217187>,process,<0.32193.341>,
> shutdown}
> ** When Server state == {state,
> {upstream,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server1",undefined,0,
> 0,0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> <<"amqps://XXXX:XXXX@remote-server1?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false">>,
> {exchange,
> {resource,<<"/">>,exchange,<<"upstream-exchange">>},
> direct,true,false,false,[],undefined,
> [{vhost,<<"/">>},
> {name,<<"federate-me">>},
> {pattern,<<"downstream-exchange">>},
> {definition,
> [{<<"federation-upstream-set">>,
> <<"upstreamset-test">>}]},
> {priority,0}]},
> 1000,1,1,none,none,false,none,<<"upstream1">>},
> <0.32151.341>,<0.32193.341>,
> <<"amq.ctag-iCWCgBnLBU7S3cWTi06V1A">>,
> <<"federation: upstream-exchange -> federation-local:downstream-exchange">>,
> <<"federation: upstream-exchange -> federation-local:downstream-exchange B">>,
> {0,nil},
> 1,
> {dict,1,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [[{<<"test">>,[]}|
> {set,1,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> []},
> {{[],[],[],[],
> [{resource,<<"/">>,queue,
> <<"downstream-queue">>}],
> [],[],[],[],[],[],[],[],[],[],[]}}}]]}}},
> <0.32129.341>,<0.32141.341>,
> {resource,<<"/">>,exchange,<<"downstream-exchange">>},
> {0,nil}}
> ** Reason for termination ==
> ** {upstream_channel_down,shutdown}
>
> =INFO REPORT==== 28-Dec-2012::10:16:42 ===
> Federation exchange 'downstream-exchange' in vhost '/' received 'basic.cancel'
>
> =WARNING REPORT==== 28-Dec-2012::10:16:42 ===
> Federation exchange 'downstream-exchange' in vhost '/' did not connect to exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server1?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {error,econnrefused}
>
> ==> rabbit at XXXX-sasl.log <==
>
> =CRASH REPORT==== 28-Dec-2012::10:16:42 ===
> crasher:
> initial call: amqp_gen_connection:init/1
> pid: <0.32151.341>
> registered_name: []
> exception exit: socket_closed_unexpectedly
> in function gen_server:terminate/6 (gen_server.erl, line 747)
> ancestors: [<0.32149.341>,amqp_sup,<0.49.0>]
> messages: [socket_closed]
> links: [<0.32149.341>]
> dictionary: []
> trap_exit: true
> status: running
> heap_size: 2584
> stack_size: 24
> reductions: 1794
> neighbours:
>
> =SUPERVISOR REPORT==== 28-Dec-2012::10:16:42 ===
> Supervisor: {<0.32149.341>,amqp_connection_sup}
> Context: child_terminated
> Reason: socket_closed_unexpectedly
> Offender: [{pid,<0.32151.341>},
> {name,connection},
> {mfa,
> {amqp_gen_connection,start_link,
> [amqp_network_connection,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server1",5671,0,0,
> 0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> #Fun<amqp_connection_sup.0.39273983>,
> #Fun<amqp_connection_sup.2.54430129>,[]]}},
> {restart_type,intrinsic},
> {shutdown,brutal_kill},
> {child_type,worker}]
>
>
> =SUPERVISOR REPORT==== 28-Dec-2012::10:16:42 ===
> Supervisor: {<0.32149.341>,amqp_connection_sup}
> Context: shutdown
> Reason: reached_max_restart_intensity
> Offender: [{pid,<0.32151.341>},
> {name,connection},
> {mfa,
> {amqp_gen_connection,start_link,
> [amqp_network_connection,
> {amqp_params_network,<<"XXXX">>,
> <<"XXXX">>,<<"/">>,"remote-server1",5671,0,0,
> 0,infinity,
> [{fail_if_no_peer_cert,false},
> {verify,verify_none},
> {keyfile,
> "XXXX"},
> {certfile,
> "XXXX"}],
> [#Fun<amqp_uri.7.123484526>,
> #Fun<amqp_uri.7.123484526>],
> [],[]},
> #Fun<amqp_connection_sup.0.39273983>,
> #Fun<amqp_connection_sup.2.54430129>,[]]}},
> {restart_type,intrinsic},
> {shutdown,brutal_kill},
> {child_type,worker}]
>
>
> =CRASH REPORT==== 28-Dec-2012::10:16:42 ===
> crasher:
> initial call: gen:init_it/6
> pid: <0.32126.341>
> registered_name: []
> exception exit: {upstream_channel_down,shutdown}
> in function gen_server2:terminate/3
> ancestors: [<0.32125.341>,<0.218.0>,rabbit_federation_link_sup_sup,
> rabbit_federation_sup,rabbit_sup,<0.165.0>]
> messages: [{'DOWN',#Ref<0.0.123.217038>,process,<0.32141.341>,normal}]
> links: [<0.32125.341>]
> dictionary: []
> trap_exit: true
> status: running
> heap_size: 1597
> stack_size: 24
> reductions: 2291192
> neighbours:
>
> ==> rabbit at XXXX.log <==
>
> =WARNING REPORT==== 28-Dec-2012::10:16:44 ===
> Federation exchange 'downstream-exchange' in vhost '/' did not connect to exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server1?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {error,econnrefused}
>
> =WARNING REPORT==== 28-Dec-2012::10:16:46 ===
> Federation exchange 'downstream-exchange' in vhost '/' did not connect to exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server2?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {{shutdown,{server_initiated_close,404,
> <<"NOT_FOUND - home node 'rabbit at remote-server1' of durable queue 'federation: upstream-exchange -> federation-local:downstream-exchange' in vhost '/' is down or inaccessible">>}},
> {gen_server,call,
> [<0.7847.351>,
> {call,{'queue.declare',0,
> <<"federation: upstream-exchange -> federation-local:downstream-exchange">>,
> false,true,false,false,false,[]},
> none,<0.7815.351>},
> infinity]}}
>
> =WARNING REPORT==== 28-Dec-2012::10:16:48 ===
> Federation exchange 'downstream-exchange' in vhost '/' did not connect to exchange 'upstream-exchange' in vhost '/' on amqps://XXXX:XXXX@remote-server1?certfile=XXXX&keyfile=XXXX&verify=verify_none&fail_if_no_peer_cert=false
> {error,econnrefused}
>
More information about the rabbitmq-discuss
mailing list