[rabbitmq-discuss] Filtering

Tim Watson tim at rabbitmq.com
Mon Apr 15 10:46:25 BST 2013

Hi Lee,

Some (hopefully useful) comments inline...

On 15 Apr 2013, at 07:30, Lee Sylvester wrote:

> So, here's my conundrum.  In my mesh, I need to distribute messages from a node on a single port and receive messages on a single port.

An AMQP client can open multiple connections (on discrete ports) and can open multiple channels per connection (multiplexing). A channel can be used to publish or subscribe, or both, though it's probably best to do one or the other per channel IMHO. Setting up a port for receiving and another for producing/publishing messages to/from a broker is very simple.

>  Each node will connect to one another, like lego. So, both incoming and outgoing ports will have many connections.

Are you planning on having a broker act as an intermediary between all the nodes in your mesh? This is certainly possible, though you should be aware that you're introducing a potential bottleneck (from a performance standpoint) and a single point of (communications) failure should the broker's node become unavailable (through network disruption, for example).

Another option would be to run the broker embedded in your nodes, providing a communications fabric alongside regular distributed Erlang message passing. It is possible to run the Erlang client alongside a broker in the same node - this is how many RabbitMQ plugins work - and the Erlang client can send/receive messages to a broker using a 'direct mode' connection, that uses direct (internal, Erlang) message passing instead of a network connection. Running a broker embedded still allows you to listen on one or more specific ports, and you can connect to another (RabbitMQ broker embedded within another) node using the Erlang client as usual.

Yet another option that opens up if you're embedding RabbitMQ in your nodes is using the Shovel or Federation plugins to completely automate the communications fabric between them, leaving you (the developer) responsible only for handling the client portion of the code within each node - an explanation will follow.

>  For incoming, I should be able to handle my messages using OTP's handle_info func on the gen_server pattern.

This is certainly possible. The Erlang client allows you to register the calling process as a consumer, after which point AMQP messages will arrive in the process' mailbox as regular messages, thus...

init(_) ->
    %% inside gen_server's Module:init/1
                           #'basic.consume'{queue    = Queue,
                                            no_local = false,
                                            no_ack   = NoAck},
    {ok, State}.
handle_info(#'basic.consume_ok'{}, State) ->
    %% we're good to go....
    {noreply, State};
handle_info({Delivery = #'basic.deliver'{ redelivered = Redelivered },
                      #amqp_msg{payload = Payload}}, State) ->
    State2 = do_something(Payload, Redelivered, State),
    {noreply, State2};
.... etc

Another option is to use the amqp_gen_consumer behaviour, which allows for a set of callbacks quite similar to a gen_server, viz...

%% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
%% handle_consume(Consume, Sender, State) -> ok_error()
%% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
%% handle_cancel(Cancel, State) -> ok_error()
%% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
%% handle_deliver(Deliver, Message, State) -> ok_error()
%% handle_info(Info, State) -> ok_error()
%% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
%% terminate(Reason, State) -> any()

As an OTP guy, I guess that'll look fairly obvious and familiar to you - check the amqp_gen_consumer documentation for details.

>  It should also expect all incoming messages to be meant for it specifically; thus I'd like to be able to filter outgoing messages on the publisher side, rather than incoming messages on the subscriber side, as I wish to reduce overall network traffic.

That's fine, and is really *entirely* up to you. If you wanted to filter on the consumer's side, there's a amqp_selective_consumer module in the Erlang client to help. Filtering outbound messages is surely an application specific issue, and the code to publish is very simple:

    amqp_channel:call(Channel, Method,
                      #amqp_msg{props = #'P_basic'{delivery_mode = 2}, %% persistent
                                payload = MyPayload}),

I think outbound filtering is up to your application isn't it? If you consider using federation or shovel to replicate messages across multiple (embedded) broker nodes, then you can automate the filtering by managing the topology which is being replicated such that only specific messages are forwarded via specific queues (or exchanges) - see the federation and/or shovel documentation for details.

> My question is; can RabbitMQ do that for me?  That I can find, ZMQ will allow for everything except for the filtering of messages from the publisher.

Yes, although you'll need to be a bit more specific about this filtering requirement for me to give you more concrete advice. In general, RabbitMQ offers extremely flexible routing which ought to meet all of your requirements.


More information about the rabbitmq-discuss mailing list