Hi there,<div><br></div><div>I'm having an issues where *sometimes* messages that appear to be successfully pushed to RabbitMQ are not delivered to any subscribers.</div><div><br></div><div>I am using RabbitMQ Server 2.4 and the .NET client 2.8.7.0.</div><div><br></div><div>I am using the default exchange (""), which sounds adequate for our needs right now, but thought I'd point it out incase I'm not using it the way it was intended.</div><div><br></div><div>In the server I can see the SubmitDeploymentToQueue() method always completes successfully, so it appears messages are <i>always</i> being submitted to RabbitMQ, but in the Consume() method of the consumer, the consumer.Queue.Dequeue() method <i>sometimes</i> does not return a message, so the next line (the console.writeline) is never called etc. and it will just sit there blocking.</div><div><br></div><div>In this situation where messages appear to be lost, restarting the consumers doesn't fix it, the messages are simply never delivered and seem to be lost forever.</div><div><br></div><div>I have attached the code below, inline, I hope this is OK.</div><div><br></div><div><br></div><div><b><font size="4">Server:</font></b></div><div>
<blockquote>//<br>// Copyright (c) Appsecute 2011-2012 - ALL RIGHTS RESERVED.<br>//<br><span class="s1">using</span> System;<br><span class="s1">using</span> RabbitMQ.Client;<br><span class="s1">using</span> System.Threading;<br><span class="s1">using</span> AppsecuteKernel.Logging;<br><span class="s1">using</span> AppsecuteServerFramework.Database;<br><span class="s1">using</span> AppsecuteKernel.Documents.Deployments;<br><span class="s1">using</span> AppsecuteKernel.Operations.Deployment;<br><span class="s1">using</span> AppsecuteKernel.Operations.Applications;<br><span class="s1">using</span> AppsecuteKernel.Operations.Notifications;<br><span class="s1">using</span> RabbitMQ.Client.Exceptions;<br><span class="s1">namespace</span> AppsecuteDeploymentScheduler<br>{<br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Periodically inspects the database and submits deployments to the queue once they have met the appropriate criteria.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span><span class="s1">public</span><span class="s2"> </span><span class="s1">class</span><span class="s2"> </span>DeploymentScheduler<span class="s2"> : </span>IDisposable<br> {<br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Rabbit MQ properties.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s5">IModel</span> _channel;<br> <span class="s1">private</span> <span class="s5">IConnection</span> _connection;<br> <br> <br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> A wait handle that other threads can use to wait on the scheduler to finish<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">public</span> <span class="s5">ManualResetEvent</span> SchedulerWaitHandle = <span class="s1">new</span> <span class="s5">ManualResetEvent</span>(<span class="s1">false</span>);<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> A logger for the scheduler to use.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span><span class="s1">public</span><span class="s2"> </span>PlatformLogger<span class="s2"> Logger = </span><span class="s1">new</span><span class="s2"> </span>PlatformLogger<span class="s2">(</span><span class="s1">typeof</span><span class="s2">(</span>DeploymentScheduler<span class="s2">));</span><br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> The time in milliseconds the scheduler should sleep for between finishing work<br><span class="s2"> </span><span class="s4">///</span> and waking up to check/process new work.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">const</span> <span class="s1">int</span> SLEEP_MILLISECONDS = 5000;<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> A timer used to peridoically wake up and poll for deployments to queue.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s5">Timer</span> _pollDeploymentsTimer;<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Creates a new deployment scheduler.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">public</span> DeploymentScheduler()<br> {<br> WriteLine(<span class="s6">"Starting scheduler"</span>);<br> OpenConnection();<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Opens the connection to RabbitMQ.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">void</span> OpenConnection()<br> {<br> <span class="s1">var</span> connectionFactory = <span class="s1">new</span> <span class="s5">ConnectionFactory</span> { uri = <span class="s5">ApplicationDeploymentOperations</span>.GetAmqpConnectionUri() };<br><span class="s2"> WriteLine(</span>"Opening RabbitMQ connection..."<span class="s2">);</span><br> _connection = connectionFactory.CreateConnection();<br> <span class="s1">if</span> (_connection.IsOpen)<br> {<br> WriteLine(<span class="s6">"Connection opened."</span>);<br> _connection = connectionFactory.CreateConnection();<br> _channel = _connection.CreateModel();<br> _channel.QueueDeclare(<span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_QUEUE_NAME, <span class="s1">true</span>, <span class="s1">false</span>, <span class="s1">false</span>, <span class="s1">null</span>);<br> <span class="s5">ThreadPool</span>.QueueUserWorkItem(ProcessDeployments);<br> WriteLine(<span class="s6">"Scheduler started."</span>);<br> }<br> <span class="s1">else<br></span> {<br> WriteLine(<span class="s6">"Scheduler failed to start."</span>);<br> }<br> }<br><br> <span class="s1">private</span> <span class="s1">void</span> WriteLine(<span class="s1">string</span> message)<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"</span><span class="s7">{0}</span><span class="s6">: </span><span class="s7">{1}</span><span class="s6">"</span>, <span class="s5">Thread</span>.CurrentThread.ManagedThreadId, message);<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Schedules the scheduler to wake up at a later time to poll the database.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">void</span> Reschedule()<br> {<br> WriteLine(<span class="s6">"Rescheduling"</span>);<br> <span class="s1">if</span> (_pollDeploymentsTimer != <span class="s1">null</span>)<br> {<br> _pollDeploymentsTimer.Dispose();<br> }<br> _pollDeploymentsTimer = <span class="s1">new</span> <span class="s5">Timer</span>(ProcessDeployments, <span class="s1">new</span> <span class="s1">object</span>(), SLEEP_MILLISECONDS, 0);<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Evaluates deployments and queues them to be deployed if appropriate.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">void</span> ProcessDeployments(<span class="s1">object</span> state)<br> {<br> <span class="s5">Console</span>.WriteLine();<br> WriteLine(<span class="s6">"Processing deployments"</span>);<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> deployments = <span class="s5">ApplicationDeploymentDatabaseOperations</span>.GetApplicationDeploymentsToQueue();<br><span class="s2"> </span>// If there is any work to do then do it, otherwise reschedule this worker<br> <span class="s1">if</span> (deployments.Count > 0)<br> {<br> <span class="s1">foreach</span> (<span class="s1">var</span> deploymentData <span class="s1">in</span> deployments)<br> {<br> ProcessDeployment(deploymentData);<br> }<br> WriteLine(<span class="s6">"Scheduling completed"</span>);<br><span class="s2"> </span><span class="s3">// </span><b>TODO: If these workers are kept busy (i.e They're continuously working instead of sleeping/waking) there is a chance this recursive calling could exceed the stack size<br></b> ProcessDeployments(<span class="s1">null</span>);<br> }<br> <span class="s1">else<br></span> {<br> Reschedule();<br> }<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> WriteLine(<span class="s1">string</span>.Format(<span class="s6">"Caught exception processing deployments: </span><span class="s7">{0}</span><span class="s6">"</span>, e.Message));<br> LogException(<span class="s1">null</span>, e);<br> Reschedule();<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Processes a deployment, updating the status, raising notifications and submitting to the deployment queue<br><span class="s2"> </span><span class="s4">///</span> as appropriate.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="deploymentData"></param><br> <span class="s1">private</span> <span class="s1">void</span> ProcessDeployment(<span class="s5">ApplicationDeploymentData</span> deploymentData)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s5">MySqlLocking</span>.TryRun(<br> <span class="s5">MySqlWrapper</span>.PLATFORM_DATABASE_NAME,<br> <span class="s1">string</span>.Format(<span class="s6">"deployment-scheduler-</span><span class="s7">{0}</span><span class="s6">"</span>, deploymentData.Id),<br> <span class="s1">new</span> <span class="s5">TimeSpan</span>(0, 0, 0, 5),<br> api =><br> {<br><span class="s2"> </span>// If the deployment should now wait for manual start update the status<br> <span class="s1">if</span> (deploymentData.DeploymentStatus.Equals(<span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_STATUS_WAITING_FOR_SCHEDULED_TIME) &&<br> deploymentData.IsManualStart)<br> {<br><span class="s2"> </span>// Run in a transaction so that if raising notifications fails, the status update will be rolled back<br> <span class="s5">MySqlWrapper</span>.RunInNewTransaction(<br> <span class="s5">MySqlWrapper</span>.PLATFORM_DATABASE_NAME,<br> database =><br> {<br><span class="s2"> </span>// Update the status first, to effectivley reserve this deployment<br> <span class="s5">ApplicationDeploymentOperations</span>.UpdateDeploymentStatusAtomic(<br> deploymentData.Id,<br> deploymentData.DeploymentStatus,<br> <span class="s5">ApplicationDeploymentOperations</span>.<br> DEPLOYMENT_STATUS_WAITING_FOR_MANUAL_START);<br> RaiseDeploymentWaitingOnManualStartNotification(deploymentData);<br> });<br> WriteLine(<span class="s1">string</span>.Format(<span class="s6">"Deployment with id </span><span class="s7">{0}</span><span class="s6"> is now waiting on manual start."</span>, deploymentData.Id));<br> Logger.Info(<span class="s1">string</span>.Format(<span class="s6">"Deployment with id </span><span class="s7">{0}</span><span class="s6"> is now waiting on manual start."</span>, deploymentData.Id));<br> }<br> <span class="s1">else<br></span> {<br><span class="s2"> </span>// Run in a transaction so that if submitting to queue fails, the status update will be rolled back<br> <span class="s5">MySqlWrapper</span>.RunInNewTransaction(<br> <span class="s5">MySqlWrapper</span>.PLATFORM_DATABASE_NAME,<br> database =><br> {<br><span class="s2"> </span>// Update the status first, to effectivley reserve this deployment<br> <span class="s5">ApplicationDeploymentOperations</span>.UpdateDeploymentStatusAtomic(<br> deploymentData.Id,<br> deploymentData.DeploymentStatus,<br> <span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_STATUS_QUEUED_FOR_DEPLOYMENT);<br> <span class="s3">// Submit the deployment to the queue<br></span> SubmitDeploymentToQueue(deploymentData);<br> }<br> );<br> <br> WriteLine(<span class="s1">string</span>.Format(<span class="s6">"Deployment with id </span><span class="s7">{0}</span><span class="s6"> has been submitted to the deployment queue."</span>, deploymentData.Id));<br> Logger.Info(<span class="s1">string</span>.Format(<span class="s6">"Deployment with id </span><span class="s7">{0}</span><span class="s6"> has been submitted to the deployment queue."</span>, deploymentData.Id));<br> }<br> });<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> LogException(deploymentData, e);<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Submits a deployment to the RabbitMQ message queue so that it will be processed by a background worker.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="deploymentData"><span class="s3">The deployment to queue.</span></param><br> <span class="s1">private</span> <span class="s1">void</span> SubmitDeploymentToQueue(<span class="s5">ApplicationDeploymentData</span> deploymentData)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> basicProperties = _channel.CreateBasicProperties();<br> basicProperties.SetPersistent(<span class="s1">true</span>);<br> _channel.BasicPublish(<span class="s1">string</span>.Empty, <span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_QUEUE_NAME, basicProperties, <span class="s5">BitConverter</span>.GetBytes(deploymentData.Id));<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<br><span class="s2"> </span><span class="s5">Console</span><span class="s2">.WriteLine(</span>"Detected broken connection to RabbitMQ, restablishing..."<span class="s2">);</span><br> Dispose();<br> OpenConnection();<br> <span class="s1">throw</span>;<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Raises a notification for a deployment that is waiting on manual start.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="applicationDeploymentData"><span class="s3">The deployment that is waiting on manual start.</span></param><br> <span class="s1">private</span> <span class="s1">void</span> RaiseDeploymentWaitingOnManualStartNotification(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> applicationData = <span class="s5">ApplicationOperations</span>.GetApplication(applicationDeploymentData.ApplicationId);<br> <span class="s5">NotificationOperations</span>.RaiseNotification(<br> applicationDeploymentData.SubmitterLegalEntityId,<br> <span class="s5">NotificationCode</span>.ApplicationDeploymentWaitingForManualStart,<br> <span class="s5">NotificationSeverity</span>.Warning,<br><span class="s2"> </span>"Deployment waiting on manual start"<span class="s2">,<br></span><span class="s2"> </span><span class="s1">string</span><span class="s2">.Format(</span>"Deployment to <span class="s7">{0}</span> scheduled for <span class="s7">{1}</span> is now waiting to be manually started."<span class="s2">,<br></span> applicationData.DisplayName,<br> applicationDeploymentData.ScheduledStartDate),<br> applicationDeploymentData.Id,<br> <span class="s1">true</span>);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<span class="s3">/* eat it*/</span>}<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Logs an exception, including inner exceptions.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">internal</span> <span class="s1">void</span> LogException(<span class="s5">ApplicationDeploymentData</span> deploymentData, <span class="s5">Exception</span> exception, <span class="s1">bool</span> inner = <span class="s1">false</span>, <span class="s1">string</span> innerMessage = <span class="s6">""</span>)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s1">string</span> errorMessage;<br> <span class="s1">if</span> (!inner)<br> {<br> <span class="s1">if</span> (deploymentData != <span class="s1">null</span>)<br> {<br> errorMessage = <span class="s1">string</span>.Format(<br><span class="s2"> </span>"Error caught executing deployment with id <span class="s7">{0}</span> for application <span class="s7">{1}</span>:"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe exception was:\r\n\t<span class="s7">{2}</span>"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe stacktrace was:\r\n<span class="s7">{3}</span>\r\n"<span class="s2">,<br></span> deploymentData.Id,<br> deploymentData.ApplicationId,<br> exception.Message,<br> exception.StackTrace);<br> }<br> <span class="s1">else<br></span> {<br> errorMessage = <span class="s1">string</span>.Format(<br> <span class="s6">"Error caught"</span> +<br><span class="s2"> </span>"\r\n\r\nThe exception was:\r\n\t<span class="s7">{0}</span>"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe stacktrace was:\r\n<span class="s7">{1}</span>\r\n"<span class="s2">,<br></span> exception.Message,<br> exception.StackTrace);<br> }<br> }<br> <span class="s1">else<br></span> {<br> errorMessage = innerMessage + <span class="s1">string</span>.Format(<br><span class="s2"> </span>"\r\nThe inner exception was:\r\n\t<span class="s7">{0}</span>"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe inner stacktrace was:\r\n<span class="s7">{1}</span>\r\n"<span class="s2">,<br></span> exception.Message,<br> exception.StackTrace);<br> }<br> <span class="s1">if</span> (exception.InnerException != <span class="s1">null</span>)<br> {<br> LogException(deploymentData, exception.InnerException, <span class="s1">true</span>, errorMessage);<br> }<br> <span class="s1">else<br></span> {<br> Logger.Error(errorMessage);<br> }<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<span class="s3">/* eat it */</span>}<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Disposes of the scheduler and frees up resources.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">public</span> <span class="s1">void</span> Dispose()<br> {<br> <span class="s1">if</span> (_channel != <span class="s1">null</span>)<br> {<br> _channel.Abort();<br> }<br> <span class="s1">if</span> (_connection != <span class="s1">null</span>)<br> {<br> <span class="s1">try<br></span> {<br> _connection.Close();<br> }<br><span class="s2"> </span><span class="s1">catch</span><span class="s2"> (</span>AlreadyClosedException<span class="s2">)<br></span> {<br> <span class="s3">/* Eat it */<br></span> }<br> }<br> }<br> }<br>}</blockquote>
</div><div><br></div><div><b><font size="4">Consumer:</font></b></div><div><br></div><div>
<blockquote>//<br>// Copyright (c) Appsecute 2011-2012 - ALL RIGHTS RESERVED.<br>//<br><span class="s1">using</span> System;<br><span class="s1">using</span> System.IO;<br><span class="s1">using</span> RabbitMQ.Client;<br><span class="s1">using</span> AppsecuteKernel.Logging;<br><span class="s1">using</span> AppsecuteServerFramework.Exceptions;<br><span class="s1">using</span> AppsecuteKernel.Documents.Deployments;<br><span class="s1">using</span> AppsecuteKernel.Operations.Deployment;<br><span class="s1">using</span> AppsecuteKernel.Operations.Applications;<br><span class="s1">using</span> AppsecuteKernel.Operations.Notifications;<br><span class="s1">using</span> RabbitMQ.Client.Events;<br><span class="s1">using</span> RabbitMQ.Client.Exceptions;<br><span class="s1">namespace</span> AppsecuteDeploymentWorker<br>{<br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> A deployment worker that receives deployment jobs from RabbitMQ.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span><span class="s1">public</span><span class="s2"> </span><span class="s1">class</span><span class="s2"> </span>DeploymentWorker<span class="s2"> : </span>IDisposable<br> {<br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Rabbit MQ properties<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s5">IModel</span> _channel;<br> <span class="s1">private</span> <span class="s5">IConnection</span> _connection;<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> A logger for the deployment worker to use.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span><span class="s1">public</span><span class="s2"> </span>PlatformLogger<span class="s2"> Logger = </span><span class="s1">new</span><span class="s2"> </span>PlatformLogger<span class="s2">(</span><span class="s1">typeof</span><span class="s2">(</span>DeploymentWorker<span class="s2">));</span><br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Creates a new deployment worker.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">public</span> DeploymentWorker()<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Starting worker"</span>);<br> OpenConnection();<br> Consume();<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Opens the connection to RabbitMQ and sets up the channel.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">void</span> OpenConnection()<br> {<br> <span class="s1">var</span> connectionFactory = <span class="s1">new</span> <span class="s5">ConnectionFactory<br></span> {<br> uri = <span class="s5">ApplicationDeploymentOperations</span>.GetAmqpConnectionUri(),<br> RequestedHeartbeat = 10<br> };<br><span class="s2"> </span><span class="s5">Console</span><span class="s2">.WriteLine(</span>"Opening RabbitMQ connection..."<span class="s2">);</span><br> _connection = connectionFactory.CreateConnection();<br> <span class="s1">if</span> (_connection.IsOpen)<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Connection opened."</span>);<br> _channel = _connection.CreateModel();<br> _channel.BasicQos(0, 1, <span class="s1">false</span>);<br> _channel.QueueDeclare(<span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_QUEUE_NAME, <span class="s1">true</span>, <span class="s1">false</span>, <span class="s1">false</span>, <span class="s1">null</span>);<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Deployment worker started."</span>);<br> }<br> <span class="s1">else<br></span> {<br><span class="s2"> </span><span class="s5">Console</span><span class="s2">.WriteLine(</span>"Deployment worker failed to start."<span class="s2">);<br></span> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Starts the synchronus consumption of queued deployments.<br><span class="s2"> </span><span class="s4">///</span> This method will only return if a critical exception occurs.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">void</span> Consume()<br> {<br> <span class="s1">var</span> consumer = <span class="s1">new</span> <span class="s5">QueueingBasicConsumer</span>(_channel);<br> _channel.BasicConsume(<span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_QUEUE_NAME, <span class="s1">false</span>, consumer);<br> <span class="s1">while</span> (<span class="s1">true</span>)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Waiting for next job..."</span>);<br> <span class="s1">var</span> messageEvent = (<span class="s5">BasicDeliverEventArgs</span>)consumer.Queue.Dequeue();<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Processing new job..."</span>);<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> applicationDeploymentId = <span class="s5">BitConverter</span>.ToInt64(messageEvent.Body, 0);<br> <span class="s1">var</span> applicationDeployment = <span class="s5">ApplicationDeploymentDatabaseOperations</span>.GetApplicationDeployment(applicationDeploymentId);<br> DoApplicationDeployment(applicationDeployment);<br> _channel.BasicAck(messageEvent.DeliveryTag, <span class="s1">false</span>);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> _channel.BasicNack(messageEvent.DeliveryTag, <span class="s1">false</span>, <span class="s1">false</span>);<br> LogException(<span class="s1">null</span>, e);<br> }<br> }<br> <span class="s1">catch</span> (<span class="s5">EndOfStreamException</span> e)<br> {<br><span class="s2"> </span><span class="s5">Console</span><span class="s2">.WriteLine(</span>"Detected broken connection to RabbitMQ, restablishing..."<span class="s2">);</span><br> LogException(<span class="s1">null</span>, e);<br> Dispose();<br> OpenConnection();<br> Consume();<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> LogException(<span class="s1">null</span>, e);<br> <span class="s1">break</span>;<br> }<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Executes an application deployment synchronusly.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="applicationDeploymentData"><span class="s3">The application deployment to execute.</span></param><br> <span class="s1">private</span> <span class="s1">void</span> DoApplicationDeployment(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Running deployment with id </span><span class="s7">{0}</span><span class="s6">..."</span>, applicationDeploymentData.Id);<br> Logger.Info(<span class="s1">string</span>.Format(<span class="s6">"Running deployment with id </span><span class="s7">{0}</span><span class="s6">..."</span>, applicationDeploymentData.Id));<br> <span class="s5">ApplicationDeploymentOperations</span>.SetActualDeploymentStartDate(applicationDeploymentData.Id, <span class="s5">DateTime</span>.UtcNow);<br> ConfirmDeploymentShouldStart(applicationDeploymentData);<br> <span class="s5">ApplicationDeploymentOperations</span>.UpdateDeploymentStatusAtomic(<br> applicationDeploymentData.Id, <br> applicationDeploymentData.DeploymentStatus,<br> <span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_STATUS_DEPLOYING);<br> <span class="s1">var</span> packagePath = DownloadApplicationDeploymentPackage(applicationDeploymentData);<br> <span class="s1">var</span> stagingDirectory = StageApplicationDeploymentPackage(applicationDeploymentData, packagePath);<br> PushApplicationDeploymentPackage(applicationDeploymentData, stagingDirectory);<br> CleanUpApplicationDeploymentPackage(packagePath, stagingDirectory);<br> <span class="s5">ApplicationDeploymentOperations</span>.UpdateDeploymentStatusAtomic(<br> applicationDeploymentData.Id,<br> <span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_STATUS_DEPLOYING,<br> <span class="s5">ApplicationDeploymentOperations</span>.DEPLOYMENT_STATUS_DEPLOYMENT_SUCCESSFUL);<br> <span class="s5">ApplicationDeploymentOperations</span>.SetActualDeploymentEndDate(applicationDeploymentData.Id, <span class="s5">DateTime</span>.UtcNow);<br> RaiseDeploymentSuceededNotification(applicationDeploymentData);<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Deployment succeeded."</span>);<br> Logger.Info(<span class="s6">"Deployment succeeded."</span>);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> LogException(applicationDeploymentData, e);<br> <span class="s5">ApplicationDeploymentOperations</span>.MarkDeploymentAsFailed(applicationDeploymentData.Id, e.Message);<br> <span class="s5">ApplicationDeploymentOperations</span>.SetActualDeploymentEndDate(applicationDeploymentData.Id, <span class="s5">DateTime</span>.UtcNow);<br> RaiseDeploymentFailedNotification(applicationDeploymentData, e.Message);<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Deployment failed."</span>);<br> <span class="s1">throw</span>;<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Confirms that a deployment should start based on the current time and available deployment window.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="deploymentData"><span class="s3">The deployment to check.</span></param><br> <span class="s1">private</span> <span class="s1">void</span> ConfirmDeploymentShouldStart(<span class="s5">ApplicationDeploymentData</span> deploymentData)<br> {<br> <span class="s1">if</span>((<span class="s5">DateTime</span>.UtcNow - <span class="s1">new</span> <span class="s5">TimeSpan</span>(0, 5, 0)) > deploymentData.ScheduledEndDate)<br> {<br> <span class="s1">throw</span> <span class="s1">new</span> <span class="s5">AppSnapException</span>(<br> <span class="s1">string</span>.Format(<br><span class="s2"> </span>"Unable to start as deployment window has passed or it was determined that deployment wouldn't complete within the window."<span class="s2">));<br></span> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Raises a notification for a failed deployment.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="applicationDeploymentData"><span class="s3">The deployment that failed.</span></param><br><span class="s2"> </span><span class="s4">///</span> <span class="s4"><param name="failureReason"></span>A human readable reason of why the deploy failed.<span class="s4"></param><br></span> <span class="s1">private</span> <span class="s1">void</span> RaiseDeploymentFailedNotification(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData, <span class="s1">string</span> failureReason)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> applicationData = <span class="s5">ApplicationOperations</span>.GetApplication(applicationDeploymentData.ApplicationId);<br> <span class="s5">NotificationOperations</span>.RaiseNotification(<br> applicationData.OwnerLegalEntityId,<br> <span class="s5">NotificationCode</span>.ApplicationDeploymentFailed,<br> <span class="s5">NotificationSeverity</span>.Critical,<br> <span class="s6">"Deploy failed"</span>,<br><span class="s2"> </span><span class="s1">string</span><span class="s2">.Format(</span>"Deployment to <span class="s7">{0}</span> scheduled for <span class="s7">{1}</span> failed with error \"<span class="s7">{2}</span>\"."<span class="s2">,<br></span> applicationData.DisplayName,<br> applicationDeploymentData.ScheduledStartDate,<br> failureReason),<br> applicationDeploymentData.Id,<br> <span class="s1">true</span>);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<span class="s3">/* eat it */</span>}<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Raises a notification for a successful deployment.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="applicationDeploymentData"><span class="s3">The deployment that succeeded.</span></param><br> <span class="s1">private</span> <span class="s1">void</span> RaiseDeploymentSuceededNotification(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> applicationData = <span class="s5">ApplicationOperations</span>.GetApplication(applicationDeploymentData.ApplicationId);<br> <span class="s5">NotificationOperations</span>.RaiseNotification(<br> applicationData.OwnerLegalEntityId,<br> <span class="s5">NotificationCode</span>.ApplicationDeploymentFailed,<br> <span class="s5">NotificationSeverity</span>.Info,<br> <span class="s6">"Deploy succeeded"</span>,<br><span class="s2"> </span><span class="s1">string</span><span class="s2">.Format(</span>"Deployment to <span class="s7">{0}</span> scheduled for <span class="s7">{1}</span> completed successfully."<span class="s2">,<br></span> applicationData.DisplayName,<br> applicationDeploymentData.ScheduledStartDate),<br> applicationDeploymentData.Id,<br> <span class="s1">true</span>);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<span class="s3">/* eat it */</span>}<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Downloads and verifies an application deployment package to a local directory.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span>///<span class="s3"> </span><param name="applicationDeploymentData"></param><br><span class="s2"> </span><span class="s4">///</span> <span class="s4"><returns></span>The full path to the local deployment package once downloaded.<span class="s4"></returns><br></span> <span class="s1">private</span> <span class="s1">string</span> DownloadApplicationDeploymentPackage(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData)<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Downloading..."</span>);<br> <span class="s1">var</span> downloadedPackagePath = <span class="s5">DeploymentPackageDownloadClient</span>.DownloadDeploymentPackage(<span class="s1">new</span> <span class="s5">Uri</span>(applicationDeploymentData.DeploymentPackageDownloadUrl));<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> computedHash = <span class="s5">DeploymentFileUtils</span>.ComputeDeploymentPackageHash(downloadedPackagePath);<br> <span class="s1">if</span>(applicationDeploymentData.DeploymentPackageHash == <span class="s1">null</span> || computedHash.Equals(applicationDeploymentData.DeploymentPackageHash))<br> {<br> <span class="s1">return</span> downloadedPackagePath;<br> }<br> <span class="s1">else<br></span> {<br><span class="s2"> </span><span class="s1">throw</span><span class="s2"> </span><span class="s1">new</span><span class="s2"> </span>AppSnapInvalidDataException<span class="s2">(<br></span><span class="s2"> </span>"Integrity check of deployment package failed. MD5 hashes do not match. Deployment package possibly corrupt or incomplete."<span class="s2">);<br></span> }<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> CleanUpApplicationDeploymentPackage(downloadedPackagePath, <span class="s1">null</span>);<br><span class="s2"> </span><span class="s1">throw</span><span class="s2"> </span><span class="s1">new</span><span class="s2"> </span><span class="s5">AppSnapException</span><span class="s2">(</span>"Failed to download deployment package. Fatal error."<span class="s2">, e);<br></span> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Extracts a local deployment package to a temp directory.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span><span class="s4">///</span> <span class="s4"><returns></span>The full path to the directory the deployment package was extracted to.<span class="s4"></returns><br></span> <span class="s1">private</span> <span class="s1">string</span> StageApplicationDeploymentPackage(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData, <span class="s1">string</span> downloadedPackagePath)<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Staging..."</span>);<br> Logger.Info(<span class="s6">"Staging..."</span>);<br> <span class="s1">try<br></span> {<br> <span class="s1">return</span> <span class="s5">DeploymentFileUtils</span>.ExtractDeploymentPackage(downloadedPackagePath);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br> CleanUpApplicationDeploymentPackage(downloadedPackagePath, <span class="s1">null</span>);<br><span class="s2"> </span><span class="s1">throw</span><span class="s2"> </span><span class="s1">new</span><span class="s2"> </span><span class="s5">AppSnapException</span><span class="s2">(</span>"Failed to stage deployment. Fatal error."<span class="s2">, e);<br></span> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Pushes an application deployment package to the cloud.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">private</span> <span class="s1">void</span> PushApplicationDeploymentPackage(<span class="s5">ApplicationDeploymentData</span> applicationDeploymentData, <span class="s1">string</span> stagingPath)<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Pushing..."</span>);<br> Logger.Info(<span class="s6">"Pushing..."</span>);<br> <span class="s1">try<br></span> {<br> <span class="s1">var</span> applicationData = <span class="s5">ApplicationOperations</span>.GetApplication(applicationDeploymentData.ApplicationId);<br> <span class="s5">DeploymentPusher</span>.UploadApplication(applicationData.CloudLoginId, applicationData, stagingPath);<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<br> CleanUpApplicationDeploymentPackage(<span class="s1">null</span>, stagingPath);<br> <span class="s1">throw</span>;<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Cleans up all temp files from an application deployment.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br><span class="s2"> </span><span class="s4">///</span> <span class="s4"><param name="downloadPackagePath"></span>The full path to the downloaded deployment package.<span class="s4"></param><br></span><span class="s2"> </span><span class="s4">///</span> <span class="s4"><param name="stagingPath"></span>The full path to the staging directory that contains the extracted deployment package.<span class="s4"></param><br></span> <span class="s1">private</span> <span class="s1">void</span> CleanUpApplicationDeploymentPackage(<span class="s1">string</span> downloadPackagePath, <span class="s1">string</span> stagingPath)<br> {<br> <span class="s5">Console</span>.WriteLine(<span class="s6">"Cleaning up..."</span>);<br> Logger.Info(<span class="s6">"Cleaning up..."</span>);<br> <span class="s1">try<br></span> {<br> <span class="s1">if</span> (!<span class="s1">string</span>.IsNullOrWhiteSpace(downloadPackagePath))<br> {<br> <span class="s5">DeploymentFileUtils</span>.DeleteFile(downloadPackagePath);<br> }<br> <span class="s1">if</span> (!<span class="s1">string</span>.IsNullOrWhiteSpace(stagingPath))<br> {<br> <span class="s5">DeploymentFileUtils</span>.DeleteDirectory(stagingPath);<br> }<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span> e)<br> {<br><span class="s2"> Logger.Error(</span><span class="s1">string</span><span class="s2">.Format(</span>"Failed to clean up deployment at <span class="s7">{0}</span> and <span class="s7">{1}</span>. Error was: <span class="s7">{2}</span>"<span class="s2">,<br></span> downloadPackagePath,<br> stagingPath,<br> e.Message));<br> }<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Logs an exception, including inner exceptions.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">internal</span> <span class="s1">void</span> LogException(<span class="s5">ApplicationDeploymentData</span> deploymentData, <span class="s5">Exception</span> exception, <span class="s1">bool</span> inner = <span class="s1">false</span>, <span class="s1">string</span> innerMessage = <span class="s6">""</span>)<br> {<br> <span class="s1">try<br></span> {<br> <span class="s1">string</span> errorMessage;<br> <span class="s1">if</span> (!inner)<br> {<br> <span class="s1">if</span> (deploymentData != <span class="s1">null</span>)<br> {<br> errorMessage = <span class="s1">string</span>.Format(<br><span class="s2"> </span>"Error caught executing deployment with id <span class="s7">{0}</span> for application <span class="s7">{1}</span>:"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe exception was:\r\n\t<span class="s7">{2}</span>"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe stacktrace was:\r\n<span class="s7">{3}</span>\r\n"<span class="s2">,<br></span> deploymentData.Id,<br> deploymentData.ApplicationId,<br> exception.Message,<br> exception.StackTrace);<br> }<br> <span class="s1">else<br></span> {<br> errorMessage = <span class="s1">string</span>.Format(<br> <span class="s6">"Error caught"</span> +<br><span class="s2"> </span>"\r\n\r\nThe exception was:\r\n\t<span class="s7">{0}</span>"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe stacktrace was:\r\n<span class="s7">{1}</span>\r\n"<span class="s2">,<br></span> exception.Message,<br> exception.StackTrace);<br> }<br> }<br> <span class="s1">else<br></span> {<br> errorMessage = innerMessage + <span class="s1">string</span>.Format(<br><span class="s2"> </span>"\r\nThe inner exception was:\r\n\t<span class="s7">{0}</span>"<span class="s2"> +<br></span><span class="s2"> </span>"\r\n\r\nThe inner stacktrace was:\r\n<span class="s7">{1}</span>\r\n"<span class="s2">,<br></span> exception.Message,<br> exception.StackTrace);<br> }<br> <span class="s1">if</span> (exception.InnerException != <span class="s1">null</span>)<br> {<br> LogException(deploymentData, exception.InnerException, <span class="s1">true</span>, errorMessage);<br> }<br> <span class="s1">else<br></span> {<br> Logger.Error(errorMessage);<br> }<br> }<br> <span class="s1">catch</span> (<span class="s5">Exception</span>)<br> {<span class="s3">/* eat it */</span>}<br> }<br><br><span class="s2"> </span>///<span class="s3"> </span><summary><br><span class="s2"> </span><span class="s4">///</span> Disposes of the deployment worker and frees up resources.<br><span class="s2"> </span>///<span class="s3"> </span></summary><br> <span class="s1">public</span> <span class="s1">void</span> Dispose()<br> {<br> <span class="s1">if</span> (_channel != <span class="s1">null</span>)<br> {<br> _channel.Abort();<br> }<br> <span class="s1">if</span> (_connection != <span class="s1">null</span>)<br> {<br> <span class="s1">try<br></span> {<br> _connection.Close();<br> }<br><span class="s2"> </span><span class="s1">catch</span><span class="s2"> (</span>AlreadyClosedException<span class="s2">)<br></span> {<br> <span class="s3">/* Eat it */<br></span> }<br> }<br> }<br> }<br>}</blockquote>
</div>