[rabbitmq-discuss] Published message not queued after publish-ok received when connection quickly closed

Michael Nacos m.nacos at gmail.com
Fri Jul 8 17:01:16 BST 2011

I forgot to mention my example code hangs if I introduce @channel.tx_select
and @channel.tx_commit around the publish call. I'm using amqp 0.8.0rc13 +
rabbit 2.4.1 & 2.5.1 -- perhaps this related to how transactions work and
the fact I'm re-using the same channel -- should I even be doing this?

I've got plenty of experience with transactions and atomicity, but in a
database context, not amqp/rabbit. I think there is considerable complexity
(made worse by my lack of familiarity with eventmachine and the amqp
protocol): in this 'rebroadcaster' scenario is it better:

   - to use the same connection for 'consume' and 'publish' or different
   - to use the same channel or different ones? (do amqp transactions
   operate on a per-channel basis?)

Jakub, I understand eventmachine is event-driven, this is how the subscribe
call works. But within its asynchronous consume loop, I need to defer the
queue1 ack until messages have been published in queue2. Sorry, this is
probably lack of familiarity with eventmachine. Do you mean I could have two
handlers within the same EM.run block, a 'subscribe'  one and a 'on_ack' one
handling the publish confirms within which the queue1 ack is done? How do I
know which queue1 message I'm getting the on_ack notification for?


On 8 July 2011 16:32, Stastny Jakub <stastny at 101ideas.cz> wrote:

