[rabbitmq-discuss] using a topic exchange to route to multiple queues

Christian Legnitto clegnitto at mozilla.com
Mon Dec 6 19:50:34 GMT 2010


I think Daniel was asking if it is possible to create automatic bindings based on queue name. From my reading, it sounds like he wants all messages that match "company.#" to be routed to any queue which has a name that starts with "company." (feel free to correct me Daniel).

I don't think a mechanism like this exists (automatic binding rules) and like Jerry says you likely need to specify the bindings manually for each queue. I would be interested of learning about this functionality if it exists though.

Christian


On Dec 6, 2010, at 10:33 AM, Jerry Kuch wrote:

> 
> On Dec 5, 2010, at 8:08 PM, Daniel Oppenheimer wrote:
> 
> Hi, Daniel...
> 
> I am writing a java rabbitmq client.  And I am trying to write code that will publish messages to topic exchanges such that they get delivered to multiple queues.  Following is the code that I am using:
> 
> Channel channel = conn.createChannel();
> channel.exchangeDeclare(my_exchange, "topic", true);
> channel.queueDeclare(company.my_queue, true, false, false, null);
> channel.queueBind(company.my_queue, my_exchange, “company.#” );
> 
> This binding would result in your queue receiving messages, for example, with routing keys "company.MSFT" and "company.VMW" and also "company".
> 
> channel.basicPublish(my_exchange, “company.#”, null, messageBodyBytes);
> 
> I would like this code to publish to all queues under company.  That is, I would like it to publish to company.my_queue and company.another_queue.  My understanding is that the wild card in the routing key should route any message published to my_exchange to any queue prefixed with company.  Is this correct?  If not, what is the correct way to do what I am trying to accomplish?  Should there be a wild card in the routing key in both the queueBind and basicPublish method calls?
> 
> For topic exchanges, messages published to that exchange will be delivered to all queues whose binding pattern matches the routing key of the message.
> 
> You only need the wild card in the routing key for the queueBind.  Any subsequent basicPublish'es that occur with routing keys that match the pattern that you used to bind your queues to the exchange will then receive those messages.  Doing:
> 
> channel.basicPublish(my_exchange, “company”, null, messageBodyBytes);
> 
> would suffice to hit all of the queues that you've attached to the exchange using your binding of the form:
> 
> channel.queueBind(company.my_queue, my_exchange, “company.#” );
> 
> Here's a transcript of an interactive session done with the Erlang client (it should be fairly easy to pattern match what's going on against the Java equivalent even if Erlang's new to you), along with example invocations of rabbitmqctl (indented with the effects of interest highlighted) to see the effects of what we've done.
> 
> Eshell V5.8  (abort with ^G)
> 
> 1> rr("include/amqp_client.hrl"). %% read headers for record types
> ['P_access', BLAHBLAHBLAH deleted]
> 
> 2> {ok, Conn} = amqp_connection:start(network). %% start connection
> {ok,<0.48.0>}
> 3> {ok, Chan} = amqp_connection:open_channel(Conn). %% open channel
> {ok,<0.58.0>}
> 
> 4> ED = #'exchange.declare'{exchange= <<"my_exchange">>,type= <<"topic">>}. %% declare topic exchange
> #'exchange.declare'{ticket = 0,exchange = <<"my_exchange">>,
>                    type = <<"topic">>,passive = false,durable = false,
>                    auto_delete = false,internal = false,nowait = false,
>                    arguments = []}
> 5> amqp_channel:call(Chan,ED).
> #'exchange.declare_ok'{}
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_exchanges
> Listing exchanges ...
> amq.direct
> direct
> amq.topic
> topic
> my_exchange
> topic
> amq.rabbitmq.log
> topic
> amq.fanout
> fanout
> amq.headers
> headers
> direct
> amq.match
> headers
> ...done.
> 
> 
> 6> QD1 = #'queue.declare'{queue = <<"queue_1">>}. %% One queue...
> #'queue.declare'{ticket = 0,queue = <<"queue_1">>,
>                 passive = false,durable = false,exclusive = false,
>                 auto_delete = false,nowait = false,arguments = []}
> 7> QD2 = #'queue.declare'{queue = <<"queue_2">>}. %% Another queue...
> #'queue.declare'{ticket = 0,queue = <<"queue_2">>,
>                 passive = false,durable = false,exclusive = false,
>                 auto_delete = false,nowait = false,arguments = []}
> 8> amqp_channel:call(Chan, QD1). %% Create first queue...
> #'queue.declare_ok'{queue = <<"queue_1">>,message_count = 0,
>                    consumer_count = 0}
> 9> amqp_channel:call(Chan, QD2). %% Create second queue...
> #'queue.declare_ok'{queue = <<"queue_2">>,message_count = 0,
>                    consumer_count = 0}
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queues
> Listing queues ...
> queue_2 0
> queue_1 0
> ...done.
> 
> 
> 10> B1 = #'queue.bind'{queue= <<"queue_1">>, exchange = <<"my_exchange">>, routing_key = <<"company.#">>}. %% Bind first queue to exchange...
> #'queue.bind'{ticket = 0,queue = <<"queue_1">>,
>              exchange = <<"my_exchange">>,routing_key = <<"company.#">>,
>              nowait = false,arguments = []}
> 11> B2 = #'queue.bind'{queue= <<"queue_2">>, exchange = <<"my_exchange">>, routing_key = <<"company.#">>}. %% Bind second queue to exchange...
> #'queue.bind'{ticket = 0,queue = <<"queue_2">>,
>              exchange = <<"my_exchange">>,routing_key = <<"company.#">>,
>              nowait = false,arguments = []}
> 12> amqp_channel:call(Chan,B1). amqp_channel:call(Chan,B2).
> #'queue.bind_ok'{}
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_bindings
> Listing bindings ...
> exchange queue_1 queue queue_1 []
> exchange queue_2 queue queue_2 []
> my_exchange exchange queue_1 queue company.# []
> my_exchange exchange queue_2 queue company.# []
> ...done.
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queuesListing queues ...
> queue_2 0  <--  NOTE NO MESSAGES IN THESE QUEUES!
> queue_1 0  <--/
> ...done.
> 
> 
> 14> Payload = <<"foobar">>. %% Bogus message payload
> <<"foobar">>
> 15> Publish = #'basic.publish'{exchange = <<"my_exchange">>, routing_key = <<"company.MSFT">>}.
> #'basic.publish'{ticket = 0,exchange = <<"my_exchange">>,
>                 routing_key = <<"company.MSFT">>,mandatory = false,
>                 immediate = false}
> 16> amqp_channel:cast(Chan, Publish, #amqp_msg{payload=Payload}). %% Publish to the exchange, should go to both bound queues...
> ok
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queues
> Listing queues ...
> queue_2 1  <-- OUR MESSAGE ARRIVED...
> queue_1 1  <-- ...IN BOTH QUEUES.
> ...done.
> 
> 
> 17> amqp_channel:cast(Chan, Publish, #amqp_msg{payload=Payload}). %% Publish to exchange again, should also go to both bound queues...
> ok
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queuesListing queues ...
> queue_2 2  <-- SECOND PUBLISHED MESSAGE ARRIVED...
> queue_1 2  <-- ...ALSO IN BOTH QUEUES.
> ...done.
> 
> 22> WPZ = #'basic.publish'{exchange = <<"my_exchange">>, routing_key = <<"company">>}.
> #'basic.publish'{ticket = 0,exchange = <<"my_exchange">>,
>                 routing_key = <<"company">>,mandatory = false,
>                 immediate = false}
> 23> amqp_channel:cast(Chan, WPZ, #amqp_msg{payload = <<"Payload">>}).
> 
> [jerryk at strongmad .../co/rabbitmq-umbrella/rabbitmq-server]$ ./scripts/rabbitmqctl list_queuesListing queues ...
> queue_2 3  <-- SECOND PUBLISHED MESSAGE ARRIVED...
> queue_1 3  <-- ...ALSO IN BOTH QUEUES.
> ...done.
> 
> 
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss



More information about the rabbitmq-discuss mailing list