[rabbitmq-discuss] Erlang Client RPC issues

Chris Jimison cjimison at gmail.com
Thu Dec 5 20:13:28 GMT 2013


This worked great.  Thanks Simon for your help!

-Chris

On Dec 5, 2013, at 4:03 AM, Simon MacMullen <simon at rabbitmq.com> wrote:

> On 04/12/2013 06:06, Chris Jimison wrote:
>> 1) I noticed in the amqp_rpc_client.erl that it is not handling
>> #’basic.return’{} messages.  I added a call to:
>> 
>> amqp_channel:register_return_handler(Channel, self()),
>> 
>> in the init function and a handle_info call:
>> 
>> handle_info({#'basic.return'{}, Content}, State = #state{continuations = Conts, channel = _Channel}) ->
>>     #amqp_msg{props = #'P_basic'{correlation_id = Id}, payload = _Payload} = Content,
>>     From = dict:fetch(Id, Conts),
>>     gen_server:reply(From, basic_return),
>>     {noreply, State};
>> 
>> 
>> and that seems to work well. I am very new to RabbitMQ and I am not
>> really sure if this is going to break something very badly. Is there a
>> reason amqp_rpc_client is not a return_handler?
> 
> No, that seems reasonable, looks like an oversight.
> 
>> 2) When I call amqp_rpc_client:stop I am getting error messages
>> (with our without my changes) about:
>> 
>> {error,{consumer_died,normal}}
>> 
>> It seams to me that when the Channel created in amqp_rpc_client is
>> shutdown, the amqp_direct_consumer is getting the ‘DOWN’ message and
>> returning {error, {consumer_died, Info}, C};
> 
> Hmm, this looks like a bug in amqp_direct_consumer. You can work around this by using amqp_selective_consumer instead:
> 
> diff -r 2678f7a61de4 src/amqp_rpc_client.erl
> --- a/src/amqp_rpc_client.erl	Fri Nov 29 13:52:43 2013 +0000
> +++ b/src/amqp_rpc_client.erl	Thu Dec 05 12:02:56 2013 +0000
> @@ -93,7 +93,7 @@
> 
> %% Registers this RPC client instance as a consumer to handle rpc responses
> setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
> -    amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
> +    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()).
> 
> %% Publishes to the broker, stores the From address against
> %% the correlation id and increments the correlationid for
> @@ -124,8 +124,7 @@
> %% Sets up a reply queue and consumer within an existing channel
> %% @private
> init([Connection, RoutingKey]) ->
> -    {ok, Channel} = amqp_connection:open_channel(
> -                        Connection, {amqp_direct_consumer, [self()]}),
> +    {ok, Channel} = amqp_connection:open_channel(Connection),
>     InitialState = #state{channel     = Channel,
>                           exchange    = <<>>,
>                           routing_key = RoutingKey},
> 
>> Does this sound correct that calling stop on an amqp_rpc_client
>> should
>> be generating so many errors? Is there some cleaner way I should be
>> stopping the amqp_rpc_client?
> 
> No, this is wrong. I'll file bugs to look at these issues.
> 
> Cheers, Simon
> 



More information about the rabbitmq-discuss mailing list