[rabbitmq-discuss] Fwd: More RabbitMQ Erlang client woes

Ben Hood 0x6e6562 at gmail.com
Thu May 8 22:22:18 BST 2008



Begin forwarded message:

> From: "Edwin Fine" <emofine at gmail.com>
> Date: 8 May 2008 22:12:07 BST
> To: "Ben Hood" <0x6e6562 at gmail.com>
> Subject: More RabbitMQ Erlang client woes
>
> Hi Ben,
>
> Sorry for the long email. Trying to get to the bottom of the  
> problems. I have done a LOT more investigation into the Erlang  
> network client, and I believe there are two related bugs, one in the  
> client, and the other I don't know where, or perhaps I am abusing  
> how AMQP is supposed to work.
>
> Bug #1: rabbit_writer not shut down
>
> The first (and easier) bug, which I have fixed eventually, is that  
> every time you start a channel, the network client creates a  
> rabbit_writer, but never shuts it down. Therefore, each channel that  
> is created leaves behind a rabbit_writer process.
> What happens is that in the handshake, the network driver starts a  
> rabbit_writer on channel 0. I believe this is used for the  
> connection itself. Its pid is stored in writer_pid in the  
> connection_state. This writer gets cleaned up properly when the  
> connection is shut down. There is no problem with this.
> Thereafter, when a channel is opened and  
> amqp_network_driver:open_channel is called, another writer is  
> started (correctly - need one per connection, right?). There is a  
> singleton reader. Anyway, this writer is never terminated. The  
> writer is registered as a peer to the channel, using  
> amqp_channel:register_direct_peer. This causes a bug, because the  
> registered peer is never shut down, probably because the direct peer  
> never should be shut down... but this is the NETWORK peer.
>
> So what I did (and you may have a better way) is to add another  
> function, amqp_channel:register_network_peer. This sets a "direct"  
> flag in the channel_state (which I had to add) to false. Calling  
> register_direct_peer sets the flag to true. When  
> amqp_channel:channel_cleanup() is eventually called (and I had to do  
> something for this, too), it checks to see if direct is false. If  
> so, it stops the writer in writer_pid, otherwise it leaves it alone.
>
> I also had to add a call to channel_cleanup in the close_ok, because  
> the cleanup was never getting called.:
>
> amqp_channel:handle_method(ChannelCloseOk = #'channel.close_ok'{},  
> State) ->
>     {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),
>     NewState2 = channel_cleanup(State),
>     {stop, normal, NewState2};
>
> Now this may be major hackery, and you may find a better way. Or,  
> maybe there has been a code change to the client and I have old  
> code. I haven't looked in the past few weeks.
>
>
> Maybe-Bug #2: I believe that multiple concurrent consumers cannot  
> self-register
>
> If all consumers are registered by a single process (which is what I  
> used to do, and it worked), and not by the consumer process itself,  
> all consume_ok methods are returned correctly. However, if you start  
> more than one consumer right after each other, and they try to self- 
> register, things get mixed up. Take a look at this output below. The  
> key is this debug print:
>
>
> However, the basic.consume method was called with the consumer's  
> PID, and my understanding is that providing the pid of the consumer  
> will ensure that the response gets back to the right one. Here is my  
> code, where you can see the self():
>
>     #'basic.consume_ok'{consumer_tag = NewConsumerTag} =  
> amqp_channel:call(Channel, BasicConsume, self()),
>
> And the corresponding output, showing that the responses are mixed  
> together.
>
> [<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel  
> <0.117.0>
> [<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel  
> <0.117.0>
> [<0.122.0>] subscribing: Response from amqp_channel:call  
> consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0>
> [<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">>  
> Expected Tag: <<"EMF_TEST_Q.2">>
>
> -----------------------------------------------------------------------------
> This is the complete output trace of the test code. I've attached my  
> version of the client (with my debug prints and my mods), and the  
> test source code.
> -----------------------------------------------------------------------------
>
> (xhg at ender)8> rabbit_chan_crash:go(rabbit at ender,2,1).
> start_writer(Sock = #Port<0.141>, Channel = 0)
>   WriterPid = <0.115.0>
> Setting up channel on realm <<"/data">> for connection  
> {<0.112.0>,network}
> start_writer(Sock = #Port<0.141>, Channel = 1)
>   WriterPid = <0.118.0>
> amqp_network_driver:open_channel:: Nothing ever shuts this writer  
> down: <0.118.0> *** I believe I have fixed this bug
> [<0.117.0>] Calling rpc_bottom_half for {'channel.open_ok'}
> [<0.117.0>] Calling rpc_bottom_half for {'access.request_ok',101}
> Access granted for channel <0.117.0> on realm <<"/data">> for  
> connection {<0.112.0>,
>                                                                           network 
> }
> Declaring exchange <<"emf_test">> using ticket 101
> [<0.117.0>] Calling rpc_bottom_half for {'exchange.declare_ok'}
> Declared exchange <<"emf_test">> using ticket 101
> Declaring queue <<"EMF_TEST_Q.1">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.declare_ok',
>                                          <<"EMF_TEST_Q.1">>,0,0}
> Declared queue <<"EMF_TEST_Q.1">>, msgc = 0, cons_c = 0
> Declaring queue <<"EMF_TEST_Q.2">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.declare_ok',
>                                          <<"EMF_TEST_Q.2">>,0,0}
> Declared queue <<"EMF_TEST_Q.2">>, msgc = 0, cons_c = 0
> Binding queue <<"EMF_TEST_Q.1">>, ticket 101, exchange  
> <<"emf_test">>, routing key <<"EMF_TEST_Q.1">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.bind_ok'}
> Bound queue <<"EMF_TEST_Q.1">> to exchange <<"emf_test">>
> Binding queue <<"EMF_TEST_Q.2">>, ticket 101, exchange  
> <<"emf_test">>, routing key <<"EMF_TEST_Q.2">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.bind_ok'}
> Bound queue <<"EMF_TEST_Q.2">> to exchange <<"emf_test">>
> Environment setup complete, rmq_state =
> {rmq_state,"guest","guest",
>            {<0.112.0>,network},
>            <0.117.0>,101,<<"emf_test">>,<<"/data">>,<<"/emf_test">>,
>            rabbit at ender,2,
>            [<<"EMF_TEST_Q.1">>,<<"EMF_TEST_Q.2">>],
>            undefined,undefined}
> Consumers started: [<0.121.0>,<0.122.0>]
> [<0.121.0>] Started consumer tag <<"EMF_TEST_Q.1">> for existing  
> channel/ticket/queue <0.117.0>/101/<<"EMF_TEST_Q.1">>
> [<0.122.0>] Started consumer tag <<"EMF_TEST_Q.2">> for existing  
> channel/ticket/queue <0.117.0>/101/<<"EMF_TEST_Q.2">>
> Producer started: <0.123.0>
> [<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel  
> <0.117.0>
> [<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel  
> <0.117.0>
> <0.120.0>
> [<0.122.0>] subscribing: Response from amqp_channel:call  
> consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0>
> [<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">>  
> Expected Tag: <<"EMF_TEST_Q.2">>
> Consumer tag <<"EMF_TEST_Q.2">> pid <0.122.0> unsubscribing
> Unsubscribing consumer <<"EMF_TEST_Q.2">> from channel <0.117.0>
> (xhg at ender)9>
> =ERROR REPORT==== 8-May-2008::16:53:43 ===
> ** Generic server <0.117.0> terminating
> ** Last message in was {method,{'basic.consume_ok',<<"EMF_TEST_Q. 
> 2">>},none}
> ** When Server state == {channel_state, 
> 1,<0.112.0>,<0.114.0>,<0.118.0>,false,
>                             #Fun<amqp_network_driver.do.2>,
>                             #Fun<amqp_network_driver.do. 
> 3>,<<>>,<<>>,false,
>                             undefined,
>                             {dict,1,16,16,8,80,48,
>                                 {[],[],[],[],[],[],[],[],[],[],[],[], 
> [],[],[],
>                                  []},
>                                 {{[],[],[],[],[],[],[],[],
>                                   [[<<"EMF_TEST_Q.1">>|<0.122.0>]],
>                                   [],[],[],[],[],[],[]}}}}
> ** Reason for termination ==
> ** {badarg,[{amqp_channel,handle_method,2},
>             {gen_server,handle_msg,5},
>             {proc_lib,init_p,5}]}
>

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment.htm 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: erlang-client-src-mod-by-edwin-fine.tar.gz
Type: application/x-gzip
Size: 12850 bytes
Desc: not available
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment.bin 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment-0001.htm 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: test-src-edwin-fine.tar.gz
Type: application/x-gzip
Size: 5651 bytes
Desc: not available
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment-0001.bin 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment-0002.htm 


More information about the rabbitmq-discuss mailing list