[rabbitmq-discuss] FW: Multiple consumers

Ben Hood 0x6e6562 at gmail.com
Sat Jul 28 14:54:04 BST 2007


Matthias,

> Apologies for the delay in responding to you - I have been very busy
> this week.

No worries. As it turns out, I sent a follow up mail to this list from
work account on Monday and have just received a notification that it
has been returned because it could not get delivered. I am going to
try to incorporate that into this conversation to try to keep things
in context.

The content of Monday's email is between the snip marks:

--snip--
I've just discovered an issue with the patch I sent:

The basic.consume_ok ack message gets sent to the subscriber and not to
the client reader.

To get around this, I extended the basic_consume/7 function in the
amqqueue module to take an acknowledgement Pid as a further argument (
hence making the function basic_consume/8 ).

In the corresponding function in amqueue_process, I set the replies to
sent to this acknowledgement process.

Then in the channel, extended the ch record with an acknowledgement_pid
field which is set according to the context of the mainloop, so the main
loop decides to which Pid an ack is sent to.

Now the basic.consume_ok goes back to the client control process and
only the deliver method goes to the subscriber.
--snip--

> Looks generally ok, with one caveat: Client channels ought to be
> processes. That way you don't funnel everything through a single
> process.
>
> In the direct API the client channel processes ought to be the same as
> the server channel processes, i.e. the user's client code would interact
> directly with the server channel processes.

Fair point, and I've already begun to refactor the direct client this
week to take this into consideration. What I started to do is to
create a named amqp_client server process for each channel, so that
the protocol flow within a channel is not inhibited by the protocol
flow from another channel that were both using the same amqp_client
process. So everytime a channel is opened by the user, a gen_server
process is started with the name chN at host. An alternative to this
approach would be for the client to save the Pid of the amqp_client
process and expose an API such as

access_request(AmqpClientPid, Realm)

In contrast, if you use the named approach, the API might look this:

access_request(Channel, Host, Realm)

In the Direct API, the host would probably always be the localhost,
but I went initially for a combination of Channel and Host to keep a
consistent API in the networked case. But I think this has been
completely thought through yet, and the each amqp_client process in
the networked case will have to use the same socket connection.

> You appear to be changing the channel's writer pid with every
> basic_consume. That doesn't look right.

Do you mean in the mainloop_without_framing function?

{method, MethodRecord, Content, WriterPid} ->
        State0 = State#ch{writer_pid = WriterPid},
	    State1 = handle_method(MethodRecord, Content, State0),
	    mainloop_without_framing(State1);

If so, yes, I think that is unecessary since I introduced the concept
of an acknowledgement pid. I will have a look at removing that.

If not, then I don't completely understand.

>
> think the direct consumer case should actually look like this:
>
>   consumer <-- queue
>
> (which is simpler than what I proposed before)
>
> Queue processes maintain a list of consumer processes. In the direct API
> that's the only mapping you need. In the network API the client channel
> processes maintain a mapping from consumer tags to consumer processes.

So are you saying that you should pass in the consumer's pid to the
rabbit_amqqueue_process and have that maintain a list of consumers?

I don't think I understand this fully. Can you elaborate?

>
> The question is what should the writer_pid be set to in the direct API?
>
> One possibility is to set it to the caller's pid when doing rpc. That
> way rpc can be implemented like this:
>
> rpc(ChannelPid, MethodRecord, Content) ->
>   ChannelPid ! {method, MethodRecord, Content, self()},
>   receive
>      {send_command, MethodRecord} -> MethodRecord
>      ...
>   end.
>
> The tricky bit is how to deal with asynchronous non-consumer events,
> i.e. cases where a channel/transaction/queue processes sends messages to
> a writer process as part of something other than an rpc or message
> delivery. Perhaps the writer_pid should be set to the channel pid?  That
> would require some pretty careful coding to avoid deadlock.
>
>
> I reckon it may be easier to tackle the network API first. It is the
> more general case and the refactoring it forces will inform our thinking
> on how to best implement the direct API.

You might be right on this, but I don't see how the networked API will
change anything on the server side, otherwise wouldn't you have to
change the java client as well.




More information about the rabbitmq-discuss mailing list