[rabbitmq-discuss] Making sure a message gets to every consumer
Dan Tenenbaum
dtenenba at fhcrc.org
Tue Jan 25 18:22:44 GMT 2011
On Mon, Jan 24, 2011 at 3:17 PM, Matthias Radestock
<matthias at rabbitmq.com>wrote:
> Sam Duncan wrote:
>
>> If you use a topic exchange and create a queue for each consumer, you get
>> a copy of each message in each queue filtered by your queue bindings.
>>
>
I tried this, but it still didn't seem to work.
>
> A direct exchange would probably work too, unless you need bindings with
> wildcards.
>
> If you haven't done so already, I recommend reading
> http://www.rabbitmq.com/tutorial-three-python.html
>
>
I have read this, thanks.
Here is an attempt that uses two fanout exchanges and two queues, one bound
to each exchange with '#'. You would think (well, I would think) that any
message sent to either of these fanout exchanges is going to go to every
consumer of every queue bound to the appropriate exchange.
However, what happens in practice is only one receiver at a time gets a
message.
Fire up two (or more) instances of app2.py and then run app1.py (which
currently just sends a message and exits). How can I change this so that all
the instances of app2.py receive all messages sent through the
'from_web_exchange'?
app1.py:
#!/usr/bin/env python
import pika
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
from_web_exchange =
channel.exchange_declare(exchange="from_web_exchange",type="fanout")
from_worker_exchange =
channel.exchange_declare(exchange="from_worker_exchange", type='fanout')
from_builders_queue = channel.queue_declare(queue='frombuilders',
auto_delete=True)
from_web_queue = channel.queue_declare(queue='fromweb', auto_delete=True)
channel.queue_bind(exchange='from_web_exchange', queue=from_web_queue.queue,
routing_key="#")
channel.queue_bind(exchange='from_worker_exchange',
queue=from_builders_queue.queue, routing_key="#")
channel.basic_publish(exchange='from_web_exchange',
routing_key='does_it_matter',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
app2.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.AsyncoreConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
from_web_exchange =
channel.exchange_declare(exchange="from_web_exchange",type="fanout")
from_worker_exchange =
channel.exchange_declare(exchange="from_worker_exchange", type='fanout')
from_builders_queue = channel.queue_declare(queue='frombuilders',
auto_delete=True)
from_web_queue = channel.queue_declare(queue='fromweb', auto_delete=True)
channel.queue_bind(exchange='from_web_exchange', queue=from_web_queue.queue,
routing_key="#")
channel.queue_bind(exchange='from_worker_exchange',
queue=from_builders_queue.queue, routing_key="#")
#channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
arg = "No-arg"
#print len(sys.argv)
if (len(sys.argv) == 2):
arg = sys.argv[1]
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_publish(exchange='from_worker_exchange',
routing_key="#", # key.frombuilders
body= arg + " python approves of " + body)
channel.basic_consume(callback,
queue=from_web_queue.queue,
no_ack=True)
pika.asyncore_loop()
Thanks in advance,
Dan
Regards,
>
> Matthias.
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110125/3b5ecbc6/attachment.htm>
More information about the rabbitmq-discuss
mailing list