<html><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; ">Ed,<div><br></div><div>Thanks for taking time to look into this so deeply.</div><div><br></div><div>You have indeed found a bug in the network client with the fact that it is not shutting the per-channel writer processes down. See below.</div><div><br><div><div>On 8 May 2008, at 22:12, Edwin Fine wrote:</div><blockquote type="cite"><br>Sorry for the long email. Trying to get to the bottom of the problems. I have done a LOT more investigation into the Erlang network client, and I believe there are two related bugs, one in the client, and the other I don't know where, or perhaps I am abusing how AMQP is supposed to work.<br> <br><b>Bug #1: rabbit_writer not shut down<br></b><br>The first (and easier) bug, which I have fixed eventually, is that every time you start a channel, the network client creates a rabbit_writer, but never shuts it down. Therefore, each channel that is created leaves behind a rabbit_writer process.<br> What happens is that in the handshake, the network driver starts a rabbit_writer on channel 0. I believe this is used for the connection itself. Its pid is stored in writer_pid in the connection_state. This writer gets cleaned up properly when the connection is shut down. There is no problem with this.<br> Thereafter, when a channel is opened and amqp_network_driver:open_channel is called, another writer is started (correctly - need one per connection, right?). There is a singleton reader. Anyway, this writer is never terminated. The writer is registered as a peer to the channel, using amqp_channel:register_direct_peer. This causes a bug, because the registered peer is never shut down, probably because the direct peer never should be shut down... but this is the NETWORK peer.<br> <br>So what I did (and you may have a better way) is to add another function, amqp_channel:register_network_peer. This sets a "direct" flag in the channel_state (which I had to add) to false. Calling register_direct_peer sets the flag to true. When amqp_channel:channel_cleanup() is eventually called (and I had to do something for this, too), it checks to see if direct is false. If so, it stops the writer in writer_pid, otherwise it leaves it alone.<br> <br></blockquote><div><br></div><div>I can see what the intention is. The main thing is that you have understood how all of the processes hang together which is a little tricky in the network case because the client is using a common codebase with the server to manage socket IO and AMQP channels. This is why it may not be immediately apparent to the naked eye why certain things have been done in the way they are.</div><div><br></div><div>To achieve this I think the cleanup should be contained within the network driver either as a callback to allow the network driver to stop the channel writer process or even linking the channel process with the writer process so that when the channel exits, the writer process receives a signal as well. I think I prefer the latter, but this would mean changing the writer module, which is core module of the server and we'd need to look at the implications of this.</div><div><br></div><div>So watch this space.</div><br><blockquote type="cite">I also had to add a call to channel_cleanup in the close_ok, because the cleanup was never getting called.:<br><br>amqp_channel:handle_method(ChannelCloseOk = #'channel.close_ok'{}, State) -><br> {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),<br> <b> NewState2 = channel_cleanup(State),</b><br> {stop, normal, NewState2};<br><br></blockquote><div><br></div><div>That's a good point. I think the cleanup code should go into the gen_server terminate callback to keep it one place.</div><br><blockquote type="cite"><b>Maybe-Bug #2: I believe that multiple concurrent consumers cannot self-register<br></b><br>If all consumers are registered by a single process (which is what I used to do, and it worked), and not by the consumer process itself, all consume_ok methods are returned correctly. However, if you start more than one consumer right after each other, and they try to self-register, things get mixed up. Take a look at this output below. The key is this debug print:<br> <br><b></b><br>However, the basic.consume method was called with the consumer's PID, and my understanding is that providing the pid of the consumer will ensure that the response gets back to the right one. Here is my code, where you can see the self():<br> <br><span style="font-family: courier new,monospace;"></span><b style="font-family: courier new,monospace;"> #'basic.consume_ok'{consumer_tag = NewConsumerTag} = amqp_channel:call(Channel, BasicConsume, self()),</b><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"></span><br>And the corresponding output, showing that the responses are mixed together.<br><br><span style="font-family: courier new,monospace;">[<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel <0.117.0></span><br style="font-family: courier new,monospace;"> <b><span style="font-family: courier new,monospace;">[<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel <0.117.0></span><br style="font-family: courier new,monospace;"></b> <span style="font-family: courier new,monospace;"></span><span style="font-family: courier new,monospace;">[<0.122.0>] subscribing: Response from amqp_channel:call consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0></span><br style="font-family: courier new,monospace;"> <b><span style="font-family: courier new,monospace;">[<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">> Expected Tag: <<"EMF_TEST_Q.2">></span></b><br style="font-family: courier new,monospace;"> <span style="font-family: courier new,monospace;"></span><br></blockquote><div><br></div><div>Yes, this is a bug. On inspection of the handle_call({basic_consume, Method, Consumer}, From, State) function in amqp_channel, there is a race condition between concurrently subscribing consumers. I will probably address this using the same strategy to serialize all synchronous RPC requests. If the client supplies a unique tag to correlate on, this could be done concurrently by maintaining a map of pending subscriptions rather than just the last pending subscription.</div><div><br></div><div>I'll fix this and let you know.</div><div><br></div><div>In your patch, you also added an extra method to handle a spurious consume_ok message;</div><div><br></div><div><div>%% Edwin Fine bugfix: This is actually being called wrong from somewhere,</div><div>%% but this will fix it.</div><div>handle_method({'basic.consume_ok', ConsumerTag}, State) -></div><div> io:format("[~p] Got bad handle_method call!~n", [self()]),</div><div> handle_method(#'basic.consume_ok'{consumer_tag = ConsumerTag}, State);</div></div><div><br></div><div>This method is preceded in the code by the following function:</div><div><br></div><div><div>handle_method(BasicConsumeOk = #'basic.consume_ok'{consumer_tag = ConsumerTag},</div><div> State = #channel_state{pending_consumer = Consumer}) -></div><div> Consumer ! BasicConsumeOk,</div><div> NewState = register_consumer(ConsumerTag, Consumer, State),</div><div> {noreply, NewState2} = rpc_bottom_half(BasicConsumeOk, NewState),</div><div> {noreply, NewState2#channel_state{pending_consumer = <<>>} };</div><div><br></div><div>So I not quite sure why that didn't match first because #'basic.consume_ok'{consumer_tag = ConsumerTag} should match against {'basic.consume_ok', ConsumerTag} and #channel_state{pending_consumer = Consumer} should match even if the pending_consumer was not defined.</div><div><br></div><div>This will require some more analysis.</div><div><br></div><div>HTH,</div><div><br></div><div>Ben</div></div></div></div></body></html>