[rabbitmq-discuss] using a topic exchange to route to multiple queues

Jerry Kuch jerryk at vmware.com
Mon Dec 6 18:33:48 GMT 2010


On Dec 5, 2010, at 8:08 PM, Daniel Oppenheimer wrote:

Hi, Daniel...

I am writing a java rabbitmq client.  And I am trying to write code that will publish messages to topic exchanges such that they get delivered to multiple queues.  Following is the code that I am using:

Channel channel = conn.createChannel();
channel.exchangeDeclare(my_exchange, "topic", true);
channel.queueDeclare(company.my_queue, true, false, false, null);
channel.queueBind(company.my_queue, my_exchange, “company.#” );

This binding would result in your queue receiving messages, for example, with routing keys "company.MSFT" and "company.VMW" and also "company".

channel.basicPublish(my_exchange, “company.#”, null, messageBodyBytes);

I would like this code to publish to all queues under company.  That is, I would like it to publish to company.my_queue and company.another_queue.  My understanding is that the wild card in the routing key should route any message published to my_exchange to any queue prefixed with company.  Is this correct?  If not, what is the correct way to do what I am trying to accomplish?  Should there be a wild card in the routing key in both the queueBind and basicPublish method calls?

For topic exchanges, messages published to that exchange will be delivered to all queues whose binding pattern matches the routing key of the message.

You only need the wild card in the routing key for the queueBind.  Any subsequent basicPublish'es that occur with routing keys that match the pattern that you used to bind your queues to the exchange will then receive those messages.  Doing:

channel.basicPublish(my_exchange, “company”, null, messageBodyBytes);

would suffice to hit all of the queues that you've attached to the exchange using your binding of the form:

channel.queueBind(company.my_queue, my_exchange, “company.#” );

Here's a transcript of an interactive session done with the Erlang client (it should be fairly easy to pattern match what's going on against the Java equivalent even if Erlang's new to you), along with example invocations of rabbitmqctl (indented with the effects of interest highlighted) to see the effects of what we've done.

Eshell V5.8  (abort with ^G)

1> rr("include/amqp_client.hrl"). %% read headers for record types
['P_access', BLAHBLAHBLAH deleted]

2> {ok, Conn} = amqp_connection:start(network). %% start connection
{ok,<0.48.0>}
3> {ok, Chan} = amqp_connection:open_channel(Conn). %% open channel
{ok,<0.58.0>}

4> ED = #'exchange.declare'{exchange= <<"my_exchange">>,type= <<"topic">>}. %% declare topic exchange
#'exchange.declare'{ticket = 0,exchange = <<"my_exchange">>,
                    type = <<"topic">>,passive = false,durable = false,
                    auto_delete = false,internal = false,nowait = false,
                    arguments = []}
5> amqp_channel:call(Chan,ED).
#'exchange.declare_ok'{}

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct
direct
amq.topic
topic
my_exchange
topic
amq.rabbitmq.log
topic
amq.fanout
fanout
amq.headers
headers
direct
amq.match
headers
...done.


6> QD1 = #'queue.declare'{queue = <<"queue_1">>}. %% One queue...
#'queue.declare'{ticket = 0,queue = <<"queue_1">>,
                 passive = false,durable = false,exclusive = false,
                 auto_delete = false,nowait = false,arguments = []}
7> QD2 = #'queue.declare'{queue = <<"queue_2">>}. %% Another queue...
#'queue.declare'{ticket = 0,queue = <<"queue_2">>,
                 passive = false,durable = false,exclusive = false,
                 auto_delete = false,nowait = false,arguments = []}
8> amqp_channel:call(Chan, QD1). %% Create first queue...
#'queue.declare_ok'{queue = <<"queue_1">>,message_count = 0,
                    consumer_count = 0}
9> amqp_channel:call(Chan, QD2). %% Create second queue...
#'queue.declare_ok'{queue = <<"queue_2">>,message_count = 0,
                    consumer_count = 0}

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queues
Listing queues ...
queue_2 0
queue_1 0
...done.


10> B1 = #'queue.bind'{queue= <<"queue_1">>, exchange = <<"my_exchange">>, routing_key = <<"company.#">>}. %% Bind first queue to exchange...
#'queue.bind'{ticket = 0,queue = <<"queue_1">>,
              exchange = <<"my_exchange">>,routing_key = <<"company.#">>,
              nowait = false,arguments = []}
11> B2 = #'queue.bind'{queue= <<"queue_2">>, exchange = <<"my_exchange">>, routing_key = <<"company.#">>}. %% Bind second queue to exchange...
#'queue.bind'{ticket = 0,queue = <<"queue_2">>,
              exchange = <<"my_exchange">>,routing_key = <<"company.#">>,
              nowait = false,arguments = []}
12> amqp_channel:call(Chan,B1). amqp_channel:call(Chan,B2).
#'queue.bind_ok'{}

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_bindings
Listing bindings ...
exchange queue_1 queue queue_1 []
exchange queue_2 queue queue_2 []
my_exchange exchange queue_1 queue company.# []
my_exchange exchange queue_2 queue company.# []
...done.

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queuesListing queues ...
queue_2 0  <--  NOTE NO MESSAGES IN THESE QUEUES!
queue_1 0  <--/
...done.


14> Payload = <<"foobar">>. %% Bogus message payload
<<"foobar">>
15> Publish = #'basic.publish'{exchange = <<"my_exchange">>, routing_key = <<"company.MSFT">>}.
#'basic.publish'{ticket = 0,exchange = <<"my_exchange">>,
                 routing_key = <<"company.MSFT">>,mandatory = false,
                 immediate = false}
16> amqp_channel:cast(Chan, Publish, #amqp_msg{payload=Payload}). %% Publish to the exchange, should go to both bound queues...
ok

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queues
Listing queues ...
queue_2 1  <-- OUR MESSAGE ARRIVED...
queue_1 1  <-- ...IN BOTH QUEUES.
...done.


17> amqp_channel:cast(Chan, Publish, #amqp_msg{payload=Payload}). %% Publish to exchange again, should also go to both bound queues...
ok

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queuesListing queues ...
queue_2 2  <-- SECOND PUBLISHED MESSAGE ARRIVED...
queue_1 2  <-- ...ALSO IN BOTH QUEUES.
...done.

22> WPZ = #'basic.publish'{exchange = <<"my_exchange">>, routing_key = <<"company">>}.
#'basic.publish'{ticket = 0,exchange = <<"my_exchange">>,
                 routing_key = <<"company">>,mandatory = false,
                 immediate = false}
23> amqp_channel:cast(Chan, WPZ, #amqp_msg{payload = <<"Payload">>}).

[jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queuesListing queues ...
queue_2 3  <-- SECOND PUBLISHED MESSAGE ARRIVED...
queue_1 3  <-- ...ALSO IN BOTH QUEUES.
...done.




More information about the rabbitmq-discuss mailing list