[rabbitmq-discuss] FW: Multiple consumers
Ben Hood
0x6e6562 at gmail.com
Sun Jul 29 01:26:20 BST 2007
Matthias,
> I think that may partially be due to some design choices in the
> channel/queue implementation that may need revisiting. atm the
> consume_ok is sent by the *queue* process. I think it ought to be sent
> by the channel process instead - the channel process should be
> responsible for sending back responses to *all* synchronous methods.
Fair enough, but I can't see how this affects the direct client
fundamentally or even in terms of what messages it passes and what it
expects to receive. I think that this could potentially be done
independently of the direct client.
>
> Why do the channel processes need to be named?
They don't need to be named per se. I just chose that approach as way
to get a handle or a reference to a process that is implementing
channel N. GIven the fact that you want to have a separate process per
channel, if you stored the pid of the channel handler, that would
suffice as well. I just though this is a simple way of maintaining a
reference to the process that has done the protocol handling for a
particular channel. Otherwise you would have to devise another way to
refer to the protocol state of a channel.
>
> > if you use the named approach, the API might look this:
> >
> > access_request(Channel, Host, Realm)
>
> I'd like to see
> access_request(Channel, Realm)
I orginally started with that variant, then I thought it might be
ambiguous when you are connecting to more than one AMQP server. In the
direct case, adding the host is probably unecessary, but I wanted to
keep a consistent interface for the network case.
>
> >> You appear to be changing the channel's writer pid with every
> >> basic_consume. That doesn't look right.
> >
> > Do you mean in the mainloop_without_framing function?
>
> Yes.
The current code is catering for the basic_consume call, so it sets
the writer pid to the actual subscriber and the ack_pid to process
handling the protocol for a channel:
mainloop_without_framing(State) ->
receive
{method, MethodRecord, Content} ->
State1 = handle_method(MethodRecord, Content, State),
mainloop_without_framing(State1);
{method, MethodRecord, Content, SubscriberPid} ->
WriterPid = State#ch.writer_pid,
State0 = State#ch{writer_pid = SubscriberPid},
State1 = handle_method(MethodRecord, Content, State0),
State2 = State1#ch{writer_pid = WriterPid},
mainloop_without_framing(State2);
Other ->
rabbit_log:error("Unexpected ch~p content: ~p~n",
[State#ch.channel, Other]),
mainloop_without_framing(State)
end.
> > So are you saying that you should pass in the consumer's pid to the
> > rabbit_amqqueue_process and have that maintain a list of consumers?
>
> rabbit_amqqueue_process already does this.
I've been looking at this, but viewed it as an optimization rather
than a fundamental change. At the moment the queue sends to the writer
that then sends to the actual consumer, when is an extra hop in the
chain. I think this could be changed without having to change the
client interaction.
>
> > I don't see how the networked API will change anything on the server
> > side, otherwise wouldn't you have to change the java client as well.
>
> The idea was/is to use the existing server code (rabbit_reader,
> rabbit_channel etc) as the basis for the client code. This will require
> some refactoring along the lines you have been pursuing so that server
> and client can share a substantial part of their code base.
I suppose you could use the handle_XXX functions in rabbit_channel
directly to acheive the same goal. In that case, you might have to
merge some of the channel0 stuff into this channel. I can see some
merit in that, though I think this would have to get discussed a bit
more before diving into any code.
BTW, I've implemented the process per channel model and now I'm
looking at paramterizing the ampq_client to cater for the network
case. Will let you know when something useful comes to life.
Ben
More information about the rabbitmq-discuss
mailing list