[rabbitmq-discuss] latency

Adam Kaminiecki adamka at dgt.com.pl
Tue Nov 9 18:17:57 GMT 2010


consuming and producting:

  private void Inicjalize()
         {
                 conFactory = new ConnectionFactory { Address = server };
                 conn = conFactory.CreateConnection();

                     conn.CallbackException += ConnCallbackException;
                     conn.CallbackException += ConnCallbackException;
                     conn.ConnectionShutdown += ConnConnectionShutdown;
                     conn.AutoClose = false;

                     channel = conn.CreateModel();
         }


declaring exchange and binding

private void NewExchange(string exchName, string exchType, ClientType 
clientType,bool durable)
         {

             try
             {
                 channel.ExchangeDeclare(exchName, exchType,durable);
                 q = channel.QueueDeclare();
                 channel.QueueBind(q, exchName, exchName, false, null);


                 switch (clientType)
                 {
                     case ClientType.Listener:
                         var consumerr = new MyConsumer(channel);

                         consumerr.NewMessagee += ConsumerrNewMessagee;
                         channel.BasicConsume(q, null, consumerr);
                         channel.BasicPublish(exchName, exchName, null, 
Encoding.UTF8.GetBytes("Hello everybody I'm new exch Consumer! : " + q));
                         break;
                     case ClientType.Producer:
                         channel.BasicPublish(exchName, exchName, null, 
Encoding.UTF8.GetBytes("Hello everybody I'm new exch Producer! : " + q));
                         break;
                     case ClientType.Both:
                         var cconsumer = new MyConsumer(channel);
                         cconsumer.NewMessagee += ConsumerrNewMessagee;
                         channel.BasicConsume(q, null, cconsumer);
                         channel.BasicPublish(exchName, exchName, null, 
Encoding.UTF8.GetBytes("Hello everybody I'm new exch Consumer and 
Producer! : " + q));
                         break;
                     default:
                         break;

                 }

             }
             catch 
(RabbitMQ.Client.Exceptions.OperationInterruptedException e)
             {
                 InvokeErrorMessage(e.Message, q);
             }
             catch (Exception e)
             {
                 InvokeErrorMessage(e.Message, q);
             }
         }



internal class MyConsumer : DefaultBasicConsumer
         {
             public delegate void NewMessage(BasicDeliverEventArgs e);
             public event NewMessage NewMessagee;


             public MyConsumer(IModel ch) : base(ch) { }

             public override void HandleBasicDeliver(string consumerTag,
                                                     ulong deliveryTag,
                                                     bool redelivered,
                                                     string exchange,
                                                     string routingKey,
                                                     IBasicProperties 
properties,
                                                     byte[] body)
             {
                 Model.BasicAck(deliveryTag, false);

                 // We only use BasicDeliverEventArgs here for
                 // convenience. Often we wouldn't bother packaging up
                 // all the arguments we received: we'd simply use
                 // those we needed directly.
                 var e = new BasicDeliverEventArgs();
                 /*
                 e.ConsumerTag = consumerTag;
                 e.DeliveryTag = deliveryTag;
                 e.Redelivered = redelivered;
                 e.Exchange = exchange;
                 e.RoutingKey = routingKey;
                 e.BasicProperties = properties;
                 e.Body = body;
                 */
                 NewMessagee(e);
              }
         }

     private void ProcessSingleDelivery(BasicDeliverEventArgs e)
         {
                 NewMessageEx(e, e.Exchange);
         }




Usage:





     public partial class Form1 : Form
     {
         private RmqExchClient Pe;

         delegate void Etykieta(string msg);


         private StreamWriter logstr;
         private Timer timer;
         private int flush_count = 0;
         readonly Random rand = new Random((int)DateTime.Now.ToBinary());
         private DateTime timout;
         private ulong i = 0;
         private int z = 0;


         public Form1()
         {
             InitializeComponent();
            Closing += Form1_Closing;
         }

         void Form1_Closing(object sender, CancelEventArgs e)
         {

             Pe.Close();
             if(timer!=null)timer.Stop();
             if (logstr != null)
             {
                 logstr.Flush();
                 logstr.Close();
             }
         }

         private void ZmienTekstW(string m)
         {
             label2.Text = m;
             Log(m);
         }

         private void ZmienTekstE(string m)
         {
             label4.Text = m;
             Log(m);
         }


         void Pe_ErrorMessageE(string m, string channell)
         {
             string message = m + " from exch: " + channell;
             BeginInvoke(new Etykieta(ZmienTekstE), new object[] { 
message });
         }


         private void button1_Click(object sender, EventArgs e)
         {
             Pe.SendMessageToExch(messageBox.Text, 
comboBox3.SelectedItem.ToString());
             
//Pe.SendMessageToExch(messageBox.Text,comboBox3.SelectedItem.ToString());
         }

         private void button2_Click(object sender, EventArgs e)
         {
             bool d=false;
             RmqExchClient.ClientType c = RmqExchClient.ClientType.Both;
             string s = comboBox1.SelectedItem.ToString();
             if(comboBox4.SelectedItem.ToString()=="tak") d = true;
             if(comboBox2.SelectedItem.ToString() == "both") c = 
RmqExchClient.ClientType.Both;
             if (comboBox2.SelectedItem.ToString() == "producer") c = 
RmqExchClient.ClientType.Producer;
             if (comboBox2.SelectedItem.ToString() == "consumer") c = 
RmqExchClient.ClientType.Listener;
             Pe.DeclareNewExchaneOrJoin(textBox1.Text,s,c,d);
         }

         private void comboBox3_Click(object sender, EventArgs e)
         {
             comboBox3.Items.Clear();
             foreach (var s in Pe.Exchanges)
             {
                 if(s!=null)
                 comboBox3.Items.Add(s);
             }
         }

         private void button3_Click(object sender, EventArgs e)
         {
             Pe = new RmqExchClient(textBox2.Text);
             Pe.NewMessageEx += Pe_NewMessageEx;
             Pe.ErrorMessageE += Pe_ErrorMessageE;
         }

         void 
Pe_NewMessageEx(RabbitMQ.Client.Events.BasicDeliverEventArgs m, string 
exchange)
         {
             z++;
             BeginInvoke(new Etykieta(ZmienTekstW), new object[] { 
z.ToString() });
         }



         private void button4_Click(object sender, EventArgs e)
         {
             Pe.DeleteExch(textBox4.Text);
         }

         private void button5_Click(object sender, EventArgs e)
         {
             try
             {
                 if (!string.IsNullOrEmpty(textBox3.Text))
                 {
                    if(folderBrowserDialog1.ShowDialog()==DialogResult.OK)
                    {
                        timout = 
DateTime.Now.AddHours(Convert.ToDouble(textBox3.Text));
                        logstr = new 
StreamWriter(folderBrowserDialog1.SelectedPath+"\\log.txt",true);
                        timer=new Timer();
                        timer.Interval = 10;
                        timer.Tick += timer_Tick;
                        timer.Enabled = true;
                        timer.Start();
                    }
                 }
             }
                 catch(Exception ex)
                 {
                     Log(ex.Message);
                 }
         }

         void timer_Tick(object sender, EventArgs e)
         {
             if (DateTime.Now < timout)
             {
                 i++;
                 Pe.SendMessageToExch(i, comboBox3.SelectedItem.ToString());
                 Log("message sended: " +i );
                 label5.Text = i.ToString();
             }
         }

         private void Log(string msg)
         {
                 if (logstr != null)
                 {
                     logstr.WriteLine("SUD[{0,6}] {1}: {2}",
                         System.Diagnostics.Process.GetCurrentProcess().Id,
                         DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
                         msg);
                     flush_count++;
                     flush_count %= 10;
                     if (flush_count == 0) logstr.Flush();
                 }
         }

         private void button6_Click(object sender, EventArgs e)
         {
             if (folderBrowserDialog1.ShowDialog() == DialogResult.OK)
             {
                 logstr = new 
StreamWriter(folderBrowserDialog1.SelectedPath + "\\log.txt", true);
             }
         }

     }
}


I think i found my mistake I left your PrecessSingleDelivery from 
example (below) .Now i have correct it and its working :) all client 
consume correct even first. I


before:
private void ProcessSingleDelivery(BasicDeliverEventArgs e)
         {
             DebugUtil.DumpProperties(e, Console.Out, 0);
             if (e.BasicProperties.ContentType == MapMessageReader.MimeType)
             {
                 IMapMessageReader r = new 
MapMessageReader(e.BasicProperties, e.Body);
                 DebugUtil.DumpProperties(r.Body, Console.Out, 0);
             }
             else if (e.BasicProperties.ContentType == 
StreamMessageReader.MimeType)
             {
                 IStreamMessageReader r = new 
StreamMessageReader(e.BasicProperties, e.Body);
                 while (true)
                 {
                     try
                     {
                         object v = r.ReadObject();
                         NewMessageEx(e, e.Exchange);
                     }
                     catch (EndOfStreamException)
                     {
                         break;
                     }
                 }
             }
             else
             {
                 NewMessageEx(e, e.Exchange);
             }
         }

-- 
This message has been scanned for viruses and
dangerous content by MailScanner, and is
believed to be clean.



More information about the rabbitmq-discuss mailing list