[rabbitmq-discuss] Channel crashes after basic.cancel_ok.

Edwin Fine rabbitmq-discuss_efine at usa.net
Thu May 8 15:04:52 BST 2008


Ben,
Thanks for looking into this. In the "real" code, all the setup is actually
done within a single Erlang process, and the only thing the consumers do is
handle the basic.deliver, as well as any basic.consume_ok  or
basic.cancel_ok messages that may arise. In the test code, I rearranged some
things specifically to be done concurrently, which I now see was a mistake.
However, this is a bug in the test code, not in the production code. I will
change the test code so that there is no concurrent work being done on a
channel other than responding with basic.ack to basic.deliver messages.

I note that there is a special version of amqp_channel:send/3, used only for
a basic.consume method, with the last argument being the pid of the process
subscribing. Am I correct to assume because of this that the basic.consume
method can indeed be sent concurrently by different consumers on the same
channel? But the basic.cancel operation does not have this property
(specifying a PID in the amqp call) , as far as I can see, which means that
a consumer cannot cancel its own subscription without potentially clashing
with other consumers that are canceling.

So I will change the code back so that everything OTHER than basic.consume
and basic.cancel is done within a single process, and take it from there.

I think this has, however, shed some light on what might have happened in
the production code. *I am using the consumer process to unsubscribe itself.
* When there are many consumer processes, this could be a problem because I
think basic.cancel does not allow you to specify a return pid and maybe this
is causing upset in the channel.

    #'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, BasicConsume, *self()*), %%% Notice return pid
    #'basic.cancel_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, BasicCancel), %%% Notice - no return pid

So maybe this should be added to the handling of basic.cancel? Why shouldn't
a consumer cancel itself? In fact, why can't all client channel calls
specify which PID is calling?

One other source of confusion for me is this: when sending an
amqp_channel:basic.consume call, why is there both a synchronous response to
the call itself and an asynchronous response on the channel?

    #'basic.consume_ok'{consumer_tag = ConsumerTag} =
