Matthew -<br><br>Thanks for trying but that does not really help. I want to interrupt a free-running<br>process that has already posted the read. To explore this, I modified RabbitMQ's<br>SimpleConsumer class (below) to post a read (via the nextDelivery method), and<br>
while it is waiting, have another thread close the channel.<br><br>Thanks,<br><br>- Jim<br><br>// The contents of this file are subject to the Mozilla Public License<br>// Version 1.1 (the "License"); you may not use this file except in<br>
// compliance with the License. You may obtain a copy of the License at<br>// <a href="http://www.mozilla.org/MPL/">http://www.mozilla.org/MPL/</a><br>//<br>// Software distributed under the License is distributed on an "AS IS"<br>
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the<br>// License for the specific language governing rights and limitations<br>// under the License.<br>//<br>// The Original Code is RabbitMQ.<br>
//<br>// The Initial Developers of the Original Code are LShift Ltd,<br>// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.<br>//<br>// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,<br>
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd<br>// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial<br>// Technologies LLC, and Rabbit Technologies Ltd.<br>//<br>// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift<br>
// Ltd. Portions created by Cohesive Financial Technologies LLC are<br>// Copyright (C) 2007-2009 Cohesive Financial Technologies<br>// LLC. Portions created by Rabbit Technologies Ltd are Copyright<br>// (C) 2007-2009 Rabbit Technologies Ltd.<br>
//<br>// All Rights Reserved.<br>//<br>// Contributor(s): ______________________________________.<br>//<br><br>package com.rabbitmq.examples;<br><br>import com.rabbitmq.client.AMQP;<br>import com.rabbitmq.client.Channel;<br>
import com.rabbitmq.client.Connection;<br>import com.rabbitmq.client.ConnectionFactory;<br>import com.rabbitmq.client.QueueingConsumer;<br><br>public class SimpleConsumerCancel implements Runnable {<br> <br> private static int count = 0;<br>
<br> private static Channel channel = null;<br> <br> private static String consumerTag = null;<br><br> public static void main(String[] args) {<br> try {<br> String hostName = (args.length > 0) ? args[0] : "172.20.125.34";<br>
int portNumber = (args.length > 1) ? Integer.parseInt(args[1]) : AMQP.PROTOCOL.PORT;<br> String queueName = (args.length > 2) ? args[2] : "SimpleQueue";<br><br> ConnectionFactory connFactory = new ConnectionFactory();<br>
Connection conn = connFactory.newConnection(hostName, portNumber);<br><br> channel = conn.createChannel();<br><br> channel.queueDeclare(queueName);<br><br> Thread thread = new Thread(new SimpleConsumerCancel());<br>
thread.start();<br><br> QueueingConsumer consumer = new QueueingConsumer(channel);<br> consumerTag = channel.basicConsume(queueName, consumer);<br> System.out.println("consumerTag: " + consumerTag);<br>
while (true) {<br> try {<br> System.out.println("Waiting for delivery.");<br> QueueingConsumer.Delivery delivery = consumer.nextDelivery();<br> System.out.println("\nGot delivery.");<br>
count++;<br> System.out.println("SimpleConsumerCancel got Message: " + new String(delivery.getBody()) + " : " + count);<br> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);<br>
}<br> catch (Exception ex) {<br> System.err.println("Read thread caught exception: " + ex);<br><br> channel = conn.createChannel();<br> channel.queueDeclare(queueName);<br>
consumer = new QueueingConsumer(channel);<br> consumerTag = channel.basicConsume(queueName, consumer);<br> System.out.println("consumerTag: " + consumerTag);<br>
}<br> }<br> }<br> catch (Exception ex) {<br> System.err.println("Main thread caught exception: " + ex);<br> //ex.printStackTrace();<br> }<br>
System.out.println("\nMain thread exiting normally.");<br> }<br><br> @Override<br> public void run() {<br><br> while (true) {<br> try {<br> System.out.println("Cancel thread is sleeping...");<br>
Thread.sleep(4000);<br> System.out.println("Going to cancel channel with consumerTag: " + consumerTag);<br> // channel.basicCancel(consumerTag);<br> channel.close();<br>
// channel.abort();<br> System.out.println("\nCancelled channel with consumerTag: " + consumerTag);<br> }<br> catch (Exception ex) {<br> System.err.println("SimpleConsumerCancel ex: " + ex);<br>
//ex.printStackTrace();<br> }<br> }<br><br> }<br>}<br><br><br><br clear="all">Jim Irrer <a href="mailto:irrer@umich.edu">irrer@umich.edu</a> (734) 647-4409<br>University of Michigan Hospital Radiation Oncology<br>
519 W. William St. Ann Arbor, MI 48103<br>
<br><br><div class="gmail_quote">On Thu, Jan 14, 2010 at 6:24 AM, Matthew Sackman <span dir="ltr"><<a href="mailto:matthew@lshift.net">matthew@lshift.net</a>></span> wrote:<br><blockquote class="gmail_quote" style="border-left: 1px solid rgb(204, 204, 204); margin: 0pt 0pt 0pt 0.8ex; padding-left: 1ex;">
Hi Jim,<br>
<div class="im"><br>
On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:<br>
> I have several consumers reading from the same queue. I would like<br>
> to be able to interrupt their pending read to suspend and resume any<br>
> one of them. I've played around with:<br>
><br>
> Channel.basicCancel(consumerTag) : don't know how to resume<br>
> Channel.abort() produces com.rabbitmq.client.ShutdownSignalException<br>
> Channel.close() produces com.rabbitmq.client.ShutdownSignalException<br>
><br>
> The close() and abort() methods seem to act about the same. I can resume<br>
> reading from the queue by creating a new channel and a new QueueingConsumer.<br>
> I could not figure out how to resume after a basicCancel.<br>
><br>
> Is using close() and then re-constructing the channel and QueueingConsumer<br>
> the right way to go? Will resources be properly taken care of by garbage<br>
> collection?<br>
<br>
</div>What I would do is set QoS prefetch to 1, and make sure that you're<br>
doing ack-ing manually (i.e. don't set noAck). Then, when you want to<br>
"suspend", just delay acking the last message you received. That'll<br>
prevent further messages being sent to you. Once you want to resume,<br>
send the ack and then the next message will be sent down to you. Does<br>
that help?<br>
<div class="im"><br>
> BTW - I use the consumerTag returned by Channel.basicConsume(queueName,<br>
> consumer)<br>
> for the argument to Channel.basicCancel, eg: *<br>
> amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .<br>
> When called, it prints:<br>
><br>
> Consumer null method handleCancelOk for channel AMQChannel(amqp://<br>
> <a href="http://guest@172.20.125.34:5672/,1" target="_blank">guest@172.20.125.34:5672/,1</a>) threw an exception:<br>
> java.lang.NullPointerException<br>
> at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)<br>
> at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)<br>
> at<br>
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)<br>
> at<br>
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)<br>
> at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)<br>
> at<br>
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)<br>
<br>
</div>The Javadoc says:<br>
<br>
/**<br>
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}<br>
* method before returning.<br>
* @param consumerTag a client- or server-generated consumer tag to establish context<br>
* @throws java.io.IOException if an error is encountered<br>
* @see com.rabbitmq.client.AMQP.Basic.Cancel<br>
* @see com.rabbitmq.client.AMQP.Basic.CancelOk<br>
*/<br>
void basicCancel(String consumerTag) throws IOException;<br>
<br>
The DefaultConsumer does have the handleCancelOk filled in, and Consumer<br>
is an interface, so I'm a little alarmed by the possibility that it<br>
can't find the handleCancelOk method. What consumer class are you using?<br>
- in short, could you send us a small code example that exhibits this<br>
behaviour?<br>
<br>
Best wishes,<br>
<br>
Matthew<br>
<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</blockquote></div><br>