> El 8 Jul 2011, a las 16:16, Michael Nacos escribió:
> so if the publish call returns immediately with no guarantees, what's the
> point of adding a @channel.tx_commit right after it?
> Transactions are mainly atomicity, so if you publish more messages, then
> running it in a transactions will result in saving all or nothing (and
> getting an error back). Using it for one item will result in getting the
> error back, so you'll know if it went wrong (I think, I don't have much
> experience with them).
> The docs:
> "The Tx class allows publish and ack operations to be batched into atomic
> units of work. The intention is that all publish and ack requests issued
> within a transaction will complete successfully or none of them will.
> Servers SHOULD implement atomic transactions at least where all publish or
> ack requests affect a single queue. Transactions that cover multiple queues
> may be non­atomic, given that queues can be created and destroyed
> asynchronously, and such events do not form part of any transaction."
> the question becomes how do I get a reliable publish ack to base the
> tx_commit/metadata.ack on
> I saw this in the docs for publisher confirms:
> # define a callback that will be executed when message is acknowledgedchannel.on_ack do |basic_ack|
>   puts "Received an acknowledgement: delivery_tag = #{basic_ack.delivery_tag}, multiple = #{basic_ack.multiple}"end
> which is great, but how does it fit within the consume loop? I need
> execution to block until it's safe to issue the metadata.ack call. Is this
> something I can achieve with eventmachine?
> I'm not sure if I understand ... this is a callback, it doesn't block as
> well as #subscribe, it's a callback called when a message is delivered to
> the client. EventMachine is an implementation of reactor pattern (it means
> it's async). So it works based on events (when I get this, do that ... ).
> BTW just to make sure, do you know that this on_ack is for ack which the
> *broker* sends? So you don't call metadata.ack for this (you ack messages
> when they're actually processed on the consumer).
> Cheers,
> Jakub
> 2011/7/8 Jakub Šťastný <stastny at 101ideas.cz>
>> Basic.Publish isn't pseudo-synchronous, it's really
>> asynchronous. Exchange#publish doesn't take nowait option (so no, your code
>> isn't safe), it simply send the data and that's it. Then you have no idea
>> whether everything went OK or not. That's why you can use either
>> transactions (yes, you'd have to wrap the code in it as you've mentioned) or
>> publisher confirms (which I'd personally prefer as transactions are really
>> slow and fairly tricky). The documentation for publisher confirms in AMQP
>> gem is here:
>> http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile
>> Jakub
>> http://www.flickr.com/photos/jakub-stastny
>> http://twitter.com/botanicus
>> 2011/7/8 Michael Nacos <m.nacos at gmail.com>
>>> so, what's currently the best way to publish messages consumed from one
>>> queue into another with the amqp gem? This is quite important in processing
>>> scenaria when no messages may be lost. In particular, is the following code
>>> safe? Will the pseudo-synchronous *publish* call return only after the
>>> message has been accepted in the second queue (:nowait => false)? otherwise,
>>> what would be the point of wrapping the publish call with @channel.tx_select
>>> / @channel.tx_commit ?
>>> # consumer which publishes each message to another queue
>>> # ------------------------------------------------------
>>> @consume_from = 'first'
>>> @deliver_to = 'second'
>>> EM.run do
>>>   AMQP.connect do |connection|
>>>     @channel = MQ.new(connection)
>>>     # making sure the relevant queues exist
>>>     @queue1 = @channel.queue(@consume_from, :passive => false, :durable
>>> => true)
>>>     @queue2 = @channel.queue(@deliver_to, :passive => false, :durable =>
>>> true)
>>>     # setting up the consumer loop
>>>     @queue1.subscribe(:ack => true) do |metadata, data|
>>>       @channel.default_exchange.publish(data, \
>>>         :routing_key => @deliver_to, \
>>>         :persistent => true, \
>>>         :nowait => false)
>>>       puts '.'
>>>       metadata.ack
>>>     end
>>>   end
>>> end
>>> 2011/7/4 Jakub Šťastný <stastny at 101ideas.cz>
>>>> AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
>>>> recommend to just use the RC.
>>>> Jakub
>>>> http://www.flickr.com/photos/jakub-stastny
>>>> http://twitter.com/botanicus
>>>> 2011/7/4 Michael Nacos <m.nacos at gmail.com>
>>>>> there seems to be some support for tx_select, tx_commit, tx_rollback in
>>>>> the master branch of the amqp gem, but not in the 0.7.x-stable branch, which
>>>>> is what most people are using
>>>>> 2011/5/6 David Wragg <david at rabbitmq.com>
>>>>>> Simon MacMullen <simon at rabbitmq.com> writes:
>>>>>> > On 05/05/11 22:33, Elias Levy wrote:
>>>>>> >> While writing some code using the Ruby AMQP gem against RabbitMQ,
>>>>>> I've
>>>>>> >> noticed that if I publish a message and quickly close the
>>>>>> connection,
>>>>>> >> even though I've received a publish-ok response from the server,
>>>>>> the
>>>>>> >> message fails to be queued by the broker.
>>>>>> >
>>>>>> > I'm not at all familiar with the Ruby client, but I should point out
>>>>>> > that unlike many of the other AMQP methods, basic.publish does not
>>>>>> > have a corresponding basic.publish-ok method; it's always
>>>>>> > asynchronous. So I imagine the post-publish callback fires
>>>>>> > immediately.
>>>>>> >
>>>>>> > In order to be able to know when the broker has taken responsibility
>>>>>> > for a message you can either wrap the publish in a transaction (when
>>>>>> > you see tx.commit-ok you know the server has the message) or use the
>>>>>> > rather more lightweight publish confirms:
>>>>>> o>
>>>>>> >
>>>>>> http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
>>>>>> Another way to solve the problem is to do a synchronous AMQP method
>>>>>> rather than abruptly closing the connection.  If this the sync method
>>>>>> completes successfully, you can be sure that your published messages
>>>>>> have reached the broker (it doesn't give you all the guarantees of
>>>>>> transactions, but it is much lighter weight).
>>>>>> An easy way to do this with all versions of the AMQP gem (even 0.6.7)
>>>>>> is
>>>>>> to use the AMQP#close callback.  E.g., add something like this to your
>>>>>> code:
>>>>>>  client.close { puts "Closed ok" ; EM.stop }
>>>>>> David
>>>>>> --
>>>>>> David Wragg
>>>>>> Staff Engineer, RabbitMQ
>>>>>> VMware, Inc.
>>>>>> _______________________________________________
>>>>>> rabbitmq-discuss mailing list
>>>>>> rabbitmq-discuss at lists.rabbitmq.com
>>>>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/e4a67a31/attachment.htm>

More information about the rabbitmq-discuss mailing list