[rabbitmq-discuss] erlang client problem (badarg in {amqp_channel, handle_regular_method, 3})
Matthew Sackman
matthew at lshift.net
Wed Mar 24 11:10:04 GMT 2010
On Wed, Mar 24, 2010 at 01:15:07AM +0300, Alexander Sviridov wrote:
> my code:
> listen_loop(Channel, Q, Ticket) ->
> BasicConsume = #'basic.consume'{ticket = Ticket,
> queue = Q,
> nowait = true},
I'd recommend you ignore all tickets stuff, and nowait is by default
false - unless you have good reason to set it true, leave it false.
> ok
> = amqp_channel:call(Channel, BasicConsume),
Unfortunately, this is wrong. We have corrected this so that the error
is more meaningful (literally done on the same day as the revision
you're running - you were mere hours too soon to pick it up! Try a pull
and update -C default).
You should actually use amqp_channel:subscribe/3. This is the only case
where you can't just pass the method through with a :call or :cast and
expect it all to work. You want something like:
#'basic.consume_ok'{} =
amqp_channel:subscribe(Chan,
#'basic.consume'{ queue = Q },
Pid).
Where Pid is the Pid of the process that'll receive all the messages.
>
> %% If the registration was sucessful, the consumer will
> %% be notified
>
> receive
> #'basic.consume_ok'{consumer_tag = Tag} -> ok
> end,
> receive
> {#'basic.deliver'{exchange = Exchange},
> Content} ->
> #content{payload_fragments_rev = [Payload]}
> = Content,
> io:format("Message received: ~p~n", [Payload])
> end,
>
> listen_loop(Channel, Q, Ticket).
This too is wrong. Once the consumer is set up, it'll receive
*everything* the server sends it, without any further interaction from
you - i.e. you don't need to resubscribe after every message.
My standard drain-loop is:
spawn_drain(Chan, Q, N, Notify) ->
Pid = spawn(fun() ->
receive
#'basic.consume_ok'{ consumer_tag = Tag } ->
drain(Chan, Tag, N, Notify)
end
end),
#'basic.consume_ok'{} =
amqp_channel:subscribe(Chan,
#'basic.consume'{ queue = Q, exclusive = true },
Pid).
drain(Chan, Tag, 0, Notify) ->
CancelOk = #'basic.cancel_ok'{ consumer_tag = Tag } =
amqp_channel:call(Chan, #'basic.cancel'{ consumer_tag = Tag }),
ok = receive CancelOk -> ok end,
Notify ! done;
drain(Chan, Tag, N, Notify) ->
receive
{#'basic.deliver'{ consumer_tag = Tag, delivery_tag = AckTag },
_Content} ->
ok = amqp_channel:call(Chan, #'basic.ack'{ delivery_tag = AckTag }),
drain(Chan, Tag, N-1, Notify)
end.
If you call spawn_drain, it'll set up a new process that tries to
receive N messages. Once it's done that, it cancels the subscription,
and then, sends a 'done' message to Notify (which should be a pid).
It also acks as it goes.
Hope that helps.
Matthew
More information about the rabbitmq-discuss
mailing list