Ben,<br>
Thanks for looking into this. In the &quot;real&quot; code, all the setup is actually done within a single Erlang process, and the only thing the consumers do is handle the basic.deliver, as well as any basic.consume_ok&nbsp; or basic.cancel_ok messages that may arise. In the test code, I rearranged some things specifically to be done concurrently, which I now see was a mistake. However, this is a bug in the test code, not in the production code. I will change the test code so that there is no concurrent work being done on a channel other than responding with basic.ack to basic.deliver messages.<br>
<br>I note that there is a special version of amqp_channel:send/3, used only for a basic.consume method, with the last argument being the pid of the process subscribing. Am I correct to assume because of this that the basic.consume method can indeed be sent concurrently by different consumers on the same channel? But the basic.cancel operation does not have this property (specifying a PID in the amqp call) , as far as I can see, which means that a consumer cannot cancel its own subscription without potentially clashing with other consumers that are canceling.<br>
<br>So I will change the code back so that everything OTHER than basic.consume and basic.cancel is done within a single process, and take it from there.<br><br>I think this has, however, shed some light on what might have happened in the production code. <b>I am using the consumer process to unsubscribe itself.</b> When there are many consumer processes, this could be a problem because I think basic.cancel does not allow you to specify a return pid and maybe this is causing upset in the channel.<br>
<br>&nbsp;&nbsp;&nbsp; #&#39;basic.consume_ok&#39;{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, BasicConsume, <b>self()</b>), %%% Notice return pid<br>&nbsp;&nbsp;&nbsp; #&#39;basic.cancel_ok&#39;{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, BasicCancel), %%% Notice - no return pid<br>
<br>So maybe this should be added to the handling of basic.cancel? Why shouldn&#39;t a consumer cancel itself? In fact, why can&#39;t all client channel calls specify which PID is calling?<br><br>One other source of confusion for me is this: when sending an amqp_channel:basic.consume call, why is there both a synchronous response to the call itself and an asynchronous response on the channel?<br>
<br>&nbsp;&nbsp;&nbsp; #&#39;basic.consume_ok&#39;{consumer_tag = ConsumerTag} = amqp_channel:call(Channel, BasicConsume, self()),<br>&nbsp;&nbsp;&nbsp; receive<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #&#39;basic.consume_ok&#39;{consumer_tag = ConsumerTag} -&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; io:format(&quot;[~p] ~p got consume_ok~n&quot;, [self(), BasicConsume#&#39;basic.consume&#39;.consumer_tag])<br>
&nbsp;&nbsp;&nbsp; end.<br><br>Thanks for your help again.<br><br>Regards,<br>Edwin Fine<br><div class="gmail_quote">On Thu, May 8, 2008 at 7:39 AM, Ben Hood &lt;<a href="mailto:0x6e6562@gmail.com">0x6e6562@gmail.com</a>&gt; wrote:<br><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
<div style="">Ed,<div><br></div><div>You may want to hang on a second with implementing any of the described workarounds because I&#39;ve had thought of how to solve this in the client, but I want to run it past the Rabbit dev list first.</div>
<div><br></div><div>HTH,</div><div><br></div><div>Ben<br><div><br><div>Begin forwarded message:</div><br><blockquote type="cite"><div><div style="margin: 0px;"><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal; color: rgb(0, 0, 0);" color="#000000" face="Helvetica" size="3"><b>From: </b></font><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal;" face="Helvetica" size="3">Ben Hood &lt;<a href="mailto:0x6e6562@gmail.com" target="_blank">0x6e6562@gmail.com</a>&gt;</font></div>
<div style="margin: 0px;"><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal; color: rgb(0, 0, 0);" color="#000000" face="Helvetica" size="3"><b>Date: </b></font><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal;" face="Helvetica" size="3">8 May 2008 11:07:05 BST</font></div>
<div style="margin: 0px;"><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal; color: rgb(0, 0, 0);" color="#000000" face="Helvetica" size="3"><b>To: </b></font><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal;" face="Helvetica" size="3"><a href="mailto:rabbitmq-discuss@lists.rabbitmq.com" target="_blank">rabbitmq-discuss@lists.rabbitmq.com</a></font></div>
<div style="margin: 0px;"><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal; color: rgb(0, 0, 0);" color="#000000" face="Helvetica" size="3"><b>Cc: </b></font><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal;" face="Helvetica" size="3">Edwin Fine &lt;<a href="mailto:rabbitmq-discuss_efine@usa.net" target="_blank">rabbitmq-discuss_efine@usa.net</a>&gt;</font></div>
<div style="margin: 0px;"><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal; color: rgb(0, 0, 0);" color="#000000" face="Helvetica" size="3"><b>Subject: </b></font><font style="font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; font-size: 12px; line-height: normal; font-size-adjust: none; font-stretch: normal;" face="Helvetica" size="3"><b>Re: [rabbitmq-discuss] Channel crashes after basic.cancel_ok.</b></font></div>
<div style="margin: 0px; min-height: 14px;"><br></div> </div><div><div></div><div class="Wj3C7c"><div style=""><div>Ed,</div><br><div><div>On 8 May 2008, at 06:13, Edwin Fine wrote:</div><blockquote type="cite"><br>Well, I have put most of the code I am using into a test program (two modules). This code is not for the mailing list&#39;s consumption, if that&#39;s ok with you. The problem is, I can&#39;t even make the code work for more than 1 consumer and I don&#39;t know why. I think I am too tired. In any case, the two modules included are:<br>
 <ul><li>rabbit_chan_crash.erl - The actual test program, which contains most of the Rabbit-related working code;</li></ul></blockquote>I&#39;ve run you code. The problem that that you are executing synchronous RPCs on the same channel concurrently, which doesn&#39;t work, because the protocol flow stipulates that a client performs a blocking receive for synchronous commands within a channel.</div>
