[rabbitmq-discuss] erlang client API: transactions
Valentino Volonghi
dialtone at gmail.com
Tue Sep 23 03:24:59 BST 2008
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
so I'm trying to add support for transactions to my simple forwarding
application, I started by
implementing the 'commit every 500 messages' in the following way:
handle_info(#'tx.select_ok'{}, State) ->
{noreply, State};
handle_info(#'tx.commit_ok'{}, State) ->
{noreply, State};
handle_info({#'basic.deliver'{routing_key = RoutingKey,
delivery_tag = DeliveryTag},
#content{payload_fragments_rev = [Payload]}},
State = #shovel_state{local = LocalBroker,
remote = RemoteBroker,
exchange = Exchange,
sent = Sent,
ack = Acks}) ->
deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,
RemoteBroker, Exchange, Sent, Acks).
deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,
RemoteBroker, Exchange, 0, []) ->
amqp:tx_select(RemoteBroker),
amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
NewState = State#shovel_state{sent=1, ack=[DeliveryTag]},
{noreply, NewState};
deliver(State, RoutingKey, DeliveryTag, Payload, LocalBroker,
RemoteBroker, Exchange, 499, Acks) ->
amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
amqp:tx_commit(RemoteBroker),
lists:foreach(fun(X) -> amqp:ack(LocalBroker, X) end,
[DeliveryTag|Acks]),
NewState = State#shovel_state{sent=0, ack=[]},
{noreply, NewState};
deliver(State, RoutingKey, DeliveryTag, Payload, _LocalBroker,
RemoteBroker, Exchange, Sent, Acks) ->
amqp:tx_select(RemoteBroker),
amqp:send_message(RemoteBroker, RoutingKey, Payload, Exchange),
NewState = State#shovel_state{sent=Sent+1, ack=[DeliveryTag|Acks]},
{noreply, NewState}.
and I implemented ack and tx_commit/tx_select like this:
ack({_Connection, Channel, _Ticket}, DeliveryTag) ->
BasicAck = #'basic.ack'{delivery_tag = DeliveryTag, multiple =
false},
ok = amqp_channel:cast(Channel, BasicAck).
tx_select({_Connection, Channel, _Ticket}) ->
ok = amqp_channel:cast(Channel, #'tx.select'{}).
tx_commit({_Connection, Channel, _Ticket}) ->
ok = amqp_channel:cast(Channel, #'tx.commit'{}).
Unfortunately when I try to run this code, during the forwarding I get
the following exception:
=ERROR REPORT==== 22-Sep-2008::19:23:11 ===
** Generic server <0.150.0> terminating
** Last message in was {method,{'tx.select_ok'},none}
** When Server state == {channel_state,1,<0.146.0>,<0.148.0>,<0.151.0>,
#Fun<amqp_network_driver.do.2>,
#Fun<amqp_network_driver.do.3>,
#Fun<amqp_network_driver.close_channel.1>,
{[],[]},
{[],[]},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],
[],[],
[],[]}}},
false,undefined,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],
[],[],
[],[]}}}}
** Reason for termination ==
** {{badmatch,{empty,{[],[]}}},
[{amqp_channel,rpc_bottom_half,2},
{gen_server,handle_msg,5},
{proc_lib,init_p,5}]}
Which clearly means that I badly matched somewhere... my best guess is
the tx_select/tx_commit
functions but then how should I fix them? (sorry for this newb
question).
- --
Valentino Volonghi aka Dialtone
Now running MacOS X 10.5
Home Page: http://www.twisted.it
http://www.adroll.com
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Darwin)
iEYEARECAAYFAkjYU3sACgkQ9Llz28widGUvqACZAaF5i6y0RCeq6S515PPERjsy
09kAoLHg1ohSFpGpDd2SGCe6I2s6Vqw6
=lGsw
-----END PGP SIGNATURE-----
More information about the rabbitmq-discuss
mailing list