[rabbitmq-discuss] suspending and resuming a QueueingConsumer
Jim Irrer
irrer at umich.edu
Thu Jan 14 16:19:41 GMT 2010
Matthew -
Thanks for trying but that does not really help. I want to interrupt a
free-running
process that has already posted the read. To explore this, I modified
RabbitMQ's
SimpleConsumer class (below) to post a read (via the nextDelivery method),
and
while it is waiting, have another thread close the channel.
Thanks,
- Jim
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License at
// http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
// License for the specific language governing rights and limitations
// under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developers of the Original Code are LShift Ltd,
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
//
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
// Technologies LLC, and Rabbit Technologies Ltd.
//
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
// Ltd. Portions created by Cohesive Financial Technologies LLC are
// Copyright (C) 2007-2009 Cohesive Financial Technologies
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
// (C) 2007-2009 Rabbit Technologies Ltd.
//
// All Rights Reserved.
//
// Contributor(s): ______________________________________.
//
package com.rabbitmq.examples;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class SimpleConsumerCancel implements Runnable {
private static int count = 0;
private static Channel channel = null;
private static String consumerTag = null;
public static void main(String[] args) {
try {
String hostName = (args.length > 0) ? args[0] : "172.20.125.34";
int portNumber = (args.length > 1) ? Integer.parseInt(args[1]) :
AMQP.PROTOCOL.PORT;
String queueName = (args.length > 2) ? args[2] : "SimpleQueue";
ConnectionFactory connFactory = new ConnectionFactory();
Connection conn = connFactory.newConnection(hostName,
portNumber);
channel = conn.createChannel();
channel.queueDeclare(queueName);
Thread thread = new Thread(new SimpleConsumerCancel());
thread.start();
QueueingConsumer consumer = new QueueingConsumer(channel);
consumerTag = channel.basicConsume(queueName, consumer);
System.out.println("consumerTag: " + consumerTag);
while (true) {
try {
System.out.println("Waiting for delivery.");
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
System.out.println("\nGot delivery.");
count++;
System.out.println("SimpleConsumerCancel got Message: "
+ new String(delivery.getBody()) + " : " + count);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
catch (Exception ex) {
System.err.println("Read thread caught exception: " +
ex);
channel = conn.createChannel();
channel.queueDeclare(queueName);
consumer = new QueueingConsumer(channel);
consumerTag = channel.basicConsume(queueName, consumer);
System.out.println("consumerTag: " + consumerTag);
}
}
}
catch (Exception ex) {
System.err.println("Main thread caught exception: " + ex);
//ex.printStackTrace();
}
System.out.println("\nMain thread exiting normally.");
}
@Override
public void run() {
while (true) {
try {
System.out.println("Cancel thread is sleeping...");
Thread.sleep(4000);
System.out.println("Going to cancel channel with
consumerTag: " + consumerTag);
// channel.basicCancel(consumerTag);
channel.close();
// channel.abort();
System.out.println("\nCancelled channel with consumerTag: "
+ consumerTag);
}
catch (Exception ex) {
System.err.println("SimpleConsumerCancel ex: " + ex);
//ex.printStackTrace();
}
}
}
}
Jim Irrer irrer at umich.edu (734) 647-4409
University of Michigan Hospital Radiation Oncology
519 W. William St. Ann Arbor, MI 48103
On Thu, Jan 14, 2010 at 6:24 AM, Matthew Sackman <matthew at lshift.net> wrote:
> Hi Jim,
>
> On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:
> > I have several consumers reading from the same queue. I would like
> > to be able to interrupt their pending read to suspend and resume any
> > one of them. I've played around with:
> >
> > Channel.basicCancel(consumerTag) : don't know how to resume
> > Channel.abort() produces com.rabbitmq.client.ShutdownSignalException
> > Channel.close() produces com.rabbitmq.client.ShutdownSignalException
> >
> > The close() and abort() methods seem to act about the same. I can resume
> > reading from the queue by creating a new channel and a new
> QueueingConsumer.
> > I could not figure out how to resume after a basicCancel.
> >
> > Is using close() and then re-constructing the channel and
> QueueingConsumer
> > the right way to go? Will resources be properly taken care of by garbage
> > collection?
>
> What I would do is set QoS prefetch to 1, and make sure that you're
> doing ack-ing manually (i.e. don't set noAck). Then, when you want to
> "suspend", just delay acking the last message you received. That'll
> prevent further messages being sent to you. Once you want to resume,
> send the ack and then the next message will be sent down to you. Does
> that help?
>
> > BTW - I use the consumerTag returned by Channel.basicConsume(queueName,
> > consumer)
> > for the argument to Channel.basicCancel, eg: *
> > amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .
> > When called, it prints:
> >
> > Consumer null method handleCancelOk for channel AMQChannel(amqp://
> > guest at 172.20.125.34:5672/,1) threw an exception:
> > java.lang.NullPointerException
> > at
> com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)
> > at
> com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)
> > at
> >
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)
> > at
> >
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
> > at
> com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
> > at
> >
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)
>
> The Javadoc says:
>
> /**
> * Cancel a consumer. Calls the consumer's {@link
> Consumer#handleCancelOk}
> * method before returning.
> * @param consumerTag a client- or server-generated consumer tag to
> establish context
> * @throws java.io.IOException if an error is encountered
> * @see com.rabbitmq.client.AMQP.Basic.Cancel
> * @see com.rabbitmq.client.AMQP.Basic.CancelOk
> */
> void basicCancel(String consumerTag) throws IOException;
>
> The DefaultConsumer does have the handleCancelOk filled in, and Consumer
> is an interface, so I'm a little alarmed by the possibility that it
> can't find the handleCancelOk method. What consumer class are you using?
> - in short, could you send us a small code example that exhibits this
> behaviour?
>
> Best wishes,
>
> Matthew
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100114/65aa8bcb/attachment.htm
More information about the rabbitmq-discuss
mailing list