[rabbitmq-discuss] Channel crashes after basic.cancel_ok.
Ben Hood
0x6e6562 at gmail.com
Thu May 8 21:11:13 BST 2008
Ed,
On 8 May 2008, at 15:04, Edwin Fine wrote:
> Ben,
> Thanks for looking into this. In the "real" code, all the setup is
> actually done within a single Erlang process, and the only thing the
> consumers do is handle the basic.deliver, as well as any
> basic.consume_ok or basic.cancel_ok messages that may arise. In the
> test code, I rearranged some things specifically to be done
> concurrently, which I now see was a mistake. However, this is a bug
> in the test code, not in the production code. I will change the test
> code so that there is no concurrent work being done on a channel
> other than responding with basic.ack to basic.deliver messages.
I've begun a discussion on this topic and it looks like we will add
the intelligence to the amqp_channel module to be able to serialize
concurrent RPC requests to the channel. I understand that you have a
workaround anyway, but on reflection, it seems like a good idea to put
this capability into the client. will let you know when it is done.
> I note that there is a special version of amqp_channel:send/3, used
> only for a basic.consume method, with the last argument being the
> pid of the process subscribing. Am I correct to assume because of
> this that the basic.consume method can indeed be sent concurrently
> by different consumers on the same channel? But the basic.cancel
> operation does not have this property (specifying a PID in the amqp
> call) , as far as I can see, which means that a consumer cannot
> cancel its own subscription without potentially clashing with other
> consumers that are canceling.
amqp_channel:send/3? There isn't an exported function of that name. I
think you mean call/3.
Any which way, the reason behind sending a consume request with a Pid
to the channel process is to allow the channel to register the
consumer process against a tag. The channel process and the broker use
a consumer tag to correlate messages across the wire. The broker does
not know anything about the consuming process. The process looks like
this:
1. Subscribe:
a) User -----(Consumer Pid, Tag?)-----> Channel Process ----(Tag)-----
> Broker
b) User <-----(Tag Ok)----- Channel Process <----(Tag Ok)----- Broker
2. Receive messages:
User <-----(Data)----- Channel Process <----(Tag, Data)----- Broker
3. Unsubscribe:
a) User -----(Tag)-----> Channel Process ----(Tag)-----> Broker
b) User <----(Tag Ok)-----> Channel Process <---(Tag Ok)----- Broker
From an user perspective, you *could* get away without the acks in
steps 1b and 3b. However, to be consistent with the Java client API, 3
different types of messages are sent to a consuming process:
1. ConsumeOk - to acknowledge the registration of a tag or to inform
the receiver of an auto-generated tag;
2. Delivery - normal message delivery;
3. CancelOk - to let the consuming process know that it's subscription
has been cancelled.
In step 1, the channel process registers the consuming Pid against a
tag, so that subsequent messages it receives with that tag can get
routed to the correct consumer. The fact that the ack is resent to the
consumer is just a matter of the client side API - it provides a
consumer with explicit lifecycle events, whether or not they actually
care about them.
When it comes to cancellation, you don't need a Pid, just a tag. This
is because the channel process maintains a map between tags and
consumers, so you only require the tag to kick off the cancellation
process. So whether you cancel yourself or some other process, all you
need is the tag.
The upshot of this is that you can't clash on cancelling, because you
can correlate with the tag. However as indicated above, there is a
current limitation with concurrent synchronous calls, which I'm hoping
to remove now. In fact, being able to cancel a subscription is
something that you might want to kick off from a consuming process,
which means that forcing application code to serialize this may be a
drawback.
In essence, I think the current subscribe/unsubscribe model makes
sense, it just requires the intelligence to serialize synchronous
client requests.
If you think that passing the consume_ok and cancel_ok acks to the
consumer is counterintuitive, maybe we could consider parametrizing
the client side call so that the client can choose whether the
consumer really cares about these lifecycle events. Which ever way,
there will only be 2 lifecycle events per consumer, so these could
*thereotically* just be left in the consumer's mailbox.
> So I will change the code back so that everything OTHER than
> basic.consume and basic.cancel is done within a single process, and
> take it from there.
>
> I think this has, however, shed some light on what might have
> happened in the production code. I am using the consumer process to
> unsubscribe itself. When there are many consumer processes, this
> could be a problem because I think basic.cancel does not allow you
> to specify a return pid and maybe this is causing upset in the
> channel.
>
> #'basic.consume_ok'{consumer_tag = ConsumerTag} =
> amqp_channel:call(Channel, BasicConsume, self()), %%% Notice return
> pid
> #'basic.cancel_ok'{consumer_tag = ConsumerTag} =
> amqp_channel:call(Channel, BasicCancel), %%% Notice - no return pid
>
> So maybe this should be added to the handling of basic.cancel? Why
> shouldn't a consumer cancel itself? In fact, why can't all client
> channel calls specify which PID is calling?
As indicated above, you don't need a Pid to cancel a subscription,
just the tag. So a consumer can cancel itself, if it wants to.
BTW, all channel calls are aware of the calling pid by default because
the channel uses the gen_server behaviour. So if the channel needs to
know which Pid invoked what call, it can. In general, it doesn't need
to.
>
>
> One other source of confusion for me is this: when sending an
> amqp_channel:basic.consume call, why is there both a synchronous
> response to the call itself and an asynchronous response on the
> channel?
>
> #'basic.consume_ok'{consumer_tag = ConsumerTag} =
> amqp_channel:call(Channel, BasicConsume, self()),
> receive
> #'basic.consume_ok'{consumer_tag = ConsumerTag} ->
> io:format("[~p] ~p got consume_ok~n", [self(),
> BasicConsume#'basic.consume'.consumer_tag])
> end.
>
This question was answered above.
HTH,
Ben
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/97cff685/attachment.htm
More information about the rabbitmq-discuss
mailing list