I forgot to mention my example code hangs if I introduce @channel.tx_select and @channel.tx_commit around the publish call. I'm using amqp 0.8.0rc13 + rabbit 2.4.1 & 2.5.1 -- perhaps this related to how transactions work and the fact I'm re-using the same channel -- should I even be doing this?<div>
<br></div><div>I've got plenty of experience with transactions and atomicity, but in a database context, not amqp/rabbit. I think there is considerable complexity (made worse by my lack of familiarity with eventmachine and the amqp protocol): in this 'rebroadcaster' scenario is it better:</div>
<div><ul><li>to use the same connection for 'consume' and 'publish' or different ones?</li><li>to use the same channel or different ones? (do amqp transactions operate on a per-channel basis?)</li></ul></div>
<div>Jakub, I understand eventmachine is event-driven, this is how the subscribe call works. But within its asynchronous consume loop, I need to defer the queue1 ack until messages have been published in queue2. Sorry, this is probably lack of familiarity with eventmachine. Do you mean I could have two handlers within the same EM.run block, a 'subscribe' one and a 'on_ack' one handling the publish confirms within which the queue1 ack is done? How do I know which queue1 message I'm getting the on_ack notification for?</div>
<div><br></div><div>Michael</div><div><br></div><div><div class="gmail_quote">On 8 July 2011 16:32, Stastny Jakub <span dir="ltr"><<a href="mailto:stastny@101ideas.cz">stastny@101ideas.cz</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">
<div style="word-wrap:break-word"><div><div>El 8 Jul 2011, a las 16:16, Michael Nacos escribió:</div><div class="im"><br><blockquote type="cite">so if the publish call returns immediately with no guarantees, what's the point of adding a @channel.tx_commit right after it?</blockquote>
<div><br></div></div><div>Transactions are mainly atomicity, so if you publish more messages, then running it in a transactions will result in saving all or nothing (and getting an error back). Using it for one item will result in getting the error back, so you'll know if it went wrong (I think, I don't have much experience with them).</div>
<div><br></div><div>The docs:</div><div><br></div><div>"<span style="font-size:11px">The Tx class allows publish and ack operations to be batched into atomic units of work. The intention is that all publish and ack requests issued within a transaction will complete successfully or none of them will. Servers SHOULD implement atomic transactions at least where all publish or ack requests affect a single queue. Transactions that cover multiple queues may be nonatomic, given that queues can be created and destroyed asynchronously, and such events do not form part of any transaction."</span></div>
<div class="im"><br><blockquote type="cite">the question becomes how do I get a reliable publish ack to base the tx_commit/metadata.ack on<br>
<br><div>I saw this in the docs for publisher confirms:</div><div><br><div><span style="font-family:'Lucida Sans', 'Lucida Grande', Verdana, Arial, sans-serif;font-size:13px"><pre style="color:rgb(0, 0, 0);font-family:monospace;display:block;padding-top:5px;padding-right:12px;padding-bottom:5px;padding-left:12px;margin-top:4px;border-top-width:1px;border-right-width:1px;border-bottom-width:1px;border-left-width:1px;border-top-style:solid;border-right-style:solid;border-bottom-style:solid;border-left-style:solid;border-top-color:rgb(238, 238, 255);border-right-color:rgb(238, 238, 255);border-bottom-color:rgb(238, 238, 255);border-left-color:rgb(238, 238, 255);background-color:rgb(245, 245, 255);background-repeat:initial initial">
<span style="color:rgb(0, 102, 255)"># define a callback that will be executed when message is acknowledged
</span><span>channel</span><span>.</span><span>on_ack</span> <span style="color:rgb(0, 0, 255)">do</span> <span>|</span><span>basic_ack</span><span>|</span>
<span>puts</span> <span style="color:rgb(3, 106, 7)"><span>"</span><span style="color:rgb(3, 106, 7)">Received an acknowledgement: delivery_tag = </span><span>#{</span><span>basic_ack</span><span>.</span><span>delivery_tag</span><span>}</span><span style="color:rgb(3, 106, 7)">, multiple = </span><span>#{</span><span>basic_ack</span><span>.</span><span>multiple</span><span>}</span><span>"</span></span>
<span style="color:rgb(0, 0, 255)">end</span></pre></span></div><div><div class="gmail_quote">which is great, but how does it fit within the consume loop? I need execution to block until it's safe to issue the metadata.ack call. Is this something I can achieve with eventmachine?</div>
</div></div></blockquote><div><br></div></div><div>I'm not sure if I understand ... this is a callback, it doesn't block as well as #subscribe, it's a callback called when a message is delivered to the client. EventMachine is an implementation of reactor pattern (it means it's async). So it works based on events (when I get this, do that ... ). BTW just to make sure, do you know that this on_ack is for ack which the *broker* sends? So you don't call metadata.ack for this (you ack messages when they're actually processed on the consumer).</div>
<div><br></div><div>Cheers,</div><div><br></div><font color="#888888"><div>Jakub</div></font><div><div></div><div class="h5"><br><blockquote type="cite"><div><div>
<div class="gmail_quote">2011/7/8 Jakub Šťastný <span dir="ltr"><<a href="mailto:stastny@101ideas.cz" target="_blank">stastny@101ideas.cz</a>></span><br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
Basic.Publish isn't pseudo-synchronous, it's really asynchronous. Exchange#publish doesn't take nowait option (so no, your code isn't safe), it simply send the data and that's it. Then you have no idea whether everything went OK or not. That's why you can use either transactions (yes, you'd have to wrap the code in it as you've mentioned) or publisher confirms (which I'd personally prefer as transactions are really slow and fairly tricky). The documentation for publisher confirms in AMQP gem is here: <a href="http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile" target="_blank">http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile</a><div>
<div>
<br clear="all">Jakub<div><div><br></div><div><a href="http://www.flickr.com/photos/jakub-stastny/" target="_blank">http://www.flickr.com/photos/jakub-stastny</a></div></div><div><a href="http://twitter.com/botanicus" target="_blank">http://twitter.com/botanicus</a></div>
<br>
<br><br></div><div><div></div><div><div class="gmail_quote">2011/7/8 Michael Nacos <span dir="ltr"><<a href="mailto:m.nacos@gmail.com" target="_blank">m.nacos@gmail.com</a>></span><br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div>so, what's currently the best way to publish messages consumed from one queue into another with the amqp gem? This is quite important in processing scenaria when no messages may be lost. In particular, is the following code safe? Will the pseudo-synchronous <b>publish</b> call return only after the message has been accepted in the second queue (:nowait => false)? otherwise, what would be the point of wrapping the publish call with @channel.tx_select / @channel.tx_commit ?</div>
<div><br></div><blockquote style="margin:0 0 0 40px;border:none;padding:0px"><div><font face="'courier new', monospace"># consumer which publishes each message to another queue</font></div>
<div><font face="'courier new', monospace"># ------------------------------------------------------</font></div><div><font face="'courier new', monospace"><br>
</font></div><div><font face="'courier new', monospace">@consume_from = 'first'</font></div><div><font face="'courier new', monospace">@deliver_to = 'second'</font></div>
<div><font face="'courier new', monospace"><br></font></div><div><font face="'courier new', monospace">EM.run do</font></div><div><font face="'courier new', monospace"> AMQP.connect do |connection|</font></div>
<div><font face="'courier new', monospace"> @channel = MQ.new(connection)</font></div><div><font face="'courier new', monospace"> # making sure the relevant queues exist</font></div>
<div><font face="'courier new', monospace"> @queue1 = @channel.queue(@consume_from, :passive => false, :durable => true)</font></div><div><font face="'courier new', monospace"> @queue2 = @channel.queue(@deliver_to, :passive => false, :durable => true)</font></div>
<div><span style="font-family:'courier new', monospace"> # setting up the consumer loop</span></div><div><font face="'courier new', monospace"> @queue1.subscribe(:ack => true) do |metadata, data|</font></div>
<div><font face="'courier new', monospace"> @channel.default_exchange.publish(data, \</font></div><div><font face="'courier new', monospace"> :routing_key => @deliver_to, \</font></div>
<div><font face="'courier new', monospace"> :persistent => true, \</font></div><div><font face="'courier new', monospace"> :nowait => false)</font></div>
<div><span style="font-family:'courier new', monospace"> puts '.'</span></div><div><font face="'courier new', monospace"> metadata.ack</font></div>
<div><font face="'courier new', monospace"> end</font></div><div><font face="'courier new', monospace"> end</font></div><div><font face="'courier new', monospace">end</font></div>
</blockquote><div><div></div><div><div><br></div><div class="gmail_quote">2011/7/4 Jakub Šťastný <span dir="ltr"><<a href="mailto:stastny@101ideas.cz" target="_blank">stastny@101ideas.cz</a>></span><br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would recommend to just use the RC.<div><br></div><div>Jakub<div><div><br></div><div><a href="http://www.flickr.com/photos/jakub-stastny/" target="_blank">http://www.flickr.com/photos/jakub-stastny</a></div>
</div><div><a href="http://twitter.com/botanicus" target="_blank">http://twitter.com/botanicus</a></div><div><div></div><div><br>
<br><br><div class="gmail_quote">2011/7/4 Michael Nacos <span dir="ltr"><<a href="mailto:m.nacos@gmail.com" target="_blank">m.nacos@gmail.com</a>></span><br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
there seems to be some support for tx_select, tx_commit, tx_rollback in the master branch of the amqp gem, but not in the 0.7.x-stable branch, which is what most people are using<br><br><div class="gmail_quote"><div><div>
</div><div>2011/5/6 David Wragg <span dir="ltr"><<a href="mailto:david@rabbitmq.com" target="_blank">david@rabbitmq.com</a>></span><br>
</div></div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div><div></div><div><div>Simon MacMullen <<a href="mailto:simon@rabbitmq.com" target="_blank">simon@rabbitmq.com</a>> writes:<br>
> On 05/05/11 22:33, Elias Levy wrote:<br>
>> While writing some code using the Ruby AMQP gem against RabbitMQ, I've<br>
>> noticed that if I publish a message and quickly close the connection,<br>
>> even though I've received a publish-ok response from the server, the<br>
>> message fails to be queued by the broker.<br>
><br>
> I'm not at all familiar with the Ruby client, but I should point out<br>
> that unlike many of the other AMQP methods, basic.publish does not<br>
> have a corresponding basic.publish-ok method; it's always<br>
> asynchronous. So I imagine the post-publish callback fires<br>
> immediately.<br>
><br>
> In order to be able to know when the broker has taken responsibility<br>
> for a message you can either wrap the publish in a transaction (when<br>
> you see tx.commit-ok you know the server has the message) or use the<br>
> rather more lightweight publish confirms:<br>
</div>o><br>
<div>> <a href="http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/" target="_blank">http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/</a><br>
<br>
</div>Another way to solve the problem is to do a synchronous AMQP method<br>
rather than abruptly closing the connection. If this the sync method<br>
completes successfully, you can be sure that your published messages<br>
have reached the broker (it doesn't give you all the guarantees of<br>
transactions, but it is much lighter weight).<br>
<br>
An easy way to do this with all versions of the AMQP gem (even 0.6.7) is<br>
to use the AMQP#close callback. E.g., add something like this to your<br>
code:<br>
<br>
client.close { puts "Closed ok" ; EM.stop }<br>
<br>
David<br>
<font color="#888888"><br>
--<br>
David Wragg<br>
Staff Engineer, RabbitMQ<br>
VMware, Inc.<br>
</font></div></div><div><div></div><div>_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com" target="_blank">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</div></div></blockquote></div><br>
</blockquote></div><br></div></div></div>
</blockquote></div><br>
</div></div></blockquote></div><br></div></div></div>
</blockquote></div><br></div></div>
</blockquote></div></div></div><br></div></blockquote></div><br></div>