namespace Example { public class MessageConsumer : DefaultBasicConsumer where T : class { private static readonly ILog Log = LogManager.GetLogger(typeof(MessageConsumer<>)); public const byte DeliveryModeNonPersistent = 1; public const byte DeliveryModePersistent = 2; private readonly object channelLock = new object(); private readonly string serverAddress; private readonly string exchange; private readonly string exchangeType; private readonly bool durable; private readonly string routingKey; private Func processMessageFunction; private IConnection connection; private bool running; private bool disposed; public string Queue { get; private set; } public MessageConsumer(string serverAddress, string exchange, string exchangeType, string queue, bool durable, string routingKey) { this.serverAddress = serverAddress; this.exchange = exchange; this.exchangeType = exchangeType; this.durable = durable; this.routingKey = routingKey; this.Queue = queue; } ~MessageConsumer() { this.Dispose(false); } public void Start(Func processMessage) { lock (this.channelLock) { if (this.running) { return; } this.running = true; this.processMessageFunction = processMessage; this.connection = new ConnectionFactory().CreateConnection(this.serverAddress); this.Model = this.connection.CreateModel(); this.Model.ExchangeDeclare(this.exchange, exchangeType, this.durable); this.Model.QueueDeclare(this.Queue, durable); this.Model.QueueBind(this.Queue, this.exchange, this.routingKey, false, null); this.Model.BasicQos(0, 1, false); this.Model.BasicConsume(this.Queue, null, this); } if (Log.IsDebugEnabled) { Log.DebugFormat("Started single message consumer (server={0}, queue={1}, exchange={2}, exchangeType={3}, durable={4}, routingKey={5})", this.serverAddress, this.Queue, this.exchange, this.exchangeType, this.durable, this.routingKey); } } public void Stop() { lock (this.channelLock) { if (!this.running) { return; } this.Model.BasicCancel(this.ConsumerTag); this.Model.Close(); this.connection.Close(); this.Model = null; this.connection = null; this.running = false; } if (Log.IsDebugEnabled) { Log.DebugFormat("Stopped single message consumer (server={0}, queue={1}, exchange={2}, exchangeType={3}, durable={4}, routingKey={5})", this.serverAddress, this.Queue, this.exchange, this.exchangeType, this.durable, this.routingKey); } } public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) { if (!properties.Type.Equals(typeof(T).Name)) { throw new Exception("Invalid message type: " + properties.Type); } var messageXml = Encoding.UTF8.GetString(body); var item = new MessageItem { Exchange = exchange, RoutingKey = routingKey, Message = DataContractDeserializer.Instance.Parse(messageXml), DeliveryTag = deliveryTag, }; if (Log.IsDebugEnabled) { Log.DebugFormat("Recieved message {0} (queue={1}, exchange={2}, routingKey={3}, deliveryTag={4})", item.Message, this.Queue, item.Exchange, item.RoutingKey, item.DeliveryTag); } ThreadPool.QueueUserWorkItem(this.ProcessMessage, item); } private void ProcessMessage(object state) { var item = (MessageItem) state; try { // Process the message var success = this.processMessageFunction(item.Message); lock (this.channelLock) { if (!this.running) { return; } if (!success) { this.Publish(item.Message); } // Acknowledge the message triggering next message this.Model.BasicAck(item.DeliveryTag, false); } if (Log.IsDebugEnabled) { Log.DebugFormat("Acknowledged message {0} ({1})", item.Message, success ? "processing successful" : "processing failure: message requeued"); } } catch (Exception ex) { Log.ErrorFormat("Failed to process {0} message {1}: {0} {1} {2}", this.Queue, item.Message, ex.GetType().Name, ex.Message, ex.StackTrace); this.Stop(); } } private void Publish(T message) { lock (this.channelLock) { if (!this.running) { throw new Exception("Cannot publish a message as the channel has been closed"); } var properties = this.Model.CreateBasicProperties(); properties.Type = typeof(T).Name; properties.DeliveryMode = DeliveryModePersistent; var messageXml = DataContractSerializer.Instance.Parse(message); this.Model.BasicPublish(this.exchange, this.routingKey, properties, Encoding.UTF8.GetBytes(messageXml)); } } protected virtual void Dispose(bool disposing) { if (this.disposed) { return; } if (disposing) { this.Stop(); } this.disposed = true; } } public class MessageItem where T : class { public string Exchange { get; set; } public string RoutingKey { get; set; } public T Message { get; set; } public ulong DeliveryTag { get; set; } } }