[rabbitmq-discuss] QueueingConsumer basicPublish safety

Emile Joubert emile at rabbitmq.com
Thu Jul 29 15:09:28 BST 2010


Hi Tsuraan,

Channel.flow has been asserted and therefore the basic.publish call has
blocked. We are aware that our API documentation is incorrect in
claiming that basic.publish is safe to use in the given context. This
will be updated.

You need to enable consumption of messages while publishers are
potentially blocked. This will allow memory pressure on the broker to be
relieved and publishers to resume eventually. In your example
consumption and publication happens in the same thread and therefore
your application gets completely stuck as soon as publication blocks.

You could also consider using separate channels for publishing and
consuming messages.

Regards

Emile








On 27/07/10 21:53, tsuraan wrote:
> In my program, messages coming from rabbit are handled on different
> threads, and once the message is handled it is ack'd.  New messages
> are also sometimes generated, so I need to send them as well.  I know
> that channels aren't thread-safe, so I wrote a wrapper class around
> QueueingConsumer that maintains two internal BlockingQueues, one for
> delivery tags to ack and one for messages to send.  I have public
> methods ack and publish that take dtags or message representations and
> enqueue them, and I overrode the nextDelivery(long) method to look
> like this:
> 
>   public QueueingConsumer.Delivery nextDelivery(long timeout)
>     throws ShutdownSignalException, InterruptedException
>   {
>     QueueingConsumer.Delivery delivery = super.nextDelivery(timeout);
>     this.processAcks();
>     this.processPublishes();
>     return delivery;
>   }
> 
> processAcks() iterates through the acks BlockingQueue and calls
> 
> this.getChannel().basicAck(l.longValue(), false);
> 
> while processPublishes() iterates through the publishes BlockingQueue and calls
> 
> this.getChannel().basicPublish(m.exchange, m.key, ...)
> 
> The problem I'm having is that, although all these functions are
> always being called in the thread that the consumer is running in, I'm
> hanging.  My stack trace looks like this:
> 
> Stack trace:
> java.lang.Object.wait(Native Method)
> java.lang.Object.wait(Object.java:485)
> com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:300)
> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
> com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:403)
> com.foo.rabbit.AckingQueueingConsumer.processPublishes(AckingQueueingConsumer.java:136)
> com.foo.rabbit.AckingQueueingConsumer.nextDelivery(AckingQueueingConsumer.java:40)
> [handler stuff]
> java.lang.Thread.run(Thread.java:619)
> 
> It seems to have been stuck there for quite a while.  Am I doing
> something obviously wrong, or is this something complicated?  I'm
> reasonably sure that the channel is only being used in the thread
> where the consumer is subscribed, so I'm hoping that I missed some
> caveat about doing publishes right after a nextDelivery or something.
> Any tips would be very appreciated.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss



More information about the rabbitmq-discuss mailing list