[rabbitmq-discuss] Fwd: More RabbitMQ Erlang client woes
Ben Hood
0x6e6562 at gmail.com
Thu May 8 22:22:18 BST 2008
Begin forwarded message:
> From: "Edwin Fine" <emofine at gmail.com>
> Date: 8 May 2008 22:12:07 BST
> To: "Ben Hood" <0x6e6562 at gmail.com>
> Subject: More RabbitMQ Erlang client woes
>
> Hi Ben,
>
> Sorry for the long email. Trying to get to the bottom of the
> problems. I have done a LOT more investigation into the Erlang
> network client, and I believe there are two related bugs, one in the
> client, and the other I don't know where, or perhaps I am abusing
> how AMQP is supposed to work.
>
> Bug #1: rabbit_writer not shut down
>
> The first (and easier) bug, which I have fixed eventually, is that
> every time you start a channel, the network client creates a
> rabbit_writer, but never shuts it down. Therefore, each channel that
> is created leaves behind a rabbit_writer process.
> What happens is that in the handshake, the network driver starts a
> rabbit_writer on channel 0. I believe this is used for the
> connection itself. Its pid is stored in writer_pid in the
> connection_state. This writer gets cleaned up properly when the
> connection is shut down. There is no problem with this.
> Thereafter, when a channel is opened and
> amqp_network_driver:open_channel is called, another writer is
> started (correctly - need one per connection, right?). There is a
> singleton reader. Anyway, this writer is never terminated. The
> writer is registered as a peer to the channel, using
> amqp_channel:register_direct_peer. This causes a bug, because the
> registered peer is never shut down, probably because the direct peer
> never should be shut down... but this is the NETWORK peer.
>
> So what I did (and you may have a better way) is to add another
> function, amqp_channel:register_network_peer. This sets a "direct"
> flag in the channel_state (which I had to add) to false. Calling
> register_direct_peer sets the flag to true. When
> amqp_channel:channel_cleanup() is eventually called (and I had to do
> something for this, too), it checks to see if direct is false. If
> so, it stops the writer in writer_pid, otherwise it leaves it alone.
>
> I also had to add a call to channel_cleanup in the close_ok, because
> the cleanup was never getting called.:
>
> amqp_channel:handle_method(ChannelCloseOk = #'channel.close_ok'{},
> State) ->
> {noreply, NewState} = rpc_bottom_half(ChannelCloseOk, State),
> NewState2 = channel_cleanup(State),
> {stop, normal, NewState2};
>
> Now this may be major hackery, and you may find a better way. Or,
> maybe there has been a code change to the client and I have old
> code. I haven't looked in the past few weeks.
>
>
> Maybe-Bug #2: I believe that multiple concurrent consumers cannot
> self-register
>
> If all consumers are registered by a single process (which is what I
> used to do, and it worked), and not by the consumer process itself,
> all consume_ok methods are returned correctly. However, if you start
> more than one consumer right after each other, and they try to self-
> register, things get mixed up. Take a look at this output below. The
> key is this debug print:
>
>
> However, the basic.consume method was called with the consumer's
> PID, and my understanding is that providing the pid of the consumer
> will ensure that the response gets back to the right one. Here is my
> code, where you can see the self():
>
> #'basic.consume_ok'{consumer_tag = NewConsumerTag} =
> amqp_channel:call(Channel, BasicConsume, self()),
>
> And the corresponding output, showing that the responses are mixed
> together.
>
> [<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel
> <0.117.0>
> [<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel
> <0.117.0>
> [<0.122.0>] subscribing: Response from amqp_channel:call
> consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0>
> [<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">>
> Expected Tag: <<"EMF_TEST_Q.2">>
>
> -----------------------------------------------------------------------------
> This is the complete output trace of the test code. I've attached my
> version of the client (with my debug prints and my mods), and the
> test source code.
> -----------------------------------------------------------------------------
>
> (xhg at ender)8> rabbit_chan_crash:go(rabbit at ender,2,1).
> start_writer(Sock = #Port<0.141>, Channel = 0)
> WriterPid = <0.115.0>
> Setting up channel on realm <<"/data">> for connection
> {<0.112.0>,network}
> start_writer(Sock = #Port<0.141>, Channel = 1)
> WriterPid = <0.118.0>
> amqp_network_driver:open_channel:: Nothing ever shuts this writer
> down: <0.118.0> *** I believe I have fixed this bug
> [<0.117.0>] Calling rpc_bottom_half for {'channel.open_ok'}
> [<0.117.0>] Calling rpc_bottom_half for {'access.request_ok',101}
> Access granted for channel <0.117.0> on realm <<"/data">> for
> connection {<0.112.0>,
> network
> }
> Declaring exchange <<"emf_test">> using ticket 101
> [<0.117.0>] Calling rpc_bottom_half for {'exchange.declare_ok'}
> Declared exchange <<"emf_test">> using ticket 101
> Declaring queue <<"EMF_TEST_Q.1">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.declare_ok',
> <<"EMF_TEST_Q.1">>,0,0}
> Declared queue <<"EMF_TEST_Q.1">>, msgc = 0, cons_c = 0
> Declaring queue <<"EMF_TEST_Q.2">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.declare_ok',
> <<"EMF_TEST_Q.2">>,0,0}
> Declared queue <<"EMF_TEST_Q.2">>, msgc = 0, cons_c = 0
> Binding queue <<"EMF_TEST_Q.1">>, ticket 101, exchange
> <<"emf_test">>, routing key <<"EMF_TEST_Q.1">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.bind_ok'}
> Bound queue <<"EMF_TEST_Q.1">> to exchange <<"emf_test">>
> Binding queue <<"EMF_TEST_Q.2">>, ticket 101, exchange
> <<"emf_test">>, routing key <<"EMF_TEST_Q.2">>
> [<0.117.0>] Calling rpc_bottom_half for {'queue.bind_ok'}
> Bound queue <<"EMF_TEST_Q.2">> to exchange <<"emf_test">>
> Environment setup complete, rmq_state =
> {rmq_state,"guest","guest",
> {<0.112.0>,network},
> <0.117.0>,101,<<"emf_test">>,<<"/data">>,<<"/emf_test">>,
> rabbit at ender,2,
> [<<"EMF_TEST_Q.1">>,<<"EMF_TEST_Q.2">>],
> undefined,undefined}
> Consumers started: [<0.121.0>,<0.122.0>]
> [<0.121.0>] Started consumer tag <<"EMF_TEST_Q.1">> for existing
> channel/ticket/queue <0.117.0>/101/<<"EMF_TEST_Q.1">>
> [<0.122.0>] Started consumer tag <<"EMF_TEST_Q.2">> for existing
> channel/ticket/queue <0.117.0>/101/<<"EMF_TEST_Q.2">>
> Producer started: <0.123.0>
> [<0.121.0>] Subscribing consumer <<"EMF_TEST_Q.1">> to channel
> <0.117.0>
> [<0.122.0>] Subscribing consumer <<"EMF_TEST_Q.2">> to channel
> <0.117.0>
> <0.120.0>
> [<0.122.0>] subscribing: Response from amqp_channel:call
> consumer_tag = <<"EMF_TEST_Q.1">> from channel <0.117.0>
> [<0.122.0>] received consume_ok: Actual tag: <<"EMF_TEST_Q.1">>
> Expected Tag: <<"EMF_TEST_Q.2">>
> Consumer tag <<"EMF_TEST_Q.2">> pid <0.122.0> unsubscribing
> Unsubscribing consumer <<"EMF_TEST_Q.2">> from channel <0.117.0>
> (xhg at ender)9>
> =ERROR REPORT==== 8-May-2008::16:53:43 ===
> ** Generic server <0.117.0> terminating
> ** Last message in was {method,{'basic.consume_ok',<<"EMF_TEST_Q.
> 2">>},none}
> ** When Server state == {channel_state,
> 1,<0.112.0>,<0.114.0>,<0.118.0>,false,
> #Fun<amqp_network_driver.do.2>,
> #Fun<amqp_network_driver.do.
> 3>,<<>>,<<>>,false,
> undefined,
> {dict,1,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],[],
> []},
> {{[],[],[],[],[],[],[],[],
> [[<<"EMF_TEST_Q.1">>|<0.122.0>]],
> [],[],[],[],[],[],[]}}}}
> ** Reason for termination ==
> ** {badarg,[{amqp_channel,handle_method,2},
> {gen_server,handle_msg,5},
> {proc_lib,init_p,5}]}
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment.htm
-------------- next part --------------
A non-text attachment was scrubbed...
Name: erlang-client-src-mod-by-edwin-fine.tar.gz
Type: application/x-gzip
Size: 12850 bytes
Desc: not available
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment.bin
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment-0001.htm
-------------- next part --------------
A non-text attachment was scrubbed...
Name: test-src-edwin-fine.tar.gz
Type: application/x-gzip
Size: 5651 bytes
Desc: not available
Url : http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment-0001.bin
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/de442ed9/attachment-0002.htm
More information about the rabbitmq-discuss
mailing list