[rabbitmq-discuss] Shuting down and reopening the consumer's side
Bamboula
aloroi18 at gmail.com
Wed Feb 15 14:35:00 GMT 2012
Hello all,
I'm working on a little project using RabbitMQ with .net C#.
I want to demonstrate that if I send a message to a queue and there is no
consumer, the message stays stocked in the queue.
In order to start with an easy example, I decided to demonstrate this with
an easy example as "Hello world" (in the RabbitMQ tutorials).
1)I build a Producer form, with a TextBox and a button to send the message
and also to open a consumer form:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HelloWorldRabbitMQ;
namespace HelloWorldRabbitMQ2
{
public partial class ProducerForm : Form
{
public int count = 0;
public string HOST_NAME = "localhost";
public string QUEUE_NAME = "helloWorld";
private Producer producer;
public ProducerForm()
{
InitializeComponent();
//create the producer
producer = new Producer(HOST_NAME, QUEUE_NAME);
}
//Send the message on click
private void button1_Click(object sender, EventArgs e)
{
count++;
if (count<2)
{
ConsumerForm cf = new ConsumerForm();
cf.Show();
}
producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(textBox1.Text));
}
}
}
2)Here is the Producer class:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
namespace HelloWorldRabbitMQ
{
class Producer:IDisposable
{
protected IModel Model;
protected IConnection Connection;
protected string QueueName;
public Producer(string hostName, string queueName)
{
QueueName = queueName;
var connectionFactory = new ConnectionFactory();
connectionFactory.HostName = hostName;
Connection = connectionFactory.CreateConnection();
Model = Connection.CreateModel();
Model.QueueDeclare(QueueName, false, false, false, null);
}
public void SendMessage(byte[] message)
{
IBasicProperties basicProperties =
Model.CreateBasicProperties();
Model.BasicPublish("", QueueName, basicProperties , message);
}
public void Dispose()
{
if (Connection != null)
Connection.Close();
if (Model != null)
Model.Abort();
}
}
}
3)The consumer form has only a RichTextBox:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HelloWorldRabbitMQ;
namespace HelloWorldRabbitMQ2
{
public partial class ConsumerForm : Form
{
public string HOST_NAME = "localhost";
public string QUEUE_NAME = "helloWorld";
public bool ConsumerUp;
public bool firstLoad;
private Consumer consumer;
public ConsumerForm()
{
InitializeComponent();
//create the consumer
consumer = new Consumer(HOST_NAME, QUEUE_NAME);
consumer.onMessageReceived += handleMessage;
//start consuming
consumer.StartConsuming();
}
//delegate to post to UI thread
private delegate void showMessageDelegate(string message);
//Callback for message receive
public void handleMessage(byte[] message)
{
showMessageDelegate s = new
showMessageDelegate(richTextBox1.AppendText);
this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) +
Environment.NewLine);
}
}
}
4)This is the Consumer class:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
namespace HelloWorldRabbitMQ
{
class Consumer
{
protected IModel Model;
protected IConnection Connection;
protected string QueueName;
protected bool isConsuming;
// used to pass messages back to UI for processing
public delegate void onReceiveMessage(byte[] message);
public event onReceiveMessage onMessageReceived;
public Consumer(string hostName, string queueName)
{
QueueName = queueName;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.HostName = hostName;
Connection = connectionFactory.CreateConnection();
Model = Connection.CreateModel();
Model.QueueDeclare(QueueName, false, false, false, null);
}
//internal delegate to run the queue consumer on a seperate thread
private delegate void ConsumeDelegate();
public void StartConsuming()
{
isConsuming = true;
ConsumeDelegate c = new ConsumeDelegate(Consume);
c.BeginInvoke(null, null);
}
public void Consume()
{
QueueingBasicConsumer consumer = new
QueueingBasicConsumer(Model);
String consumerTag = Model.BasicConsume(QueueName, false,
consumer);
while (isConsuming)
{
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
// ... process the message
onMessageReceived(body);
Model.BasicAck(e.DeliveryTag, false);
}
catch //(OperationInterruptedException ex)
{
// The consumer was removed, either through
// channel or connection closure, or through the
// action of IModel.BasicCancel().
break;
}
}
}
public void Dispose()
{
isConsuming = false;
if (Connection != null)
Connection.Close();
if (Model != null)
Model.Abort();
}
}
}
Using this program, I can send messages to a single consumer and seeing the
received messages on the consumer side.
Now, if I close my consumer form and go on sending messages, messages will
be stocked into the queue.
If I close all the program and restarts sending a new message, I will
receive on my consumer side all the non-consumed messages plus the currently
sent message. That's what I want to demonstrate but without closing all the
program.
My purpous is to show that if I close my connection to the queue (on the
consumer's side)and aterwars, I open a new consumer, I will receive all the
messages that were sent to the queue during the time there was no consumer.
Does anyone have an idea what I should append to my code?
--
View this message in context: http://old.nabble.com/Shuting-down-and-reopening-the-consumer%27s-side-tp33329194p33329194.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list