[rabbitmq-discuss] Shuting down and reopening the consumer's side

Bamboula aloroi18 at gmail.com
Wed Feb 15 14:35:00 GMT 2012


Hello all,

I'm working on a little project using RabbitMQ with .net C#.
I want to demonstrate that if I send a message to a queue and there is no
consumer, the message stays stocked in the queue.

In order to start with an easy example, I decided to demonstrate this with
an easy example as "Hello world" (in the RabbitMQ tutorials).

1)I build a Producer form, with a TextBox and a button to send the message
and also to open a consumer form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HelloWorldRabbitMQ;

namespace HelloWorldRabbitMQ2
{
    public partial class ProducerForm : Form
    {
        public int count = 0;
        public string HOST_NAME = "localhost";
        public string QUEUE_NAME = "helloWorld";

        private Producer producer;

        public ProducerForm()
        {
            InitializeComponent();
            //create the producer
            producer = new Producer(HOST_NAME, QUEUE_NAME);

        }

        //Send the message on click
        private void button1_Click(object sender, EventArgs e)
        {
            count++;
            if (count<2)
            {
                ConsumerForm cf = new ConsumerForm();
                cf.Show();
            }

           
producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(textBox1.Text));
        }

    }
}

2)Here is the Producer class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;

namespace HelloWorldRabbitMQ
{
    class Producer:IDisposable
    {
        protected IModel Model;  

        protected IConnection Connection;  

         protected string QueueName;  

    

         public Producer(string hostName, string queueName)  

         {  

             QueueName = queueName;  

             var connectionFactory = new ConnectionFactory();  

             connectionFactory.HostName = hostName;  

             Connection = connectionFactory.CreateConnection();  

             Model = Connection.CreateModel();  

             Model.QueueDeclare(QueueName, false, false, false, null);  

         }  

    

         public void SendMessage(byte[] message)  

         {  

             IBasicProperties basicProperties =
Model.CreateBasicProperties();  

             Model.BasicPublish("", QueueName, basicProperties , message);  

         }  

         public void Dispose()  

         {  

             if (Connection != null)  

                 Connection.Close();  

             if (Model != null)  

                 Model.Abort();  

         }  

    }
}
3)The consumer form has only a RichTextBox:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HelloWorldRabbitMQ;

namespace HelloWorldRabbitMQ2
{
    public partial class ConsumerForm : Form
    {
        public string HOST_NAME = "localhost";
        public string QUEUE_NAME = "helloWorld";
        public bool ConsumerUp;
        public bool firstLoad;
        private Consumer consumer;

        public ConsumerForm()
        {
            InitializeComponent();
            //create the consumer
            consumer = new Consumer(HOST_NAME, QUEUE_NAME);
            consumer.onMessageReceived += handleMessage;

            //start consuming
            consumer.StartConsuming();
      

        }
        //delegate to post to UI thread
        private delegate void showMessageDelegate(string message);

        //Callback for message receive
        public void handleMessage(byte[] message)
        {
            showMessageDelegate s = new
showMessageDelegate(richTextBox1.AppendText);
            
            this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) +
Environment.NewLine);
            
            
        }

    }
}
4)This is the Consumer class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace HelloWorldRabbitMQ
{
    class Consumer
    {
        protected IModel Model;
        protected IConnection Connection;
        protected string QueueName;

        protected bool isConsuming;

        // used to pass messages back to UI for processing
        public delegate void onReceiveMessage(byte[] message);
        public event onReceiveMessage onMessageReceived;

        public Consumer(string hostName, string queueName)
        {
            QueueName = queueName;
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = hostName;
            Connection = connectionFactory.CreateConnection();
            Model = Connection.CreateModel();
            Model.QueueDeclare(QueueName, false, false, false, null);
        }

        //internal delegate to run the queue consumer on a seperate thread
        private delegate void ConsumeDelegate();

        public void StartConsuming()
        {
            isConsuming = true;
            ConsumeDelegate c = new ConsumeDelegate(Consume);
            c.BeginInvoke(null, null);
        }

        public void Consume()
        {
            QueueingBasicConsumer consumer = new
QueueingBasicConsumer(Model);
            String consumerTag = Model.BasicConsume(QueueName, false,
consumer);
            while (isConsuming)
            {
                try
                {
                    RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    IBasicProperties props = e.BasicProperties;
                    byte[] body = e.Body;
                  
                    // ... process the message
                    onMessageReceived(body);
                    Model.BasicAck(e.DeliveryTag, false);

                }
                catch //(OperationInterruptedException ex)
                {
                    
                    // The consumer was removed, either through
                    // channel or connection closure, or through the
                    // action of IModel.BasicCancel().
                    break;
                }
            }
        }

        public void Dispose()
        {
            isConsuming = false;
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }

    }
}
Using this program, I can send messages to a single consumer and seeing the
received messages on the consumer side.
Now, if I close my consumer form and go on sending messages, messages will
be stocked into the queue.
If I close all the program and restarts sending a new message, I will
receive on my consumer side all the non-consumed messages plus the currently
sent message. That's what I want to demonstrate but without closing all the
program.

My purpous is to show that if I close my connection to the queue (on the
consumer's side)and aterwars, I open a new consumer, I will receive all the
messages that were sent to the queue during the time there was no consumer.

Does anyone have an idea what I should append to my code?

-- 
View this message in context: http://old.nabble.com/Shuting-down-and-reopening-the-consumer%27s-side-tp33329194p33329194.html
Sent from the RabbitMQ mailing list archive at Nabble.com.



More information about the rabbitmq-discuss mailing list