I have 2 RabbitMQ Nodes. One node is on a US server and one is on a UK server. They are both connected to the same VPN.<br><br>I'm consuming the messages from the local nodes in both in the US and UK with:<br><br>AMQP.start('amqp://guest:guest@localhost:25672') do |connection, open_ok|<br>
channel = AMQP::Channel.new(connection)<br> exchange = AMQP::Exchange.new(channel, "x-federation", "xanview", :durable => true,<br> :arguments => {"upstream-set" => "my-upstreams", "type" => "topic", "durable" => "true"})<br>
channel.queue("testqueue_uk", :durable => true) do |queue| # testqueue_us in the US<br> queue.bind(exchange, :routing_key => '#')<br> queue.subscribe do |metadata, payload|<br> i+=1<br>
puts "#{i}: Recieved message: #{payload.inspect}."<br> end <br> end <br>end<br><br>I'm producing messages locally to the local nodes in both the US and UK with:<br><br>EventMachine.run do<br> AMQP.connect('amqp://guest:guest@localhost:25672') do |connection, open_ok|<br>
channel = AMQP::Channel.new(connection)<br> exchange = AMQP::Exchange.new(channel, "x-federation", "xanview", :durable => true, :arguments => {"upstream-set" => "my-upstreams", "type" => "topic", "durable" => "true"})<br>
EventMachine.add_periodic_timer(0.1) do<br> $i+=1<br> puts "#{$i}: publishing msg"<br> $exchange.publish("#{$i} this is a test message from the US", :routing_key => "some.topic", :persistent => true )<br>
end<br> end <br>end<br><br>This all seems to work reliably until I try to kill the connection: I shutdown the VPN and see messages being queued in queues called: "federation: xanview -> node2" (on the UK server) and "federation: xanview -> node1" (on the US server) as expected.<br>
<br>However when I restart the VPN, the UK server has received 43737 messages while the US only received 43708 messages. Where did 29 messages disappear?<br><br>Both node configs look like this with the only exception of the US connecting to federation on 10.8.0.22 and the UK connecting to federation on <a href="http://10.8.0.33">10.8.0.33</a>: <br>
<br>[<br> {rabbit, [<br> {tcp_listeners, [25672]}<br> ]},<br> {rabbitmq_mochiweb, [<br> {listeners, [{mgmt, [{port, 55555}]}]},<br> {default_listener, [{port, 55550}]},<br> {contexts, [{rabbit_mgmt, mgmt}]}<br>
]},<br><br>{rabbitmq_federation,<br> [ {exchanges, [[{exchange, "xanview"},<br> {virtual_host, "/"},<br> {type, "topic"},<br> {durable, true},<br>
{auto_delete, false},<br> {internal, false},<br> {upstream_set, "my-upstreams"}]<br> ]},<br> {upstream_sets, [{"my-upstreams", [[{connection, "upstream-server"}]]}]},<br>
{connections, [{"upstream-server", [{host, "10.8.0.22"},<br> {protocol, "amqp"},<br> {port, 25672},<br>
{virtual_host, "/"},<br> {username, "guest"},<br> {password, "guest"},<br>
{mechanism, default},<br> {prefetch_count, 1000},<br> {reconnect_delay, 5},<br> {ha_policy, "all"}<br>
]}<br> ]},<br> {local_username, "guest"},<br> {local_nodename, "node2"}<br> ]<br> }<br><br>].<br><br>Any ideas why messages are lost? What could be causing it?<br>
<br>Roman<br>