[rabbitmq-discuss] Making sure a message gets to every consumer

Dan Tenenbaum dtenenba at fhcrc.org
Tue Jan 25 18:22:44 GMT 2011


On Mon, Jan 24, 2011 at 3:17 PM, Matthias Radestock
<matthias at rabbitmq.com>wrote:

> Sam Duncan wrote:
>
>> If you use a topic exchange and create a queue for each consumer, you get
>> a copy of each message in each queue filtered by your queue bindings.
>>
>
I tried this, but it still didn't seem to work.


>
> A direct exchange would probably work too, unless you need bindings with
> wildcards.
>
> If you haven't done so already, I recommend reading
> http://www.rabbitmq.com/tutorial-three-python.html
>
>
I have read this, thanks.

Here is an attempt that uses two fanout exchanges and two queues, one bound
to each exchange with '#'. You would think (well, I would think) that any
message sent to either of these fanout exchanges is going to go to every
consumer of every queue bound to the appropriate exchange.

However, what happens in practice is only one receiver at a time gets a
message.

Fire up two (or more) instances of app2.py and then run app1.py (which
currently just sends a message and exits). How can I change this so that all
the instances of app2.py receive all messages sent through the
'from_web_exchange'?

app1.py:
#!/usr/bin/env python
import pika

connection = pika.AsyncoreConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()


from_web_exchange =
channel.exchange_declare(exchange="from_web_exchange",type="fanout")
from_worker_exchange =
channel.exchange_declare(exchange="from_worker_exchange", type='fanout')

from_builders_queue = channel.queue_declare(queue='frombuilders',
auto_delete=True)
from_web_queue = channel.queue_declare(queue='fromweb', auto_delete=True)
channel.queue_bind(exchange='from_web_exchange', queue=from_web_queue.queue,
routing_key="#")
channel.queue_bind(exchange='from_worker_exchange',
queue=from_builders_queue.queue, routing_key="#")

channel.basic_publish(exchange='from_web_exchange',
                      routing_key='does_it_matter',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

app2.py:
#!/usr/bin/env python
import pika
import sys

connection = pika.AsyncoreConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()



from_web_exchange =
channel.exchange_declare(exchange="from_web_exchange",type="fanout")
from_worker_exchange =
channel.exchange_declare(exchange="from_worker_exchange", type='fanout')

from_builders_queue = channel.queue_declare(queue='frombuilders',
auto_delete=True)
from_web_queue = channel.queue_declare(queue='fromweb', auto_delete=True)
channel.queue_bind(exchange='from_web_exchange', queue=from_web_queue.queue,
routing_key="#")
channel.queue_bind(exchange='from_worker_exchange',
queue=from_builders_queue.queue, routing_key="#")


#channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

arg = "No-arg"
#print len(sys.argv)
if (len(sys.argv) == 2):
    arg = sys.argv[1]


def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    channel.basic_publish(exchange='from_worker_exchange',
                          routing_key="#", # key.frombuilders
                          body= arg + " python approves of " + body)


channel.basic_consume(callback,
                      queue=from_web_queue.queue,
                      no_ack=True)

pika.asyncore_loop()


Thanks in advance,
Dan

Regards,
>
> Matthias.
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110125/3b5ecbc6/attachment.htm>


More information about the rabbitmq-discuss mailing list