[rabbitmq-discuss] More RabbitMQ Erlang client woes

Ben Hood 0x6e6562 at gmail.com
Fri May 9 10:56:51 BST 2008


Ed,

Thanks for taking time to look into this so deeply.

You have indeed found a bug in the network client with the fact that  
it is not shutting the per-channel writer processes down. See below.

On 8 May 2008, at 22:12, Edwin Fine wrote:
>
> Sorry for the long email. Trying to get to the bottom of the  
> problems. I have done a LOT more investigation into the Erlang  
> network client, and I believe there are two related bugs, one in the  
> client, and the other I don't know where, or perhaps I am abusing  
> how AMQP is supposed to work.
>
> Bug #1: rabbit_writer not shut down
>
> The first (and easier) bug, which I have fixed eventually, is that  
> every time you start a channel, the network client creates a  
> rabbit_writer, but never shuts it down. Therefore, each channel that  
> is created leaves behind a rabbit_writer process.
> What happens is that in the handshake, the network driver starts a  
> rabbit_writer on channel 0. I believe this is used for the  
> connection itself. Its pid is stored in writer_pid in the  
> connection_state. This writer gets cleaned up properly when the  
> connection is shut down. There is no problem with this.
> Thereafter, when a channel is opened and  
> amqp_network_driver:open_channel is called, another writer is  
> started (correctly - need one per connection, right?). There is a  
> singleton reader. Anyway, this writer is never terminated. The  
> writer is registered as a peer to the channel, using  
> amqp_channel:register_direct_peer. This causes a bug, because the  
> registered peer is never shut down, probably because the direct peer  
> never should be shut down... but this is the NETWORK peer.
>
> So what I did (and you may have a better way) is to add another  
> function, amqp_channel:register_network_peer. This sets a "direct"  
> flag in the channel_state (which I had to add) to false. Calling  
> register_direct_peer sets the flag to true. When  
> amqp_channel:channel_cleanup() is eventually called (and I had to do  
> something for this, too), it checks to see if direct is false. If  
> so, it stops the writer in writer_pid, otherwise it leaves it alone.
>

I can see what the intention is. The main thing is that you have  
understood how all of the processes hang together which is a little  
tricky in the network case because the client is using a common  
codebase with the server to manage socket IO and AMQP channels. This  
is why it may not be immediately apparent to the naked eye why certain  
things have been done in the way they are.

To achieve this I think the cleanup should be contained within the  
network driver either as a callback to allow the network driver to  
stop the channel writer process or even linking the channel process  
with the writer process so that when the channel exits, the writer  
process receives a signal as well. I think I prefer the latter, but  
this would mean changing the writer module, which is core module of  
the server and we'd need to look at the implications of this.

So watch this space.

> I also had to add a call to channel_cleanup in the close_ok, because  
> the cleanup was never getting called.:
>
> amqp_channel:handle_method(ChannelCloseOk = #'channel.close_ok'{},  
> State) ->
>     {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),
>     NewState2 = channel_cleanup(State),
>     {stop, normal, NewState2};
>

That's a good point. I think the cleanup code should go into the  
gen_server terminate callback to keep it one place.

> Maybe-Bug #2: I believe that multiple concurrent consumers cannot  
> self-register
>
> If all consumers are registered by a single process (which is what I  
> used to do, and it worked), and not by the consumer process itself,  
> all consume_ok methods are returned correctly. However, if you start  
> more than one consumer right after each other, and they try to self- 
> register, things get mixed up. Take a look at this output below. The  
> key is this debug print:
>
>
> However, the basic.consume method was called with the consumer's  
> PID, and my understanding is that providing the pid of the consumer  
> will ensure that the response gets back to the right one. Here is my  
> code, where you can see the self():
>
>     #'basic.consume_ok'{consumer_tag = NewConsumerTag} =  
> amqp_channel:call(Channel, BasicConsume, self()),
>
> And the corresponding output, showing that the responses are mixed  
> together.
>
> [<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel  
> <0.117.0>
> [<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel  
> <0.117.0>
> [<0.122.0>] subscribing: Response from amqp_channel:call  
> consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0>
> [<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">>  
> Expected Tag: <<"EMF_TEST_Q.2">>
>

Yes,  this is a bug. On inspection of the handle_call({basic_consume,  
Method, Consumer}, From, State) function in amqp_channel, there is a  
race condition between concurrently subscribing consumers. I will  
probably address this using the same strategy to serialize all  
synchronous RPC requests. If the client supplies a unique tag to  
correlate on, this could be done concurrently by maintaining a map of  
pending subscriptions rather than just the last pending subscription.

I'll fix this and let you know.

In your patch, you also added an extra method to handle a spurious  
consume_ok message;

%% Edwin Fine bugfix: This is actually being called wrong from  
somewhere,
%% but this will fix it.
handle_method({'basic.consume_ok', ConsumerTag}, State) ->
     io:format("[~p] Got bad handle_method call!~n", [self()]),
     handle_method(#'basic.consume_ok'{consumer_tag = ConsumerTag},  
State);

This method is preceded in the code by the following function:

handle_method(BasicConsumeOk = #'basic.consume_ok'{consumer_tag =  
ConsumerTag},
                         State = #channel_state{pending_consumer =  
Consumer}) ->
     Consumer ! BasicConsumeOk,
     NewState = register_consumer(ConsumerTag, Consumer, State),
     {noreply, NewState2} = rpc_bottom_half(BasicConsumeOk, NewState),
     {noreply, NewState2#channel_state{pending_consumer = <<>>} };

So I not quite sure why that didn't match first because  
#'basic.consume_ok'{consumer_tag = ConsumerTag} should match against  
{'basic.consume_ok', ConsumerTag} and #channel_state{pending_consumer  
= Consumer} should match even if the pending_consumer was not defined.

This will require some more analysis.

HTH,

Ben
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080509/c321541d/attachment.htm 


More information about the rabbitmq-discuss mailing list