[rabbitmq-discuss] EventMachine EM::Iterator being blocked with rabbitmq rbc

Paul Cowan dagda1 at scotalt.net
Fri Jan 6 07:30:31 GMT 2012


I am trying to set up RabbitMQ rpc. I want one queue to listen, and when it
receives a message I want it to reply to an anonymous queue that is
specified via the reply_to header with multiple messages.

I have the following thor task creates a queue and then uses EM:Iterator to
send a number of messages back to the queue specified with the replyt_to
routing key:

desc "start_consumer", "start the test consumer"
def start_consumer
  conf = {
    :host => "localhost",
    :user => "guest",
    :password => "guest",
    :vhost => "/",
    :logging => true,
    :port => 5672
  }

  # n = 1

  AMQP.start(conf) do |connection|

    channel = AMQP::Channel.new(connection)

    requests_queue = channel.queue("one")
    requests_queue.purge

    Signal.trap("INT") do
      connection.close do
        EM.stop{exit}
      end
    end

    channel.prefetch(1)

    requests_queue.subscribe(:ack => true) do |header, body|
      url_search = MultiJson.decode(body)

      EM::Iterator.new(0..5).each do |n, iter|
        lead = get_lead(n, (n == 5))

        puts "about to publish #{n} message is_last = #{lead.is_last}
at #{Time.now}"

        AMQP::Exchange.default.publish(
                                        MultiJson.encode(lead),
                                        :immediate => true,
                                        :routing_key => header.reply_to,
                                        :correlation_id => header.correlation_id
                                      )

        iter.next
      end
    end

    puts " [x] Awaiting RPC requests"
  end
end

The code beloow sends a message to the queue specified above and also
creates a queue that will be used to listen for the messages sent by the
EM::Iterator code. This queue's name is the routing key for the first
queues reply_to header.

def publish(urlSearch, routing_key)
  EM.run do
    corr_id = rand(10_000_000).to_s

    requests ||= Hash.new

    connection = AMQP.connect(:host => "localhost")

    callback_queue = AMQP::Channel.new(connection).queue("",
:exclusive => false)

    callback_queue.subscribe do |header, body|
      lead = safe_json_decode(body)

      puts "company = #{lead["company"]} is_last = #{lead["is_last"]}
received at #{Time.now}"

      if lead["is_last"]
        puts "in exit"
        connection.close do
          EM.stop{exit}
        end
      end
    end

    callback_queue.append_callback(:declare) do
      AMQP::Exchange.default.publish(MultiJson.encode(urlSearch),
:routing_key => routing_key, :reply_to => callback_queue.name,
:correlation_id => corr_id)
    end

    puts "initial message sent"
  end
end

The above code works as I want with one annoying exception. Something is
blocking the EM::Iterator code from being executed asynchronously. It is
only after the EM::Iterator code has completed that the messages are sent.
I want the messages to be sent asynchronously and handled by the anonymous
queue after each iteration. At the moment, it is only after the
EM::Iterator code has completed its last iteration that all the messages
are sent.

Can anyone see what I am doing wrong or suggest a different approach? I
tried EM::defer and had the same behaviour.

Thanks

Paul
Cheers

Paul Cowan

Cutting-Edge Solutions (Scotland)

http://thesoftwaresimpleton.com/
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120106/6e726c86/attachment.htm>


More information about the rabbitmq-discuss mailing list