[rabbitmq-discuss] erlang client API: transactions
Edwin Fine
rabbitmq-discuss_efine at usa.net
Tue Sep 23 05:49:25 BST 2008
I believe this is coming from the Erlang client, in
amqp_channel:rpc_bottom_half/2. I can't tell you why your program is doing
this, only where the error message originates. It seems to be happening in
response to receiving a tx.select_ok, AFAICS.
If you look at the code for rpc_bottom_half (reproduced at the end of this
email for convenience), the line
{{value, {From,_}}, NewRequestQueue} = queue:out(RequestQueue)
will create the error message you are seeing if RequestQueue is empty. This
can clearly be seen in the shell:
1> EmptyQ = queue:new().
{[],[]}
2> {{value, {From,_}}, NewRequestQueue} = queue:out(EmptyQ).
** exception error: no match of right hand side value *{empty,{[],[]}}*
I don't know if this is a bug or one of those "can't happen" things that
just didn't give a nice error message. I suspect the latter, because the
code just cannot continue without the value of "From" to send a reply to.
Why should the function have been called with a reply if the reply queue is
empty? Something's wrong somewhere.
In addition, maybe I am too tired to see straight, but this looks really
suspect:
catch case queue:head(NewRequestQueue) of
empty ->
ok;
{NewFrom,Method} ->
Do2(Writer,Method)
end,
This is because queue:head/1 exits when it is empty and does not return
'empty', so the catch will return an EXIT:
1> catch queue:head(queue:new()).
{'EXIT',{empty,[{queue,head,[{[],[]}]},
{erl_eval,do_apply,5},
{erl_eval,expr,5},
{shell,exprs,6},
{shell,eval_exprs,6},
{shell,eval_loop,3}]}}
I think this should be
catch case queue:head(NewRequestQueue) of
{'EXIT', {empty,_}} ->
ok;
{NewFrom,Method} ->
Do2(Writer,Method)
end,
Maybe an empty queue is one of those things that should never happen, but
like I said, I am really tired...
*Code reproduced from amqp_channel.erl*
rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
rpc_requests = RequestQueue,
do2 = Do2}) ->
*{{value, {From,_}}, NewRequestQueue} = queue:out(RequestQueue),*
gen_server:reply(From, Reply),
catch case queue:head(NewRequestQueue) of
empty ->
ok;
{NewFrom,Method} ->
Do2(Writer,Method)
end,
NewState = State#channel_state{rpc_requests = NewRequestQueue},
{noreply, NewState}.
On Mon, Sep 22, 2008 at 10:24 PM, Valentino Volonghi <dialtone at gmail.com>wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> so I'm trying to add support for transactions to my simple forwarding
> application, I started by
> implementing the 'commit every 500 messages' in the following way:
>
> handle_info(#'tx.select_ok'{}, State) ->
> {noreply, State};
>
> handle_info(#'tx.commit_ok'{}, State) ->
> {noreply, State};
>
> handle_info({#'basic.deliver'{routing_key = RoutingKey,
> delivery_tag = DeliveryTag},
> #content{payload_fragments_rev = [Payload]}},
> State = #shovel_state{local = LocalBroker,
> remote = RemoteBroker,
> exchange = Exchange,
> sent = Sent,
> ack = Acks}) ->
> deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,
> RemoteBroker, Exchange, Sent, Acks).
>
> deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,
> RemoteBroker, Exchange, 0, []) ->
> amqp:tx_select(RemoteBroker),
> amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
> NewState = State#shovel_state{sent=1, ack=[DeliveryTag]},
> {noreply, NewState};
>
> deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,
> RemoteBroker, Exchange, 499, Acks) ->
> amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
> amqp:tx_commit(RemoteBroker),
> lists:foreach(fun(X) -> amqp:ack(LocalBroker, X) end,
> [DeliveryTag|Acks]),
> NewState = State#shovel_state{sent=0, ack=[]},
> {noreply, NewState};
>
> deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,
> RemoteBroker, Exchange, Sent, Acks) ->
> amqp:tx_select(RemoteBroker),
> amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
> NewState = State#shovel_state{sent=Sent+1, ack=[DeliveryTag|Acks]},
> {noreply, NewState}.
>
> and I implemented ack and tx_commit/tx_select like this:
>
> ack({_Connection, Channel, _Ticket}, DeliveryTag) ->
> BasicAck = #'basic.ack'{delivery_tag = DeliveryTag, multiple =
> false},
> ok = amqp_channel:cast(Channel, BasicAck).
>
> tx_select({_Connection, Channel, _Ticket}) ->
> ok = amqp_channel:cast(Channel, #'tx.select'{}).
>
> tx_commit({_Connection, Channel, _Ticket}) ->
> ok = amqp_channel:cast(Channel, #'tx.commit'{}).
>
> Unfortunately when I try to run this code, during the forwarding I get
> the following exception:
>
> =ERROR REPORT==== 22-Sep-2008::19:23:11 ===
> ** Generic server <0.150.0> terminating
> ** Last message in was {method,{'tx.select_ok'},none}
> ** When Server state == {channel_state,1,<0.146.0>,<0.148.0>,<0.151.0>,
> #Fun<amqp_network_driver.do.2>,
> #Fun<amqp_network_driver.do.3>,
> #Fun<amqp_network_driver.close_channel.1>,
> {[],[]},
> {[],[]},
> {dict,0,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],[],
> []},
> {{[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],
> [],[]}}},
> false,undefined,
> {dict,0,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],[],
> []},
> {{[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],
> [],[]}}}}
> ** Reason for termination ==
> ** {{badmatch,{empty,{[],[]}}},
> [{amqp_channel,rpc_bottom_half,2},
> {gen_server,handle_msg,5},
> {proc_lib,init_p,5}]}
>
> Which clearly means that I badly matched somewhere... my best guess is
> the tx_select/tx_commit
> functions but then how should I fix them? (sorry for this newb
> question).
>
> - --
> Valentino Volonghi aka Dialtone
> Now running MacOS X 10.5
> Home Page: http://www.twisted.it
> http://www.adroll.com
>
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.9 (Darwin)
>
> iEYEARECAAYFAkjYU3sACgkQ9Llz28widGUvqACZAaF5i6y0RCeq6S515PPERjsy
> 09kAoLHg1ohSFpGpDd2SGCe6I2s6Vqw6
> =lGsw
> -----END PGP SIGNATURE-----
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080923/a38ac88e/attachment.htm
More information about the rabbitmq-discuss
mailing list