[rabbitmq-discuss] Erlang Client RPC issues

Simon MacMullen simon at rabbitmq.com
Thu Dec 5 12:03:39 GMT 2013


On 04/12/2013 06:06, Chris Jimison wrote:
> 1) I noticed in the amqp_rpc_client.erl that it is not handling
> #’basic.return’{} messages.  I added a call to:
>
> amqp_channel:register_return_handler(Channel, self()),
>
> in the init function and a handle_info call:
>
> handle_info({#'basic.return'{}, Content}, State = #state{continuations = Conts, channel = _Channel}) ->
>      #amqp_msg{props = #'P_basic'{correlation_id = Id}, payload = _Payload} = Content,
>      From = dict:fetch(Id, Conts),
>      gen_server:reply(From, basic_return),
>      {noreply, State};
>
>
> and that seems to work well. I am very new to RabbitMQ and I am not
> really sure if this is going to break something very badly. Is there a
> reason amqp_rpc_client is not a return_handler?

No, that seems reasonable, looks like an oversight.

> 2) When I call amqp_rpc_client:stop I am getting error messages
> (with our without my changes) about:
>
> {error,{consumer_died,normal}}
>
> It seams to me that when the Channel created in amqp_rpc_client is
> shutdown, the amqp_direct_consumer is getting the ‘DOWN’ message and
> returning {error, {consumer_died, Info}, C};

Hmm, this looks like a bug in amqp_direct_consumer. You can work around 
this by using amqp_selective_consumer instead:

diff -r 2678f7a61de4 src/amqp_rpc_client.erl
--- a/src/amqp_rpc_client.erl	Fri Nov 29 13:52:43 2013 +0000
+++ b/src/amqp_rpc_client.erl	Thu Dec 05 12:02:56 2013 +0000
@@ -93,7 +93,7 @@

  %% Registers this RPC client instance as a consumer to handle rpc 
responses
  setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
-    amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
+    amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, self()).

  %% Publishes to the broker, stores the From address against
  %% the correlation id and increments the correlationid for
@@ -124,8 +124,7 @@
  %% Sets up a reply queue and consumer within an existing channel
  %% @private
  init([Connection, RoutingKey]) ->
-    {ok, Channel} = amqp_connection:open_channel(
-                        Connection, {amqp_direct_consumer, [self()]}),
+    {ok, Channel} = amqp_connection:open_channel(Connection),
      InitialState = #state{channel     = Channel,
                            exchange    = <<>>,
                            routing_key = RoutingKey},

> Does this sound correct that calling stop on an amqp_rpc_client
> should
> be generating so many errors? Is there some cleaner way I should be
> stopping the amqp_rpc_client?

No, this is wrong. I'll file bugs to look at these issues.

Cheers, Simon



More information about the rabbitmq-discuss mailing list