[rabbitmq-discuss] Consumer connection closes abruptly after one message

p2w tpletch001 at gmail.com
Wed Nov 16 20:53:45 GMT 2011


OK, I think I have some more concrete behavior to work off of.

First of all some of the earlier issues were due to orphaned
connections on the server. I'm not sure how that happens and why those
idle connections were not killed of by the broker, but i'll sort that
out later.

Once those connections and channels were deleted i was able to do the
following with 100% reproducibility: 1) publish a message to the
server, 2) start the server and consume the message, pass it to the
Resque work queue, do some stuff there. 3) bounce back and forth
between doing things in the app gui and publishing/consuming messages
on the rabbitmq server.

Now here's where it gets interesting. if i then stop all activities
the connection is closed after a couple minutes and is not reconnected
by the recovery code in the initializer.  I am unable to tell whether
the connection is severed at the broker or the client side. i do see
in the doc where there is a timeout setting in some of the code
samples but i am not seeing anything in the doc on how it works or
what the recommended params are.

for reference here is the initializer code as it sits right now:

------------------------------------------------------------------------------------------------------------------
connect_params = YAML::load_file(File.join(Rails.root, "config",
"amqp.yml"))[Rails.env]

EventMachine.next_tick {
  AMQP.connect(connect_params['uri']) do |connection, open_ok|
    puts "Connected to MQ Server to Retreive API Submissions..."
    AMQP::Channel.new(connection, :auto_recovery => true) do |channel,
open_ok|
      puts "Channel ##{channel.id} is open..."
      exchange = channel.direct('inbound', :durable => true)
      channel.on_error do |ch, close|
        puts "Handling channel-level error"
        connection.close
      end
      channel.queue('dropbox', :durable => true) do |queue,
declare_ok|
        queue.bind(exchange).subscribe do |metadata, payload|
          message_h = metadata.headers
          message_p = payload
          puts message_h
          puts message_p
          Resque.enqueue(ApiIntake, message_h, message_p)
        end
      end
   end
   connection.on_tcp_connection_loss do |connection, settings|
     # reconnect in 2 seconds, with enforcement
     connection.reconnect(true, 2)
   end
end
}
----------------------------------------------------------------------------

So I guess the next question is how is inactivity handled at the amqp
client side?


On Nov 16, 1:44 pm, Michael Klishin <michael.s.klis... at gmail.com>
wrote:
> Le 16/nov/2011 à 23:29, p2w a écrit :
>
> > Cool.
>
> > Hey I just noticed something in the new code. You are doing an
> > AMQP.start inside the EventMachine.next_tick. In my original code I am
> > doing a connect because the doc indicates that Thin already starts the
> > EM reactor.
>
> > Could that be an issue?
>
> I doubt it. EventMachine is smart enough to detect that the reactor is running. In 1.0 it should
> be smart enough to also detect when forking has killed it.
>
> MK
>
> http://github.com/michaelklishinhttp://twitter.com/michaelklishin
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


More information about the rabbitmq-discuss mailing list