[rabbitmq-discuss] Erlang Client RPC issues
Simon MacMullen
simon at rabbitmq.com
Thu Dec 5 12:03:39 GMT 2013
On 04/12/2013 06:06, Chris Jimison wrote:
> 1) I noticed in the amqp_rpc_client.erl that it is not handling
> #’basic.return’{} messages. I added a call to:
>
> amqp_channel:register_return_handler(Channel, self()),
>
> in the init function and a handle_info call:
>
> handle_info({#'basic.return'{}, Content}, State = #state{continuations = Conts, channel = _Channel}) ->
> #amqp_msg{props = #'P_basic'{correlation_id = Id}, payload = _Payload} = Content,
> From = dict:fetch(Id, Conts),
> gen_server:reply(From, basic_return),
> {noreply, State};
>
>
> and that seems to work well. I am very new to RabbitMQ and I am not
> really sure if this is going to break something very badly. Is there a
> reason amqp_rpc_client is not a return_handler?
No, that seems reasonable, looks like an oversight.
> 2) When I call amqp_rpc_client:stop I am getting error messages
> (with our without my changes) about:
>
> {error,{consumer_died,normal}}
>
> It seams to me that when the Channel created in amqp_rpc_client is
> shutdown, the amqp_direct_consumer is getting the ‘DOWN’ message and
> returning {error, {consumer_died, Info}, C};
Hmm, this looks like a bug in amqp_direct_consumer. You can work around
this by using amqp_selective_consumer instead:
diff -r 2678f7a61de4 src/amqp_rpc_client.erl
--- a/src/amqp_rpc_client.erl Fri Nov 29 13:52:43 2013 +0000
+++ b/src/amqp_rpc_client.erl Thu Dec 05 12:02:56 2013 +0000
@@ -93,7 +93,7 @@
%% Registers this RPC client instance as a consumer to handle rpc
responses
setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
- amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
+ amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()).
%% Publishes to the broker, stores the From address against
%% the correlation id and increments the correlationid for
@@ -124,8 +124,7 @@
%% Sets up a reply queue and consumer within an existing channel
%% @private
init([Connection, RoutingKey]) ->
- {ok, Channel} = amqp_connection:open_channel(
- Connection, {amqp_direct_consumer, [self()]}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
InitialState = #state{channel = Channel,
exchange = <<>>,
routing_key = RoutingKey},
> Does this sound correct that calling stop on an amqp_rpc_client
> should
> be generating so many errors? Is there some cleaner way I should be
> stopping the amqp_rpc_client?
No, this is wrong. I'll file bugs to look at these issues.
Cheers, Simon
More information about the rabbitmq-discuss
mailing list