amqp_channel:call(Channel, BasicConsume, self()),
    receive
        #'basic.consume_ok'{consumer_tag = ConsumerTag} ->
            io:format("[~p] ~p got consume_ok~n", [self(),
BasicConsume#'basic.consume'.consumer_tag])
    end.

Thanks for your help again.

Regards,
Edwin Fine
On Thu, May 8, 2008 at 7:39 AM, Ben Hood <0x6e6562 at gmail.com> wrote:

> Ed,
> You may want to hang on a second with implementing any of the described
> workarounds because I've had thought of how to solve this in the client, but
> I want to run it past the Rabbit dev list first.
>
> HTH,
>
> Ben
>
> Begin forwarded message:
>
> *From: *Ben Hood <0x6e6562 at gmail.com>
> *Date: *8 May 2008 11:07:05 BST
> *To: *rabbitmq-discuss at lists.rabbitmq.com
> *Cc: *Edwin Fine <rabbitmq-discuss_efine at usa.net>
> *Subject: **Re: [rabbitmq-discuss] Channel crashes after basic.cancel_ok.*
>
> Ed,
>
> On 8 May 2008, at 06:13, Edwin Fine wrote:
>
>
> Well, I have put most of the code I am using into a test program (two
> modules). This code is not for the mailing list's consumption, if that's ok
> with you. The problem is, I can't even make the code work for more than 1
> consumer and I don't know why. I think I am too tired. In any case, the two
> modules included are:
>
>    - rabbit_chan_crash.erl - The actual test program, which contains most
>    of the Rabbit-related working code;
>
> I've run you code. The problem that that you are executing synchronous RPCs
> on the same channel concurrently, which doesn't work, because the protocol
> flow stipulates that a client performs a blocking receive for synchronous
> commands within a channel.
>
> By doing this though, you have uncovered a bug in the synchronous RPC
> handling code in the client that doesn't protect against this properly. So
> I've fixed this bug and will send it down to the repository.
>
> This will not solve your problem, it will just make a clearer indication of
> what went wrong. When designing the client, we deliberated whether the
> client channel module should contain the intelligence to queue up pending
> RPC requests in order to accept concurrent requests from user code and
> essentially serialize their dispatch to the server. Initially, we decided
> against this approach because of the complexity it introduces. In general,
> queue declaration and binding is an operation that you don't need to perform
> concurrently per channel. While I think that this view is still valid, your
> use case may provide some food for thought. Thoughts anyone?
>
> To solve your problem, you can do one of a few things:
>
> 1. Refactor your code so that synchronous RPCs are not executed in parallel
> (e.g. QueueDeclare, QueueBind, etc) within one channel process;
> 2. Manage by exception - catch an illegal_pending_rpc error (which is the
> bug fix I just put in) and resend the request;
> 3. Use more channel processes - this *may* be a bit heavy weight for the
> TCP client, this is better suited to the direct client.
>
> If you want to go down route 2, let me know because we're currently in the
> process of moving the source code from monotone to mercurial.
>
> BTW, I see that you've made a local modification to the amqp_connection
> module to export the start/4 function which allows the caller to specify a
> virtual host. This is probably a good idea and we may incorporate this into
> the API.
>
>
>    - rbmq_admin.erl - An interface to the admin part of Rabbit that I
>    wrote because I wanted to be able to create exchanges, users, etc from
>    within Erlang instead of the command line. Feel free to use this if you
>    think it's of any use, or improve it - feedback welcome. Actually, at some
>    point I want to write a full Web admin interface for Rabbit. Real Soon Now.
>
> Glad you've mentioned this topic. I've done some initial work of doing some
> remote management of Rabbit, if and when you actually want to get going, it
> will probably be a good idea to start a separate discussion thread on this
> one. There are many design issues that would need to be discussed, most
> importantly dependencies that may arise.
>
>
> To compile you will need all the usual RabbitMQ paths, and the Erlang
> client of course. You will also need .erlang.cookie or setcookie with your
> Rabbit's cookie. I am pretty sure there are no external dependencies other
> than Rabbit and Erlang.
>
> To run is simple (but please note that the code will try to create a user
> named emf_test, and an exchange and a bunch of queues). Feel free to hack
> into shape if you have the time and interest. Tomorrow I will try to run
> Rabbit and my code co-located.
>
> >rabbit_chan_crash:go(rabbit at mynode, NumConsumers, NumMsgsPerSec).
> >rabbit_chan_crash:stop(). % If you can even read the screen to type this
> in :)
>
> When I use rabbit_chan_crash:go(rabbit at mynode, 1, 10) it's all good.
> When I use rabbit_chan_crash:go(rabbit at mynode, 2, 10) it's all bad. I
> don't know what I am doing wrong, but my other code works (each consumer is
> in its own gen_event server, which is probably why. I think I am getting
> messages in the wrong message queues due to my lack of Erlang experience.
> And I am tired, too).
>
> Anyway I have included the "bad" output.
>
> If you have any luck please let me know. Feedback welcome, too.
>
> Regards,
> Ed
>
> (xhg at ender)3> rabbit_chan_crash:go(rabbit at ender,2,10).
> Setting up channel on realm <<"/data">> for connection {<0.60.0>,network}
> Access granted for channel <0.65.0> on realm <<"/data">> for connection
> {<0.60.0>,
>
> network}
> Declaring exchange <<"emf_test">> using ticket 101
> Declared exchange <<"emf_test">> using ticket 101
> Declaring queue <<"EMF_TEST_Q.1">>
> Declared queue <<"EMF_TEST_Q.1">>, msgc = 0, cons_c = 0
> Declaring queue <<"EMF_TEST_Q.2">>
> Declared queue <<"EMF_TEST_Q.2">>, msgc = 0, cons_c = 0
> Environment setup complete, rmq_state =
> {rmq_state,"guest","guest",
>            {<0.60.0>,network},
>            <0.65.0>,101,<<"emf_test">>,<<"/data">>,<<"/emf_test">>,
>            rabbit at ender,2}
> Consumers started: [<0.69.0>,<0.70.0>]
> [<0.69.0>] Started consumer tag <<"EMF_TEST_Q.1">> for existing
> channel/ticket/queue <0.65.0>/101/<<"EMF_TEST_Q.1">>
> [<0.70.0>] Started consumer tag <<"EMF_TEST_Q.2">> for existing
> channel/ticket/queue <0.65.0>/101/<<"EMF_TEST_Q.2">>
> <0.68.0>
> Producer started: <0.71.0>
> Binding queue <<"EMF_TEST_Q.1">>, ticket 101, exchange <<"emf_test">>,
> routing key <<"EMF_TEST_Q.1">>
> Binding queue <<"EMF_TEST_Q.2">>, ticket 101, exchange <<"emf_test">>,
> routing key <<"EMF_TEST_Q.2">>
> Bound queue <<"EMF_TEST_Q.2">> to exchange <<"emf_test">>
> Subscribing consumer <<"EMF_TEST_Q.2">> to channel <0.65.0>
> (xhg at ender)4>
> =ERROR REPORT==== 8-May-2008::00:57:23 ===
> ** Generic server <0.65.0> terminating
> ** Last message in was
> {method,{'basic.consume_ok',<<"EMF_TEST_Q.2">>},none}
> ** When Server state == {channel_state,1,<0.60.0>,<0.62.0>,<0.66.0>,
>                             #Fun<amqp_network_driver.do.2>,
>                             #Fun<amqp_network_driver.do.3>,<<>>,<0.70.0>,
>                             false,undefined,
>                             {dict,0,16,16,8,80,48,
>
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
>                                  []},
>
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
>                                   [],[]}}}}
> ** Reason for termination ==
> ** {function_clause,[{gen_server,reply,
>                                  [<<>>,
>
> {'basic.consume_ok',<<"EMF_TEST_Q.2">>}]},
>                      {amqp_channel,rpc_bottom_half,2},
>                      {amqp_channel,handle_method,2},
>                      {gen_server,handle_msg,5},
>                      {proc_lib,init_p,5}]}
>
> =ERROR REPORT==== 8-May-2008::00:57:23 ===
> Error in process <0.70.0> on node 'xhg at ender' with exit value:
> {{badmatch,{'queue.bind_ok'}},[{rabbit_chan_crash,subscribe,2},{rabbit_chan_crash,consumer,5}]}
> ---------
>
>
>
> HTH,
>
> Ben
>
> PS I'm attaching your code to the list for reference.
>
>
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080508/c7e6ab16/attachment.htm 


More information about the rabbitmq-discuss mailing list