[rabbitmq-discuss] erlang client problem (badarg in {amqp_channel, handle_regular_method, 3})

Matthew Sackman matthew at lshift.net
Wed Mar 24 11:10:04 GMT 2010


On Wed, Mar 24, 2010 at 01:15:07AM +0300, Alexander Sviridov wrote:
> my code:
> listen_loop(Channel, Q, Ticket) ->
>  BasicConsume = #'basic.consume'{ticket = Ticket,
>                                  queue = Q,
>                                  nowait = true},

I'd recommend you ignore all tickets stuff, and nowait is by default
false - unless you have good reason to set it true, leave it false.

>  ok
>      = amqp_channel:call(Channel, BasicConsume),

Unfortunately, this is wrong. We have corrected this so that the error
is more meaningful (literally done on the same day as the revision
you're running - you were mere hours too soon to pick it up! Try a pull
and update -C default).

You should actually use amqp_channel:subscribe/3. This is the only case
where you can't just pass the method through with a :call or :cast and
expect it all to work. You want something like:

#'basic.consume_ok'{} =
        amqp_channel:subscribe(Chan,
                               #'basic.consume'{ queue = Q },
                               Pid).

Where Pid is the Pid of the process that'll receive all the messages.

> 
>  %% If the registration was sucessful, the consumer will
>  %% be notified
> 
>  receive
>        #'basic.consume_ok'{consumer_tag = Tag} -> ok
>  end,
>  receive
>       {#'basic.deliver'{exchange  =  Exchange},
>        Content}  ->
>           #content{payload_fragments_rev  =  [Payload]}
>            =  Content,
>           io:format("Message  received:  ~p~n",  [Payload])
>  end,
> 
>  listen_loop(Channel, Q, Ticket).

This too is wrong. Once the consumer is set up, it'll receive
*everything* the server sends it, without any further interaction from
you - i.e. you don't need to resubscribe after every message.

My standard drain-loop is:

spawn_drain(Chan, Q, N, Notify) ->
    Pid = spawn(fun() ->
                        receive
                            #'basic.consume_ok'{ consumer_tag = Tag } ->
                                drain(Chan, Tag, N, Notify)
                        end
                end),
    #'basic.consume_ok'{} =
        amqp_channel:subscribe(Chan,
                               #'basic.consume'{ queue = Q, exclusive = true },
                               Pid).

drain(Chan, Tag, 0, Notify) ->
    CancelOk = #'basic.cancel_ok'{ consumer_tag = Tag } =
        amqp_channel:call(Chan, #'basic.cancel'{ consumer_tag = Tag }),
    ok = receive CancelOk -> ok end,
    Notify ! done;
drain(Chan, Tag, N, Notify) ->
    receive
        {#'basic.deliver'{ consumer_tag = Tag, delivery_tag = AckTag },
         _Content} ->
            ok = amqp_channel:call(Chan, #'basic.ack'{ delivery_tag = AckTag }),
            drain(Chan, Tag, N-1, Notify)
    end.

If you call spawn_drain, it'll set up a new process that tries to
receive N messages. Once it's done that, it cancels the subscription,
and then, sends a 'done' message to Notify (which should be a pid).
It also acks as it goes.

Hope that helps.

Matthew




More information about the rabbitmq-discuss mailing list