[rabbitmq-discuss] erlang client API: transactions

Edwin Fine rabbitmq-discuss_efine at usa.net
Tue Sep 23 05:49:25 BST 2008


I believe this is coming from the Erlang client, in
amqp_channel:rpc_bottom_half/2. I can't tell you why your program is doing
this, only where the error message originates. It seems to be happening in
response to receiving a tx.select_ok, AFAICS.

If you look at the code for rpc_bottom_half (reproduced at the end of this
email for convenience), the line

{{value, {From,_}}, NewRequestQueue} = queue:out(RequestQueue)

will create the error message you are seeing if RequestQueue is empty. This
can clearly be seen in the shell:

1> EmptyQ = queue:new().
{[],[]}
2> {{value, {From,_}}, NewRequestQueue} = queue:out(EmptyQ).
** exception error: no match of right hand side value *{empty,{[],[]}}*

I don't know if this is a bug or one of those "can't happen" things that
just didn't give a nice error message. I suspect the latter, because the
code just cannot continue without the value of "From" to send a reply to.
Why should the function have been called with a reply if the reply queue is
empty? Something's wrong somewhere.

In addition, maybe I am too tired to see straight, but this looks really
suspect:

    catch case queue:head(NewRequestQueue) of
        empty ->
            ok;
        {NewFrom,Method} ->
            Do2(Writer,Method)
    end,

This is because queue:head/1 exits when it is empty and does not return
'empty', so the catch will return an EXIT:

1> catch queue:head(queue:new()).
{'EXIT',{empty,[{queue,head,[{[],[]}]},
                {erl_eval,do_apply,5},
                {erl_eval,expr,5},
                {shell,exprs,6},
                {shell,eval_exprs,6},
                {shell,eval_loop,3}]}}

I think this should be

    catch case queue:head(NewRequestQueue) of
        {'EXIT', {empty,_}} ->
            ok;
        {NewFrom,Method} ->
            Do2(Writer,Method)
    end,

Maybe an empty queue is one of those things that should never happen, but
like I said, I am really tired...


*Code reproduced from amqp_channel.erl*

rpc_bottom_half(Reply, State = #channel_state{writer_pid = Writer,
                                              rpc_requests = RequestQueue,
                                              do2 = Do2}) ->
    *{{value, {From,_}}, NewRequestQueue} = queue:out(RequestQueue),*
    gen_server:reply(From, Reply),
    catch case queue:head(NewRequestQueue) of
        empty ->
            ok;
        {NewFrom,Method} ->
            Do2(Writer,Method)
    end,
    NewState = State#channel_state{rpc_requests = NewRequestQueue},
    {noreply, NewState}.


On Mon, Sep 22, 2008 at 10:24 PM, Valentino Volonghi <dialtone at gmail.com>wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> so I'm trying to add support for transactions to my simple forwarding
> application, I started by
> implementing the 'commit every 500 messages' in the following way:
>
> handle_info(#'tx.select_ok'{}, State) ->
>     {noreply, State};
>
> handle_info(#'tx.commit_ok'{}, State) ->
>     {noreply, State};
>
> handle_info({#'basic.deliver'{routing_key = RoutingKey,
>                               delivery_tag = DeliveryTag},
>              #content{payload_fragments_rev = [Payload]}},
>             State = #shovel_state{local = LocalBroker,
>                                   remote = RemoteBroker,
>                                   exchange = Exchange,
>                                   sent = Sent,
>                                   ack = Acks}) ->
>     deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,
> RemoteBroker, Exchange, Sent, Acks).
>
> deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,
> RemoteBroker, Exchange, 0, []) ->
>     amqp:tx_select(RemoteBroker),
>     amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
>     NewState = State#shovel_state{sent=1, ack=[DeliveryTag]},
>     {noreply, NewState};
>
> deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,
> RemoteBroker, Exchange, 499, Acks) ->
>     amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
>     amqp:tx_commit(RemoteBroker),
>     lists:foreach(fun(X) -> amqp:ack(LocalBroker, X) end,
> [DeliveryTag|Acks]),
>     NewState = State#shovel_state{sent=0, ack=[]},
>     {noreply, NewState};
>
> deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,
> RemoteBroker, Exchange, Sent, Acks) ->
>     amqp:tx_select(RemoteBroker),
>     amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
>     NewState = State#shovel_state{sent=Sent+1, ack=[DeliveryTag|Acks]},
>     {noreply, NewState}.
>
> and I implemented ack and tx_commit/tx_select like this:
>
> ack({_Connection, Channel, _Ticket}, DeliveryTag) ->
>     BasicAck = #'basic.ack'{delivery_tag = DeliveryTag, multiple =
> false},
>     ok = amqp_channel:cast(Channel, BasicAck).
>
> tx_select({_Connection, Channel, _Ticket}) ->
>     ok = amqp_channel:cast(Channel, #'tx.select'{}).
>
> tx_commit({_Connection, Channel, _Ticket}) ->
>     ok = amqp_channel:cast(Channel, #'tx.commit'{}).
>
> Unfortunately when I try to run this code, during the forwarding I get
> the following exception:
>
> =ERROR REPORT==== 22-Sep-2008::19:23:11 ===
> ** Generic server <0.150.0> terminating
> ** Last message in was {method,{'tx.select_ok'},none}
> ** When Server state == {channel_state,1,<0.146.0>,<0.148.0>,<0.151.0>,
>                             #Fun<amqp_network_driver.do.2>,
>                             #Fun<amqp_network_driver.do.3>,
>                             #Fun<amqp_network_driver.close_channel.1>,
>                             {[],[]},
>                             {[],[]},
>                             {dict,0,16,16,8,80,48,
>                                 {[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],[],
>                                  []},
>                                 {{[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],
>                                   [],[]}}},
>                             false,undefined,
>                             {dict,0,16,16,8,80,48,
>                                 {[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],[],
>                                  []},
>                                 {{[],[],[],[],[],[],[],[],[],[],[],[],
> [],[],
>                                   [],[]}}}}
> ** Reason for termination ==
> ** {{badmatch,{empty,{[],[]}}},
>     [{amqp_channel,rpc_bottom_half,2},
>      {gen_server,handle_msg,5},
>      {proc_lib,init_p,5}]}
>
> Which clearly means that I badly matched somewhere... my best guess is
> the tx_select/tx_commit
> functions but then how should I fix them? (sorry for this newb
> question).
>
> - --
> Valentino Volonghi aka Dialtone
> Now running MacOS X 10.5
> Home Page: http://www.twisted.it
> http://www.adroll.com
>
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.9 (Darwin)
>
> iEYEARECAAYFAkjYU3sACgkQ9Llz28widGUvqACZAaF5i6y0RCeq6S515PPERjsy
> 09kAoLHg1ohSFpGpDd2SGCe6I2s6Vqw6
> =lGsw
> -----END PGP SIGNATURE-----
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20080923/a38ac88e/attachment.htm 


More information about the rabbitmq-discuss mailing list