<div><br></div><div>By doing this though, you have uncovered a bug in the synchronous RPC handling code in the client that doesn&#39;t protect against this properly. So I&#39;ve fixed this bug and will send it down to the repository.&nbsp;</div>
<div><br></div><div>This will not solve your problem, it will just make a clearer indication of what went wrong. When designing the client, we deliberated whether the client channel module should contain the intelligence to queue up pending RPC requests in order to accept concurrent requests from user code and essentially serialize their dispatch to the server.&nbsp;Initially, we decided against this approach because of the complexity it introduces. In general, queue declaration and binding is an operation that you don&#39;t need to perform concurrently per channel. While I think that this view is still valid, your use case may provide some food for thought. Thoughts anyone?</div>
<div><br></div><div>To solve your problem, you can do one of a few things:</div><div><br></div><div>1. Refactor your code so that synchronous RPCs are not executed in parallel (e.g. QueueDeclare, QueueBind, etc) within one channel process;</div>
<div>2. Manage by exception - catch an&nbsp;illegal_pending_rpc error (which is the bug fix I just put in) and resend the request;</div><div>3. Use more channel processes - this *may* be a bit heavy weight for the TCP client, this is better suited to the direct client.</div>
<div><br></div><div>If you want to go down route 2, let me know because we&#39;re currently in the process of moving the source code from monotone to mercurial.</div><div><br></div><div>BTW, I see that you&#39;ve made a local modification to the amqp_connection module to export the start/4 function which allows the caller to specify a virtual host. This is probably a good idea and we may incorporate this into the API.</div>
<div><br></div><div><blockquote type="cite"><ul><li><span>rbmq_admin.erl - An interface to the admin part of Rabbit that I wrote because I wanted to be able to create exchanges, users, etc from within Erlang instead of the command line. Feel free to use this if you think it&#39;s of any use, or improve it - feedback welcome. Actually, at some point I want to write a full Web admin interface for Rabbit. Real Soon Now.</span></li>
</ul></blockquote><div>Glad you&#39;ve mentioned this topic. I&#39;ve done some initial work of doing some remote management of Rabbit, if and when you actually want to get going, it will probably be a good idea to start a&nbsp;separate discussion thread on this one. There are many design issues that would need to be discussed, most importantly dependencies that may arise.&nbsp;</div>
<div><br></div><br><blockquote type="cite">To compile you will need all the usual RabbitMQ paths, and the Erlang client of course. You will also need .erlang.cookie or setcookie with your Rabbit&#39;s cookie. I am pretty sure there are no external dependencies other than Rabbit and Erlang.<br>
 <br>To run is simple (but please note that the code will try to create a user named emf_test, and an exchange and a bunch of queues). Feel free to hack into shape if you have the time and interest. Tomorrow I will try to run Rabbit and my code co-located.<br>
 <br>&gt;rabbit_chan_crash:go(rabbit@mynode, NumConsumers, NumMsgsPerSec).<br>&gt;rabbit_chan_crash:stop(). % If you can even read the screen to type this in :)<br><br>When I use rabbit_chan_crash:go(rabbit@mynode, 1, 10) it&#39;s all good.<br>
 When I use rabbit_chan_crash:go(rabbit@mynode, 2, 10) it&#39;s all bad. I don&#39;t know what I am doing wrong, but my other code works (each consumer is in its own gen_event server, which is probably why. I think I am getting messages in the wrong message queues due to my lack of Erlang experience. And I am tired, too).<br>
 <br>Anyway I have included the &quot;bad&quot; output.<br><br>If you have any luck please let me know. Feedback welcome, too.<br><br> Regards,<br> Ed<br> <br>(xhg@ender)3&gt; rabbit_chan_crash:go(rabbit@ender,2,10).<br>Setting up channel on realm &lt;&lt;&quot;/data&quot;&gt;&gt; for connection {&lt;0.60.0&gt;,network}<br>
