[rabbitmq-discuss] RabbitMQ 1.7.0 close the cnx when consumer don't ack deliveries during 30s
Vincent Barat
vbarat at ubikod.com
Thu Jul 15 15:35:59 BST 2010
Hi Simon,
I got my fix: here is what I do:
1- I use basicQos() to be sure that the server will not send "too
many" messages (actually, I have checked that basicQos() does not
offers a guarantee on the number of un-acknoledged messages sent,
since I actually receive much more than the parameter I give, but it
seems at least to limit the amount of messages, maybe there is a bug
here ?).
2- I use an unlimited LinkedBlockingQueue to buffer my messages and
to be sure that handleDelivery will never block
3- then, I use another thread to read this queue and write my
message to the database
It works perfectly.
Here is the code:
public abstract class BufferedQueueConsumer extends QueueConsumer
{
private static final Log LOG =
LogFactory.getLog(BufferedQueueConsumer.class);
/** Maximum time during which the messages will be buffered
before triggering a delivery */
private int pace = 1000;
/** Maximum number of messages being buffered before triggering a
delivered */
private int capacity = 1000;
/** Thread used to consume buffered messages */
private final Thread consumerThread;
/**
* The message buffer. A queue is used to store the buffered
messages and to synchronize the
* message producer and consumer. This queue must not have a
maximum capacity, otherwise it could
* block the producer in handleDelivery() and then brake the AMQP
connection. Instead, we rely on
* AMQP's QOS parameter which allow to configure the server to
send not more than a given amount
* of unacknowledged deliveries. Tests have shown that this
parameter is not a guarantee (we can
* receive more unacknowledged messages than expected) but it has
a clear effect preventing memory
* overflows.
*/
private final BlockingQueue<Delivery> deliveryQueue = new
LinkedBlockingQueue<Delivery>();
public BufferedQueueConsumer(final String queueName, String
exchange, String routingKey,
String mode, boolean durable)
{
/* Create the parent queue consumer */
super(queueName, exchange, routingKey, mode, durable);
/* Create the thread consuming delivered messages */
consumerThread = new Thread(queueName + "'s consumer")
{
@Override
public void run()
{
LOG.info("Thread " + this.getName() + " started");
/* Create a delivery list used to extract deliveries from
the delivery queue */
final List<Delivery> deliveryList = new ArrayList<Delivery>();
final List<Delivery> readOnlyDeliveryList =
Collections.unmodifiableList(deliveryList);
/* Initialize last delivery time */
long lastDeliveryTime = System.currentTimeMillis();
while (true)
{
long currentTime = System.currentTimeMillis();
/*
* If we have polled enough messages from the delivery
queue, or we have polled the
* delivery queue long enough
*/
if (deliveryList.size() == capacity
|| ((currentTime >= lastDeliveryTime + pace) &&
!deliveryList.isEmpty()))
{
/* Ask subclass to handle all buffered deliveries */
handleBufferedDeliveries(readOnlyDeliveryList);
/* Acknowledge all buffered deliveries */
ackDeliveries(readOnlyDeliveryList);
/* Forget all buffered deliveries */
deliveryList.clear();
/* Reset last delivery time */
lastDeliveryTime = System.currentTimeMillis();
}
/*
* If we need to poll more messages from the delivery
queue, or we haven't polled the
* delivery queue long enough
*/
else
{
try
{
/*
* If the delivery list is empty, we need to wait
forever for one message before being
* able to deliver something
*/
if (deliveryList.isEmpty())
deliveryList.add(deliveryQueue.take());
/*
* If the delivery list is not empty, we only need to
wait for the time remaining to
* poll
*/
else
{
Delivery delivery =
deliveryQueue.poll(lastDeliveryTime + pace - currentTime,
TimeUnit.MILLISECONDS);
if (delivery != null)
deliveryList.add(delivery);
}
}
catch (InterruptedException ie)
{
LOG.warn("Thread " + this.getName() + " for " +
queueName + " interrupted");
return;
}
}
}
}
};
consumerThread.setDaemon(true);
consumerThread.start();
}
@Override
protected void onNewChannel(Channel channel) throws IOException
{
super.onNewChannel(channel);
/*
* Configure the channel to send not more than a given amount
of unacknowledged messages. This
* avoid at the same time to buffer too many messages (which
could exhaust memory) and to block
* in handleDelivery() (which could brake the connection).
*/
channel.basicQos(capacity);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties,
byte[] body)
{
try
{
/* Store the delivery in the message buffer */
deliveryQueue.put(new Delivery(envelope, properties, body));
}
catch (InterruptedException ie)
{
/* Due to program being exited, do nothing */
}
}
/**
* Override this method to be notified of buffered deliveries.
Once this method has run, messages
* will be acknowledged.
* @param stack unmodifiable list of deliveries.
*/
public abstract void handleBufferedDeliveries(List<Delivery> stack);
}
Le 14/07/10 18:53, Simon MacMullen a écrit :
> Hi Vincent. Yes, I think you could do that, although you have to
> think about what to do when there aren't 1000 messages to deliver
> - would you just wait indefinitely?
>
> It does sound like you'd be building something like a fixed-length
> QueueingConsumer though - I don't really see what that would gain
> you.
>
> Cheers, Simon
>
> On 14/07/10 17:38, Vincent Barat wrote:
>> Concerning this issue, I've read about the basicQos() call.
>>
>> Do you think that I can use this call to force the server to send
>> me a
>> maximum of, say, 1000 messages that I could store in an array in
>> handleDelivery() (without blocking), and, once I've counted 1000
>> messages exactly, I trigger another thread to store all messages and
>> then ack all messages ?
>>
>> Le 14/07/10 11:30, Simon MacMullen a écrit :
>>> On 13/07/10 16:00, Vincent Barat wrote:
>>> <snip>
>>>> But from time to time, the database blocks during 30s to
>>>> 1minute and
>>>> thus I don't acknowledge during the same time.
>>>>
>>>> It seems that the RabbitMQ server closes the connection. Is
>>>> there a
>>>> timeout (or a setting) I can use to fix this issue ?
>>>
>>> Hi Vincent. Not acking for 30s certainly should not close the
>>> connection. When the server closes a connection due to client
>>> error it
>>> sends an error message; you should see this as an IOException in
>>> the
>>> Java client. Check the ShutdownSignalException inside the
>>> IOException
>>> to get the error code / message. Alternatively you could look in
>>> the
>>> server logs, sometimes there's more detail there.
>>>
>>> One way this could be happening is:
>>>
>>> * You have heartbeating turned on (I think some of the client
>>> version
>>> have this turned on by default). Heartbeating makes the server
>>> periodically check the client is still alive and responding.
>>>
>>> * You block the connection main loop, say by implementing your own
>>> Consumer rather than using QueueingConsumer, and doing something
>>> slow
>>> / expensive in Consumer.handleDelivery().
>>>
>>> Does this sound plausible? If not I would look for error
>>> messages as
>>> above.
>>>
>>> Cheers, Simon
>>>
>> _______________________________________________
>> rabbitmq-discuss mailing list
>> rabbitmq-discuss at lists.rabbitmq.com
>> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
More information about the rabbitmq-discuss
mailing list