[rabbitmq-discuss] publishing messages causes messages to stop being collected

Dan Tenenbaum dtenenba at fhcrc.org
Wed Feb 2 21:37:54 GMT 2011


Hi,

I had developed a solution using rabbitmq and it was working fine. Then I
started another instance of one of my apps on another machine and both
instances stopped receiving messages. Here is more detail.

I have a setup with two fanout exchanges, call them e1 and e2.

I have two apps, one (call it a1) written in node-amqp and the other (a2) in
python/pika.

The workflow is:
1) a1 publishes a message to e1

2) a2 is listening to e1, receives the message , starts sending a series of
messages to e2.

3) a1 is listening to e2 and prints out the messages it receives.

All 3 steps were working, then I added another instance of a1 on another
machine and step 3 stopped working. Neither instance of a1 receives any more
messages. rabbitmqctl tells me they are piling up in the queue, and a2 logs
also tell me step 2 is still happening.

Specifically, the action that caused step 3 to fail was not simply adding
another instance of a1 but having that instance do step 1.

Here are what I think are the relevant bits of code.

In a1:
var connection = amqp.createConnection({ host: 'broker_host' });
connection.addListener('ready', function(){
  //this is what I'm calling e1:
  var from_web_exchange = connection.exchange('from_web_exchange', {type:
'fanout', autoDelete: false});
  // and e2:
  var from_worker_exchange = connection.exchange('from_worker_exchange',
{type: 'fanout', autoDelete: false});
  //hostname contains the name of the machine where this app is running.
 //there will only be one instance of this app per machine
  var queueName = hostname + "_queue";
  var fromBuildersQueue = connection.queue(queueName, {exclusive: true})
  fromBuildersQueue.bind('from_worker_exchange', '#')

    fromBuildersQueue.subscribe( {ack:true}, function(message){
      sys.puts("got message: " + message.data.toString());
   }
  ...

//later:
        var msg = "hello, world";
        from_web_exchange.publish("#", msg);


in a2:

# e1:
from_web_exchange =
channel.exchange_declare(exchange="from_web_exchange",type="fanout")
# e2:
from_worker_exchange =
channel.exchange_declare(exchange="from_worker_exchange", type='fanout')

from_web_queue = channel.queue_declare(exclusive=True)
from_web_queue_name = from_web_queue.queue

channel.queue_bind(exchange='from_web_exchange', queue=from_web_queue_name)

def callback(ch, method, properties, body):
          channel.basic_publish(exchange='from_worker_exchange',
                              routing_key="key.frombuilders",
                              body= "hello world from python")

channel.basic_consume(callback,
                      queue=from_web_queue.queue,
                      no_ack=True)

pika.asyncore_loop()

---
rabbitmqctl list_queues reports:

Listing queues ...
host1_queue 7
amq.gen-AGnTMUOdr+DdImg0jfqahA== 0
host2_queue 0
...done.


Hope someone can tell me what is going on here. a1 is a web app and the
first instance of it is on my local machine, and a2 represents me trying to
deploy this app in a production environment.

Thanks!
Dan
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110202/9d78ede3/attachment-0001.htm>


More information about the rabbitmq-discuss mailing list