<br>Thanks for reply Eugene Kirpichov�, <br>One more help, Can you suggest me any reference site from where i can follow and code for my goal?<div><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:57 PM, Eugene Kirpichov <span dir="ltr">&lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;</span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">Is this the minimal piece of code that demonstrates poor performance?<br>
It&#39;s hard to decipher that much code and it can have many kinds of<br>
problems inside; I think you should be able to trim it down to a<br>
couple dozen lines demonstrating the same speed; then diagnosing will<br>
become a lot easier.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt; This is my code.<br>
&gt;<br>
&gt;<br>
&gt; //� The contents of this file are subject to the Mozilla Public License<br>
&gt; //� Version 1.1 (the &quot;License&quot;); you may not use this file except in<br>
&gt; //� compliance with the License. You may obtain a copy of the License<br>
&gt; //� at <a href="http://www.mozilla.org/MPL/" target="_blank">http://www.mozilla.org/MPL/</a><br>
&gt; //<br>
&gt; //� Software distributed under the License is distributed on an &quot;AS IS&quot;<br>
&gt; //� basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
&gt; //� the License for the specific language governing rights and<br>
&gt; //� limitations under the License.<br>
&gt; //<br>
&gt; //� The Original Code is RabbitMQ.<br>
&gt; //<br>
&gt; //� The Initial Developer of the Original Code is VMware, Inc.<br>
&gt; //� Copyright (c) 2007-2011 VMware, Inc.� All rights reserved.<br>
&gt; //<br>
&gt;<br>
&gt;<br>
&gt; package com.rabbitmq.examples;<br>
&gt;<br>
&gt; import java.io.ByteArrayInputStream;<br>
&gt; import java.io.ByteArrayOutputStream;<br>
&gt; import java.io.DataInputStream;<br>
&gt; import java.io.DataOutputStream;<br>
&gt; import java.io.IOException;<br>
&gt; import java.util.Arrays;<br>
&gt; import java.util.Collections;<br>
&gt; import java.util.List;<br>
&gt; import java.util.SortedSet;<br>
&gt; import java.util.TreeSet;<br>
&gt; import java.util.UUID;<br>
&gt; import java.util.concurrent.Semaphore;<br>
&gt;<br>
&gt; import org.apache.commons.cli.CommandLine;<br>
&gt; import org.apache.commons.cli.CommandLineParser;<br>
&gt; import org.apache.commons.cli.GnuParser;<br>
&gt; import org.apache.commons.cli.HelpFormatter;<br>
&gt; import org.apache.commons.cli.Option;<br>
&gt; import org.apache.commons.cli.Options;<br>
&gt; import org.apache.commons.cli.ParseException;<br>
&gt;<br>
&gt; import com.rabbitmq.client.AMQP;<br>
&gt; import com.rabbitmq.client.Channel;<br>
&gt; import com.rabbitmq.client.ConfirmListener;<br>
&gt; import com.rabbitmq.client.Connection;<br>
&gt; import com.rabbitmq.client.ConnectionFactory;<br>
&gt; import com.rabbitmq.client.Envelope;<br>
&gt; import com.rabbitmq.client.MessageProperties;<br>
&gt; import com.rabbitmq.client.QueueingConsumer;<br>
&gt; import com.rabbitmq.client.QueueingConsumer.Delivery;<br>
&gt; import com.rabbitmq.client.ReturnListener;<br>
&gt; import com.rabbitmq.client.ShutdownSignalException;<br>
&gt;<br>
&gt;<br>
&gt; public class MulticastMain {<br>
&gt;<br>
&gt; ��� public static void main(String[] args) {<br>
&gt; ������� Options options = getOptions();<br>
&gt; ������� CommandLineParser parser = new GnuParser();<br>
&gt; ������� try {<br>
&gt; ����������� CommandLine cmd = parser.parse(options, args);<br>
&gt;<br>
&gt; ����������� if (cmd.hasOption(&#39;?&#39;)) {<br>
&gt; ��������������� usage(options);<br>
&gt; ��������������� System.exit(0);<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� String hostName����� = strArg(cmd, &#39;h&#39;, &quot;localhost&quot;);<br>
&gt; ����������� int portNumber������ = intArg(cmd, &#39;p&#39;, AMQP.PROTOCOL.PORT);<br>
&gt; ����������� String exchangeType� = strArg(cmd, &#39;t&#39;, &quot;direct&quot;);<br>
&gt; ����������� String exchangeName� = strArg(cmd, &#39;e&#39;, exchangeType);<br>
&gt; ����������� int samplingInterval = intArg(cmd, &#39;i&#39;, 1);<br>
&gt; ����������� int rateLimit������� = intArg(cmd, &#39;r&#39;, 0);<br>
&gt; ����������� int producerCount��� = intArg(cmd, &#39;x&#39;, 1);<br>
&gt; ����������� int consumerCount��� = intArg(cmd, &#39;y&#39;, 1);<br>
&gt; ����������� int producerTxSize�� = intArg(cmd, &#39;m&#39;, 0);<br>
&gt; ����������� int consumerTxSize�� = intArg(cmd, &#39;n&#39;, 0);<br>
&gt; ����������� long confirm�������� = intArg(cmd, &#39;c&#39;, -1);<br>
&gt; ����������� boolean autoAck����� = cmd.hasOption(&#39;a&#39;);<br>
&gt; ����������� int prefetchCount��� = intArg(cmd, &#39;q&#39;, 0);<br>
&gt; ����������� int minMsgSize������ = intArg(cmd, &#39;s&#39;, 0);<br>
&gt; ����������� int timeLimit������� = intArg(cmd, &#39;z&#39;, 0);<br>
&gt; ����������� List&lt;?&gt; flags������� = lstArg(cmd, &#39;f&#39;);<br>
&gt; ����������� int frameMax�������� = intArg(cmd, &#39;M&#39;, 0);<br>
&gt; ����������� int heartbeat������� = intArg(cmd, &#39;b&#39;, 0);<br>
&gt;<br>
&gt; ����������� if ((producerTxSize &gt; 0) &amp;&amp; confirm &gt;= 0) {<br>
&gt; ��������������� throw new ParseException(&quot;Cannot select both<br>
&gt; producerTxSize&quot;+<br>
&gt; ���������������������������������������� &quot; and confirm&quot;);<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� //setup<br>
&gt; ����������� String id = UUID.randomUUID().toString();<br>
&gt; ����������� Stats stats = new Stats(1000L * samplingInterval);<br>
&gt; ����������� ConnectionFactory factory = new ConnectionFactory();<br>
&gt; ����������� factory.setHost(hostName);<br>
&gt; ����������� factory.setPort(portNumber);<br>
&gt; ����������� factory.setRequestedFrameMax(frameMax);<br>
&gt; ����������� factory.setRequestedHeartbeat(heartbeat);<br>
&gt;<br>
&gt; ����������� Thread[] consumerThreads = new Thread[consumerCount];<br>
&gt; ����������� Connection[] consumerConnections = new<br>
&gt; Connection[consumerCount];<br>
&gt; ����������� for (int i = 0; i &lt; consumerCount; i++) {<br>
&gt; ��������������� System.out.println(&quot;starting consumer #&quot; + i);<br>
&gt; ��������������� Connection conn = factory.newConnection();<br>
&gt; ��������������� consumerConnections[i] = conn;<br>
&gt; ��������������� Channel channel = conn.createChannel();<br>
&gt; ��������������� if (consumerTxSize &gt; 0) channel.txSelect();<br>
&gt; ��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>
&gt; ��������������� String queueName =<br>
&gt; ����������������������� channel.queueDeclare(&quot;&quot;,<br>
&gt; flags.contains(&quot;persistent&quot;),<br>
&gt; �������������������������������������������� true, false, null).getQueue();<br>
&gt; ��������������� QueueingConsumer consumer = new QueueingConsumer(channel);<br>
&gt; ��������������� if (prefetchCount &gt; 0) channel.basicQos(prefetchCount);<br>
&gt; ��������������� channel.basicConsume(queueName, autoAck, consumer);<br>
&gt; ��������������� channel.queueBind(queueName, exchangeName, id);<br>
&gt; ��������������� Thread t =<br>
&gt; ������������������� new Thread(new Consumer(consumer, id,<br>
&gt; ������������������������������������������� consumerTxSize, autoAck,<br>
&gt; ������������������������������������������� stats, timeLimit));<br>
&gt; ��������������� consumerThreads[i] = t;<br>
&gt; ��������������� t.start();<br>
&gt; ����������� }<br>
&gt; ����������� Thread[] producerThreads = new Thread[producerCount];<br>
&gt; ����������� Connection[] producerConnections = new<br>
&gt; Connection[producerCount];<br>
&gt; ����������� for (int i = 0; i &lt; producerCount; i++) {<br>
&gt; ��������������� System.out.println(&quot;starting producer #&quot; + i);<br>
&gt; ��������������� Connection conn = factory.newConnection();<br>
&gt; ��������������� producerConnections[i] = conn;<br>
&gt; ��������������� Channel channel = conn.createChannel();<br>
&gt; ��������������� if (producerTxSize &gt; 0) channel.txSelect();<br>
&gt; ��������������� if (confirm &gt;= 0) channel.confirmSelect();<br>
&gt; ��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>
&gt; ��������������� final Producer p = new Producer(channel, exchangeName, id,<br>
&gt; ����������������������������������������������� flags, producerTxSize,<br>
&gt; ����������������������������������������������� 1000L * samplingInterval,<br>
&gt; ����������������������������������������������� rateLimit, minMsgSize,<br>
&gt; timeLimit,<br>
&gt; ����������������������������������������������� confirm);<br>
&gt; ��������������� channel.setReturnListener(p);<br>
&gt; ��������������� channel.setConfirmListener(p);<br>
&gt; ��������������� Thread t = new Thread(p);<br>
&gt; ��������������� producerThreads[i] = t;<br>
&gt; ��������������� t.start();<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� for (int i = 0; i &lt; producerCount; i++) {<br>
&gt; ��������������� producerThreads[i].join();<br>
&gt; ��������������� producerConnections[i].close();<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� for (int i = 0; i &lt; consumerCount; i++) {<br>
&gt; ��������������� consumerThreads[i].join();<br>
&gt; ��������������� consumerConnections[i].close();<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ������� }<br>
&gt; ������� catch( ParseException exp ) {<br>
&gt; ����������� System.err.println(&quot;Parsing failed. Reason: &quot; +<br>
&gt; exp.getMessage());<br>
&gt; ����������� usage(options);<br>
&gt; ������� } catch (Exception e) {<br>
&gt; ����������� System.err.println(&quot;Main thread caught exception: &quot; + e);<br>
&gt; ����������� e.printStackTrace();<br>
&gt; ����������� System.exit(1);<br>
&gt; ������� }<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� private static void usage(Options options) {<br>
&gt; ������� HelpFormatter formatter = new HelpFormatter();<br>
&gt; ������� formatter.printHelp(&quot;&lt;program&gt;&quot;, options);<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� private static Options getOptions() {<br>
&gt; ������� Options options = new Options();<br>
&gt; ������� options.addOption(new Option(&quot;?&quot;, &quot;help&quot;,����� false,&quot;show usage&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;h&quot;, &quot;host&quot;,����� true, &quot;broker<br>
&gt; host&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;p&quot;, &quot;port&quot;,����� true, &quot;broker<br>
&gt; port&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;t&quot;, &quot;type&quot;,����� true, &quot;exchange<br>
&gt; type&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;e&quot;, &quot;exchange&quot;,� true, &quot;exchange<br>
&gt; name&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;i&quot;, &quot;interval&quot;,� true, &quot;sampling<br>
&gt; interval&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;r&quot;, &quot;rate&quot;,����� true, &quot;rate limit&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;x&quot;, &quot;producers&quot;, true, &quot;producer<br>
&gt; count&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;y&quot;, &quot;consumers&quot;, true, &quot;consumer<br>
&gt; count&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;m&quot;, &quot;ptxsize&quot;,�� true, &quot;producer tx<br>
&gt; size&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;n&quot;, &quot;ctxsize&quot;,�� true, &quot;consumer tx<br>
&gt; size&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;c&quot;, &quot;confirm&quot;,�� true, &quot;max<br>
&gt; unconfirmed publishes&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;a&quot;, &quot;autoack&quot;,�� false,&quot;auto ack&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;q&quot;, &quot;qos&quot;,������ true, &quot;qos prefetch<br>
&gt; count&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;s&quot;, &quot;size&quot;,����� true, &quot;message<br>
&gt; size&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;z&quot;, &quot;time&quot;,����� true, &quot;time limit&quot;));<br>
&gt; ������� Option flag =���� new Option(&quot;f&quot;, &quot;flag&quot;,����� true, &quot;message<br>
&gt; flag&quot;);<br>
&gt; ������� flag.setArgs(Option.UNLIMITED_VALUES);<br>
&gt; ������� options.addOption(flag);<br>
&gt; ������� options.addOption(new Option(&quot;M&quot;, &quot;framemax&quot;,� true, &quot;frame max&quot;));<br>
&gt; ������� options.addOption(new Option(&quot;b&quot;, &quot;heartbeat&quot;, true, &quot;heartbeat<br>
&gt; interval&quot;));<br>
&gt; ������� return options;<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� private static String strArg(CommandLine cmd, char opt, String def) {<br>
&gt; ������� return cmd.getOptionValue(opt, def);<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� private static int intArg(CommandLine cmd, char opt, int def) {<br>
&gt; ������� return Integer.parseInt(cmd.getOptionValue(opt,<br>
&gt; Integer.toString(def)));<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� private static List&lt;?&gt; lstArg(CommandLine cmd, char opt) {<br>
&gt; ������� String[] vals = cmd.getOptionValues(&#39;f&#39;);<br>
&gt; ������� if (vals == null) {<br>
&gt; ����������� vals = new String[] {};<br>
&gt; ������� }<br>
&gt; ������� return Arrays.asList(vals);<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� public static class Producer implements Runnable, ReturnListener,<br>
&gt; ������������������������������������������� ConfirmListener<br>
&gt; ��� {<br>
&gt; ������� private Channel channel;<br>
&gt; ������� private String� exchangeName;<br>
&gt; ������� private String� id;<br>
&gt; ������� private boolean mandatory;<br>
&gt; ������� private boolean immediate;<br>
&gt; ������� private boolean persistent;<br>
&gt; ������� private int���� txSize;<br>
&gt; ������� private long��� interval;<br>
&gt; ������� private int���� rateLimit;<br>
&gt; ������� private long��� timeLimit;<br>
&gt;<br>
&gt; ������� private byte[]� message;<br>
&gt;<br>
&gt; ������� private long��� startTime;<br>
&gt; ������� private long��� lastStatsTime;<br>
&gt; ������� private int���� msgCount;<br>
&gt; ������� private int���� returnCount;<br>
&gt;<br>
&gt; ������� private long����� confirm;<br>
&gt; ������� private Semaphore confirmPool;<br>
&gt; ������� private long����� confirmCount;<br>
&gt; ������� private long����� nackCount;<br>
&gt; ������� private volatile SortedSet&lt;Long&gt; unconfirmedSet =<br>
&gt; ����������� Collections.synchronizedSortedSet(new TreeSet&lt;Long&gt;());<br>
&gt;<br>
&gt; ������� public Producer(Channel channel, String exchangeName, String id,<br>
&gt; ����������������������� List&lt;?&gt; flags, int txSize,<br>
&gt; ����������������������� long interval, int rateLimit, int minMsgSize, int<br>
&gt; timeLimit,<br>
&gt; ����������������������� long confirm)<br>
&gt; ����������� throws IOException {<br>
&gt;<br>
&gt; ����������� this.channel����� = channel;<br>
&gt; ����������� this.exchangeName = exchangeName;<br>
&gt; ����������� <a href="http://this.id" target="_blank">this.id</a>���������� = id;<br>
&gt; ����������� this.mandatory��� = flags.contains(&quot;mandatory&quot;);<br>
&gt; ����������� this.immediate��� = flags.contains(&quot;immediate&quot;);<br>
&gt; ����������� this.persistent�� = flags.contains(&quot;persistent&quot;);<br>
&gt; ����������� this.txSize������ = txSize;<br>
&gt; ����������� this.interval���� = interval;<br>
&gt; ����������� this.rateLimit��� = rateLimit;<br>
&gt; ����������� this.timeLimit��� = 1000L * timeLimit;<br>
&gt; ����������� this.message����� = new byte[minMsgSize];<br>
&gt; ����������� this.confirm����� = confirm;<br>
&gt; ����������� if (confirm &gt; 0) {<br>
&gt; ��������������� this.confirmPool� = new Semaphore((int)confirm);<br>
&gt; ����������� }<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� public synchronized void handleReturn(int replyCode,<br>
&gt; ��������������������������������������������� String replyText,<br>
&gt; ��������������������������������������������� String exchange,<br>
&gt; ��������������������������������������������� String routingKey,<br>
&gt; ��������������������������������������������� AMQP.BasicProperties<br>
&gt; properties,<br>
&gt; ��������������������������������������������� byte[] body)<br>
&gt; ����������� throws IOException {<br>
&gt; ����������� returnCount++;<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� public void handleAck(long seqNo, boolean multiple) {<br>
&gt; ����������� handleAckNack(seqNo, multiple, false);<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� public void handleNack(long seqNo, boolean multiple) {<br>
&gt; ����������� handleAckNack(seqNo, multiple, true);<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� private void handleAckNack(long seqNo, boolean multiple,<br>
&gt; ���������������������������������� boolean nack) {<br>
&gt; ����������� int numConfirms = 0;<br>
&gt; ����������� if (multiple) {<br>
&gt; ��������������� SortedSet&lt;Long&gt; confirmed = unconfirmedSet.headSet(seqNo +<br>
&gt; 1);<br>
&gt; ��������������� numConfirms += confirmed.size();<br>
&gt; ��������������� confirmed.clear();<br>
&gt; ����������� } else {<br>
&gt; ��������������� unconfirmedSet.remove(seqNo);<br>
&gt; ��������������� numConfirms = 1;<br>
&gt; ����������� }<br>
&gt; ����������� synchronized (this) {<br>
&gt; ��������������� if (nack) {<br>
&gt; ������������������� nackCount += numConfirms;<br>
&gt; ��������������� } else {<br>
&gt; ������������������� confirmCount += numConfirms;<br>
&gt; ��������������� }<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� if (confirmPool != null) {<br>
&gt; ��������������� for (int i = 0; i &lt; numConfirms; ++i) {<br>
&gt; ������������������� confirmPool.release();<br>
&gt; ��������������� }<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� public void run() {<br>
&gt;<br>
&gt; ����������� long now;<br>
&gt; ����������� now = startTime = lastStatsTime = System.currentTimeMillis();<br>
&gt; ����������� msgCount = 0;<br>
&gt; ����������� int totalMsgCount = 0;<br>
&gt;<br>
&gt; ����������� try {<br>
&gt;<br>
&gt; ��������������� while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>
&gt; ������������������� if (confirmPool != null) {<br>
&gt; ����������������������� confirmPool.acquire();<br>
&gt; ������������������� }<br>
&gt; ������������������� delay(now);<br>
&gt; ������������������� publish(createMessage(totalMsgCount));<br>
&gt; ������������������� totalMsgCount++;<br>
&gt; ������������������� msgCount++;<br>
&gt;<br>
&gt; ������������������� if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>
&gt; ����������������������� channel.txCommit();<br>
&gt; ������������������� }<br>
&gt; ������������������� now = System.currentTimeMillis();<br>
&gt; ��������������� }<br>
&gt;<br>
&gt; ����������� } catch (IOException e) {<br>
&gt; ��������������� throw new RuntimeException(e);<br>
&gt; ����������� } catch (InterruptedException e) {<br>
&gt; ��������������� throw new RuntimeException (e);<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� System.out.println(&quot;sending rate avg: &quot; +<br>
&gt; ������������������������������ (totalMsgCount * 1000L / (now - startTime)) +<br>
&gt; ������������������������������ &quot; msg/s&quot;);<br>
&gt;<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� private void publish(byte[] msg)<br>
&gt; ����������� throws IOException {<br>
&gt;<br>
&gt; ����������� unconfirmedSet.add(channel.getNextPublishSeqNo());<br>
&gt; ����������� channel.basicPublish(exchangeName, id,<br>
&gt; �������������������������������� mandatory, immediate,<br>
&gt; �������������������������������� persistent ?<br>
&gt; MessageProperties.MINIMAL_PERSISTENT_BASIC :<br>
&gt; MessageProperties.MINIMAL_BASIC,<br>
&gt; �������������������������������� msg);<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� private void delay(long now)<br>
&gt; ����������� throws InterruptedException {<br>
&gt;<br>
&gt; ����������� long elapsed = now - lastStatsTime;<br>
&gt; ����������� //example: rateLimit is 5000 msg/s,<br>
&gt; ����������� //10 ms have elapsed, we have sent 200 messages<br>
&gt; ����������� //the 200 msgs we have actually sent should have taken us<br>
&gt; ����������� //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br>
&gt; ����������� long pause = rateLimit == 0 ?<br>
&gt; ��������������� 0 : (msgCount * 1000L / rateLimit - elapsed);<br>
&gt; ����������� if (pause &gt; 0) {<br>
&gt; ��������������� Thread.sleep(pause);<br>
&gt; ����������� }<br>
&gt; ����������� if (elapsed &gt; interval) {<br>
&gt; ��������������� long sendRate, returnRate, confirmRate, nackRate;<br>
&gt; ��������������� synchronized(this) {<br>
&gt; ������������������� sendRate���� = msgCount���� * 1000L / elapsed;<br>
&gt; ������������������� returnRate�� = returnCount� * 1000L / elapsed;<br>
&gt; ������������������� confirmRate� = confirmCount * 1000L / elapsed;<br>
&gt; ������������������� nackRate���� = nackCount��� * 1000L / elapsed;<br>
&gt; ������������������� msgCount���� = 0;<br>
&gt; ������������������� returnCount� = 0;<br>
&gt; ������������������� confirmCount = 0;<br>
&gt; ������������������� nackCount��� = 0;<br>
&gt; ��������������� }<br>
&gt; ��������������� System.out.print(&quot;sending rate: &quot; + sendRate + &quot; msg/s&quot;);<br>
&gt; ��������������� if (mandatory || immediate) {<br>
&gt; ������������������� System.out.print(&quot;, returns: &quot; + returnRate + &quot; ret/s&quot;);<br>
&gt; ��������������� }<br>
&gt; ��������������� if (confirm &gt;= 0) {<br>
&gt; ������������������� System.out.print(&quot;, confirms: &quot; + confirmRate + &quot; c/s&quot;);<br>
&gt; ������������������� if (nackRate &gt; 0) {<br>
&gt; ����������������������� System.out.print(&quot;, nacks: &quot; + nackRate + &quot; n/s&quot;);<br>
&gt; ������������������� }<br>
&gt; ��������������� }<br>
&gt; ��������������� System.out.println();<br>
&gt; ��������������� lastStatsTime = now;<br>
&gt; ����������� }<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� private byte[] createMessage(int sequenceNumber)<br>
&gt; ����������� throws IOException {<br>
&gt;<br>
&gt; ����������� ByteArrayOutputStream acc = new ByteArrayOutputStream();<br>
&gt; ����������� DataOutputStream d = new DataOutputStream(acc);<br>
&gt; ����������� long nano = System.nanoTime();<br>
&gt; ����������� d.writeInt(sequenceNumber);<br>
&gt; ����������� d.writeLong(nano);<br>
&gt; ����������� d.flush();<br>
&gt; ����������� acc.flush();<br>
&gt; ����������� byte[] m = acc.toByteArray();<br>
&gt; ����������� if (m.length &lt;= message.length) {<br>
&gt; ��������������� System.arraycopy(m, 0, message, 0, m.length);<br>
&gt; ��������������� return message;<br>
&gt; ����������� } else {<br>
&gt; ��������������� return m;<br>
&gt; ����������� }<br>
&gt; ������� }<br>
&gt;<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� public static class Consumer implements Runnable {<br>
&gt;<br>
&gt; ������� private QueueingConsumer q;<br>
&gt; ������� private String���������� id;<br>
&gt; ������� private int������������� txSize;<br>
&gt; ������� private boolean��������� autoAck;<br>
&gt; ������� private Stats����������� stats;<br>
&gt; ������� private long������������ timeLimit;<br>
&gt;<br>
&gt; ������� public Consumer(QueueingConsumer q, String id,<br>
&gt; ����������������������� int txSize, boolean autoAck,<br>
&gt; ����������������������� Stats stats, int timeLimit) {<br>
&gt;<br>
&gt; ����������� this.q�������� = q;<br>
&gt; ����������� <a href="http://this.id" target="_blank">this.id</a>������� = id;<br>
&gt; ����������� this.txSize��� = txSize;<br>
&gt; ����������� this.autoAck�� = autoAck;<br>
&gt; ����������� this.stats���� = stats;<br>
&gt; ����������� this.timeLimit = 1000L * timeLimit;<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� public void run() {<br>
&gt;<br>
&gt; ����������� long now;<br>
&gt; ����������� long startTime;<br>
&gt; ����������� startTime = now = System.currentTimeMillis();<br>
&gt; ����������� int totalMsgCount = 0;<br>
&gt;<br>
&gt; ����������� Channel channel = q.getChannel();<br>
&gt;<br>
&gt; ����������� try {<br>
&gt;<br>
&gt; ��������������� while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>
&gt; ������������������� Delivery delivery;<br>
&gt; ������������������� if (timeLimit == 0) {<br>
&gt; ����������������������� delivery = q.nextDelivery();<br>
&gt; ������������������� } else {<br>
&gt; ����������������������� delivery = q.nextDelivery(startTime + timeLimit -<br>
&gt; now);<br>
&gt; ����������������������� if (delivery == null) break;<br>
&gt; ������������������� }<br>
&gt; ��� ��� ��� totalMsgCount++;<br>
&gt;<br>
&gt; ������������������� DataInputStream d = new DataInputStream(new<br>
&gt; ByteArrayInputStream(delivery.getBody()));<br>
&gt; ������������������� d.readInt();<br>
&gt; ������������������� long msgNano = d.readLong();<br>
&gt; ������������������� long nano = System.nanoTime();<br>
&gt;<br>
&gt; ������������������� Envelope envelope = delivery.getEnvelope();<br>
&gt;<br>
&gt; ������������������� if (!autoAck) {<br>
&gt; ����������������������� channel.basicAck(envelope.getDeliveryTag(), false);<br>
&gt; ������������������� }<br>
&gt;<br>
&gt; ������������������� if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>
&gt; ����������������������� channel.txCommit();<br>
&gt; ������������������� }<br>
&gt;<br>
&gt; ������������������� now = System.currentTimeMillis();<br>
&gt;<br>
&gt; ������������������� stats.collectStats(now,<br>
&gt; id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
&gt; ��������������� }<br>
&gt;<br>
&gt; ����������� } catch (IOException e) {<br>
&gt; ��������������� throw new RuntimeException(e);<br>
&gt; ����������� } catch (InterruptedException e) {<br>
&gt; ��������������� throw new RuntimeException (e);<br>
&gt; ����������� } catch (ShutdownSignalException e) {<br>
&gt; ��������������� throw new RuntimeException(e);<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� long elapsed = now - startTime;<br>
&gt; ����������� if (elapsed &gt; 0) {<br>
&gt; ��������������� System.out.println(&quot;recving rate avg: &quot; +<br>
&gt; ���������������������������������� (totalMsgCount * 1000L / elapsed) +<br>
&gt; ���������������������������������� &quot; msg/s&quot;);<br>
&gt; ����������� }<br>
&gt; ������� }<br>
&gt;<br>
&gt; ��� }<br>
&gt;<br>
&gt; ��� public static class Stats {<br>
&gt;<br>
&gt; ������� private long��� interval;<br>
&gt;<br>
&gt; ������� private long��� lastStatsTime;<br>
&gt; ������� private int���� msgCount;<br>
&gt; ������� private int���� latencyCount;<br>
&gt; ������� private long��� minLatency;<br>
&gt; ������� private long��� maxLatency;<br>
&gt; ������� private long��� cumulativeLatency;<br>
&gt;<br>
&gt; ������� public Stats(long interval) {<br>
&gt; ����������� this.interval = interval;<br>
&gt; ����������� reset(System.currentTimeMillis());<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� private void reset(long t) {<br>
&gt; ����������� lastStatsTime���� = t;<br>
&gt; ����������� msgCount��������� = 0;<br>
&gt; ����������� latencyCount����� = 0;<br>
&gt; ����������� minLatency������� = Long.MAX_VALUE;<br>
&gt; ����������� maxLatency������� = Long.MIN_VALUE;<br>
&gt; ����������� cumulativeLatency = 0L;<br>
&gt; ������� }<br>
&gt;<br>
&gt; ������� public synchronized void collectStats(long now, long latency) {<br>
&gt; ����������� msgCount++;<br>
&gt;<br>
&gt; ����������� if (latency &gt; 0) {<br>
&gt; ��������������� minLatency = Math.min(minLatency, latency);<br>
&gt; ��������������� maxLatency = Math.max(maxLatency, latency);<br>
&gt; ��������������� cumulativeLatency += latency;<br>
&gt; ��������������� latencyCount++;<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ����������� long elapsed = now - lastStatsTime;<br>
&gt; ����������� if (elapsed &gt; interval) {<br>
&gt; ��������������� System.out.println(&quot;recving rate: &quot; +<br>
&gt; ���������������������������������� (1000L * msgCount / elapsed) +<br>
&gt; ���������������������������������� &quot; msg/s&quot; +<br>
&gt; ���������������������������������� (latencyCount &gt; 0 ?<br>
&gt; ����������������������������������� &quot;, min/avg/max latency: &quot; +<br>
&gt; ����������������������������������� minLatency/1000L + &quot;/&quot; +<br>
&gt; ����������������������������������� cumulativeLatency / (1000L *<br>
&gt; latencyCount) + &quot;/&quot; +<br>
&gt; ����������������������������������� maxLatency/1000L + &quot; microseconds&quot; :<br>
&gt; ����������������������������������� &quot;&quot;));<br>
&gt; ��������������� reset(now);<br>
&gt; ����������� }<br>
&gt;<br>
&gt; ������� }<br>
&gt;<br>
&gt; ��� }<br>
&gt;<br>
&gt; }<br>
&gt;<br>
&gt;<br>
&gt; On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt; wrote:<br>
&gt;&gt;<br>
&gt;&gt; I am sorry but this link doesn&#39;t have any source code - this is the<br>
&gt;&gt; library.<br>
&gt;&gt; I meant *your* source code, the code of your program; the code which<br>
&gt;&gt; exercises the library and gets 5000 msg/s.<br>
&gt;&gt;<br>
&gt;&gt; P.S. Please use &quot;Reply all&quot;, this discussion is most probably<br>
&gt;&gt; interesting for many people in the community.<br>
&gt;&gt;<br>
&gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt; I am following this link for my java code:<br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt; <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>

&gt;&gt; &gt;<br>
&gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; Please show the source code of all participating components. It&#39;s<br>
&gt;&gt; &gt;&gt; impossible to say anything definite without it.<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; Till now i am getting the result of 5000 msg/sec in java.<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt; &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; I guess you should get the speed you want, and much more, if you<br>
&gt;&gt; &gt;&gt; &gt;&gt; just<br>
&gt;&gt; &gt;&gt; &gt;&gt; set up a high enough value for prefetch (see basic.qos) and set<br>
&gt;&gt; &gt;&gt; &gt;&gt; autoack.<br>
&gt;&gt; &gt;&gt; &gt;&gt; I got ~8k msg/s with much larger messages and persistence turned on<br>
&gt;&gt; &gt;&gt; &gt;&gt; too.<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Message size is 199 bytes<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Messaging scenario is Persistence<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Hi.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; What&#39;s the size of messages?<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; What&#39;s your messaging scenario in general? Persistence, prefetch,<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; transactions?<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; I<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; have<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; tried a lot in java as well as in ruby. And right now<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; implementing<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; node.js .<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; I want to prefer java because after lots of experimentation the<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; only<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; java has given me good result but still goal is not achieved.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; So i want help to know how to reach upto 15000 msg/sec in<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; RabbitMQ<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; using java. If any good suggestion �is there please tell me or<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; if<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; any<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; site to prefer then please tell me.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; Thanks.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; _______________________________________________<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; rabbitmq-discuss mailing list<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; --<br>
&gt;&gt; Eugene Kirpichov<br>
&gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;<br>
&gt;<br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br></div>