[rabbitmq-discuss] suspending and resuming a QueueingConsumer

Matthew Sackman matthew at lshift.net
Thu Jan 14 11:24:59 GMT 2010


Hi Jim,

On Wed, Jan 13, 2010 at 02:06:57PM -0500, Jim Irrer wrote:
> I have several consumers reading from the same queue.  I would like
> to be able to interrupt their pending read to suspend and resume any
> one of them.   I've played around with:
> 
> Channel.basicCancel(consumerTag) : don't know how to resume
> Channel.abort()    produces com.rabbitmq.client.ShutdownSignalException
> Channel.close()   produces com.rabbitmq.client.ShutdownSignalException
> 
> The close() and abort() methods seem to act about the same.  I can resume
> reading from the queue by creating a new channel and a new QueueingConsumer.
> I could not figure out how to resume after a basicCancel.
> 
> Is using close() and then re-constructing the channel and QueueingConsumer
> the right way to go?  Will resources be properly taken care of by garbage
> collection?

What I would do is set QoS prefetch to 1, and make sure that you're
doing ack-ing manually (i.e. don't set noAck). Then, when you want to
"suspend", just delay acking the last message you received. That'll
prevent further messages being sent to you. Once you want to resume,
send the ack and then the next message will be sent down to you. Does
that help?

> BTW - I use the consumerTag returned by Channel.basicConsume(queueName,
> consumer)
> for the argument to Channel.basicCancel, eg: *
> amq.ctag-1m/H7+SDcZpbzTMgsUyhNg==* .
> When called, it prints:
> 
> Consumer null method handleCancelOk for channel AMQChannel(amqp://
> guest at 172.20.125.34:5672/,1) threw an exception:
> java.lang.NullPointerException
>     at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:728)
>     at com.rabbitmq.client.impl.ChannelN$2.transformReply(ChannelN.java:721)
>     at
> com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.handleCommand(AMQChannel.java:327)
>     at
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
>     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:110)
>     at
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:456)

The Javadoc says:

    /**
     * Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
     * method before returning.
     * @param consumerTag a client- or server-generated consumer tag to establish context
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Cancel
     * @see com.rabbitmq.client.AMQP.Basic.CancelOk
     */
    void basicCancel(String consumerTag) throws IOException;

The DefaultConsumer does have the handleCancelOk filled in, and Consumer
is an interface, so I'm a little alarmed by the possibility that it
can't find the handleCancelOk method. What consumer class are you using?
- in short, could you send us a small code example that exhibits this
behaviour?

Best wishes,

Matthew




More information about the rabbitmq-discuss mailing list