<html><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; ">Ed,<div><br><div><div>On 8 May 2008, at 15:04, Edwin Fine wrote:</div><br class="Apple-interchange-newline"><blockquote type="cite">Ben,<br> Thanks for looking into this. In the "real" code, all the setup is actually done within a single Erlang process, and the only thing the consumers do is handle the basic.deliver, as well as any basic.consume_ok&nbsp; or basic.cancel_ok messages that may arise. In the test code, I rearranged some things specifically to be done concurrently, which I now see was a mistake. However, this is a bug in the test code, not in the production code. I will change the test code so that there is no concurrent work being done on a channel other than responding with basic.ack to basic.deliver messages.<br></blockquote><div><br></div><div>I've begun a discussion on this topic and it looks like we will add the intelligence to the amqp_channel module to be able to serialize concurrent RPC requests to the channel. I understand that you have a workaround anyway, but on reflection, it seems like a good idea to put this capability into the client. will let you know when it is done.</div><br><blockquote type="cite">I note that there is a special version of amqp_channel:send/3, used only for a basic.consume method, with the last argument being the pid of the process subscribing. Am I correct to assume because of this that the basic.consume method can indeed be sent concurrently by different consumers on the same channel? But the basic.cancel operation does not have this property (specifying a PID in the amqp call) , as far as I can see, which means that a consumer cannot cancel its own subscription without potentially clashing with other consumers that are canceling.<br></blockquote><div><br></div><div>amqp_channel:send/3? There isn't an exported function of that name. I think you mean call/3.</div><div><br></div><div>Any which way, the reason behind sending a consume request with a Pid to the channel process is to allow the channel to register the consumer process against a tag. The channel process and the broker use a consumer tag to correlate messages&nbsp;across&nbsp;the wire. The broker does not know anything about the consuming process. The process looks like this:</div><div><br></div><div>1. Subscribe:</div><div><span class="Apple-tab-span" style="white-space:pre">                                </span><br></div><div>a) User -----(Consumer Pid, Tag?)-----> Channel Process ----(Tag)-----> Broker</div><div>b) User &lt;-----(Tag Ok)----- Channel Process &lt;----(Tag Ok)----- Broker</div><div><br></div><div>2. Receive messages:</div><div><br></div><div>User &lt;-----(Data)----- Channel Process &lt;----(Tag, Data)----- Broker</div><div><br></div><div>3. Unsubscribe:</div><div><br></div><div>a) User -----(Tag)-----> Channel Process ----(Tag)-----> Broker</div><div>b) User &lt;----(Tag Ok)-----> Channel Process &lt;---(Tag Ok)----- Broker</div><div><br></div><div>From an user perspective, you *could* get away without the acks in steps 1b and 3b. However, to be consistent with the Java client API, 3 different types of messages are sent to a consuming process:</div><div><br></div><div>1. ConsumeOk - to acknowledge the registration of a tag or to inform the receiver of an auto-generated tag;</div><div>2. Delivery - normal message delivery;</div><div>3. CancelOk - to let the consuming process know that it's subscription has been cancelled.</div><div><br></div><div>In step 1, the channel process registers the consuming Pid against a tag, so that subsequent messages it receives with that tag can get routed to the correct consumer. The fact that the ack is resent to the consumer is just a matter of the client side API - it provides a consumer with explicit lifecycle events, whether or not they actually care about them.</div><div><br></div><div>When it comes to cancellation, you don't need a Pid, just a tag. This is because the channel process maintains a map between tags and consumers, so you only require the tag to kick off the cancellation process. So whether you cancel yourself or some other process, all you need is the tag.</div><div><br></div><div>The upshot of this is that you can't clash on cancelling, because you can correlate with the tag. However as indicated above, there is a current limitation with concurrent synchronous calls, which I'm hoping to remove now. In fact, being able to cancel a subscription is something that you might want to kick off from a consuming process, which means that forcing application code to serialize this may be a drawback.</div><div><br></div><div>In essence, I think the current subscribe/unsubscribe model makes sense, it just requires the intelligence to serialize synchronous client requests.</div><div><br></div><div>If you think that passing the consume_ok and cancel_ok acks to the consumer is counterintuitive, maybe we could consider&nbsp;parametrizing the client side call so that the client can choose whether the consumer really cares about these lifecycle events. Which ever way, there will only be 2 lifecycle events per consumer, so these could *thereotically* just be left in the consumer's mailbox.&nbsp;</div><div><br></div><div><br></div><blockquote type="cite">So I will change the code back so that everything OTHER than basic.consume and basic.cancel is done within a single process, and take it from there.<br><br>I think this has, however, shed some light on what might have happened in the production code. <b>I am using the consumer process to unsubscribe itself.</b> When there are many consumer processes, this could be a problem because I think basic.cancel does not allow you to specify a return pid and maybe this is causing upset in the channel.<br> <br>&nbsp;&nbsp;&nbsp; #'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, BasicConsume, <b>self()</b>), %%% Notice return pid<br>&nbsp;&nbsp;&nbsp; #'basic.cancel_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, BasicCancel), %%% Notice - no return pid<br> <br>So maybe this should be added to the handling of basic.cancel? Why shouldn't a consumer cancel itself? In fact, why can't all client channel calls specify which PID is calling?</blockquote><div><br></div><div>As indicated above, you don't need a Pid to cancel a subscription, just the tag. So a consumer can cancel itself, if it wants to.</div><div><br></div><div>BTW, all channel calls are aware of the calling pid by default because the channel uses the gen_server behaviour. So if the channel needs to know which Pid invoked what call, it can. In general, it doesn't need to.</div><br><blockquote type="cite"><br><br>One other source of confusion for me is this: when sending an amqp_channel:basic.consume call, why is there both a synchronous response to the call itself and an asynchronous response on the channel?<br> <br>&nbsp;&nbsp;&nbsp; #'basic.consume_ok'{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, BasicConsume, self()),<br>&nbsp;&nbsp;&nbsp; receive<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #'basic.consume_ok'{consumer_tag = ConsumerTag} -><br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; io:format("[~p] ~p got consume_ok~n", [self(), BasicConsume#'basic.consume'.consumer_tag])<br> &nbsp;&nbsp;&nbsp; end.<br><br></blockquote><div><br></div><div>This question was answered above.</div></div><br><div>HTH,</div><div><br></div><div>Ben</div></div></body></html>