[rabbitmq-discuss] Persistent messages on durable queue sometimes not delivered to consumers

Tyler Power tyler at appsecute.com
Fri Oct 26 00:50:26 BST 2012


Hi there,

I'm having an issues where *sometimes* messages that appear to be 
successfully pushed to RabbitMQ are not delivered to any subscribers.

I am using RabbitMQ Server 2.4 and the .NET client 2.8.7.0.

I am using the default exchange (""), which sounds adequate for our needs 
right now, but thought I'd point it out incase I'm not using it the way it 
was intended.

In the server I can see the SubmitDeploymentToQueue() method always 
completes successfully, so it appears messages are *always* being submitted 
to RabbitMQ, but in the Consume() method of the consumer, 
the consumer.Queue.Dequeue() method *sometimes* does not return a message, 
so the next line (the console.writeline) is never called etc. and it 
will just sit there blocking.

In this situation where messages appear to be lost, restarting the 
consumers doesn't fix it, the messages are simply never delivered and seem 
to be lost forever.

I have attached the code below, inline, I hope this is OK.


*Server:*

//
// Copyright (c) Appsecute 2011-2012 - ALL RIGHTS RESERVED.
//
using System;
using RabbitMQ.Client;
using System.Threading;
using AppsecuteKernel.Logging;
using AppsecuteServerFramework.Database;
using AppsecuteKernel.Documents.Deployments;
using AppsecuteKernel.Operations.Deployment;
using AppsecuteKernel.Operations.Applications;
using AppsecuteKernel.Operations.Notifications;
using RabbitMQ.Client.Exceptions;
namespace AppsecuteDeploymentScheduler
{
    /// <summary>
    /// Periodically inspects the database and submits deployments to the 
queue once they have met the appropriate criteria.
    /// </summary>
    public class DeploymentScheduler : IDisposable
    {
        /// <summary>
        /// Rabbit MQ properties.
        /// </summary>
        private IModel _channel;
        private IConnection _connection;
        
        
        /// <summary>
        /// A wait handle that other threads can use to wait on the 
scheduler to finish
        /// </summary>
        public ManualResetEvent SchedulerWaitHandle = new ManualResetEvent(
false);

        /// <summary>
        /// A logger for the scheduler to use.
        /// </summary>
        public PlatformLogger Logger = new PlatformLogger(typeof(
DeploymentScheduler));

         /// <summary>
        /// The time in milliseconds the scheduler should sleep for between 
finishing work
        /// and waking up to check/process new work.
        /// </summary>
        private const int SLEEP_MILLISECONDS = 5000;

        /// <summary>
        /// A timer used to peridoically wake up and poll for deployments 
to queue.
        /// </summary>
        private Timer _pollDeploymentsTimer;

        /// <summary>
        /// Creates a new deployment scheduler.
        /// </summary>
        public DeploymentScheduler()
        {
            WriteLine("Starting scheduler");
            OpenConnection();
        }

        /// <summary>
        /// Opens the connection to RabbitMQ.
        /// </summary>
        private void OpenConnection()
        {
            var connectionFactory = new ConnectionFactory { uri = 
ApplicationDeploymentOperations.GetAmqpConnectionUri() };
            WriteLine("Opening RabbitMQ connection...");
            _connection = connectionFactory.CreateConnection();
            if (_connection.IsOpen)
            {
                WriteLine("Connection opened.");
                _connection = connectionFactory.CreateConnection();
                _channel = _connection.CreateModel();
                _channel.QueueDeclare(ApplicationDeploymentOperations.DEPLOYMENT_QUEUE_NAME, 
true, false, false, null);
                ThreadPool.QueueUserWorkItem(ProcessDeployments);
                WriteLine("Scheduler started.");
            }
            else
            {
                WriteLine("Scheduler failed to start.");
            }
        }

        private void WriteLine(string message)
        {
            Console.WriteLine("{0}: {1}", Thread.CurrentThread.ManagedThreadId, 
message);
        }

        /// <summary>
        /// Schedules the scheduler to wake up at a later time to poll the 
database.
        /// </summary>
        private void Reschedule()
        {
            WriteLine("Rescheduling");
            if (_pollDeploymentsTimer != null)
            {
                _pollDeploymentsTimer.Dispose();
            }
            _pollDeploymentsTimer = new Timer(ProcessDeployments, new object(), 
SLEEP_MILLISECONDS, 0);
        }