Access granted for channel &lt;0.65.0&gt; on realm &lt;&lt;&quot;/data&quot;&gt;&gt; for connection {&lt;0.60.0&gt;,<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; network}<br>Declaring exchange &lt;&lt;&quot;emf_test&quot;&gt;&gt; using ticket 101<br>
Declared exchange &lt;&lt;&quot;emf_test&quot;&gt;&gt; using ticket 101<br> Declaring queue &lt;&lt;&quot;EMF_TEST_Q.1&quot;&gt;&gt;<br>Declared queue &lt;&lt;&quot;EMF_TEST_Q.1&quot;&gt;&gt;, msgc = 0, cons_c = 0<br>Declaring queue &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;<br>
Declared queue &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;, msgc = 0, cons_c = 0<br> Environment setup complete, rmq_state =<br>{rmq_state,&quot;guest&quot;,&quot;guest&quot;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {&lt;0.60.0&gt;,network},<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &lt;0.65.0&gt;,101,&lt;&lt;&quot;emf_test&quot;&gt;&gt;,&lt;&lt;&quot;/data&quot;&gt;&gt;,&lt;&lt;&quot;/emf_test&quot;&gt;&gt;,<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; rabbit@ender,2}<br>Consumers started: [&lt;0.69.0&gt;,&lt;0.70.0&gt;]<br>[&lt;0.69.0&gt;] Started consumer tag &lt;&lt;&quot;EMF_TEST_Q.1&quot;&gt;&gt; for existing channel/ticket/queue &lt;0.65.0&gt;/101/&lt;&lt;&quot;EMF_TEST_Q.1&quot;&gt;&gt;<br>
 [&lt;0.70.0&gt;] Started consumer tag &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt; for existing channel/ticket/queue &lt;0.65.0&gt;/101/&lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;<br>&lt;0.68.0&gt;<br>Producer started: &lt;0.71.0&gt;<br>
 Binding queue &lt;&lt;&quot;EMF_TEST_Q.1&quot;&gt;&gt;, ticket 101, exchange &lt;&lt;&quot;emf_test&quot;&gt;&gt;, routing key &lt;&lt;&quot;EMF_TEST_Q.1&quot;&gt;&gt;<br>Binding queue &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;, ticket 101, exchange &lt;&lt;&quot;emf_test&quot;&gt;&gt;, routing key &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;<br>
 Bound queue &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt; to exchange &lt;&lt;&quot;emf_test&quot;&gt;&gt;<br>Subscribing consumer &lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt; to channel &lt;0.65.0&gt;<br>(xhg@ender)4&gt; <br>=ERROR REPORT==== 8-May-2008::00:57:23 ===<br>
 ** Generic server &lt;0.65.0&gt; terminating <br>** Last message in was {method,{&#39;basic.consume_ok&#39;,&lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;},none}<br>** When Server state == {channel_state,1,&lt;0.60.0&gt;,&lt;0.62.0&gt;,&lt;0.66.0&gt;,<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #Fun&lt;amqp_network_driver.do.2&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; #Fun&lt;amqp_network_driver.do.3&gt;,&lt;&lt;&gt;&gt;,&lt;0.70.0&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; false,undefined,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {dict,0,16,16,8,80,48,<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; []},<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; [],[]}}}}<br>
 ** Reason for termination == <br>** {function_clause,[{gen_server,reply,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; [&lt;&lt;&gt;&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {&#39;basic.consume_ok&#39;,&lt;&lt;&quot;EMF_TEST_Q.2&quot;&gt;&gt;}]},<br>
 &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {amqp_channel,rpc_bottom_half,2},<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {amqp_channel,handle_method,2},<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {gen_server,handle_msg,5},<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; {proc_lib,init_p,5}]}<br><br>=ERROR REPORT==== 8-May-2008::00:57:23 ===<br>
 Error in process &lt;0.70.0&gt; on node &#39;xhg@ender&#39; with exit value: {{badmatch,{&#39;queue.bind_ok&#39;}},[{rabbit_chan_crash,subscribe,2},{rabbit_chan_crash,consumer,5}]}<br>---------</blockquote><br></div><div>
<br></div><div>HTH,</div><div><br></div><div>Ben</div><div><br></div><div>PS I&#39;m attaching your code to the list for reference.</div><div><br></div><div><div></div></div></div></div></div></blockquote></div></div></div>
<br><div style=""><div><div><blockquote type="cite"><div style=""></div><div style=""><div><div></div></div></div></blockquote></div><br></div></div><br></blockquote></div><br>