[rabbitmq-discuss] Messaging system for the university

Simon MacMullen simon at rabbitmq.com
Tue Oct 19 10:10:09 BST 2010


On 19/10/10 06:19, Victor Lyapunov wrote:
> Hi,
> We are building a messaging system for our university.
> Currently we have a rabbitmq cluster setup running on 2
> nodes(CentOS-5.5x86 and CentOS-5.5x86_64). There are message consumers
> connected to both nodes. The problem is that consumers receive
> messages in a round-robin fashion (e.g. Message#1 is received by
> consumer #1(not by the second consumer) and message#2 is received by
> the second consumer(not by the first consumer)). We want all of our
> consumers to receive all of the published messages.

Hi Victor.

The problem is that you're only creating one queue ("po_box") and 
binding two consumers to it. Each message can only be retrieved from a 
given queue once. Round robin is the intended behaviour in this setup.

The solution is for each consumer to create its own queue. To help with 
this:

If you do not supply a queue name the server will generate one and 
return it to you (so you don't have to worry about keeping names 
unique). Also, if you set exclusive=true on the queue declare, the queue 
will be "owned" by its connection and will be automatically destroyed 
when the client disconnects.

Alexandre's response about fanout vs direct is a bit of a red herring, 
that only controls how messages get routed to queues.

Cheers, Simon

>
> Any thoughts on this?
>
>
>
> [root at bv ~]# rabbitmqctl status
> Status of node rabbit at bv ...
> [{running_applications,[{rabbit,"RabbitMQ","2.0.0"},
>                          {os_mon,"CPO  CXC 138 46","2.1.8"},
>                          {sasl,"SASL  CXC 138 11","2.1.5.4"},
>                          {mnesia,"MNESIA  CXC 138 12","4.4.7"},
>                          {stdlib,"ERTS  CXC 138 10","1.15.5"},
>                          {kernel,"ERTS  CXC 138 10","2.12.5"}]},
>   {nodes,[{disc,[rabbit at bv]},{ram,[rabbit at dcbackup]}]},
>   {running_nodes,[rabbit at dcbackup,rabbit at bv]}]
> ...done.
>
> [root at dcbackup ~]# rabbitmqctl status
> Status of node rabbit at dcbackup ...
> [{running_applications,[{rabbit,"RabbitMQ","2.0.0"},
>                          {mnesia,"MNESIA  CXC 138 12","4.4.7"},
>                          {os_mon,"CPO  CXC 138 46","2.1.8"},
>                          {sasl,"SASL  CXC 138 11","2.1.5.4"},
>                          {stdlib,"ERTS  CXC 138 10","1.15.5"},
>                          {kernel,"ERTS  CXC 138 10","2.12.5"}]},
>   {nodes,[{disc,[rabbit at bv]},{ram,[rabbit at dcbackup]}]},
>   {running_nodes,[rabbit at bv,rabbit at dcbackup]}]
> ...done.
>
>
>
> #---- amqp_consumer.py on server `dcbackup' ------
> from amqplib import client_0_8 as amqp
>
> conn = amqp.Connection(host="dcbackup:5672", userid="guest",
> password="guest", virtual_host="/", insist=False)
> chan = conn.channel()
>
> chan.queue_declare(queue="po_box", durable=True, exclusive=False,
> auto_delete=False)
> chan.exchange_declare(exchange="sorting_room", type="direct",
> durable=True, auto_delete=False,)
>
> chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="jason")
>
> def recv_callback(msg):
>      print 'Received: ' + msg.body + ' from channel #' +
> str(msg.channel.channel_id)
>
> chan.basic_consume(queue='po_box', no_ack=True,
> callback=recv_callback, consumer_tag="testtag")
> while True:
>      chan.wait()
> chan.basic_cancel("testtag")
> chan.close()
> conn.close()
>
>
> #---- amqp_consumer.py on server `bv' ------
> from amqplib import client_0_8 as amqp
>
> conn = amqp.Connection(host="bv:5672", userid="guest",
> password="guest", virtual_host="/", insist=False)
> chan = conn.channel()
>
> chan.queue_declare(queue="po_box", durable=True, exclusive=False,
> auto_delete=False)
> chan.exchange_declare(exchange="sorting_room", type="direct",
> durable=True, auto_delete=False,)
>
> chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="jason")
>
> def recv_callback(msg):
>      print 'Received: ' + msg.body + ' from channel #' +
> str(msg.channel.channel_id)
>
> chan.basic_consume(queue='po_box', no_ack=True,
> callback=recv_callback, consumer_tag="testtag")
> while True:
>      chan.wait()
> chan.basic_cancel("testtag")
> chan.close()
> conn.close()
>
>
> #---- amqp_consumer.py on server `bv' ------
> from amqplib import client_0_8 as amqp
> import sys
>
> conn = amqp.Connection(host="bv:5672", userid="guest",
> password="guest", virtual_host="/", insist=False)
> chan = conn.channel()
>
> msg = amqp.Message(sys.argv[1])
> msg.properties["delivery_mode"] = 1
> chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
>
> chan.close()
> conn.close()
>
>
> [root at bv ~]# uname -a
> Linux bv.localdomain 2.6.18-194.el5xen #1 SMP Fri Apr 2 15:34:40 EDT
> 2010 x86_64 x86_64 x86_64 GNU/Linux
>
>
> [root at dcbackup ~]# uname -a
> Linux dcbackup.localdomain 2.6.18-194.el5xen #1 SMP Fri Apr 2 16:16:54
> EDT 2010 i686 i686 i386 GNU/Linux
>
> Our python version is:
> Python 2.6.5 (r265:79063, Apr  9 2010, 11:16:46)
> [GCC 4.1.2 20080704 (Red Hat 4.1.2-48)] on linux2
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss


-- 
Simon MacMullen
Staff Engineer, RabbitMQ
SpringSource, a division of VMware



More information about the rabbitmq-discuss mailing list