[rabbitmq-discuss] Published message not queued after publish-ok received when connection quickly closed

Michael Nacos m.nacos at gmail.com
Fri Jul 8 16:16:38 BST 2011


so if the publish call returns immediately with no guarantees, what's the
point of adding a @channel.tx_commit right after it? the question becomes
how do I get a reliable publish ack to base the tx_commit/metadata.ack on

I saw this in the docs for publisher confirms:

# define a callback that will be executed when message is
acknowledgedchannel.on_ack do |basic_ack|
  puts "Received an acknowledgement: delivery_tag =
#{basic_ack.delivery_tag}, multiple = #{basic_ack.multiple}"end

which is great, but how does it fit within the consume loop? I need
execution to block until it's safe to issue the metadata.ack call. Is this
something I can achieve with eventmachine?

2011/7/8 Jakub Šťastný <stastny at 101ideas.cz>

> Basic.Publish isn't pseudo-synchronous, it's really
> asynchronous. Exchange#publish doesn't take nowait option (so no, your code
> isn't safe), it simply send the data and that's it. Then you have no idea
> whether everything went OK or not. That's why you can use either
> transactions (yes, you'd have to wrap the code in it as you've mentioned) or
> publisher confirms (which I'd personally prefer as transactions are really
> slow and fairly tricky). The documentation for publisher confirms in AMQP
> gem is here:
> http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile
>
> Jakub
>
> http://www.flickr.com/photos/jakub-stastny
> http://twitter.com/botanicus
>
>
>
> 2011/7/8 Michael Nacos <m.nacos at gmail.com>
>
>> so, what's currently the best way to publish messages consumed from one
>> queue into another with the amqp gem? This is quite important in processing
>> scenaria when no messages may be lost. In particular, is the following code
>> safe? Will the pseudo-synchronous *publish* call return only after the
>> message has been accepted in the second queue (:nowait => false)? otherwise,
>> what would be the point of wrapping the publish call with @channel.tx_select
>> / @channel.tx_commit ?
>>
>> # consumer which publishes each message to another queue
>> # ------------------------------------------------------
>>
>> @consume_from = 'first'
>> @deliver_to = 'second'
>>
>> EM.run do
>>   AMQP.connect do |connection|
>>     @channel = MQ.new(connection)
>>     # making sure the relevant queues exist
>>     @queue1 = @channel.queue(@consume_from, :passive => false, :durable =>
>> true)
>>     @queue2 = @channel.queue(@deliver_to, :passive => false, :durable =>
>> true)
>>     # setting up the consumer loop
>>     @queue1.subscribe(:ack => true) do |metadata, data|
>>       @channel.default_exchange.publish(data, \
>>         :routing_key => @deliver_to, \
>>         :persistent => true, \
>>         :nowait => false)
>>       puts '.'
>>       metadata.ack
>>     end
>>   end
>> end
>>
>>
>> 2011/7/4 Jakub Šťastný <stastny at 101ideas.cz>
>>
>>> AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
>>> recommend to just use the RC.
>>>
>>> Jakub
>>>
>>> http://www.flickr.com/photos/jakub-stastny
>>> http://twitter.com/botanicus
>>>
>>>
>>>
>>> 2011/7/4 Michael Nacos <m.nacos at gmail.com>
>>>
>>>> there seems to be some support for tx_select, tx_commit, tx_rollback in
>>>> the master branch of the amqp gem, but not in the 0.7.x-stable branch, which
>>>> is what most people are using
>>>>
>>>> 2011/5/6 David Wragg <david at rabbitmq.com>
>>>>
>>>>> Simon MacMullen <simon at rabbitmq.com> writes:
>>>>> > On 05/05/11 22:33, Elias Levy wrote:
>>>>> >> While writing some code using the Ruby AMQP gem against RabbitMQ,
>>>>> I've
>>>>> >> noticed that if I publish a message and quickly close the
>>>>> connection,
>>>>> >> even though I've received a publish-ok response from the server, the
>>>>> >> message fails to be queued by the broker.
>>>>> >
>>>>> > I'm not at all familiar with the Ruby client, but I should point out
>>>>> > that unlike many of the other AMQP methods, basic.publish does not
>>>>> > have a corresponding basic.publish-ok method; it's always
>>>>> > asynchronous. So I imagine the post-publish callback fires
>>>>> > immediately.
>>>>> >
>>>>> > In order to be able to know when the broker has taken responsibility
>>>>> > for a message you can either wrap the publish in a transaction (when
>>>>> > you see tx.commit-ok you know the server has the message) or use the
>>>>> > rather more lightweight publish confirms:
>>>>> o>
>>>>> >
>>>>> http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
>>>>>
>>>>> Another way to solve the problem is to do a synchronous AMQP method
>>>>> rather than abruptly closing the connection.  If this the sync method
>>>>> completes successfully, you can be sure that your published messages
>>>>> have reached the broker (it doesn't give you all the guarantees of
>>>>> transactions, but it is much lighter weight).
>>>>>
>>>>> An easy way to do this with all versions of the AMQP gem (even 0.6.7)
>>>>> is
>>>>> to use the AMQP#close callback.  E.g., add something like this to your
>>>>> code:
>>>>>
>>>>>  client.close { puts "Closed ok" ; EM.stop }
>>>>>
>>>>> David
>>>>>
>>>>> --
>>>>> David Wragg
>>>>> Staff Engineer, RabbitMQ
>>>>> VMware, Inc.
>>>>> _______________________________________________
>>>>> rabbitmq-discuss mailing list
>>>>> rabbitmq-discuss at lists.rabbitmq.com
>>>>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>>>>>
>>>>
>>>>
>>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/9d885563/attachment.htm>


More information about the rabbitmq-discuss mailing list