<html><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; "><br><div><br><div>Begin forwarded message:</div><br class="Apple-interchange-newline"><blockquote type="cite"><div><div style="margin-top: 0px; margin-right: 0px; margin-bottom: 0px; margin-left: 0px; "><font face="Helvetica" size="3" color="#000000" style="font: 12.0px Helvetica; color: #000000"><b>From: </b></font><font face="Helvetica" size="3" style="font: 12.0px Helvetica">"Edwin Fine" <<a href="mailto:emofine@gmail.com">emofine@gmail.com</a>></font></div><div style="margin-top: 0px; margin-right: 0px; margin-bottom: 0px; margin-left: 0px; "><font face="Helvetica" size="3" color="#000000" style="font: 12.0px Helvetica; color: #000000"><b>Date: </b></font><font face="Helvetica" size="3" style="font: 12.0px Helvetica">8 May 2008 22:12:07 BST</font></div><div style="margin-top: 0px; margin-right: 0px; margin-bottom: 0px; margin-left: 0px; "><font face="Helvetica" size="3" color="#000000" style="font: 12.0px Helvetica; color: #000000"><b>To: </b></font><font face="Helvetica" size="3" style="font: 12.0px Helvetica">"Ben Hood" <<a href="mailto:0x6e6562@gmail.com">0x6e6562@gmail.com</a>></font></div><div style="margin-top: 0px; margin-right: 0px; margin-bottom: 0px; margin-left: 0px; "><font face="Helvetica" size="3" color="#000000" style="font: 12.0px Helvetica; color: #000000"><b>Subject: </b></font><font face="Helvetica" size="3" style="font: 12.0px Helvetica"><b>More RabbitMQ Erlang client woes</b></font></div><div style="margin-top: 0px; margin-right: 0px; margin-bottom: 0px; margin-left: 0px; min-height: 14px; "><br></div> </div>Hi Ben,<br><br>Sorry for the long email. Trying to get to the bottom of the problems. I have done a LOT more investigation into the Erlang network client, and I believe there are two related bugs, one in the client, and the other I don't know where, or perhaps I am abusing how AMQP is supposed to work.<br> <br><b>Bug #1: rabbit_writer not shut down<br></b><br>The first (and easier) bug, which I have fixed eventually, is that every time you start a channel, the network client creates a rabbit_writer, but never shuts it down. Therefore, each channel that is created leaves behind a rabbit_writer process.<br> What happens is that in the handshake, the network driver starts a rabbit_writer on channel 0. I believe this is used for the connection itself. Its pid is stored in writer_pid in the connection_state. This writer gets cleaned up properly when the connection is shut down. There is no problem with this.<br> Thereafter, when a channel is opened and amqp_network_driver:open_channel is called, another writer is started (correctly - need one per connection, right?). There is a singleton reader. Anyway, this writer is never terminated. The writer is registered as a peer to the channel, using amqp_channel:register_direct_peer. This causes a bug, because the registered peer is never shut down, probably because the direct peer never should be shut down... but this is the NETWORK peer.<br> <br>So what I did (and you may have a better way) is to add another function, amqp_channel:register_network_peer. This sets a "direct" flag in the channel_state (which I had to add) to false. Calling register_direct_peer sets the flag to true. When amqp_channel:channel_cleanup() is eventually called (and I had to do something for this, too), it checks to see if direct is false. If so, it stops the writer in writer_pid, otherwise it leaves it alone.<br> <br>I also had to add a call to channel_cleanup in the close_ok, because the cleanup was never getting called.:<br><br>amqp_channel:handle_method(ChannelCloseOk = #'channel.close_ok'{}, State) -><br> {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),<br> <b> NewState2 = channel_cleanup(State),</b><br> {stop, normal, NewState2};<br><br>Now this may be major hackery, and you may find a better way. Or, maybe there has been a code change to the client and I have old code. I haven't looked in the past few weeks.<br> <br><br><b>Maybe-Bug #2: I believe that multiple concurrent consumers cannot self-register<br></b><br>If all consumers are registered by a single process (which is what I used to do, and it worked), and not by the consumer process itself, all consume_ok methods are returned correctly. However, if you start more than one consumer right after each other, and they try to self-register, things get mixed up. Take a look at this output below. The key is this debug print:<br> <br><b></b><br>However, the basic.consume method was called with the consumer's PID, and my understanding is that providing the pid of the consumer will ensure that the response gets back to the right one. Here is my code, where you can see the self():<br> <br><span style="font-family: courier new,monospace;"></span><b style="font-family: courier new,monospace;"> #'basic.consume_ok'{consumer_tag = NewConsumerTag} = amqp_channel:call(Channel, BasicConsume, self()),</b><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"></span><br>And the corresponding output, showing that the responses are mixed together.<br><br><span style="font-family: courier new,monospace;">[<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel <0.117.0></span><br style="font-family: courier new,monospace;"> <b><span style="font-family: courier new,monospace;">[<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel <0.117.0></span><br style="font-family: courier new,monospace;"></b> <span style="font-family: courier new,monospace;"></span><span style="font-family: courier new,monospace;">[<0.122.0>] subscribing: Response from amqp_channel:call consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0></span><br style="font-family: courier new,monospace;"> <b><span style="font-family: courier new,monospace;">[<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">> Expected Tag: <<"EMF_TEST_Q.2">></span></b><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"></span><br>-----------------------------------------------------------------------------<br>This is the complete output trace of the test code. I've attached my version of the client (with my debug prints and my mods), and the test source code.<br> -----------------------------------------------------------------------------<br><br><span style="font-family: courier new,monospace;">(xhg@ender)8> rabbit_chan_crash:go(rabbit@ender,2,1).</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">start_writer(Sock = #Port<0.141>, Channel = 0)</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> WriterPid = <0.115.0></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Setting up channel on realm <<"/data">> for connection {<0.112.0>,network}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">start_writer(Sock = #Port<0.141>, Channel = 1)</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> WriterPid = <0.118.0></span><br style="font-family: courier new,monospace;"><b><span style="font-family: courier new,monospace;">amqp_network_driver:open_channel:: Nothing ever shuts this writer down: <0.118.0> *** I believe I have fixed this bug</span></b><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'channel.open_ok'}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'access.request_ok',101}</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Access granted for channel <0.117.0> on realm <<"/data">> for connection {<0.112.0>,</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> network}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Declaring exchange <<"emf_test">> using ticket 101</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'exchange.declare_ok'}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Declared exchange <<"emf_test">> using ticket 101</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Declaring queue <<"EMF_TEST_Q.1">></span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'queue.declare_ok',</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> <<"EMF_TEST_Q.1">>,0,0}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Declared queue <<"EMF_TEST_Q.1">>, msgc = 0, cons_c = 0</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Declaring queue <<"EMF_TEST_Q.2">></span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'queue.declare_ok',</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> <<"EMF_TEST_Q.2">>,0,0}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Declared queue <<"EMF_TEST_Q.2">>, msgc = 0, cons_c = 0</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Binding queue <<"EMF_TEST_Q.1">>, ticket 101, exchange <<"emf_test">>, routing key <<"EMF_TEST_Q.1">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'queue.bind_ok'}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Bound queue <<"EMF_TEST_Q.1">> to exchange <<"emf_test">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Binding queue <<"EMF_TEST_Q.2">>, ticket 101, exchange <<"emf_test">>, routing key <<"EMF_TEST_Q.2">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.117.0>] Calling rpc_bottom_half for {'queue.bind_ok'}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Bound queue <<"EMF_TEST_Q.2">> to exchange <<"emf_test">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Environment setup complete, rmq_state =</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">{rmq_state,"guest","guest",</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> {<0.112.0>,network},</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> <0.117.0>,101,<<"emf_test">>,<<"/data">>,<<"/emf_test">>,</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> rabbit@ender,2,</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> [<<"EMF_TEST_Q.1">>,<<"EMF_TEST_Q.2">>],</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> undefined,undefined}</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Consumers started: [<0.121.0>,<0.122.0>]</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.121.0>] Started consumer tag <<"EMF_TEST_Q.1">> for existing channel/ticket/queue <0.117.0>/101/<<"EMF_TEST_Q.1">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.122.0>] Started consumer tag <<"EMF_TEST_Q.2">> for existing channel/ticket/queue <0.117.0>/101/<<"EMF_TEST_Q.2">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Producer started: <0.123.0></span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">[<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel <0.117.0></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel <0.117.0></span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"><0.120.0></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.122.0>] subscribing: Response from amqp_channel:call consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">[<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">> Expected Tag: <<"EMF_TEST_Q.2">></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">Consumer tag <<"EMF_TEST_Q.2">> pid <0.122.0> unsubscribing</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">Unsubscribing consumer <<"EMF_TEST_Q.2">> from channel <0.117.0></span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">(xhg@ender)9> </span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">=ERROR REPORT==== 8-May-2008::16:53:43 ===</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">** Generic server <0.117.0> terminating </span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">** Last message in was {method,{'basic.consume_ok',<<"EMF_TEST_Q.2">>},none}</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">** When Server state == {channel_state,1,<0.112.0>,<0.114.0>,<0.118.0>,false,</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> #Fun<amqp_network_driver.do.2>,</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> #Fun<amqp_network_driver.do.3>,<<>>,<<>>,false,</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> undefined,</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> {dict,1,16,16,8,80,48,</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> []},</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> {{[],[],[],[],[],[],[],[],</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> [[<<"EMF_TEST_Q.1">>|<0.122.0>]],</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> [],[],[],[],[],[],[]}}}}</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;">** Reason for termination == </span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;">** {badarg,[{amqp_channel,handle_method,2},</span><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"> {gen_server,handle_msg,5},</span><br style="font-family: courier new,monospace;"><span style="font-family: courier new,monospace;"> {proc_lib,init_p,5}]}</span><br style="font-family: courier new,monospace;"> <br> </blockquote></div></body></html>