[rabbitmq-discuss] Erlang Client RPC issues
Chris Jimison
cjimison at gmail.com
Thu Dec 5 20:13:28 GMT 2013
This worked great. Thanks Simon for your help!
-Chris
On Dec 5, 2013, at 4:03 AM, Simon MacMullen <simon at rabbitmq.com> wrote:
> On 04/12/2013 06:06, Chris Jimison wrote:
>> 1) I noticed in the amqp_rpc_client.erl that it is not handling
>> #’basic.return’{} messages. I added a call to:
>>
>> amqp_channel:register_return_handler(Channel, self()),
>>
>> in the init function and a handle_info call:
>>
>> handle_info({#'basic.return'{}, Content}, State = #state{continuations = Conts, channel = _Channel}) ->
>> #amqp_msg{props = #'P_basic'{correlation_id = Id}, payload = _Payload} = Content,
>> From = dict:fetch(Id, Conts),
>> gen_server:reply(From, basic_return),
>> {noreply, State};
>>
>>
>> and that seems to work well. I am very new to RabbitMQ and I am not
>> really sure if this is going to break something very badly. Is there a
>> reason amqp_rpc_client is not a return_handler?
>
> No, that seems reasonable, looks like an oversight.
>
>> 2) When I call amqp_rpc_client:stop I am getting error messages
>> (with our without my changes) about:
>>
>> {error,{consumer_died,normal}}
>>
>> It seams to me that when the Channel created in amqp_rpc_client is
>> shutdown, the amqp_direct_consumer is getting the ‘DOWN’ message and
>> returning {error, {consumer_died, Info}, C};
>
> Hmm, this looks like a bug in amqp_direct_consumer. You can work around this by using amqp_selective_consumer instead:
>
> diff -r 2678f7a61de4 src/amqp_rpc_client.erl
> --- a/src/amqp_rpc_client.erl Fri Nov 29 13:52:43 2013 +0000
> +++ b/src/amqp_rpc_client.erl Thu Dec 05 12:02:56 2013 +0000
> @@ -93,7 +93,7 @@
>
> %% Registers this RPC client instance as a consumer to handle rpc responses
> setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
> - amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
> + amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()).
>
> %% Publishes to the broker, stores the From address against
> %% the correlation id and increments the correlationid for
> @@ -124,8 +124,7 @@
> %% Sets up a reply queue and consumer within an existing channel
> %% @private
> init([Connection, RoutingKey]) ->
> - {ok, Channel} = amqp_connection:open_channel(
> - Connection, {amqp_direct_consumer, [self()]}),
> + {ok, Channel} = amqp_connection:open_channel(Connection),
> InitialState = #state{channel = Channel,
> exchange = <<>>,
> routing_key = RoutingKey},
>
>> Does this sound correct that calling stop on an amqp_rpc_client
>> should
>> be generating so many errors? Is there some cleaner way I should be
>> stopping the amqp_rpc_client?
>
> No, this is wrong. I'll file bugs to look at these issues.
>
> Cheers, Simon
>
More information about the rabbitmq-discuss
mailing list