        /// <summary>
        /// Evaluates deployments and queues them to be deployed if 
appropriate.
        /// </summary>
        private void ProcessDeployments(object state)
        {
            Console.WriteLine();
            WriteLine("Processing deployments");
            try
            {
                var deployments = ApplicationDeploymentDatabaseOperations
.GetApplicationDeploymentsToQueue();
                // If there is any work to do then do it, otherwise 
reschedule this worker
                if (deployments.Count > 0)
                {
                    foreach (var deploymentData in deployments)
                    {
                        ProcessDeployment(deploymentData);
                    }
                    WriteLine("Scheduling completed");
                    // *TODO: If these workers are kept busy (i.e They're 
continuously working instead of sleeping/waking) there is a chance this 
recursive calling could exceed the stack size
*                    ProcessDeployments(null);
                }
                else
                {
                    Reschedule();
                }
            }
            catch (Exception e)
            {
                WriteLine(string.Format("Caught exception processing 
deployments: {0}", e.Message));
                LogException(null, e);
                Reschedule();
            }
        }

        /// <summary>
        /// Processes a deployment, updating the status, raising 
notifications and submitting to the deployment queue
        /// as appropriate.
        /// </summary>
        /// <param name="deploymentData"></param>
        private void ProcessDeployment(ApplicationDeploymentDatadeploymentData)
        {
            try
            {
                MySqlLocking.TryRun(
                    MySqlWrapper.PLATFORM_DATABASE_NAME,
                    string.Format("deployment-scheduler-{0}", 
deploymentData.Id),
                    new TimeSpan(0, 0, 0, 5),
                    api =>
                        {
                            // If the deployment should now wait for manual 
start update the status
                            if (deploymentData.DeploymentStatus.Equals(
ApplicationDeploymentOperations.DEPLOYMENT_STATUS_WAITING_FOR_SCHEDULED_TIME) 
&&
                                deploymentData.IsManualStart)
                            {
                                // Run in a transaction so that if raising 
notifications fails, the status update will be rolled back
                                MySqlWrapper.RunInNewTransaction(
                                    MySqlWrapper.PLATFORM_DATABASE_NAME,
                                    database =>
                                        {
                                            // Update the status first, to 
effectivley reserve this deployment
                                            ApplicationDeploymentOperations
.UpdateDeploymentStatusAtomic(
                                                deploymentData.Id,
                                                
deploymentData.DeploymentStatus,
                                                
ApplicationDeploymentOperations.
                                                    
DEPLOYMENT_STATUS_WAITING_FOR_MANUAL_START);
                                            
RaiseDeploymentWaitingOnManualStartNotification(deploymentData);
                                        });
                                WriteLine(string.Format("Deployment with id 
{0} is now waiting on manual start.", deploymentData.Id));
                                Logger.Info(string.Format("Deployment with 
id {0} is now waiting on manual start.", deploymentData.Id));
                            }
                            else
                            {
                                // Run in a transaction so that if 
submitting to queue fails, the status update will be rolled back
                                MySqlWrapper.RunInNewTransaction(
                                    MySqlWrapper.PLATFORM_DATABASE_NAME,
                                    database =>
                                        {
                                            // Update the status first, to 
effectivley reserve this deployment
                                            ApplicationDeploymentOperations
.UpdateDeploymentStatusAtomic(
                                                deploymentData.Id,
                                                
deploymentData.DeploymentStatus,
                                                
ApplicationDeploymentOperations.DEPLOYMENT_STATUS_QUEUED_FOR_DEPLOYMENT);
                                            // Submit the deployment to the 
queue
                                            
SubmitDeploymentToQueue(deploymentData);
                                        }
                                    );
                                
                                WriteLine(string.Format("Deployment with id 
{0} has been submitted to the deployment queue.", deploymentData.Id));
                                Logger.Info(string.Format("Deployment with 
id {0} has been submitted to the deployment queue.", deploymentData.Id));
                            }
                        });
            }
            catch (Exception e)
            {
                LogException(deploymentData, e);
            }
        }

        /// <summary>
        /// Submits a deployment to the RabbitMQ message queue so that it 
will be processed by a background worker.
        /// </summary>
        /// <param name="deploymentData">The deployment to queue.</param>
        private void SubmitDeploymentToQueue(ApplicationDeploymentDatadeploymentData)
        {
            try
            {
                var basicProperties = _channel.CreateBasicProperties();
                basicProperties.SetPersistent(true);
                _channel.BasicPublish(string.Empty, 
ApplicationDeploymentOperations.DEPLOYMENT_QUEUE_NAME, basicProperties, 
BitConverter.GetBytes(deploymentData.Id));
            }
            catch (Exception)
            {
                Console.WriteLine("Detected broken connection to RabbitMQ, 
restablishing...");
                Dispose();
                OpenConnection();
                throw;
            }
        }

        /// <summary>
        /// Raises a notification for a deployment that is waiting on 
manual start.
        /// </summary>
        /// <param name="applicationDeploymentData">The deployment that is 
waiting on manual start.</param>
        private void RaiseDeploymentWaitingOnManualStartNotification(
ApplicationDeploymentData applicationDeploymentData)
        {
            try
            {
                var applicationData = ApplicationOperations
.GetApplication(applicationDeploymentData.ApplicationId);
                NotificationOperations.RaiseNotification(
                    applicationDeploymentData.SubmitterLegalEntityId,
                    NotificationCode
.ApplicationDeploymentWaitingForManualStart,
                    NotificationSeverity.Warning,
                    "Deployment waiting on manual start",
                    string.Format("Deployment to {0} scheduled for {1} is 
now waiting to be manually started.",
                                  applicationData.DisplayName,
                                  
applicationDeploymentData.ScheduledStartDate),
                    applicationDeploymentData.Id,
                    true);
            }
            catch (Exception)
            {/* eat it*/}
        }

        /// <summary>
        /// Logs an exception, including inner exceptions.
        /// </summary>
        internal void LogException(ApplicationDeploymentDatadeploymentData, 
Exception exception, bool inner = false, string innerMessage = "")
        {
            try
            {
                string errorMessage;
                if (!inner)
                {
                    if (deploymentData != null)
                    {
                        errorMessage = string.Format(
                            "Error caught executing deployment with id {0}for application 
{1}:" +
                            "\r\n\r\nThe exception was:\r\n\t{2}" +
                            "\r\n\r\nThe stacktrace was:\r\n{3}\r\n",
                            deploymentData.Id,
                            deploymentData.ApplicationId,
                            exception.Message,
                            exception.StackTrace);
                    }
                    else
                    {
                        errorMessage = string.Format(
                            "Error caught" +
                            "\r\n\r\nThe exception was:\r\n\t{0}" +
                            "\r\n\r\nThe stacktrace was:\r\n{1}\r\n",
                            exception.Message,
                            exception.StackTrace);
                    }
                }
                else
                {
                    errorMessage = innerMessage + string.Format(
                        "\r\nThe inner exception was:\r\n\t{0}" +
                        "\r\n\r\nThe inner stacktrace was:\r\n{1}\r\n",
                        exception.Message,
                        exception.StackTrace);
                }
                if (exception.InnerException != null)
                {
                    LogException(deploymentData, exception.InnerException, 
true, errorMessage);
                }
                else
                {
                    Logger.Error(errorMessage);
                }
            }
            catch (Exception)
            {/* eat it */}
        }

        /// <summary>
        /// Disposes of the scheduler and frees up resources.
        /// </summary>
        public void Dispose()
        {
            if (_channel != null)
            {
                _channel.Abort();
            }
            if (_connection != null)
            {
                try
                {
                    _connection.Close();
                }
                catch (AlreadyClosedException)
                {
                    /* Eat it */
                }
            }
        }
    }
}


*Consumer:*

//
// Copyright (c) Appsecute 2011-2012 - ALL RIGHTS RESERVED.
//
using System;
using System.IO;
using RabbitMQ.Client;
using AppsecuteKernel.Logging;
using AppsecuteServerFramework.Exceptions;
using AppsecuteKernel.Documents.Deployments;
using AppsecuteKernel.Operations.Deployment;
using AppsecuteKernel.Operations.Applications;
using AppsecuteKernel.Operations.Notifications;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace AppsecuteDeploymentWorker
{
    /// <summary>
    /// A deployment worker that receives deployment jobs from RabbitMQ.
    /// </summary>
    public class DeploymentWorker : IDisposable
    {
        /// <summary>
        /// Rabbit MQ properties
        /// </summary>
        private IModel _channel;
        private IConnection _connection;

        /// <summary>
        /// A logger for the deployment worker to use.
        /// </summary>
        public PlatformLogger Logger = new PlatformLogger(typeof(
DeploymentWorker));

        /// <summary>
        /// Creates a new deployment worker.
        /// </summary>
        public DeploymentWorker()
        {
            Console.WriteLine("Starting worker");
            OpenConnection();
            Consume();
        }

        /// <summary>
        /// Opens the connection to RabbitMQ and sets up the channel.
        /// </summary>
        private void OpenConnection()
        {
            var connectionFactory = new ConnectionFactory
            {
                uri = ApplicationDeploymentOperations
.GetAmqpConnectionUri(),
                RequestedHeartbeat = 10
            };
            Console.WriteLine("Opening RabbitMQ connection...");
            _connection = connectionFactory.CreateConnection();
            if (_connection.IsOpen)
            {
                Console.WriteLine("Connection opened.");
                _channel = _connection.CreateModel();
                _channel.BasicQos(0, 1, false);
                _channel.QueueDeclare(ApplicationDeploymentOperations.DEPLOYMENT_QUEUE_NAME, 
true, false, false, null);
                Console.WriteLine("Deployment worker started.");
            }
            else
            {
                Console.WriteLine("Deployment worker failed to start.");
            }
        }

        /// <summary>
        /// Starts the synchronus consumption of queued deployments.
        /// This method will only return if a critical exception occurs.
        /// </summary>
        private void Consume()
        {
            var consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume(ApplicationDeploymentOperations.DEPLOYMENT_QUEUE_NAME, 
false, consumer);
            while (true)
            {
                try
                {
                    Console.WriteLine("Waiting for next job...");
                    var messageEvent = (BasicDeliverEventArgs
)consumer.Queue.Dequeue();
                    Console.WriteLine("Processing new job...");
                    try
                    {
                        var applicationDeploymentId = BitConverter.ToInt64(messageEvent.Body, 
0);
                        var applicationDeployment = 
ApplicationDeploymentDatabaseOperations
.GetApplicationDeployment(applicationDeploymentId);
                        DoApplicationDeployment(applicationDeployment);
                        _channel.BasicAck(messageEvent.DeliveryTag, false);
                    }
                    catch (Exception e)
                    {
                        _channel.BasicNack(messageEvent.DeliveryTag, false, 
false);
                        LogException(null, e);
                    }
                }
                catch (EndOfStreamException e)
                {
                    Console.WriteLine("Detected broken connection to 
RabbitMQ, restablishing...");
                    LogException(null, e);
                    Dispose();
                    OpenConnection();
                    Consume();
                }
                catch (Exception e)
                {
                    LogException(null, e);
                    break;
                }
            }
        }

        /// <summary>
        /// Executes an application deployment synchronusly.
        /// </summary>
        /// <param name="applicationDeploymentData">The application 
deployment to execute.</param>
        private void DoApplicationDeployment(ApplicationDeploymentDataapplicationDeploymentData)
        {
            try
            {
                Console.WriteLine("Running deployment with id {0}...", 
applicationDeploymentData.Id);
                Logger.Info(string.Format("Running deployment with id {0}
...", applicationDeploymentData.Id));
                ApplicationDeploymentOperations.SetActualDeploymentStartDate(applicationDeploymentData.Id, 
DateTime.UtcNow);
                ConfirmDeploymentShouldStart(applicationDeploymentData);
                ApplicationDeploymentOperations
.UpdateDeploymentStatusAtomic(
                    applicationDeploymentData.Id, 
                    applicationDeploymentData.DeploymentStatus,
                    ApplicationDeploymentOperations
.DEPLOYMENT_STATUS_DEPLOYING);
                var packagePath = 
DownloadApplicationDeploymentPackage(applicationDeploymentData);
                var stagingDirectory = 
StageApplicationDeploymentPackage(applicationDeploymentData, packagePath);
                PushApplicationDeploymentPackage(applicationDeploymentData, 
stagingDirectory);
                CleanUpApplicationDeploymentPackage(packagePath, 
stagingDirectory);
                ApplicationDeploymentOperations
.UpdateDeploymentStatusAtomic(
                    applicationDeploymentData.Id,
                    ApplicationDeploymentOperations
.DEPLOYMENT_STATUS_DEPLOYING,
                    ApplicationDeploymentOperations
.DEPLOYMENT_STATUS_DEPLOYMENT_SUCCESSFUL);
                ApplicationDeploymentOperations.SetActualDeploymentEndDate(applicationDeploymentData.Id, 
DateTime.UtcNow);
                
RaiseDeploymentSuceededNotification(applicationDeploymentData);
                Console.WriteLine("Deployment succeeded.");
                Logger.Info("Deployment succeeded.");
            }
            catch (Exception e)
            {
                LogException(applicationDeploymentData, e);
                ApplicationDeploymentOperations.MarkDeploymentAsFailed(applicationDeploymentData.Id, 
e.Message);
                ApplicationDeploymentOperations.SetActualDeploymentEndDate(applicationDeploymentData.Id, 
DateTime.UtcNow);
                
RaiseDeploymentFailedNotification(applicationDeploymentData, e.Message);
                Console.WriteLine("Deployment failed.");
                throw;
            }
        }

        /// <summary>
        /// Confirms that a deployment should start based on the current 
time and available deployment window.
        /// </summary>
        /// <param name="deploymentData">The deployment to check.</param>
        private void ConfirmDeploymentShouldStart(ApplicationDeploymentDatadeploymentData)
        {
            if((DateTime.UtcNow - new TimeSpan(0, 5, 0)) > 
deploymentData.ScheduledEndDate)
            {
                throw new AppSnapException(
                    string.Format(
                        "Unable to start as deployment window has passed or 
it was determined that deployment wouldn't complete within the window."));
            }
        }

        /// <summary>
        /// Raises a notification for a failed deployment.
        /// </summary>
        /// <param name="applicationDeploymentData">The deployment that 
failed.</param>
        /// <param name="failureReason">A human readable reason of why the 
deploy failed.</param>
        private void RaiseDeploymentFailedNotification(
ApplicationDeploymentData applicationDeploymentData, string failureReason)
        {
            try
            {
                var applicationData = ApplicationOperations
.GetApplication(applicationDeploymentData.ApplicationId);
                NotificationOperations.RaiseNotification(
                    applicationData.OwnerLegalEntityId,
                    NotificationCode.ApplicationDeploymentFailed,
                    NotificationSeverity.Critical,
                    "Deploy failed",
                    string.Format("Deployment to {0} scheduled for {1}failed with error \"
{2}\".",
                                  applicationData.DisplayName,
                                  
applicationDeploymentData.ScheduledStartDate,
                                  failureReason),
                    applicationDeploymentData.Id,
                    true);
            }
            catch (Exception)
            {/* eat it */}
        }

        /// <summary>
        /// Raises a notification for a successful deployment.
        /// </summary>
        /// <param name="applicationDeploymentData">The deployment that 
succeeded.</param>
        private void RaiseDeploymentSuceededNotification(
ApplicationDeploymentData applicationDeploymentData)
        {
            try
            {
                var applicationData = ApplicationOperations
.GetApplication(applicationDeploymentData.ApplicationId);
                NotificationOperations.RaiseNotification(
                    applicationData.OwnerLegalEntityId,
                    NotificationCode.ApplicationDeploymentFailed,
                    NotificationSeverity.Info,
                    "Deploy succeeded",
                    string.Format("Deployment to {0} scheduled for {1}completed successfully."
,
                                  applicationData.DisplayName,
                                  
applicationDeploymentData.ScheduledStartDate),
                    applicationDeploymentData.Id,
                    true);
            }
            catch (Exception)
            {/* eat it */}
        }

        /// <summary>
        /// Downloads and verifies an application deployment package to a 
local directory.
        /// </summary>
        /// <param name="applicationDeploymentData"></param>
        /// <returns>The full path to the local deployment package once 
downloaded.</returns>
        private string DownloadApplicationDeploymentPackage(
ApplicationDeploymentData applicationDeploymentData)
        {
            Console.WriteLine("Downloading...");
            var downloadedPackagePath = DeploymentPackageDownloadClient
.DownloadDeploymentPackage(new Uri
(applicationDeploymentData.DeploymentPackageDownloadUrl));
            try
            {
                var computedHash = DeploymentFileUtils
.ComputeDeploymentPackageHash(downloadedPackagePath);
                if(applicationDeploymentData.DeploymentPackageHash == null|| computedHash.Equals(applicationDeploymentData.DeploymentPackageHash))
                {
                    return downloadedPackagePath;
                }
                else
                {
                    throw new AppSnapInvalidDataException(
                        "Integrity check of deployment package failed. MD5 
hashes do not match. Deployment package possibly corrupt or incomplete.");
                }
            }
            catch (Exception e)
            {
                CleanUpApplicationDeploymentPackage(downloadedPackagePath, 
null);
                throw new AppSnapException("Failed to download deployment 
package. Fatal error.", e);
            }
        }

        /// <summary>
        /// Extracts a local deployment package to a temp directory.
        /// </summary>
        /// <returns>The full path to the directory the deployment package 
was extracted to.</returns>
        private string StageApplicationDeploymentPackage(
ApplicationDeploymentData applicationDeploymentData, stringdownloadedPackagePath)
        {
            Console.WriteLine("Staging...");
            Logger.Info("Staging...");
            try
            {
                return DeploymentFileUtils
.ExtractDeploymentPackage(downloadedPackagePath);
            }
            catch (Exception e)
            {
                CleanUpApplicationDeploymentPackage(downloadedPackagePath, 
null);
                throw new AppSnapException("Failed to stage deployment. 
Fatal error.", e);
            }
        }

        /// <summary>
        /// Pushes an application deployment package to the cloud.
        /// </summary>
        private void PushApplicationDeploymentPackage(
ApplicationDeploymentData applicationDeploymentData, string stagingPath)
        {
            Console.WriteLine("Pushing...");
            Logger.Info("Pushing...");
            try
            {
                var applicationData = ApplicationOperations
.GetApplication(applicationDeploymentData.ApplicationId);
                DeploymentPusher.UploadApplication(applicationData.CloudLoginId, 
applicationData, stagingPath);
            }
            catch (Exception)
            {
                CleanUpApplicationDeploymentPackage(null, stagingPath);
                throw;
            }
        }

        /// <summary>
        /// Cleans up all temp files from an application deployment.
        /// </summary>
        /// <param name="downloadPackagePath">The full path to the 
downloaded deployment package.</param>
        /// <param name="stagingPath">The full path to the staging 
directory that contains the extracted deployment package.</param>
        private void CleanUpApplicationDeploymentPackage(stringdownloadPackagePath, 
string stagingPath)
        {
            Console.WriteLine("Cleaning up...");
            Logger.Info("Cleaning up...");
            try
            {
                if (!string.IsNullOrWhiteSpace(downloadPackagePath))
                {
                    DeploymentFileUtils.DeleteFile(downloadPackagePath);
                }
                if (!string.IsNullOrWhiteSpace(stagingPath))
                {
                    DeploymentFileUtils.DeleteDirectory(stagingPath);
                }
            }
            catch (Exception e)
            {
                Logger.Error(string.Format("Failed to clean up deployment 
at {0} and {1}. Error was: {2}",
                                           downloadPackagePath,
                                           stagingPath,
                                           e.Message));
            }
        }

        /// <summary>
        /// Logs an exception, including inner exceptions.
        /// </summary>
        internal void LogException(ApplicationDeploymentDatadeploymentData, 
Exception exception, bool inner = false, string innerMessage = "")
        {
            try
            {
                string errorMessage;
                if (!inner)
                {
                    if (deploymentData != null)
                    {
                        errorMessage = string.Format(
                            "Error caught executing deployment with id {0}for application 
{1}:" +
                            "\r\n\r\nThe exception was:\r\n\t{2}" +
                            "\r\n\r\nThe stacktrace was:\r\n{3}\r\n",
                            deploymentData.Id,
                            deploymentData.ApplicationId,
                            exception.Message,
                            exception.StackTrace);
                    }
                    else
                    {
                        errorMessage = string.Format(
                            "Error caught" +
                            "\r\n\r\nThe exception was:\r\n\t{0}" +
                            "\r\n\r\nThe stacktrace was:\r\n{1}\r\n",
                            exception.Message,
                            exception.StackTrace);
                    }
                }
                else
                {
                    errorMessage = innerMessage + string.Format(
                        "\r\nThe inner exception was:\r\n\t{0}" +
                        "\r\n\r\nThe inner stacktrace was:\r\n{1}\r\n",
                        exception.Message,
                        exception.StackTrace);
                }
                if (exception.InnerException != null)
                {
                    LogException(deploymentData, exception.InnerException, 
true, errorMessage);
                }
                else
                {
                    Logger.Error(errorMessage);
                }
            }
            catch (Exception)
            {/* eat it */}
        }

        /// <summary>
        /// Disposes of the deployment worker and frees up resources.
        /// </summary>
        public void Dispose()
        {
            if (_channel != null)
            {
                _channel.Abort();
            }
            if (_connection != null)
            {
                try
                {
                    _connection.Close();
                }
                catch (AlreadyClosedException)
                {
                    /* Eat it */
                }
            }
        }
    }
}

 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20121025/bcd2a73d/attachment.htm>


More information about the rabbitmq-discuss mailing list