[rabbitmq-discuss] |Spam| rabbitmq service dies when a large number of messages expire.
Simon MacMullen
simon at rabbitmq.com
Thu May 15 16:27:14 BST 2014
No, this should not happen.
It would be expected that dead lettering huge numbers of messages at
once could cause excessive memory use as dead-lettering isn't throttled.
That's why we don't dead-letter on queue deletion. But you don't appear
to be doing anything with DLX.
Assuming there aren't any policies in place to make individual messages
expire then the "x-expires" argument will really just do the equivalent
of a timed queue.delete - and that really should not cause additional
resources to be used.
So is there anything else going on? Any policies set? Does memory use
spike before the service dies? Anything interesting in the logs?
Cheers, Simon
On 15/05/14 16:00, Sreyan Sarkar wrote:
> Hi,
>
> While stress testing rabbit mq to resolve a separate performance issue,
> we noticed that rabbit mq would crash when a queue with a large number
> of messages expired. When we held the total size of the all of the
> messages constant, but varied the number of messages we found that more
> messages meant a higher probability of the node dying.
>
> I’ve attached some standalone test code below that applies this load.
> I’ve found that the parameters need to be varied depending on the
> strength of the machine, but in all cases there is a point where
> messages can be built up and happily served, but the expiry of the
> entire queue will fail.
>
> I have tested this with Rabbit MQ 3.3.0 and RabbitMQ 3.2.4. Both of
> these clusters were running Erlang R16B03 on Windows Server 2008 R2.
>
> It’s not clear to me why the nodes will die with this sort of load. Is
> this a known issue with RabbitMQ or the version of Erlang I am running on?
>
> -Sreyan
>
> class Program
>
> {
>
> static void Main(string[] args)
>
> {
>
> var host = "127.0.0.1";
>
> const int loadSize = 1024*1024;
>
> var stressor = new Stressor(1, loadSize, host, "foo",
> "bar", TimeSpan.FromSeconds(10));
>
> stressor.ApplyLoad();
>
> stressor.CancellationTokenSource.Cancel();
>
> Console.WriteLine( "Applied a load of {0} KiB. Press
> [ENTER] to exit.", loadSize);
>
> Console.ReadLine();
>
> }
>
> }
>
> public sealed class Stressor
>
> {
>
> private const string QueueName = "TEST";
>
> private readonly int _messageSizeInKiB;
>
> private readonly int _totalQueueLoadInKiB;
>
> private readonly IConnection _connection;
>
> private readonly CancellationTokenSource
> _cancellationTokenSource = new CancellationTokenSource();
>
> public CancellationTokenSource CancellationTokenSource { get {
> return _cancellationTokenSource; } }
>
> private readonly Thread _consumptionThread;
>
> public Stressor(int messageSizeInKiB, int totalQueueLoadInKiB,
> string hostName, string userName, string password, TimeSpan
> connectionTimeout)
>
> {
>
> if (hostName == null) throw new
> ArgumentNullException("hostName");
>
> if (userName == null) throw new
> ArgumentNullException("userName");
>
> if (password == null) throw new
> ArgumentNullException("password");
>
> _messageSizeInKiB = messageSizeInKiB;
>
> _totalQueueLoadInKiB = totalQueueLoadInKiB;
>
> _connection = new ConnectionFactory
>
> {
>
> HostName = hostName,
>
> UserName = userName,
>
> Password = password,
>
> RequestedConnectionTimeout =
> (ushort)connectionTimeout.TotalMilliseconds
>
> }.CreateConnection();
>
> _junkMessageWrapper = new Lazy<byte[]>(() =>
>
> {
>
> var random = new Random();
>
> var message = new byte[_messageSizeInKiB * 1024];
>
> random.NextBytes(message);
>
> return message;
>
> });
>
> _consumptionThread = new Thread(Consume) {IsBackground = true};
>
> _consumptionThread.Start();
>
> }
>
> private readonly Lazy<Byte[]> _junkMessageWrapper;
>
> /// <summary>
>
> /// A consumer that doesn't take! We want to build up messagees
> then have them expire -- That's the behavior in question.
>
> /// </summary>
>
> private void Consume()
>
> {
>
> using (var channel = _connection.CreateModel())
>
> {
>
> channel.QueueDeclare(QueueName, false, false, false,
> new Dictionary<string, object> { { "x-expires",
> (int)TimeSpan.FromSeconds(60).TotalMilliseconds } });
>
> var consumer = new QueueingBasicConsumer(channel, new
> SharedQueue<BasicDeliverEventArgs>());
>
> channel.BasicConsume(QueueName, false, consumer);
>
> while (!_cancellationTokenSource.IsCancellationRequested)
>
> {
>
> Thread.Sleep(TimeSpan.FromMilliseconds(100));
>
> }
>
> }
>
> }
>
> public void ApplyLoad()
>
> {
>
> using (var channel = _connection.CreateModel())
>
> {
>
> channel.ConfirmSelect();
>
> var properties = channel.CreateBasicProperties();
>
> properties.SetPersistent(true);
>
> var count = _totalQueueLoadInKiB/_messageSizeInKiB + 1;
>
> Console.WriteLine("Publishing {0} messages, each {1}
> bytes in size", count, _junkMessageWrapper.Value.Length);
>
> for (int i = 0; i < count; i++)
>
> {
>
> channel.BasicPublish(string.Empty, QueueName,
> properties, _junkMessageWrapper.Value);
>
> }
>
> channel.WaitForConfirms();
>
> }
>
> }
>
> }
>
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
--
Simon MacMullen
RabbitMQ, Pivotal
More information about the rabbitmq-discuss
mailing list