[rabbitmq-discuss] latency
Adam Kaminiecki
adamka at dgt.com.pl
Tue Nov 9 18:17:57 GMT 2010
consuming and producting:
private void Inicjalize()
{
conFactory = new ConnectionFactory { Address = server };
conn = conFactory.CreateConnection();
conn.CallbackException += ConnCallbackException;
conn.CallbackException += ConnCallbackException;
conn.ConnectionShutdown += ConnConnectionShutdown;
conn.AutoClose = false;
channel = conn.CreateModel();
}
declaring exchange and binding
private void NewExchange(string exchName, string exchType, ClientType
clientType,bool durable)
{
try
{
channel.ExchangeDeclare(exchName, exchType,durable);
q = channel.QueueDeclare();
channel.QueueBind(q, exchName, exchName, false, null);
switch (clientType)
{
case ClientType.Listener:
var consumerr = new MyConsumer(channel);
consumerr.NewMessagee += ConsumerrNewMessagee;
channel.BasicConsume(q, null, consumerr);
channel.BasicPublish(exchName, exchName, null,
Encoding.UTF8.GetBytes("Hello everybody I'm new exch Consumer! : " + q));
break;
case ClientType.Producer:
channel.BasicPublish(exchName, exchName, null,
Encoding.UTF8.GetBytes("Hello everybody I'm new exch Producer! : " + q));
break;
case ClientType.Both:
var cconsumer = new MyConsumer(channel);
cconsumer.NewMessagee += ConsumerrNewMessagee;
channel.BasicConsume(q, null, cconsumer);
channel.BasicPublish(exchName, exchName, null,
Encoding.UTF8.GetBytes("Hello everybody I'm new exch Consumer and
Producer! : " + q));
break;
default:
break;
}
}
catch
(RabbitMQ.Client.Exceptions.OperationInterruptedException e)
{
InvokeErrorMessage(e.Message, q);
}
catch (Exception e)
{
InvokeErrorMessage(e.Message, q);
}
}
internal class MyConsumer : DefaultBasicConsumer
{
public delegate void NewMessage(BasicDeliverEventArgs e);
public event NewMessage NewMessagee;
public MyConsumer(IModel ch) : base(ch) { }
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties
properties,
byte[] body)
{
Model.BasicAck(deliveryTag, false);
// We only use BasicDeliverEventArgs here for
// convenience. Often we wouldn't bother packaging up
// all the arguments we received: we'd simply use
// those we needed directly.
var e = new BasicDeliverEventArgs();
/*
e.ConsumerTag = consumerTag;
e.DeliveryTag = deliveryTag;
e.Redelivered = redelivered;
e.Exchange = exchange;
e.RoutingKey = routingKey;
e.BasicProperties = properties;
e.Body = body;
*/
NewMessagee(e);
}
}
private void ProcessSingleDelivery(BasicDeliverEventArgs e)
{
NewMessageEx(e, e.Exchange);
}
Usage:
public partial class Form1 : Form
{
private RmqExchClient Pe;
delegate void Etykieta(string msg);
private StreamWriter logstr;
private Timer timer;
private int flush_count = 0;
readonly Random rand = new Random((int)DateTime.Now.ToBinary());
private DateTime timout;
private ulong i = 0;
private int z = 0;
public Form1()
{
InitializeComponent();
Closing += Form1_Closing;
}
void Form1_Closing(object sender, CancelEventArgs e)
{
Pe.Close();
if(timer!=null)timer.Stop();
if (logstr != null)
{
logstr.Flush();
logstr.Close();
}
}
private void ZmienTekstW(string m)
{
label2.Text = m;
Log(m);
}
private void ZmienTekstE(string m)
{
label4.Text = m;
Log(m);
}
void Pe_ErrorMessageE(string m, string channell)
{
string message = m + " from exch: " + channell;
BeginInvoke(new Etykieta(ZmienTekstE), new object[] {
message });
}
private void button1_Click(object sender, EventArgs e)
{
Pe.SendMessageToExch(messageBox.Text,
comboBox3.SelectedItem.ToString());
//Pe.SendMessageToExch(messageBox.Text,comboBox3.SelectedItem.ToString());
}
private void button2_Click(object sender, EventArgs e)
{
bool d=false;
RmqExchClient.ClientType c = RmqExchClient.ClientType.Both;
string s = comboBox1.SelectedItem.ToString();
if(comboBox4.SelectedItem.ToString()=="tak") d = true;
if(comboBox2.SelectedItem.ToString() == "both") c =
RmqExchClient.ClientType.Both;
if (comboBox2.SelectedItem.ToString() == "producer") c =
RmqExchClient.ClientType.Producer;
if (comboBox2.SelectedItem.ToString() == "consumer") c =
RmqExchClient.ClientType.Listener;
Pe.DeclareNewExchaneOrJoin(textBox1.Text,s,c,d);
}
private void comboBox3_Click(object sender, EventArgs e)
{
comboBox3.Items.Clear();
foreach (var s in Pe.Exchanges)
{
if(s!=null)
comboBox3.Items.Add(s);
}
}
private void button3_Click(object sender, EventArgs e)
{
Pe = new RmqExchClient(textBox2.Text);
Pe.NewMessageEx += Pe_NewMessageEx;
Pe.ErrorMessageE += Pe_ErrorMessageE;
}
void
Pe_NewMessageEx(RabbitMQ.Client.Events.BasicDeliverEventArgs m, string
exchange)
{
z++;
BeginInvoke(new Etykieta(ZmienTekstW), new object[] {
z.ToString() });
}
private void button4_Click(object sender, EventArgs e)
{
Pe.DeleteExch(textBox4.Text);
}
private void button5_Click(object sender, EventArgs e)
{
try
{
if (!string.IsNullOrEmpty(textBox3.Text))
{
if(folderBrowserDialog1.ShowDialog()==DialogResult.OK)
{
timout =
DateTime.Now.AddHours(Convert.ToDouble(textBox3.Text));
logstr = new
StreamWriter(folderBrowserDialog1.SelectedPath+"\\log.txt",true);
timer=new Timer();
timer.Interval = 10;
timer.Tick += timer_Tick;
timer.Enabled = true;
timer.Start();
}
}
}
catch(Exception ex)
{
Log(ex.Message);
}
}
void timer_Tick(object sender, EventArgs e)
{
if (DateTime.Now < timout)
{
i++;
Pe.SendMessageToExch(i, comboBox3.SelectedItem.ToString());
Log("message sended: " +i );
label5.Text = i.ToString();
}
}
private void Log(string msg)
{
if (logstr != null)
{
logstr.WriteLine("SUD[{0,6}] {1}: {2}",
System.Diagnostics.Process.GetCurrentProcess().Id,
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
msg);
flush_count++;
flush_count %= 10;
if (flush_count == 0) logstr.Flush();
}
}
private void button6_Click(object sender, EventArgs e)
{
if (folderBrowserDialog1.ShowDialog() == DialogResult.OK)
{
logstr = new
StreamWriter(folderBrowserDialog1.SelectedPath + "\\log.txt", true);
}
}
}
}
I think i found my mistake I left your PrecessSingleDelivery from
example (below) .Now i have correct it and its working :) all client
consume correct even first. I
before:
private void ProcessSingleDelivery(BasicDeliverEventArgs e)
{
DebugUtil.DumpProperties(e, Console.Out, 0);
if (e.BasicProperties.ContentType == MapMessageReader.MimeType)
{
IMapMessageReader r = new
MapMessageReader(e.BasicProperties, e.Body);
DebugUtil.DumpProperties(r.Body, Console.Out, 0);
}
else if (e.BasicProperties.ContentType ==
StreamMessageReader.MimeType)
{
IStreamMessageReader r = new
StreamMessageReader(e.BasicProperties, e.Body);
while (true)
{
try
{
object v = r.ReadObject();
NewMessageEx(e, e.Exchange);
}
catch (EndOfStreamException)
{
break;
}
}
}
else
{
NewMessageEx(e, e.Exchange);
}
}
--
This message has been scanned for viruses and
dangerous content by MailScanner, and is
believed to be clean.
More information about the rabbitmq-discuss
mailing list