[rabbitmq-discuss] Memory Management Concerns / Questions
Matthias Radestock
matthias at rabbitmq.com
Thu Dec 22 20:21:36 GMT 2011
Andy,
On 22/12/11 19:53, AndyB wrote:
> In my test case, I have intentionally coded the consumer to never
> catch up. During the process, I saw the publishers get blocked
> after the alert, messages were streamed to disk enough to get below
> the watermark, and the publishers were unblocked. Of course they hit
> the watermark soon after and the same process happened again. I'd
> say this happened maybe 4 or 5 times and then they just remained in
> a blocked state. I let the test continue running for almost 10
> minutes and they never became unblocked and the watermark alert never
> seemed to clear on the server. So I guess that means that it
> stopped streaming the messages to disk or something?
It's possible that you ran into another limit...
Each message has a small memory footprint, even when it has been paged
to disk. So there is an upper bound to how many messages rabbit can hold
on to. When that bound is reached producers will remain blocked until
some messages have been consumed.
There is way around that - changing the message store index module to
one that is operating entirely on disk. See
https://github.com/rabbitmq/rabbitmq-toke. However, I don't know of any
production rabbits that have actually run into this limitation.
> Either way, I'm going to have to come up with a way to implement
> something in my code to try to avoid tying up a thread for an unknown
> amount of time. Any ideas?
You could perform all the invocations of the AMQP client's publish
methods from a single, separate thread. It would sit in a loop, pulling
messages off a bounded buffer / queue (e.g. an ArrayBlockingQueue if
this was Java; there are presumably similar data structures in C#, and
worst case you could roll your own) and invoking the publish methods in
the AMQP client.
The "real" publishing threads simply deposit messages into the buffer /
queue using an operation with a timeout, e.g. BlockingQueue.offer(E o,
long timeout, TimeUnit unit).
Matthias.
More information about the rabbitmq-discuss
mailing list