Matthew -<br><br>Thanks for trying but that does not really help.� I want to interrupt a free-running<br>process that has already posted the read.� To explore this, I modified RabbitMQ&#39;s<br>SimpleConsumer class (below) to post a read (via the nextDelivery method), and<br>
while it is waiting, have another thread close the channel.<br><br>Thanks,<br><br>- Jim<br><br>//�� The contents of this file are subject to the Mozilla Public License<br>//�� Version 1.1 (the &quot;License&quot;); you may not use this file except in<br>
//�� compliance with the License. You may obtain a copy of the License at<br>//�� <a href="http://www.mozilla.org/MPL/">http://www.mozilla.org/MPL/</a><br>//<br>//�� Software distributed under the License is distributed on an &quot;AS IS&quot;<br>
//�� basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the<br>//�� License for the specific language governing rights and limitations<br>//�� under the License.<br>//<br>//�� The Original Code is RabbitMQ.<br>
//<br>//�� The Initial Developers of the Original Code are LShift Ltd,<br>//�� Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.<br>//<br>//�� Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,<br>
//�� Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd<br>//�� are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial<br>//�� Technologies LLC, and Rabbit Technologies Ltd.<br>//<br>//�� Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift<br>
//�� Ltd. Portions created by Cohesive Financial Technologies LLC are<br>//�� Copyright (C) 2007-2009 Cohesive Financial Technologies<br>//�� LLC. Portions created by Rabbit Technologies Ltd are Copyright<br>//�� (C) 2007-2009 Rabbit Technologies Ltd.<br>
//<br>//�� All Rights Reserved.<br>//<br>//�� Contributor(s): ______________________________________.<br>//<br><br>package com.rabbitmq.examples;<br><br>import com.rabbitmq.client.AMQP;<br>import com.rabbitmq.client.Channel;<br>
import com.rabbitmq.client.Connection;<br>import com.rabbitmq.client.ConnectionFactory;<br>import com.rabbitmq.client.QueueingConsumer;<br><br>public class SimpleConsumerCancel implements Runnable {<br>��� <br>��� private static int count = 0;<br>
��� <br>��� private static Channel channel = null;<br>��� <br>��� private static String consumerTag = null;<br><br>��� public static void main(String[] args) {<br>������� try {<br>����������� String hostName = (args.length &gt; 0) ? args[0] : &quot;172.20.125.34&quot;;<br>
����������� int portNumber = (args.length &gt; 1) ? Integer.parseInt(args[1]) : AMQP.PROTOCOL.PORT;<br>����������� String queueName = (args.length &gt; 2) ? args[2] : &quot;SimpleQueue&quot;;<br><br>����������� ConnectionFactory connFactory = new ConnectionFactory();<br>
����������� Connection conn = connFactory.newConnection(hostName, portNumber);<br><br>����������� channel = conn.createChannel();<br><br>����������� channel.queueDeclare(queueName);<br><br>����������� Thread thread = new Thread(new SimpleConsumerCancel());<br>
����������� thread.start();<br><br>����������� QueueingConsumer consumer = new QueueingConsumer(channel);<br>����������� consumerTag = channel.basicConsume(queueName, consumer);<br>����������� System.out.println(&quot;consumerTag: &quot; + consumerTag);<br>
����������� while (true) {<br>��������������� try {<br>������������������� System.out.println(&quot;Waiting for delivery.&quot;);<br>������������������� QueueingConsumer.Delivery delivery = consumer.nextDelivery();<br>������������������� System.out.println(&quot;\nGot delivery.&quot;);<br>
������������������� count++;<br>������������������� System.out.println(&quot;SimpleConsumerCancel got Message: &quot; + new String(delivery.getBody()) + &quot; : &quot; + count);<br>������������������� channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);<br>
��������������� }<br>��������������� catch (Exception ex) {<br>������������������� System.err.println(&quot;Read thread caught exception: &quot; + ex);<br><br>������������������� channel = conn.createChannel();<br>������������������� channel.queueDeclare(queueName);<br>
������������������� consumer = new QueueingConsumer(channel);<br>������������������� consumerTag = channel.basicConsume(queueName, consumer);<br>������������������� System.out.println(&quot;consumerTag: &quot; + consumerTag);<br>
��������������� }<br>����������� }<br>������� }<br>������� catch (Exception ex) {<br>����������� System.err.println(&quot;Main thread caught exception: &quot; + ex);<br>����������� //ex.printStackTrace();<br>������� }<br>
������� System.out.println(&quot;\nMain thread exiting normally.&quot;);<br>��� }<br><br>��� @Override<br>��� public void run() {<br><br>������� while (true) {<br>����������� try {<br>��������������� System.out.println(&quot;Cancel thread is sleeping...&quot;);<br>
��������������� Thread.sleep(4000);<br>��������������� System.out.println(&quot;Going to cancel channel with consumerTag: &quot; + consumerTag);<br>��������������� // channel.basicCancel(consumerTag);<br>��������������� channel.close();<br>
��������������� // channel.abort();<br>��������������� System.out.println(&quot;\nCancelled channel with consumerTag: &quot; + consumerTag);<br>����������� }<br>����������� catch (Exception ex) {<br>��������������� System.err.println(&quot;SimpleConsumerCancel ex: &quot; + ex);<br>
��������������� //ex.printStackTrace();<br>����������� }<br>������� }<br><br>��� }<br>}<br><br><br><br clear="all">Jim Irrer � � <a href="mailto:irrer@umich.edu">irrer@umich.edu</a> � � � (734) 647-4409<br>University of Michigan Hospital Radiation Oncology<br>
519 W. William St. � � � � � � Ann Arbor, MI 48103<br>
<br><br><div class="gmail_quote">On Thu, Jan 14, 2010 at 6:24 AM, Matthew Sackman <span dir="ltr">&lt;<a href="mailto:matthew@lshift.net">matthew@lshift.net</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
Hi Jim,<br>
<div class="im"><br>
On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:<br>
&gt; I have several consumers reading from the same queue. �I would like<br>
&gt; to be able to interrupt their pending read to suspend and resume any<br>
&gt; one of them. � I&#39;ve played around with:<br>
&gt;<br>
&gt; Channel.basicCancel(consumerTag) : don&#39;t know how to resume<br>
&gt; Channel.abort() � �produces com.rabbitmq.client.ShutdownSignalException<br>
&gt; Channel.close() � produces com.rabbitmq.client.ShutdownSignalException<br>
&gt;<br>
&gt; The close() and abort() methods seem to act about the same. �I can resume<br>
&gt; reading from the queue by creating a new channel and a new QueueingConsumer.<br>
&gt; I could not figure out how to resume after a basicCancel.<br>
&gt;<br>
&gt; Is using close() and then re-constructing the channel and QueueingConsumer<br>
&gt; the right way to go? �Will resources be properly taken care of by garbage<br>
&gt; collection?<br>
<br>
</div>What I would do is set QoS prefetch to 1, and make sure that you&#39;re<br>
doing ack-ing manually (i.e. don&#39;t set noAck). Then, when you want to<br>
&quot;suspend&quot;, just delay acking the last message you received. That&#39;ll<br>
prevent further messages being sent to you. Once you want to resume,<br>
send the ack and then the next message will be sent down to you. Does<br>
that help?<br>
<div class="im"><br>
&gt; BTW - I use the consumerTag returned by Channel.basicConsume(queueName,<br>
&gt; consumer)<br>
&gt; for the argument to Channel.basicCancel, eg: *<br>
&gt; amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .<br>
&gt; When called, it prints:<br>
&gt;<br>
&gt; Consumer null method handleCancelOk for channel AMQChannel(amqp://<br>
&gt; <a href="http://guest@172.20.125.34:5672/,1" target="_blank">guest@172.20.125.34:5672/,1</a>) threw an exception:<br>
&gt; java.lang.NullPointerException<br>
&gt; � � at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)<br>
&gt; � � at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)<br>
&gt; � � at<br>
&gt; com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)<br>
&gt; � � at<br>
&gt; com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)<br>
&gt; � � at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)<br>
&gt; � � at<br>
&gt; com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)<br>
<br>
</div>The Javadoc says:<br>
<br>
 � �/**<br>
 � � * Cancel a consumer. Calls the consumer&#39;s {@link Consumer#handleCancelOk}<br>
 � � * method before returning.<br>
 � � * @param consumerTag a client- or server-generated consumer tag to establish context<br>
 � � * @throws java.io.IOException if an error is encountered<br>
 � � * @see com.rabbitmq.client.AMQP.Basic.Cancel<br>
 � � * @see com.rabbitmq.client.AMQP.Basic.CancelOk<br>
 � � */<br>
 � �void basicCancel(String consumerTag) throws IOException;<br>
<br>
The DefaultConsumer does have the handleCancelOk filled in, and Consumer<br>
is an interface, so I&#39;m a little alarmed by the possibility that it<br>
can&#39;t find the handleCancelOk method. What consumer class are you using?<br>
- in short, could you send us a small code example that exhibits this<br>
behaviour?<br>
<br>
Best wishes,<br>
<br>
Matthew<br>
<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</blockquote></div><br>