[rabbitmq-discuss] |Spam| rabbitmq service dies when a large number of messages expire.

Sreyan Sarkar Sreyan.Sarkar at gmo.com
Thu May 15 16:00:50 BST 2014


Hi,

While stress testing rabbit mq to resolve a separate performance issue, we noticed that rabbit mq would crash when a queue with a large number of messages expired. When we held the total size of the all of the messages constant, but varied the number of messages we found that more messages meant a higher probability of the node dying.

I've attached some standalone test code below that applies this load. I've found that the parameters need to be varied depending on the strength of the machine, but in all cases there is a point where messages can be built up and happily served, but the expiry of the entire queue will fail.

I have tested this with Rabbit MQ 3.3.0 and RabbitMQ 3.2.4. Both of these clusters were running Erlang R16B03 on Windows Server 2008 R2.

It's not clear to me why the nodes will die with this sort of load. Is this a known issue with RabbitMQ or the version of Erlang I am running on?

-Sreyan

class Program
    {
        static void Main(string[] args)
        {
            var host = "127.0.0.1";
            const int loadSize = 1024*1024;
            var stressor = new Stressor(1, loadSize, host, "foo", "bar", TimeSpan.FromSeconds(10));
            stressor.ApplyLoad();
            stressor.CancellationTokenSource.Cancel();

            Console.WriteLine( "Applied a load of {0} KiB. Press [ENTER] to exit.", loadSize);
            Console.ReadLine();

        }
    }

    public sealed class Stressor
    {
        private const string QueueName = "TEST";
        private readonly int _messageSizeInKiB;
        private readonly int _totalQueueLoadInKiB;
        private readonly IConnection _connection;

        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
        public CancellationTokenSource CancellationTokenSource { get { return _cancellationTokenSource; } }

        private readonly Thread _consumptionThread;

        public Stressor(int messageSizeInKiB, int totalQueueLoadInKiB, string hostName, string userName, string password, TimeSpan connectionTimeout)
        {
            if (hostName == null) throw new ArgumentNullException("hostName");
            if (userName == null) throw new ArgumentNullException("userName");
            if (password == null) throw new ArgumentNullException("password");

            _messageSizeInKiB = messageSizeInKiB;
            _totalQueueLoadInKiB = totalQueueLoadInKiB;
            _connection = new ConnectionFactory
            {
                HostName = hostName,
                UserName = userName,
                Password = password,
                RequestedConnectionTimeout = (ushort)connectionTimeout.TotalMilliseconds
            }.CreateConnection();

            _junkMessageWrapper = new Lazy<byte[]>(() =>
            {
                var random = new Random();
                var message = new byte[_messageSizeInKiB * 1024];
                random.NextBytes(message);
                return message;
            });

            _consumptionThread = new Thread(Consume) {IsBackground = true};
            _consumptionThread.Start();
        }

        private readonly Lazy<Byte[]> _junkMessageWrapper;



        /// <summary>
        /// A consumer that doesn't take! We want to build up messagees then have them expire -- That's the behavior in question.
        /// </summary>
        private void Consume()
        {
            using (var channel = _connection.CreateModel())
            {
                channel.QueueDeclare(QueueName, false, false, false, new Dictionary<string, object> { { "x-expires", (int)TimeSpan.FromSeconds(60).TotalMilliseconds } });
                var consumer = new QueueingBasicConsumer(channel, new SharedQueue<BasicDeliverEventArgs>());
                channel.BasicConsume(QueueName, false, consumer);

                while (!_cancellationTokenSource.IsCancellationRequested)
                {
                    Thread.Sleep(TimeSpan.FromMilliseconds(100));
                }
            }
        }

        public void ApplyLoad()
        {
            using (var channel = _connection.CreateModel())
            {
                channel.ConfirmSelect();
                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);

                var count = _totalQueueLoadInKiB/_messageSizeInKiB + 1;

                Console.WriteLine("Publishing {0} messages, each {1} bytes in size", count, _junkMessageWrapper.Value.Length);

                for (int i = 0; i < count; i++)
                {
                    channel.BasicPublish(string.Empty, QueueName, properties, _junkMessageWrapper.Value);
                }

                channel.WaitForConfirms();
            }
        }
    }


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140515/ad0e4169/attachment.html>


More information about the rabbitmq-discuss mailing list