<br>Thanks for reply Eugene Kirpichov , <br>One more help, Can you suggest me any reference site from where i can follow and code for my goal?<div><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:57 PM, Eugene Kirpichov <span dir="ltr">&lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;</span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">Is this the minimal piece of code that demonstrates poor performance?<br>
It&#39;s hard to decipher that much code and it can have many kinds of<br>
problems inside; I think you should be able to trim it down to a<br>
couple dozen lines demonstrating the same speed; then diagnosing will<br>
become a lot easier.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt; This is my code.<br>
&gt;<br>
&gt;<br>
&gt; //  The contents of this file are subject to the Mozilla Public License<br>
&gt; //  Version 1.1 (the &quot;License&quot;); you may not use this file except in<br>
&gt; //  compliance with the License. You may obtain a copy of the License<br>
&gt; //  at <a href="http://www.mozilla.org/MPL/" target="_blank">http://www.mozilla.org/MPL/</a><br>
&gt; //<br>
&gt; //  Software distributed under the License is distributed on an &quot;AS IS&quot;<br>
&gt; //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
&gt; //  the License for the specific language governing rights and<br>
&gt; //  limitations under the License.<br>
&gt; //<br>
&gt; //  The Original Code is RabbitMQ.<br>
&gt; //<br>
&gt; //  The Initial Developer of the Original Code is VMware, Inc.<br>
&gt; //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.<br>
&gt; //<br>
&gt;<br>
&gt;<br>
&gt; package com.rabbitmq.examples;<br>
&gt;<br>
&gt; import java.io.ByteArrayInputStream;<br>
&gt; import java.io.ByteArrayOutputStream;<br>
&gt; import java.io.DataInputStream;<br>
&gt; import java.io.DataOutputStream;<br>
&gt; import java.io.IOException;<br>
&gt; import java.util.Arrays;<br>
&gt; import java.util.Collections;<br>
&gt; import java.util.List;<br>
&gt; import java.util.SortedSet;<br>
&gt; import java.util.TreeSet;<br>
&gt; import java.util.UUID;<br>
&gt; import java.util.concurrent.Semaphore;<br>
&gt;<br>
&gt; import org.apache.commons.cli.CommandLine;<br>
&gt; import org.apache.commons.cli.CommandLineParser;<br>
&gt; import org.apache.commons.cli.GnuParser;<br>
&gt; import org.apache.commons.cli.HelpFormatter;<br>
&gt; import org.apache.commons.cli.Option;<br>
&gt; import org.apache.commons.cli.Options;<br>
&gt; import org.apache.commons.cli.ParseException;<br>
&gt;<br>
&gt; import com.rabbitmq.client.AMQP;<br>
&gt; import com.rabbitmq.client.Channel;<br>
&gt; import com.rabbitmq.client.ConfirmListener;<br>
&gt; import com.rabbitmq.client.Connection;<br>
&gt; import com.rabbitmq.client.ConnectionFactory;<br>
&gt; import com.rabbitmq.client.Envelope;<br>
&gt; import com.rabbitmq.client.MessageProperties;<br>
&gt; import com.rabbitmq.client.QueueingConsumer;<br>
&gt; import com.rabbitmq.client.QueueingConsumer.Delivery;<br>
&gt; import com.rabbitmq.client.ReturnListener;<br>
&gt; import com.rabbitmq.client.ShutdownSignalException;<br>
&gt;<br>
&gt;<br>
&gt; public class MulticastMain {<br>
&gt;<br>
&gt;     public static void main(String[] args) {<br>
&gt;         Options options = getOptions();<br>
&gt;         CommandLineParser parser = new GnuParser();<br>
&gt;         try {<br>
&gt;             CommandLine cmd = parser.parse(options, args);<br>
&gt;<br>
&gt;             if (cmd.hasOption(&#39;?&#39;)) {<br>
&gt;                 usage(options);<br>
&gt;                 System.exit(0);<br>
&gt;             }<br>
&gt;<br>
&gt;             String hostName      = strArg(cmd, &#39;h&#39;, &quot;localhost&quot;);<br>
&gt;             int portNumber       = intArg(cmd, &#39;p&#39;, AMQP.PROTOCOL.PORT);<br>
&gt;             String exchangeType  = strArg(cmd, &#39;t&#39;, &quot;direct&quot;);<br>
&gt;             String exchangeName  = strArg(cmd, &#39;e&#39;, exchangeType);<br>
&gt;             int samplingInterval = intArg(cmd, &#39;i&#39;, 1);<br>
&gt;             int rateLimit        = intArg(cmd, &#39;r&#39;, 0);<br>
&gt;             int producerCount    = intArg(cmd, &#39;x&#39;, 1);<br>
&gt;             int consumerCount    = intArg(cmd, &#39;y&#39;, 1);<br>
&gt;             int producerTxSize   = intArg(cmd, &#39;m&#39;, 0);<br>
&gt;             int consumerTxSize   = intArg(cmd, &#39;n&#39;, 0);<br>
&gt;             long confirm         = intArg(cmd, &#39;c&#39;, -1);<br>
&gt;             boolean autoAck      = cmd.hasOption(&#39;a&#39;);<br>
&gt;             int prefetchCount    = intArg(cmd, &#39;q&#39;, 0);<br>
&gt;             int minMsgSize       = intArg(cmd, &#39;s&#39;, 0);<br>
&gt;             int timeLimit        = intArg(cmd, &#39;z&#39;, 0);<br>
&gt;             List&lt;?&gt; flags        = lstArg(cmd, &#39;f&#39;);<br>
&gt;             int frameMax         = intArg(cmd, &#39;M&#39;, 0);<br>
&gt;             int heartbeat        = intArg(cmd, &#39;b&#39;, 0);<br>
&gt;<br>
&gt;             if ((producerTxSize &gt; 0) &amp;&amp; confirm &gt;= 0) {<br>
&gt;                 throw new ParseException(&quot;Cannot select both<br>
&gt; producerTxSize&quot;+<br>
&gt;                                          &quot; and confirm&quot;);<br>
&gt;             }<br>
&gt;<br>
&gt;             //setup<br>
&gt;             String id = UUID.randomUUID().toString();<br>
&gt;             Stats stats = new Stats(1000L * samplingInterval);<br>
&gt;             ConnectionFactory factory = new ConnectionFactory();<br>
&gt;             factory.setHost(hostName);<br>
&gt;             factory.setPort(portNumber);<br>
&gt;             factory.setRequestedFrameMax(frameMax);<br>
&gt;             factory.setRequestedHeartbeat(heartbeat);<br>
&gt;<br>
&gt;             Thread[] consumerThreads = new Thread[consumerCount];<br>
&gt;             Connection[] consumerConnections = new<br>
&gt; Connection[consumerCount];<br>
&gt;             for (int i = 0; i &lt; consumerCount; i++) {<br>
&gt;                 System.out.println(&quot;starting consumer #&quot; + i);<br>
&gt;                 Connection conn = factory.newConnection();<br>
&gt;                 consumerConnections[i] = conn;<br>
&gt;                 Channel channel = conn.createChannel();<br>
&gt;                 if (consumerTxSize &gt; 0) channel.txSelect();<br>
&gt;                 channel.exchangeDeclare(exchangeName, exchangeType);<br>
&gt;                 String queueName =<br>
&gt;                         channel.queueDeclare(&quot;&quot;,<br>
&gt; flags.contains(&quot;persistent&quot;),<br>
&gt;                                              true, false, null).getQueue();<br>
&gt;                 QueueingConsumer consumer = new QueueingConsumer(channel);<br>
&gt;                 if (prefetchCount &gt; 0) channel.basicQos(prefetchCount);<br>
&gt;                 channel.basicConsume(queueName, autoAck, consumer);<br>
&gt;                 channel.queueBind(queueName, exchangeName, id);<br>
&gt;                 Thread t =<br>
&gt;                     new Thread(new Consumer(consumer, id,<br>
&gt;                                             consumerTxSize, autoAck,<br>
&gt;                                             stats, timeLimit));<br>
&gt;                 consumerThreads[i] = t;<br>
&gt;                 t.start();<br>
&gt;             }<br>
&gt;             Thread[] producerThreads = new Thread[producerCount];<br>
&gt;             Connection[] producerConnections = new<br>
&gt; Connection[producerCount];<br>
&gt;             for (int i = 0; i &lt; producerCount; i++) {<br>
&gt;                 System.out.println(&quot;starting producer #&quot; + i);<br>
&gt;                 Connection conn = factory.newConnection();<br>
&gt;                 producerConnections[i] = conn;<br>
&gt;                 Channel channel = conn.createChannel();<br>
&gt;                 if (producerTxSize &gt; 0) channel.txSelect();<br>
&gt;                 if (confirm &gt;= 0) channel.confirmSelect();<br>
&gt;                 channel.exchangeDeclare(exchangeName, exchangeType);<br>
&gt;                 final Producer p = new Producer(channel, exchangeName, id,<br>
&gt;                                                 flags, producerTxSize,<br>
&gt;                                                 1000L * samplingInterval,<br>
&gt;                                                 rateLimit, minMsgSize,<br>
&gt; timeLimit,<br>
&gt;                                                 confirm);<br>
&gt;                 channel.setReturnListener(p);<br>
&gt;                 channel.setConfirmListener(p);<br>
&gt;                 Thread t = new Thread(p);<br>
&gt;                 producerThreads[i] = t;<br>
&gt;                 t.start();<br>
&gt;             }<br>
&gt;<br>
&gt;             for (int i = 0; i &lt; producerCount; i++) {<br>
&gt;                 producerThreads[i].join();<br>
&gt;                 producerConnections[i].close();<br>
&gt;             }<br>
&gt;<br>
&gt;             for (int i = 0; i &lt; consumerCount; i++) {<br>
&gt;                 consumerThreads[i].join();<br>
&gt;                 consumerConnections[i].close();<br>
&gt;             }<br>
&gt;<br>
&gt;         }<br>
&gt;         catch( ParseException exp ) {<br>
&gt;             System.err.println(&quot;Parsing failed. Reason: &quot; +<br>
&gt; exp.getMessage());<br>
&gt;             usage(options);<br>
&gt;         } catch (Exception e) {<br>
&gt;             System.err.println(&quot;Main thread caught exception: &quot; + e);<br>
&gt;             e.printStackTrace();<br>
&gt;             System.exit(1);<br>
&gt;         }<br>
&gt;     }<br>
&gt;<br>
&gt;     private static void usage(Options options) {<br>
&gt;         HelpFormatter formatter = new HelpFormatter();<br>
&gt;         formatter.printHelp(&quot;&lt;program&gt;&quot;, options);<br>
&gt;     }<br>
&gt;<br>
&gt;     private static Options getOptions() {<br>
&gt;         Options options = new Options();<br>
&gt;         options.addOption(new Option(&quot;?&quot;, &quot;help&quot;,      false,&quot;show usage&quot;));<br>
&gt;         options.addOption(new Option(&quot;h&quot;, &quot;host&quot;,      true, &quot;broker<br>
&gt; host&quot;));<br>
&gt;         options.addOption(new Option(&quot;p&quot;, &quot;port&quot;,      true, &quot;broker<br>
&gt; port&quot;));<br>
&gt;         options.addOption(new Option(&quot;t&quot;, &quot;type&quot;,      true, &quot;exchange<br>
&gt; type&quot;));<br>
&gt;         options.addOption(new Option(&quot;e&quot;, &quot;exchange&quot;,  true, &quot;exchange<br>
&gt; name&quot;));<br>
&gt;         options.addOption(new Option(&quot;i&quot;, &quot;interval&quot;,  true, &quot;sampling<br>
&gt; interval&quot;));<br>
&gt;         options.addOption(new Option(&quot;r&quot;, &quot;rate&quot;,      true, &quot;rate limit&quot;));<br>
&gt;         options.addOption(new Option(&quot;x&quot;, &quot;producers&quot;, true, &quot;producer<br>
&gt; count&quot;));<br>
&gt;         options.addOption(new Option(&quot;y&quot;, &quot;consumers&quot;, true, &quot;consumer<br>
&gt; count&quot;));<br>
&gt;         options.addOption(new Option(&quot;m&quot;, &quot;ptxsize&quot;,   true, &quot;producer tx<br>
&gt; size&quot;));<br>
&gt;         options.addOption(new Option(&quot;n&quot;, &quot;ctxsize&quot;,   true, &quot;consumer tx<br>
&gt; size&quot;));<br>
&gt;         options.addOption(new Option(&quot;c&quot;, &quot;confirm&quot;,   true, &quot;max<br>
&gt; unconfirmed publishes&quot;));<br>
&gt;         options.addOption(new Option(&quot;a&quot;, &quot;autoack&quot;,   false,&quot;auto ack&quot;));<br>
&gt;         options.addOption(new Option(&quot;q&quot;, &quot;qos&quot;,       true, &quot;qos prefetch<br>
&gt; count&quot;));<br>
&gt;         options.addOption(new Option(&quot;s&quot;, &quot;size&quot;,      true, &quot;message<br>
&gt; size&quot;));<br>
&gt;         options.addOption(new Option(&quot;z&quot;, &quot;time&quot;,      true, &quot;time limit&quot;));<br>
&gt;         Option flag =     new Option(&quot;f&quot;, &quot;flag&quot;,      true, &quot;message<br>
&gt; flag&quot;);<br>
&gt;         flag.setArgs(Option.UNLIMITED_VALUES);<br>
&gt;         options.addOption(flag);<br>
&gt;         options.addOption(new Option(&quot;M&quot;, &quot;framemax&quot;,  true, &quot;frame max&quot;));<br>
&gt;         options.addOption(new Option(&quot;b&quot;, &quot;heartbeat&quot;, true, &quot;heartbeat<br>
&gt; interval&quot;));<br>
&gt;         return options;<br>
&gt;     }<br>
&gt;<br>
&gt;     private static String strArg(CommandLine cmd, char opt, String def) {<br>
&gt;         return cmd.getOptionValue(opt, def);<br>
&gt;     }<br>
&gt;<br>
&gt;     private static int intArg(CommandLine cmd, char opt, int def) {<br>
&gt;         return Integer.parseInt(cmd.getOptionValue(opt,<br>
&gt; Integer.toString(def)));<br>
&gt;     }<br>
&gt;<br>
&gt;     private static List&lt;?&gt; lstArg(CommandLine cmd, char opt) {<br>
&gt;         String[] vals = cmd.getOptionValues(&#39;f&#39;);<br>
&gt;         if (vals == null) {<br>
&gt;             vals = new String[] {};<br>
&gt;         }<br>
&gt;         return Arrays.asList(vals);<br>
&gt;     }<br>
&gt;<br>
&gt;     public static class Producer implements Runnable, ReturnListener,<br>
&gt;                                             ConfirmListener<br>
&gt;     {<br>
&gt;         private Channel channel;<br>
&gt;         private String  exchangeName;<br>
&gt;         private String  id;<br>
&gt;         private boolean mandatory;<br>
&gt;         private boolean immediate;<br>
&gt;         private boolean persistent;<br>
&gt;         private int     txSize;<br>
&gt;         private long    interval;<br>
&gt;         private int     rateLimit;<br>
&gt;         private long    timeLimit;<br>
&gt;<br>
&gt;         private byte[]  message;<br>
&gt;<br>
&gt;         private long    startTime;<br>
&gt;         private long    lastStatsTime;<br>
&gt;         private int     msgCount;<br>
&gt;         private int     returnCount;<br>
&gt;<br>
&gt;         private long      confirm;<br>
&gt;         private Semaphore confirmPool;<br>
&gt;         private long      confirmCount;<br>
&gt;         private long      nackCount;<br>
&gt;         private volatile SortedSet&lt;Long&gt; unconfirmedSet =<br>
&gt;             Collections.synchronizedSortedSet(new TreeSet&lt;Long&gt;());<br>
&gt;<br>
&gt;         public Producer(Channel channel, String exchangeName, String id,<br>
&gt;                         List&lt;?&gt; flags, int txSize,<br>
&gt;                         long interval, int rateLimit, int minMsgSize, int<br>
&gt; timeLimit,<br>
&gt;                         long confirm)<br>
&gt;             throws IOException {<br>
&gt;<br>
&gt;             this.channel      = channel;<br>
&gt;             this.exchangeName = exchangeName;<br>
&gt;             <a href="http://this.id" target="_blank">this.id</a>           = id;<br>
&gt;             this.mandatory    = flags.contains(&quot;mandatory&quot;);<br>
&gt;             this.immediate    = flags.contains(&quot;immediate&quot;);<br>
&gt;             this.persistent   = flags.contains(&quot;persistent&quot;);<br>
&gt;             this.txSize       = txSize;<br>
&gt;             this.interval     = interval;<br>
&gt;             this.rateLimit    = rateLimit;<br>
&gt;             this.timeLimit    = 1000L * timeLimit;<br>
&gt;             this.message      = new byte[minMsgSize];<br>
&gt;             this.confirm      = confirm;<br>
&gt;             if (confirm &gt; 0) {<br>
&gt;                 this.confirmPool  = new Semaphore((int)confirm);<br>
&gt;             }<br>
&gt;         }<br>
&gt;<br>
&gt;         public synchronized void handleReturn(int replyCode,<br>
&gt;                                               String replyText,<br>
&gt;                                               String exchange,<br>
&gt;                                               String routingKey,<br>
&gt;                                               AMQP.BasicProperties<br>
&gt; properties,<br>
&gt;                                               byte[] body)<br>
&gt;             throws IOException {<br>
&gt;             returnCount++;<br>
&gt;         }<br>
&gt;<br>
&gt;         public void handleAck(long seqNo, boolean multiple) {<br>
&gt;             handleAckNack(seqNo, multiple, false);<br>
&gt;         }<br>
&gt;<br>
&gt;         public void handleNack(long seqNo, boolean multiple) {<br>
&gt;             handleAckNack(seqNo, multiple, true);<br>
&gt;         }<br>
&gt;<br>
&gt;         private void handleAckNack(long seqNo, boolean multiple,<br>
&gt;                                    boolean nack) {<br>
&gt;             int numConfirms = 0;<br>
&gt;             if (multiple) {<br>
&gt;                 SortedSet&lt;Long&gt; confirmed = unconfirmedSet.headSet(seqNo +<br>
&gt; 1);<br>
&gt;                 numConfirms += confirmed.size();<br>
&gt;                 confirmed.clear();<br>
&gt;             } else {<br>
&gt;                 unconfirmedSet.remove(seqNo);<br>
&gt;                 numConfirms = 1;<br>
&gt;             }<br>
&gt;             synchronized (this) {<br>
&gt;                 if (nack) {<br>
&gt;                     nackCount += numConfirms;<br>
&gt;                 } else {<br>
&gt;                     confirmCount += numConfirms;<br>
&gt;                 }<br>
&gt;             }<br>
&gt;<br>
&gt;             if (confirmPool != null) {<br>
&gt;                 for (int i = 0; i &lt; numConfirms; ++i) {<br>
&gt;                     confirmPool.release();<br>
&gt;                 }<br>
&gt;             }<br>
&gt;<br>
&gt;         }<br>
&gt;<br>
&gt;         public void run() {<br>
&gt;<br>
&gt;             long now;<br>
&gt;             now = startTime = lastStatsTime = System.currentTimeMillis();<br>
&gt;             msgCount = 0;<br>
&gt;             int totalMsgCount = 0;<br>
&gt;<br>
&gt;             try {<br>
&gt;<br>
&gt;                 while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>
&gt;                     if (confirmPool != null) {<br>
&gt;                         confirmPool.acquire();<br>
&gt;                     }<br>
&gt;                     delay(now);<br>
&gt;                     publish(createMessage(totalMsgCount));<br>
&gt;                     totalMsgCount++;<br>
&gt;                     msgCount++;<br>
&gt;<br>
&gt;                     if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>
&gt;                         channel.txCommit();<br>
&gt;                     }<br>
&gt;                     now = System.currentTimeMillis();<br>
&gt;                 }<br>
&gt;<br>
&gt;             } catch (IOException e) {<br>
&gt;                 throw new RuntimeException(e);<br>
&gt;             } catch (InterruptedException e) {<br>
&gt;                 throw new RuntimeException (e);<br>
&gt;             }<br>
&gt;<br>
&gt;             System.out.println(&quot;sending rate avg: &quot; +<br>
&gt;                                (totalMsgCount * 1000L / (now - startTime)) +<br>
&gt;                                &quot; msg/s&quot;);<br>
&gt;<br>
&gt;         }<br>
&gt;<br>
&gt;         private void publish(byte[] msg)<br>
&gt;             throws IOException {<br>
&gt;<br>
&gt;             unconfirmedSet.add(channel.getNextPublishSeqNo());<br>
&gt;             channel.basicPublish(exchangeName, id,<br>
&gt;                                  mandatory, immediate,<br>
&gt;                                  persistent ?<br>
&gt; MessageProperties.MINIMAL_PERSISTENT_BASIC :<br>
&gt; MessageProperties.MINIMAL_BASIC,<br>
&gt;                                  msg);<br>
&gt;         }<br>
&gt;<br>
&gt;         private void delay(long now)<br>
&gt;             throws InterruptedException {<br>
&gt;<br>
&gt;             long elapsed = now - lastStatsTime;<br>
&gt;             //example: rateLimit is 5000 msg/s,<br>
&gt;             //10 ms have elapsed, we have sent 200 messages<br>
&gt;             //the 200 msgs we have actually sent should have taken us<br>
&gt;             //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br>
&gt;             long pause = rateLimit == 0 ?<br>
&gt;                 0 : (msgCount * 1000L / rateLimit - elapsed);<br>
&gt;             if (pause &gt; 0) {<br>
&gt;                 Thread.sleep(pause);<br>
&gt;             }<br>
&gt;             if (elapsed &gt; interval) {<br>
&gt;                 long sendRate, returnRate, confirmRate, nackRate;<br>
&gt;                 synchronized(this) {<br>
&gt;                     sendRate     = msgCount     * 1000L / elapsed;<br>
&gt;                     returnRate   = returnCount  * 1000L / elapsed;<br>
&gt;                     confirmRate  = confirmCount * 1000L / elapsed;<br>
&gt;                     nackRate     = nackCount    * 1000L / elapsed;<br>
&gt;                     msgCount     = 0;<br>
&gt;                     returnCount  = 0;<br>
&gt;                     confirmCount = 0;<br>
&gt;                     nackCount    = 0;<br>
&gt;                 }<br>
&gt;                 System.out.print(&quot;sending rate: &quot; + sendRate + &quot; msg/s&quot;);<br>
&gt;                 if (mandatory || immediate) {<br>
&gt;                     System.out.print(&quot;, returns: &quot; + returnRate + &quot; ret/s&quot;);<br>
&gt;                 }<br>
&gt;                 if (confirm &gt;= 0) {<br>
&gt;                     System.out.print(&quot;, confirms: &quot; + confirmRate + &quot; c/s&quot;);<br>
&gt;                     if (nackRate &gt; 0) {<br>
&gt;                         System.out.print(&quot;, nacks: &quot; + nackRate + &quot; n/s&quot;);<br>
&gt;                     }<br>
&gt;                 }<br>
&gt;                 System.out.println();<br>
&gt;                 lastStatsTime = now;<br>
&gt;             }<br>
&gt;         }<br>
&gt;<br>
&gt;         private byte[] createMessage(int sequenceNumber)<br>
&gt;             throws IOException {<br>
&gt;<br>
&gt;             ByteArrayOutputStream acc = new ByteArrayOutputStream();<br>
&gt;             DataOutputStream d = new DataOutputStream(acc);<br>
&gt;             long nano = System.nanoTime();<br>
&gt;             d.writeInt(sequenceNumber);<br>
&gt;             d.writeLong(nano);<br>
&gt;             d.flush();<br>
&gt;             acc.flush();<br>
&gt;             byte[] m = acc.toByteArray();<br>
&gt;             if (m.length &lt;= message.length) {<br>
&gt;                 System.arraycopy(m, 0, message, 0, m.length);<br>
&gt;                 return message;<br>
&gt;             } else {<br>
&gt;                 return m;<br>
&gt;             }<br>
&gt;         }<br>
&gt;<br>
&gt;     }<br>
&gt;<br>
&gt;     public static class Consumer implements Runnable {<br>
&gt;<br>
&gt;         private QueueingConsumer q;<br>
&gt;         private String           id;<br>
&gt;         private int              txSize;<br>
&gt;         private boolean          autoAck;<br>
&gt;         private Stats            stats;<br>
&gt;         private long             timeLimit;<br>
&gt;<br>
&gt;         public Consumer(QueueingConsumer q, String id,<br>
&gt;                         int txSize, boolean autoAck,<br>
&gt;                         Stats stats, int timeLimit) {<br>
&gt;<br>
&gt;             this.q         = q;<br>
&gt;             <a href="http://this.id" target="_blank">this.id</a>        = id;<br>
&gt;             this.txSize    = txSize;<br>
&gt;             this.autoAck   = autoAck;<br>
&gt;             this.stats     = stats;<br>
&gt;             this.timeLimit = 1000L * timeLimit;<br>
&gt;         }<br>
&gt;<br>
&gt;         public void run() {<br>
&gt;<br>
&gt;             long now;<br>
&gt;             long startTime;<br>
&gt;             startTime = now = System.currentTimeMillis();<br>
&gt;             int totalMsgCount = 0;<br>
&gt;<br>
&gt;             Channel channel = q.getChannel();<br>
&gt;<br>
&gt;             try {<br>
&gt;<br>
&gt;                 while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>
&gt;                     Delivery delivery;<br>
&gt;                     if (timeLimit == 0) {<br>
&gt;                         delivery = q.nextDelivery();<br>
&gt;                     } else {<br>
&gt;                         delivery = q.nextDelivery(startTime + timeLimit -<br>
&gt; now);<br>
&gt;                         if (delivery == null) break;<br>
&gt;                     }<br>
&gt;             totalMsgCount++;<br>
&gt;<br>
&gt;                     DataInputStream d = new DataInputStream(new<br>
&gt; ByteArrayInputStream(delivery.getBody()));<br>
&gt;                     d.readInt();<br>
&gt;                     long msgNano = d.readLong();<br>
&gt;                     long nano = System.nanoTime();<br>
&gt;<br>
&gt;                     Envelope envelope = delivery.getEnvelope();<br>
&gt;<br>
&gt;                     if (!autoAck) {<br>
&gt;                         channel.basicAck(envelope.getDeliveryTag(), false);<br>
&gt;                     }<br>
&gt;<br>
&gt;                     if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>
&gt;                         channel.txCommit();<br>
&gt;                     }<br>
&gt;<br>
&gt;                     now = System.currentTimeMillis();<br>
&gt;<br>
&gt;                     stats.collectStats(now,<br>
&gt; id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
&gt;                 }<br>
&gt;<br>
&gt;             } catch (IOException e) {<br>
&gt;                 throw new RuntimeException(e);<br>
&gt;             } catch (InterruptedException e) {<br>
&gt;                 throw new RuntimeException (e);<br>
&gt;             } catch (ShutdownSignalException e) {<br>
&gt;                 throw new RuntimeException(e);<br>
&gt;             }<br>
&gt;<br>
&gt;             long elapsed = now - startTime;<br>
&gt;             if (elapsed &gt; 0) {<br>
&gt;                 System.out.println(&quot;recving rate avg: &quot; +<br>
&gt;                                    (totalMsgCount * 1000L / elapsed) +<br>
&gt;                                    &quot; msg/s&quot;);<br>
&gt;             }<br>
&gt;         }<br>
&gt;<br>
&gt;     }<br>
&gt;<br>
&gt;     public static class Stats {<br>
&gt;<br>
&gt;         private long    interval;<br>
&gt;<br>
&gt;         private long    lastStatsTime;<br>
&gt;         private int     msgCount;<br>
&gt;         private int     latencyCount;<br>
&gt;         private long    minLatency;<br>
&gt;         private long    maxLatency;<br>
&gt;         private long    cumulativeLatency;<br>
&gt;<br>
&gt;         public Stats(long interval) {<br>
&gt;             this.interval = interval;<br>
&gt;             reset(System.currentTimeMillis());<br>
&gt;         }<br>
&gt;<br>
&gt;         private void reset(long t) {<br>
&gt;             lastStatsTime     = t;<br>
&gt;             msgCount          = 0;<br>
&gt;             latencyCount      = 0;<br>
&gt;             minLatency        = Long.MAX_VALUE;<br>
&gt;             maxLatency        = Long.MIN_VALUE;<br>
&gt;             cumulativeLatency = 0L;<br>
&gt;         }<br>
&gt;<br>
&gt;         public synchronized void collectStats(long now, long latency) {<br>
&gt;             msgCount++;<br>
&gt;<br>
&gt;             if (latency &gt; 0) {<br>
&gt;                 minLatency = Math.min(minLatency, latency);<br>
&gt;                 maxLatency = Math.max(maxLatency, latency);<br>
&gt;                 cumulativeLatency += latency;<br>
&gt;                 latencyCount++;<br>
&gt;             }<br>
&gt;<br>
&gt;             long elapsed = now - lastStatsTime;<br>
&gt;             if (elapsed &gt; interval) {<br>
&gt;                 System.out.println(&quot;recving rate: &quot; +<br>
&gt;                                    (1000L * msgCount / elapsed) +<br>
&gt;                                    &quot; msg/s&quot; +<br>
&gt;                                    (latencyCount &gt; 0 ?<br>
&gt;                                     &quot;, min/avg/max latency: &quot; +<br>
&gt;                                     minLatency/1000L + &quot;/&quot; +<br>
&gt;                                     cumulativeLatency / (1000L *<br>
&gt; latencyCount) + &quot;/&quot; +<br>
&gt;                                     maxLatency/1000L + &quot; microseconds&quot; :<br>
&gt;                                     &quot;&quot;));<br>
&gt;                 reset(now);<br>
&gt;             }<br>
&gt;<br>
&gt;         }<br>
&gt;<br>
&gt;     }<br>
&gt;<br>
&gt; }<br>
&gt;<br>
&gt;<br>
&gt; On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt; wrote:<br>
&gt;&gt;<br>
&gt;&gt; I am sorry but this link doesn&#39;t have any source code - this is the<br>
&gt;&gt; library.<br>
&gt;&gt; I meant *your* source code, the code of your program; the code which<br>
&gt;&gt; exercises the library and gets 5000 msg/s.<br>
&gt;&gt;<br>
&gt;&gt; P.S. Please use &quot;Reply all&quot;, this discussion is most probably<br>
&gt;&gt; interesting for many people in the community.<br>
&gt;&gt;<br>
&gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt; I am following this link for my java code:<br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt; <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>

&gt;&gt; &gt;<br>
&gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; Please show the source code of all participating components. It&#39;s<br>
&gt;&gt; &gt;&gt; impossible to say anything definite without it.<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; Till now i am getting the result of 5000 msg/sec in java.<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt; &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; I guess you should get the speed you want, and much more, if you<br>
&gt;&gt; &gt;&gt; &gt;&gt; just<br>
&gt;&gt; &gt;&gt; &gt;&gt; set up a high enough value for prefetch (see basic.qos) and set<br>
&gt;&gt; &gt;&gt; &gt;&gt; autoack.<br>
&gt;&gt; &gt;&gt; &gt;&gt; I got ~8k msg/s with much larger messages and persistence turned on<br>
&gt;&gt; &gt;&gt; &gt;&gt; too.<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Message size is 199 bytes<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Messaging scenario is Persistence<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Hi.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; What&#39;s the size of messages?<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; What&#39;s your messaging scenario in general? Persistence, prefetch,<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; transactions?<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; I<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; have<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; tried a lot in java as well as in ruby. And right now<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; implementing<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; node.js .<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; I want to prefer java because after lots of experimentation the<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; only<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; java has given me good result but still goal is not achieved.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; So i want help to know how to reach upto 15000 msg/sec in<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; RabbitMQ<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; using java. If any good suggestion  is there please tell me or<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; if<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; any<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; site to prefer then please tell me.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; Thanks.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; _______________________________________________<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; rabbitmq-discuss mailing list<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; --<br>
&gt;&gt; Eugene Kirpichov<br>
&gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;<br>
&gt;<br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br></div>