[rabbitmq-discuss] suspending and resuming a QueueingConsumer

Jim Irrer irrer at umich.edu
Thu Jan 14 16:19:41 GMT 2010


Matthew -

Thanks for trying but that does not really help.  I want to interrupt a
free-running
process that has already posted the read.  To explore this, I modified
RabbitMQ's
SimpleConsumer class (below) to post a read (via the nextDelivery method),
and
while it is waiting, have another thread close the channel.

Thanks,

- Jim

//   The contents of this file are subject to the Mozilla Public License
//   Version 1.1 (the "License"); you may not use this file except in
//   compliance with the License. You may obtain a copy of the License at
//   http://www.mozilla.org/MPL/
//
//   Software distributed under the License is distributed on an "AS IS"
//   basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
//   License for the specific language governing rights and limitations
//   under the License.
//
//   The Original Code is RabbitMQ.
//
//   The Initial Developers of the Original Code are LShift Ltd,
//   Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
//
//   Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
//   Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
//   are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
//   Technologies LLC, and Rabbit Technologies Ltd.
//
//   Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
//   Ltd. Portions created by Cohesive Financial Technologies LLC are
//   Copyright (C) 2007-2009 Cohesive Financial Technologies
//   LLC. Portions created by Rabbit Technologies Ltd are Copyright
//   (C) 2007-2009 Rabbit Technologies Ltd.
//
//   All Rights Reserved.
//
//   Contributor(s): ______________________________________.
//

package com.rabbitmq.examples;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class SimpleConsumerCancel implements Runnable {

    private static int count = 0;

    private static Channel channel = null;

    private static String consumerTag = null;

    public static void main(String[] args) {
        try {
            String hostName = (args.length > 0) ? args[0] : "172.20.125.34";
            int portNumber = (args.length > 1) ? Integer.parseInt(args[1]) :
AMQP.PROTOCOL.PORT;
            String queueName = (args.length > 2) ? args[2] : "SimpleQueue";

            ConnectionFactory connFactory = new ConnectionFactory();
            Connection conn = connFactory.newConnection(hostName,
portNumber);

            channel = conn.createChannel();

            channel.queueDeclare(queueName);

            Thread thread = new Thread(new SimpleConsumerCancel());
            thread.start();

            QueueingConsumer consumer = new QueueingConsumer(channel);
            consumerTag = channel.basicConsume(queueName, consumer);
            System.out.println("consumerTag: " + consumerTag);
            while (true) {
                try {
                    System.out.println("Waiting for delivery.");
                    QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
                    System.out.println("\nGot delivery.");
                    count++;
                    System.out.println("SimpleConsumerCancel got Message: "
+ new String(delivery.getBody()) + " : " + count);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
                catch (Exception ex) {
                    System.err.println("Read thread caught exception: " +
ex);

                    channel = conn.createChannel();
                    channel.queueDeclare(queueName);
                    consumer = new QueueingConsumer(channel);
                    consumerTag = channel.basicConsume(queueName, consumer);
                    System.out.println("consumerTag: " + consumerTag);
                }
            }
        }
        catch (Exception ex) {
            System.err.println("Main thread caught exception: " + ex);
            //ex.printStackTrace();
        }
        System.out.println("\nMain thread exiting normally.");
    }

    @Override
    public void run() {

        while (true) {
            try {
                System.out.println("Cancel thread is sleeping...");
                Thread.sleep(4000);
                System.out.println("Going to cancel channel with
consumerTag: " + consumerTag);
                // channel.basicCancel(consumerTag);
                channel.close();
                // channel.abort();
                System.out.println("\nCancelled channel with consumerTag: "
+ consumerTag);
            }
            catch (Exception ex) {
                System.err.println("SimpleConsumerCancel ex: " + ex);
                //ex.printStackTrace();
            }
        }

    }
}



Jim Irrer     irrer at umich.edu       (734) 647-4409
University of Michigan Hospital Radiation Oncology
519 W. William St.             Ann Arbor, MI 48103


On Thu, Jan 14, 2010 at 6:24 AM, Matthew Sackman <matthew at lshift.net> wrote:

> Hi Jim,
>
> On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:
> > I have several consumers reading from the same queue.  I would like
> > to be able to interrupt their pending read to suspend and resume any
> > one of them.   I've played around with:
> >
> > Channel.basicCancel(consumerTag) : don't know how to resume
> > Channel.abort()    produces com.rabbitmq.client.ShutdownSignalException
> > Channel.close()   produces com.rabbitmq.client.ShutdownSignalException
> >
> > The close() and abort() methods seem to act about the same.  I can resume
> > reading from the queue by creating a new channel and a new
> QueueingConsumer.
> > I could not figure out how to resume after a basicCancel.
> >
> > Is using close() and then re-constructing the channel and
> QueueingConsumer
> > the right way to go?  Will resources be properly taken care of by garbage
> > collection?
>
> What I would do is set QoS prefetch to 1, and make sure that you're
> doing ack-ing manually (i.e. don't set noAck). Then, when you want to
> "suspend", just delay acking the last message you received. That'll
> prevent further messages being sent to you. Once you want to resume,
> send the ack and then the next message will be sent down to you. Does
> that help?
>
> > BTW - I use the consumerTag returned by Channel.basicConsume(queueName,
> > consumer)
> > for the argument to Channel.basicCancel, eg: *
> > amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .
> > When called, it prints:
> >
> > Consumer null method handleCancelOk for channel AMQChannel(amqp://
> > guest at 172.20.125.34:5672/,1) threw an exception:
> > java.lang.NullPointerException
> >     at
> com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)
> >     at
> com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)
> >     at
> >
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)
> >     at
> >
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
> >     at
> com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
> >     at
> >
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)
>
> The Javadoc says:
>
>    /**
>     * Cancel a consumer. Calls the consumer's {@link
> Consumer#handleCancelOk}
>     * method before returning.
>     * @param consumerTag a client- or server-generated consumer tag to
> establish context
>     * @throws java.io.IOException if an error is encountered
>     * @see com.rabbitmq.client.AMQP.Basic.Cancel
>     * @see com.rabbitmq.client.AMQP.Basic.CancelOk
>     */
>    void basicCancel(String consumerTag) throws IOException;
>
> The DefaultConsumer does have the handleCancelOk filled in, and Consumer
> is an interface, so I'm a little alarmed by the possibility that it
> can't find the handleCancelOk method. What consumer class are you using?
> - in short, could you send us a small code example that exhibits this
> behaviour?
>
> Best wishes,
>
> Matthew
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100114/65aa8bcb/attachment.htm 


More information about the rabbitmq-discuss mailing list