Matthew -<br><br>Thanks for trying but that does not really help.  I want to interrupt a free-running<br>process that has already posted the read.  To explore this, I modified RabbitMQ&#39;s<br>SimpleConsumer class (below) to post a read (via the nextDelivery method), and<br>
while it is waiting, have another thread close the channel.<br><br>Thanks,<br><br>- Jim<br><br>//   The contents of this file are subject to the Mozilla Public License<br>//   Version 1.1 (the &quot;License&quot;); you may not use this file except in<br>
//   compliance with the License. You may obtain a copy of the License at<br>//   <a href="http://www.mozilla.org/MPL/">http://www.mozilla.org/MPL/</a><br>//<br>//   Software distributed under the License is distributed on an &quot;AS IS&quot;<br>
//   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the<br>//   License for the specific language governing rights and limitations<br>//   under the License.<br>//<br>//   The Original Code is RabbitMQ.<br>
//<br>//   The Initial Developers of the Original Code are LShift Ltd,<br>//   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.<br>//<br>//   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,<br>
//   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd<br>//   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial<br>//   Technologies LLC, and Rabbit Technologies Ltd.<br>//<br>//   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift<br>
//   Ltd. Portions created by Cohesive Financial Technologies LLC are<br>//   Copyright (C) 2007-2009 Cohesive Financial Technologies<br>//   LLC. Portions created by Rabbit Technologies Ltd are Copyright<br>//   (C) 2007-2009 Rabbit Technologies Ltd.<br>
//<br>//   All Rights Reserved.<br>//<br>//   Contributor(s): ______________________________________.<br>//<br><br>package com.rabbitmq.examples;<br><br>import com.rabbitmq.client.AMQP;<br>import com.rabbitmq.client.Channel;<br>
import com.rabbitmq.client.Connection;<br>import com.rabbitmq.client.ConnectionFactory;<br>import com.rabbitmq.client.QueueingConsumer;<br><br>public class SimpleConsumerCancel implements Runnable {<br>    <br>    private static int count = 0;<br>
    <br>    private static Channel channel = null;<br>    <br>    private static String consumerTag = null;<br><br>    public static void main(String[] args) {<br>        try {<br>            String hostName = (args.length &gt; 0) ? args[0] : &quot;172.20.125.34&quot;;<br>
            int portNumber = (args.length &gt; 1) ? Integer.parseInt(args[1]) : AMQP.PROTOCOL.PORT;<br>            String queueName = (args.length &gt; 2) ? args[2] : &quot;SimpleQueue&quot;;<br><br>            ConnectionFactory connFactory = new ConnectionFactory();<br>
            Connection conn = connFactory.newConnection(hostName, portNumber);<br><br>            channel = conn.createChannel();<br><br>            channel.queueDeclare(queueName);<br><br>            Thread thread = new Thread(new SimpleConsumerCancel());<br>
            thread.start();<br><br>            QueueingConsumer consumer = new QueueingConsumer(channel);<br>            consumerTag = channel.basicConsume(queueName, consumer);<br>            System.out.println(&quot;consumerTag: &quot; + consumerTag);<br>
            while (true) {<br>                try {<br>                    System.out.println(&quot;Waiting for delivery.&quot;);<br>                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();<br>                    System.out.println(&quot;\nGot delivery.&quot;);<br>
                    count++;<br>                    System.out.println(&quot;SimpleConsumerCancel got Message: &quot; + new String(delivery.getBody()) + &quot; : &quot; + count);<br>                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);<br>
                }<br>                catch (Exception ex) {<br>                    System.err.println(&quot;Read thread caught exception: &quot; + ex);<br><br>                    channel = conn.createChannel();<br>                    channel.queueDeclare(queueName);<br>
                    consumer = new QueueingConsumer(channel);<br>                    consumerTag = channel.basicConsume(queueName, consumer);<br>                    System.out.println(&quot;consumerTag: &quot; + consumerTag);<br>
                }<br>            }<br>        }<br>        catch (Exception ex) {<br>            System.err.println(&quot;Main thread caught exception: &quot; + ex);<br>            //ex.printStackTrace();<br>        }<br>
        System.out.println(&quot;\nMain thread exiting normally.&quot;);<br>    }<br><br>    @Override<br>    public void run() {<br><br>        while (true) {<br>            try {<br>                System.out.println(&quot;Cancel thread is sleeping...&quot;);<br>
                Thread.sleep(4000);<br>                System.out.println(&quot;Going to cancel channel with consumerTag: &quot; + consumerTag);<br>                // channel.basicCancel(consumerTag);<br>                channel.close();<br>
                // channel.abort();<br>                System.out.println(&quot;\nCancelled channel with consumerTag: &quot; + consumerTag);<br>            }<br>            catch (Exception ex) {<br>                System.err.println(&quot;SimpleConsumerCancel ex: &quot; + ex);<br>
                //ex.printStackTrace();<br>            }<br>        }<br><br>    }<br>}<br><br><br><br clear="all">Jim Irrer     <a href="mailto:irrer@umich.edu">irrer@umich.edu</a>       (734) 647-4409<br>University of Michigan Hospital Radiation Oncology<br>
519 W. William St.             Ann Arbor, MI 48103<br>
<br><br><div class="gmail_quote">On Thu, Jan 14, 2010 at 6:24 AM, Matthew Sackman <span dir="ltr">&lt;<a href="mailto:matthew@lshift.net">matthew@lshift.net</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
Hi Jim,<br>
<div class="im"><br>
On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:<br>
&gt; I have several consumers reading from the same queue.  I would like<br>
&gt; to be able to interrupt their pending read to suspend and resume any<br>
&gt; one of them.   I&#39;ve played around with:<br>
&gt;<br>
&gt; Channel.basicCancel(consumerTag) : don&#39;t know how to resume<br>
&gt; Channel.abort()    produces com.rabbitmq.client.ShutdownSignalException<br>
&gt; Channel.close()   produces com.rabbitmq.client.ShutdownSignalException<br>
&gt;<br>
&gt; The close() and abort() methods seem to act about the same.  I can resume<br>
&gt; reading from the queue by creating a new channel and a new QueueingConsumer.<br>
&gt; I could not figure out how to resume after a basicCancel.<br>
&gt;<br>
&gt; Is using close() and then re-constructing the channel and QueueingConsumer<br>
&gt; the right way to go?  Will resources be properly taken care of by garbage<br>
&gt; collection?<br>
<br>
</div>What I would do is set QoS prefetch to 1, and make sure that you&#39;re<br>
doing ack-ing manually (i.e. don&#39;t set noAck). Then, when you want to<br>
&quot;suspend&quot;, just delay acking the last message you received. That&#39;ll<br>
prevent further messages being sent to you. Once you want to resume,<br>
send the ack and then the next message will be sent down to you. Does<br>
that help?<br>
<div class="im"><br>
&gt; BTW - I use the consumerTag returned by Channel.basicConsume(queueName,<br>
&gt; consumer)<br>
&gt; for the argument to Channel.basicCancel, eg: *<br>
&gt; amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .<br>
&gt; When called, it prints:<br>
&gt;<br>
&gt; Consumer null method handleCancelOk for channel AMQChannel(amqp://<br>
&gt; <a href="http://guest@172.20.125.34:5672/,1" target="_blank">guest@172.20.125.34:5672/,1</a>) threw an exception:<br>
&gt; java.lang.NullPointerException<br>
&gt;     at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)<br>
&gt;     at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)<br>
&gt;     at<br>
&gt; com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)<br>
&gt;     at<br>
&gt; com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)<br>
&gt;     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)<br>
&gt;     at<br>
&gt; com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)<br>
<br>
</div>The Javadoc says:<br>
<br>
    /**<br>
     * Cancel a consumer. Calls the consumer&#39;s {@link Consumer#handleCancelOk}<br>
     * method before returning.<br>
     * @param consumerTag a client- or server-generated consumer tag to establish context<br>
     * @throws java.io.IOException if an error is encountered<br>
     * @see com.rabbitmq.client.AMQP.Basic.Cancel<br>
     * @see com.rabbitmq.client.AMQP.Basic.CancelOk<br>
     */<br>
    void basicCancel(String consumerTag) throws IOException;<br>
<br>
The DefaultConsumer does have the handleCancelOk filled in, and Consumer<br>
is an interface, so I&#39;m a little alarmed by the possibility that it<br>
can&#39;t find the handleCancelOk method. What consumer class are you using?<br>
- in short, could you send us a small code example that exhibits this<br>
behaviour?<br>
<br>
Best wishes,<br>
<br>
Matthew<br>
<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</blockquote></div><br>