[rabbitmq-discuss] QueueingConsumer basicPublish safety

tsuraan tsuraan at gmail.com
Tue Jul 27 21:53:04 BST 2010

In my program, messages coming from rabbit are handled on different
threads, and once the message is handled it is ack'd.  New messages
are also sometimes generated, so I need to send them as well.  I know
that channels aren't thread-safe, so I wrote a wrapper class around
QueueingConsumer that maintains two internal BlockingQueues, one for
delivery tags to ack and one for messages to send.  I have public
methods ack and publish that take dtags or message representations and
enqueue them, and I overrode the nextDelivery(long) method to look
like this:

  public QueueingConsumer.Delivery nextDelivery(long timeout)
    throws ShutdownSignalException, InterruptedException
    QueueingConsumer.Delivery delivery = super.nextDelivery(timeout);
    return delivery;

processAcks() iterates through the acks BlockingQueue and calls

this.getChannel().basicAck(l.longValue(), false);

while processPublishes() iterates through the publishes BlockingQueue and calls

this.getChannel().basicPublish(m.exchange, m.key, ...)

The problem I'm having is that, although all these functions are
always being called in the thread that the consumer is running in, I'm
hanging.  My stack trace looks like this:

Stack trace:
java.lang.Object.wait(Native Method)
[handler stuff]

It seems to have been stuck there for quite a while.  Am I doing
something obviously wrong, or is this something complicated?  I'm
reasonably sure that the channel is only being used in the thread
where the consumer is subscribed, so I'm hoping that I missed some
caveat about doing publishes right after a nextDelivery or something.
Any tips would be very appreciated.

More information about the rabbitmq-discuss mailing list