[rabbitmq-discuss] Channel crashes after basic.cancel_ok.

Edwin Fine rabbitmq-discuss_efine at usa.net
Mon May 12 20:56:59 BST 2008


Ben,

I was monitoring my running system, and unless I am mistaken, it looks as if
amqp_network_connection:reader_loop/4 has a non-tail recursive call, which
is making it eat stack and heap like there's no tomorrow. There's a call to
gen_tcp:close(Sock) at the end of the function. I suggest you move this to
the last line of start_reader as shown:

start_reader(Sock, FramingPid) ->
    process_flag(trap_exit, true),
    put({channel, 0},{chpid, FramingPid}),
    {ok, Ref} = prim_inet:async_recv(Sock, 7, -1),
    reader_loop(Sock, undefined, undefined, undefined),
*    gen_tcp:close(Sock).*

reader_loop(Sock, Type, Channel, Length) ->
    receive
        {inet_async, Sock, _, {ok, <<Payload:Length/binary,?FRAME_END>>} }
->
            case handle_frame(Type, Channel, Payload) of
                closed_ok ->
                    ok;
                _ ->
                    {ok, Ref} = prim_inet:async_recv(Sock, 7, -1),
                    reader_loop(Sock, undefined, undefined, undefined)
            end;
        {inet_async, Sock, _, {ok, <<_Type:8,_Channel:16,PayloadSize:32>>}}
->
            {ok, Ref} = prim_inet:async_recv(Sock, PayloadSize + 1, -1),
            reader_loop(Sock, _Type, _Channel, PayloadSize);
        {inet_async, Sock, Ref, {error, Reason}} ->
            io:format("Have a look into this one: ~p~n",[Reason]);
        {heartbeat, Heartbeat} ->
            rabbit_heartbeat:start_heartbeat(Sock, Heartbeat),
            reader_loop(Sock, Type, Channel, Length);
        {ChannelPid, ChannelNumber} ->
            start_framing_channel(ChannelPid, ChannelNumber),
            reader_loop(Sock, Type, Channel, Length);
        timeout ->
            io:format("Reader (~p) received timeout from heartbeat, exiting
~n");
        {'EXIT', Pid, Reason} ->
            [H|T] = get_keys({chpid,Pid}),
            erase(H),
            reader_loop(Sock, Type, Channel, Length);
        Other ->
            io:format("Other ~p~n",[Other])
    end,
*    gen_tcp:close(Sock). %% Non-tail recursive call*


On Mon, May 12, 2008 at 2:07 PM, Ben Hood <0x6e6562 at gmail.com> wrote:

> Ed,
> On 10 May 2008, at 01:02, Edwin Fine wrote:
>
> Thanks, Ben, I will take a look and give you some feedback.
>
> In the meantime, I have done the following:
>
>    - Changed my consumer code (I use the term "consumer" loosely as
>    "anything that eats the output of a producer") to use basic.get instead of
>    basic.consume. Actually, it's set up so that I can select basic.get or
>    basic.consume behavior at run-time. I didn't want to throw away working
>    basic.consume code :)
>    - Changed the process that creates consumers so that it now creates
>    one channel per consumer. Previously, there was one channel only for all
>    consumers. One-channel-per-consumer was the only way I could get the code to
>    work with the network client; in the one-channel scenario I was getting back
>    responses to messages destined for different consumers. I assume that with
>    your changes I will be able to again use one channel for all consumers.
>    - I tested with 50 queues (each with its own consumer and channel)
>    and it seemed reasonably performant, even with the get. I need to try a
>    full-blast test soon.
>
>
> I have now commited fix 2 of 3 to the mtn repo which addresses the issue
> of not being able to subscribe concurrently. So the 2 issues you mention
> here should be addressed.
>
> The outstanding issue is to close the writer down properly in the network
> case.
>
> HTH,
>
> Ben
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080512/bd6ca729/attachment.htm 


More information about the rabbitmq-discuss mailing list