[rabbitmq-discuss] erlang client API: transactions

Valentino Volonghi dialtone at gmail.com
Tue Sep 23 03:24:59 BST 2008


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

so I'm trying to add support for transactions to my simple forwarding  
application, I started by
implementing the 'commit every 500 messages' in the following way:

handle_info(#'tx.select_ok'{}, State) ->
     {noreply, State};

handle_info(#'tx.commit_ok'{}, State) ->
     {noreply, State};

handle_info({#'basic.deliver'{routing_key = RoutingKey,
                               delivery_tag = DeliveryTag},
              #content{payload_fragments_rev = [Payload]}},
             State = #shovel_state{local = LocalBroker,
                                   remote = RemoteBroker,
                                   exchange = Exchange,
                                   sent = Sent,
                                   ack = Acks}) ->
     deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,  
RemoteBroker, Exchange, Sent, Acks).

deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,  
RemoteBroker, Exchange, 0, []) ->
     amqp:tx_select(RemoteBroker),
     amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
     NewState = State#shovel_state{sent=1, ack=[DeliveryTag]},
     {noreply, NewState};

deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,  
RemoteBroker, Exchange, 499, Acks) ->
     amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
     amqp:tx_commit(RemoteBroker),
     lists:foreach(fun(X) -> amqp:ack(LocalBroker, X) end,  
[DeliveryTag|Acks]),
     NewState = State#shovel_state{sent=0, ack=[]},
     {noreply, NewState};

deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,  
RemoteBroker, Exchange, Sent, Acks) ->
     amqp:tx_select(RemoteBroker),
     amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
     NewState = State#shovel_state{sent=Sent+1, ack=[DeliveryTag|Acks]},
     {noreply, NewState}.

and I implemented ack and tx_commit/tx_select like this:

ack({_Connection, Channel, _Ticket}, DeliveryTag) ->
     BasicAck = #'basic.ack'{delivery_tag = DeliveryTag, multiple =  
false},
     ok = amqp_channel:cast(Channel, BasicAck).

tx_select({_Connection, Channel, _Ticket}) ->
     ok = amqp_channel:cast(Channel, #'tx.select'{}).

tx_commit({_Connection, Channel, _Ticket}) ->
     ok = amqp_channel:cast(Channel, #'tx.commit'{}).

Unfortunately when I try to run this code, during the forwarding I get  
the following exception:

=ERROR REPORT==== 22-Sep-2008::19:23:11 ===
** Generic server <0.150.0> terminating
** Last message in was {method,{'tx.select_ok'},none}
** When Server state == {channel_state,1,<0.146.0>,<0.148.0>,<0.151.0>,
                             #Fun<amqp_network_driver.do.2>,
                             #Fun<amqp_network_driver.do.3>,
                             #Fun<amqp_network_driver.close_channel.1>,
                             {[],[]},
                             {[],[]},
                             {dict,0,16,16,8,80,48,
                                 {[],[],[],[],[],[],[],[],[],[],[],[], 
[],[],[],
                                  []},
                                 {{[],[],[],[],[],[],[],[],[],[],[],[], 
[],[],
                                   [],[]}}},
                             false,undefined,
                             {dict,0,16,16,8,80,48,
                                 {[],[],[],[],[],[],[],[],[],[],[],[], 
[],[],[],
                                  []},
                                 {{[],[],[],[],[],[],[],[],[],[],[],[], 
[],[],
                                   [],[]}}}}
** Reason for termination ==
** {{badmatch,{empty,{[],[]}}},
     [{amqp_channel,rpc_bottom_half,2},
      {gen_server,handle_msg,5},
      {proc_lib,init_p,5}]}

Which clearly means that I badly matched somewhere... my best guess is  
the tx_select/tx_commit
functions but then how should I fix them? (sorry for this newb  
question).

- --
Valentino Volonghi aka Dialtone
Now running MacOS X 10.5
Home Page: http://www.twisted.it
http://www.adroll.com

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Darwin)

iEYEARECAAYFAkjYU3sACgkQ9Llz28widGUvqACZAaF5i6y0RCeq6S515PPERjsy
09kAoLHg1ohSFpGpDd2SGCe6I2s6Vqw6
=lGsw
-----END PGP SIGNATURE-----




More information about the rabbitmq-discuss mailing list