<html xmlns:v="urn:schemas-microsoft-com:vml" xmlns:o="urn:schemas-microsoft-com:office:office" xmlns:w="urn:schemas-microsoft-com:office:word" xmlns:m="http://schemas.microsoft.com/office/2004/12/omml" xmlns="http://www.w3.org/TR/REC-html40">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=us-ascii">
<meta name="Generator" content="Microsoft Word 15 (filtered medium)">
<style><!--
/* Font Definitions */
@font-face
{font-family:"Cambria Math";
panose-1:2 4 5 3 5 4 6 3 2 4;}
@font-face
{font-family:Calibri;
panose-1:2 15 5 2 2 2 4 3 2 4;}
@font-face
{font-family:Consolas;
panose-1:2 11 6 9 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
{margin:0in;
margin-bottom:.0001pt;
font-size:11.0pt;
font-family:"Calibri","sans-serif";}
a:link, span.MsoHyperlink
{mso-style-priority:99;
color:#0563C1;
text-decoration:underline;}
a:visited, span.MsoHyperlinkFollowed
{mso-style-priority:99;
color:#954F72;
text-decoration:underline;}
span.EmailStyle17
{mso-style-type:personal-compose;
font-family:"Calibri","sans-serif";
color:windowtext;}
.MsoChpDefault
{mso-style-type:export-only;}
@page WordSection1
{size:8.5in 11.0in;
margin:1.0in 1.0in 1.0in 1.0in;}
div.WordSection1
{page:WordSection1;}
--></style><!--[if gte mso 9]><xml>
<o:shapedefaults v:ext="edit" spidmax="1026" />
</xml><![endif]--><!--[if gte mso 9]><xml>
<o:shapelayout v:ext="edit">
<o:idmap v:ext="edit" data="1" />
</o:shapelayout></xml><![endif]-->
</head>
<body lang="EN-US" link="#0563C1" vlink="#954F72">
<div class="WordSection1">
<p class="MsoNormal">Hi,<o:p></o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal">While stress testing rabbit mq to resolve a separate performance issue, we noticed that rabbit mq would crash when a queue with a large number of messages expired. When we held the total size of the all of the messages constant, but varied
the number of messages we found that more messages meant a higher probability of the node dying.
<o:p></o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal">I’ve attached some standalone test code below that applies this load. I’ve found that the parameters need to be varied depending on the strength of the machine, but in all cases there is a point where messages can be built up and happily
served, but the expiry of the entire queue will fail.<o:p></o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal">I have tested this with Rabbit MQ 3.3.0 and RabbitMQ 3.2.4. Both of these clusters were running Erlang R16B03 on Windows Server 2008 R2.<o:p></o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal">It’s not clear to me why the nodes will die with this sort of load. Is this a known issue with RabbitMQ or the version of Erlang I am running on?<o:p></o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal">-Sreyan<o:p></o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas">class Program<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> static void Main(string[] args)<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var host = "127.0.0.1";<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> const int loadSize = 1024*1024;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var stressor = new Stressor(1, loadSize, host, "foo", "bar", TimeSpan.FromSeconds(10));<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> stressor.ApplyLoad();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> stressor.CancellationTokenSource.Cancel();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> Console.WriteLine( "Applied a load of {0} KiB. Press [ENTER] to exit.", loadSize);<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> Console.ReadLine();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> public sealed class Stressor<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private const string QueueName = "TEST";<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private readonly int _messageSizeInKiB;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private readonly int _totalQueueLoadInKiB;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private readonly IConnection _connection;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> public CancellationTokenSource CancellationTokenSource { get { return _cancellationTokenSource; } }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private readonly Thread _consumptionThread;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> public Stressor(int messageSizeInKiB, int totalQueueLoadInKiB, string hostName, string userName, string password, TimeSpan connectionTimeout)<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> if (hostName == null) throw new ArgumentNullException("hostName");<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> if (userName == null) throw new ArgumentNullException("userName");<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> if (password == null) throw new ArgumentNullException("password");<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> _messageSizeInKiB = messageSizeInKiB;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> _totalQueueLoadInKiB = totalQueueLoadInKiB;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> _connection = new ConnectionFactory<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> HostName = hostName,<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> UserName = userName,<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> Password = password,<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> RequestedConnectionTimeout = (ushort)connectionTimeout.TotalMilliseconds<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }.CreateConnection();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> _junkMessageWrapper = new Lazy<byte[]>(() =><o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var random = new Random();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var message = new byte[_messageSizeInKiB * 1024];<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> random.NextBytes(message);<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> return message;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> });<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> _consumptionThread = new Thread(Consume) {IsBackground = true};<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> _consumptionThread.Start();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private readonly Lazy<Byte[]> _junkMessageWrapper;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> /// <summary><o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> /// A consumer that doesn't take! We want to build up messagees then have them expire -- That's the behavior in question.<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> /// </summary><o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> private void Consume()<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> using (var channel = _connection.CreateModel())<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> channel.QueueDeclare(QueueName, false, false, false, new Dictionary<string, object> { { "x-expires", (int)TimeSpan.FromSeconds(60).TotalMilliseconds } });<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var consumer = new QueueingBasicConsumer(channel, new SharedQueue<BasicDeliverEventArgs>());<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> channel.BasicConsume(QueueName, false, consumer);<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> while (!_cancellationTokenSource.IsCancellationRequested)<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> Thread.Sleep(TimeSpan.FromMilliseconds(100));<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> public void ApplyLoad()<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> using (var channel = _connection.CreateModel())<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> channel.ConfirmSelect();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var properties = channel.CreateBasicProperties();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> properties.SetPersistent(true);<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> var count = _totalQueueLoadInKiB/_messageSizeInKiB + 1;<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> Console.WriteLine("Publishing {0} messages, each {1} bytes in size", count, _junkMessageWrapper.Value.Length);<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> for (int i = 0; i < count; i++)<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> {<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> channel.BasicPublish(string.Empty, QueueName, properties, _junkMessageWrapper.Value);<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"><o:p> </o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> channel.WaitForConfirms();<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><span style="font-size:9.0pt;font-family:Consolas"> }<o:p></o:p></span></p>
<p class="MsoNormal"><o:p> </o:p></p>
<p class="MsoNormal"><o:p> </o:p></p>
</div>
</body>
</html>