This is my code.<br><br><br>//� The contents of this file are subject to the Mozilla Public License<br>//� Version 1.1 (the &quot;License&quot;); you may not use this file except in<br>//� compliance with the License. You may obtain a copy of the License<br>
//� at <a href="http://www.mozilla.org/MPL/">http://www.mozilla.org/MPL/</a><br>//<br>//� Software distributed under the License is distributed on an &quot;AS IS&quot;<br>//� basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
//� the License for the specific language governing rights and<br>//� limitations under the License.<br>//<br>//� The Original Code is RabbitMQ.<br>//<br>//� The Initial Developer of the Original Code is VMware, Inc.<br>//� Copyright (c) 2007-2011 VMware, Inc.� All rights reserved.<br>
//<br><br><br>package com.rabbitmq.examples;<br><br>import java.io.ByteArrayInputStream;<br>import java.io.ByteArrayOutputStream;<br>import java.io.DataInputStream;<br>import java.io.DataOutputStream;<br>import java.io.IOException;<br>
import java.util.Arrays;<br>import java.util.Collections;<br>import java.util.List;<br>import java.util.SortedSet;<br>import java.util.TreeSet;<br>import java.util.UUID;<br>import java.util.concurrent.Semaphore;<br><br>import org.apache.commons.cli.CommandLine;<br>
import org.apache.commons.cli.CommandLineParser;<br>import org.apache.commons.cli.GnuParser;<br>import org.apache.commons.cli.HelpFormatter;<br>import org.apache.commons.cli.Option;<br>import org.apache.commons.cli.Options;<br>
import org.apache.commons.cli.ParseException;<br><br>import com.rabbitmq.client.AMQP;<br>import com.rabbitmq.client.Channel;<br>import com.rabbitmq.client.ConfirmListener;<br>import com.rabbitmq.client.Connection;<br>import com.rabbitmq.client.ConnectionFactory;<br>
import com.rabbitmq.client.Envelope;<br>import com.rabbitmq.client.MessageProperties;<br>import com.rabbitmq.client.QueueingConsumer;<br>import com.rabbitmq.client.QueueingConsumer.Delivery;<br>import com.rabbitmq.client.ReturnListener;<br>
import com.rabbitmq.client.ShutdownSignalException;<br><br><br>public class MulticastMain {<br><br>��� public static void main(String[] args) {<br>������� Options options = getOptions();<br>������� CommandLineParser parser = new GnuParser();<br>
������� try {<br>����������� CommandLine cmd = parser.parse(options, args);<br><br>����������� if (cmd.hasOption(&#39;?&#39;)) {<br>��������������� usage(options);<br>��������������� System.exit(0);<br>����������� }<br><br>
����������� String hostName����� = strArg(cmd, &#39;h&#39;, &quot;localhost&quot;);<br>����������� int portNumber������ = intArg(cmd, &#39;p&#39;, AMQP.PROTOCOL.PORT);<br>����������� String exchangeType� = strArg(cmd, &#39;t&#39;, &quot;direct&quot;);<br>
����������� String exchangeName� = strArg(cmd, &#39;e&#39;, exchangeType);<br>����������� int samplingInterval = intArg(cmd, &#39;i&#39;, 1);<br>����������� int rateLimit������� = intArg(cmd, &#39;r&#39;, 0);<br>����������� int producerCount��� = intArg(cmd, &#39;x&#39;, 1);<br>
����������� int consumerCount��� = intArg(cmd, &#39;y&#39;, 1);<br>����������� int producerTxSize�� = intArg(cmd, &#39;m&#39;, 0);<br>����������� int consumerTxSize�� = intArg(cmd, &#39;n&#39;, 0);<br>����������� long confirm�������� = intArg(cmd, &#39;c&#39;, -1);<br>
����������� boolean autoAck����� = cmd.hasOption(&#39;a&#39;);<br>����������� int prefetchCount��� = intArg(cmd, &#39;q&#39;, 0);<br>����������� int minMsgSize������ = intArg(cmd, &#39;s&#39;, 0);<br>����������� int timeLimit������� = intArg(cmd, &#39;z&#39;, 0);<br>
����������� List&lt;?&gt; flags������� = lstArg(cmd, &#39;f&#39;);<br>����������� int frameMax�������� = intArg(cmd, &#39;M&#39;, 0);<br>����������� int heartbeat������� = intArg(cmd, &#39;b&#39;, 0);<br><br>����������� if ((producerTxSize &gt; 0) &amp;&amp; confirm &gt;= 0) {<br>
��������������� throw new ParseException(&quot;Cannot select both producerTxSize&quot;+<br>���������������������������������������� &quot; and confirm&quot;);<br>����������� }<br><br>����������� //setup<br>����������� String id = UUID.randomUUID().toString();<br>
����������� Stats stats = new Stats(1000L * samplingInterval);<br>����������� ConnectionFactory factory = new ConnectionFactory();<br>����������� factory.setHost(hostName);<br>����������� factory.setPort(portNumber);<br>����������� factory.setRequestedFrameMax(frameMax);<br>
����������� factory.setRequestedHeartbeat(heartbeat);<br><br>����������� Thread[] consumerThreads = new Thread[consumerCount];<br>����������� Connection[] consumerConnections = new Connection[consumerCount];<br>����������� for (int i = 0; i &lt; consumerCount; i++) {<br>
��������������� System.out.println(&quot;starting consumer #&quot; + i);<br>��������������� Connection conn = factory.newConnection();<br>��������������� consumerConnections[i] = conn;<br>��������������� Channel channel = conn.createChannel();<br>
��������������� if (consumerTxSize &gt; 0) channel.txSelect();<br>��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>��������������� String queueName =<br>����������������������� channel.queueDeclare(&quot;&quot;, flags.contains(&quot;persistent&quot;),<br>
�������������������������������������������� true, false, null).getQueue();<br>��������������� QueueingConsumer consumer = new QueueingConsumer(channel);<br>��������������� if (prefetchCount &gt; 0) channel.basicQos(prefetchCount);<br>
��������������� channel.basicConsume(queueName, autoAck, consumer);<br>��������������� channel.queueBind(queueName, exchangeName, id);<br>��������������� Thread t =<br>������������������� new Thread(new Consumer(consumer, id,<br>
������������������������������������������� consumerTxSize, autoAck,<br>������������������������������������������� stats, timeLimit));<br>��������������� consumerThreads[i] = t;<br>��������������� t.start();<br>����������� }<br>
����������� Thread[] producerThreads = new Thread[producerCount];<br>����������� Connection[] producerConnections = new Connection[producerCount];<br>����������� for (int i = 0; i &lt; producerCount; i++) {<br>��������������� System.out.println(&quot;starting producer #&quot; + i);<br>
��������������� Connection conn = factory.newConnection();<br>��������������� producerConnections[i] = conn;<br>��������������� Channel channel = conn.createChannel();<br>��������������� if (producerTxSize &gt; 0) channel.txSelect();<br>
��������������� if (confirm &gt;= 0) channel.confirmSelect();<br>��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>��������������� final Producer p = new Producer(channel, exchangeName, id,<br>����������������������������������������������� flags, producerTxSize,<br>
����������������������������������������������� 1000L * samplingInterval,<br>����������������������������������������������� rateLimit, minMsgSize, timeLimit,<br>����������������������������������������������� confirm);<br>
��������������� channel.setReturnListener(p);<br>��������������� channel.setConfirmListener(p);<br>��������������� Thread t = new Thread(p);<br>��������������� producerThreads[i] = t;<br>��������������� t.start();<br>����������� }<br>
<br>����������� for (int i = 0; i &lt; producerCount; i++) {<br>��������������� producerThreads[i].join();<br>��������������� producerConnections[i].close();<br>����������� }<br><br>����������� for (int i = 0; i &lt; consumerCount; i++) {<br>
��������������� consumerThreads[i].join();<br>��������������� consumerConnections[i].close();<br>����������� }<br><br>������� }<br>������� catch( ParseException exp ) {<br>����������� System.err.println(&quot;Parsing failed. Reason: &quot; + exp.getMessage());<br>
����������� usage(options);<br>������� } catch (Exception e) {<br>����������� System.err.println(&quot;Main thread caught exception: &quot; + e);<br>����������� e.printStackTrace();<br>����������� System.exit(1);<br>������� }<br>
��� }<br><br>��� private static void usage(Options options) {<br>������� HelpFormatter formatter = new HelpFormatter();<br>������� formatter.printHelp(&quot;&lt;program&gt;&quot;, options);<br>��� }<br><br>��� private static Options getOptions() {<br>
������� Options options = new Options();<br>������� options.addOption(new Option(&quot;?&quot;, &quot;help&quot;,����� false,&quot;show usage&quot;));<br>������� options.addOption(new Option(&quot;h&quot;, &quot;host&quot;,����� true, &quot;broker host&quot;));<br>
������� options.addOption(new Option(&quot;p&quot;, &quot;port&quot;,����� true, &quot;broker port&quot;));<br>������� options.addOption(new Option(&quot;t&quot;, &quot;type&quot;,����� true, &quot;exchange type&quot;));<br>
������� options.addOption(new Option(&quot;e&quot;, &quot;exchange&quot;,� true, &quot;exchange name&quot;));<br>������� options.addOption(new Option(&quot;i&quot;, &quot;interval&quot;,� true, &quot;sampling interval&quot;));<br>
������� options.addOption(new Option(&quot;r&quot;, &quot;rate&quot;,����� true, &quot;rate limit&quot;));<br>������� options.addOption(new Option(&quot;x&quot;, &quot;producers&quot;, true, &quot;producer count&quot;));<br>
������� options.addOption(new Option(&quot;y&quot;, &quot;consumers&quot;, true, &quot;consumer count&quot;));<br>������� options.addOption(new Option(&quot;m&quot;, &quot;ptxsize&quot;,�� true, &quot;producer tx size&quot;));<br>
������� options.addOption(new Option(&quot;n&quot;, &quot;ctxsize&quot;,�� true, &quot;consumer tx size&quot;));<br>������� options.addOption(new Option(&quot;c&quot;, &quot;confirm&quot;,�� true, &quot;max unconfirmed publishes&quot;));<br>
������� options.addOption(new Option(&quot;a&quot;, &quot;autoack&quot;,�� false,&quot;auto ack&quot;));<br>������� options.addOption(new Option(&quot;q&quot;, &quot;qos&quot;,������ true, &quot;qos prefetch count&quot;));<br>
������� options.addOption(new Option(&quot;s&quot;, &quot;size&quot;,����� true, &quot;message size&quot;));<br>������� options.addOption(new Option(&quot;z&quot;, &quot;time&quot;,����� true, &quot;time limit&quot;));<br>
������� Option flag =���� new Option(&quot;f&quot;, &quot;flag&quot;,����� true, &quot;message flag&quot;);<br>������� flag.setArgs(Option.UNLIMITED_VALUES);<br>������� options.addOption(flag);<br>������� options.addOption(new Option(&quot;M&quot;, &quot;framemax&quot;,� true, &quot;frame max&quot;));<br>
������� options.addOption(new Option(&quot;b&quot;, &quot;heartbeat&quot;, true, &quot;heartbeat interval&quot;));<br>������� return options;<br>��� }<br><br>��� private static String strArg(CommandLine cmd, char opt, String def) {<br>
������� return cmd.getOptionValue(opt, def);<br>��� }<br><br>��� private static int intArg(CommandLine cmd, char opt, int def) {<br>������� return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));<br>��� }<br>
<br>��� private static List&lt;?&gt; lstArg(CommandLine cmd, char opt) {<br>������� String[] vals = cmd.getOptionValues(&#39;f&#39;);<br>������� if (vals == null) {<br>����������� vals = new String[] {};<br>������� }<br>������� return Arrays.asList(vals);<br>
��� }<br><br>��� public static class Producer implements Runnable, ReturnListener,<br>������������������������������������������� ConfirmListener<br>��� {<br>������� private Channel channel;<br>������� private String� exchangeName;<br>
������� private String� id;<br>������� private boolean mandatory;<br>������� private boolean immediate;<br>������� private boolean persistent;<br>������� private int���� txSize;<br>������� private long��� interval;<br>������� private int���� rateLimit;<br>
������� private long��� timeLimit;<br><br>������� private byte[]� message;<br><br>������� private long��� startTime;<br>������� private long��� lastStatsTime;<br>������� private int���� msgCount;<br>������� private int���� returnCount;<br>
<br>������� private long����� confirm;<br>������� private Semaphore confirmPool;<br>������� private long����� confirmCount;<br>������� private long����� nackCount;<br>������� private volatile SortedSet&lt;Long&gt; unconfirmedSet =<br>
����������� Collections.synchronizedSortedSet(new TreeSet&lt;Long&gt;());<br><br>������� public Producer(Channel channel, String exchangeName, String id,<br>����������������������� List&lt;?&gt; flags, int txSize,<br>����������������������� long interval, int rateLimit, int minMsgSize, int timeLimit,<br>
����������������������� long confirm)<br>����������� throws IOException {<br><br>����������� this.channel����� = channel;<br>����������� this.exchangeName = exchangeName;<br>����������� <a href="http://this.id">this.id</a>���������� = id;<br>
����������� this.mandatory��� = flags.contains(&quot;mandatory&quot;);<br>����������� this.immediate��� = flags.contains(&quot;immediate&quot;);<br>����������� this.persistent�� = flags.contains(&quot;persistent&quot;);<br>
����������� this.txSize������ = txSize;<br>����������� this.interval���� = interval;<br>����������� this.rateLimit��� = rateLimit;<br>����������� this.timeLimit��� = 1000L * timeLimit;<br>����������� this.message����� = new byte[minMsgSize];<br>
����������� this.confirm����� = confirm;<br>����������� if (confirm &gt; 0) {<br>��������������� this.confirmPool� = new Semaphore((int)confirm);<br>����������� }<br>������� }<br><br>������� public synchronized void handleReturn(int replyCode,<br>
��������������������������������������������� String replyText,<br>��������������������������������������������� String exchange,<br>��������������������������������������������� String routingKey,<br>��������������������������������������������� AMQP.BasicProperties properties,<br>
��������������������������������������������� byte[] body)<br>����������� throws IOException {<br>����������� returnCount++;<br>������� }<br><br>������� public void handleAck(long seqNo, boolean multiple) {<br>����������� handleAckNack(seqNo, multiple, false);<br>
������� }<br><br>������� public void handleNack(long seqNo, boolean multiple) {<br>����������� handleAckNack(seqNo, multiple, true);<br>������� }<br><br>������� private void handleAckNack(long seqNo, boolean multiple,<br>
���������������������������������� boolean nack) {<br>����������� int numConfirms = 0;<br>����������� if (multiple) {<br>��������������� SortedSet&lt;Long&gt; confirmed = unconfirmedSet.headSet(seqNo + 1);<br>��������������� numConfirms += confirmed.size();<br>
��������������� confirmed.clear();<br>����������� } else {<br>��������������� unconfirmedSet.remove(seqNo);<br>��������������� numConfirms = 1;<br>����������� }<br>����������� synchronized (this) {<br>��������������� if (nack) {<br>
������������������� nackCount += numConfirms;<br>��������������� } else {<br>������������������� confirmCount += numConfirms;<br>��������������� }<br>����������� }<br><br>����������� if (confirmPool != null) {<br>��������������� for (int i = 0; i &lt; numConfirms; ++i) {<br>
������������������� confirmPool.release();<br>��������������� }<br>����������� }<br><br>������� }<br><br>������� public void run() {<br><br>����������� long now;<br>����������� now = startTime = lastStatsTime = System.currentTimeMillis();<br>
����������� msgCount = 0;<br>����������� int totalMsgCount = 0;<br><br>����������� try {<br><br>��������������� while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>������������������� if (confirmPool != null) {<br>
����������������������� confirmPool.acquire();<br>������������������� }<br>������������������� delay(now);<br>������������������� publish(createMessage(totalMsgCount));<br>������������������� totalMsgCount++;<br>������������������� msgCount++;<br>
<br>������������������� if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>����������������������� channel.txCommit();<br>������������������� }<br>������������������� now = System.currentTimeMillis();<br>��������������� }<br>
<br>����������� } catch (IOException e) {<br>��������������� throw new RuntimeException(e);<br>����������� } catch (InterruptedException e) {<br>��������������� throw new RuntimeException (e);<br>����������� }<br><br>����������� System.out.println(&quot;sending rate avg: &quot; +<br>
������������������������������ (totalMsgCount * 1000L / (now - startTime)) +<br>������������������������������ &quot; msg/s&quot;);<br><br>������� }<br><br>������� private void publish(byte[] msg)<br>����������� throws IOException {<br>
<br>����������� unconfirmedSet.add(channel.getNextPublishSeqNo());<br>����������� channel.basicPublish(exchangeName, id,<br>�������������������������������� mandatory, immediate,<br>�������������������������������� persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,<br>
�������������������������������� msg);<br>������� }<br><br>������� private void delay(long now)<br>����������� throws InterruptedException {<br><br>����������� long elapsed = now - lastStatsTime;<br>����������� //example: rateLimit is 5000 msg/s,<br>
����������� //10 ms have elapsed, we have sent 200 messages<br>����������� //the 200 msgs we have actually sent should have taken us<br>����������� //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br>����������� long pause = rateLimit == 0 ?<br>
��������������� 0 : (msgCount * 1000L / rateLimit - elapsed);<br>����������� if (pause &gt; 0) {<br>��������������� Thread.sleep(pause);<br>����������� }<br>����������� if (elapsed &gt; interval) {<br>��������������� long sendRate, returnRate, confirmRate, nackRate;<br>
��������������� synchronized(this) {<br>������������������� sendRate���� = msgCount���� * 1000L / elapsed;<br>������������������� returnRate�� = returnCount� * 1000L / elapsed;<br>������������������� confirmRate� = confirmCount * 1000L / elapsed;<br>
������������������� nackRate���� = nackCount��� * 1000L / elapsed;<br>������������������� msgCount���� = 0;<br>������������������� returnCount� = 0;<br>������������������� confirmCount = 0;<br>������������������� nackCount��� = 0;<br>
��������������� }<br>��������������� System.out.print(&quot;sending rate: &quot; + sendRate + &quot; msg/s&quot;);<br>��������������� if (mandatory || immediate) {<br>������������������� System.out.print(&quot;, returns: &quot; + returnRate + &quot; ret/s&quot;);<br>
��������������� }<br>��������������� if (confirm &gt;= 0) {<br>������������������� System.out.print(&quot;, confirms: &quot; + confirmRate + &quot; c/s&quot;);<br>������������������� if (nackRate &gt; 0) {<br>����������������������� System.out.print(&quot;, nacks: &quot; + nackRate + &quot; n/s&quot;);<br>
������������������� }<br>��������������� }<br>��������������� System.out.println();<br>��������������� lastStatsTime = now;<br>����������� }<br>������� }<br><br>������� private byte[] createMessage(int sequenceNumber)<br>
����������� throws IOException {<br><br>����������� ByteArrayOutputStream acc = new ByteArrayOutputStream();<br>����������� DataOutputStream d = new DataOutputStream(acc);<br>����������� long nano = System.nanoTime();<br>
����������� d.writeInt(sequenceNumber);<br>����������� d.writeLong(nano);<br>����������� d.flush();<br>����������� acc.flush();<br>����������� byte[] m = acc.toByteArray();<br>����������� if (m.length &lt;= message.length) {<br>
��������������� System.arraycopy(m, 0, message, 0, m.length);<br>��������������� return message;<br>����������� } else {<br>��������������� return m;<br>����������� }<br>������� }<br><br>��� }<br><br>��� public static class Consumer implements Runnable {<br>
<br>������� private QueueingConsumer q;<br>������� private String���������� id;<br>������� private int������������� txSize;<br>������� private boolean��������� autoAck;<br>������� private Stats����������� stats;<br>������� private long������������ timeLimit;<br>
<br>������� public Consumer(QueueingConsumer q, String id,<br>����������������������� int txSize, boolean autoAck,<br>����������������������� Stats stats, int timeLimit) {<br><br>����������� this.q�������� = q;<br>����������� <a href="http://this.id">this.id</a>������� = id;<br>
����������� this.txSize��� = txSize;<br>����������� this.autoAck�� = autoAck;<br>����������� this.stats���� = stats;<br>����������� this.timeLimit = 1000L * timeLimit;<br>������� }<br><br>������� public void run() {<br><br>
����������� long now;<br>����������� long startTime;<br>����������� startTime = now = System.currentTimeMillis();<br>����������� int totalMsgCount = 0;<br><br>����������� Channel channel = q.getChannel();<br><br>����������� try {<br>
<br>��������������� while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>������������������� Delivery delivery;<br>������������������� if (timeLimit == 0) {<br>����������������������� delivery = q.nextDelivery();<br>
������������������� } else {<br>����������������������� delivery = q.nextDelivery(startTime + timeLimit - now);<br>����������������������� if (delivery == null) break;<br>������������������� }<br>��� ��� ��� totalMsgCount++;<br>
<br>������������������� DataInputStream d = new DataInputStream(new ByteArrayInputStream(delivery.getBody()));<br>������������������� d.readInt();<br>������������������� long msgNano = d.readLong();<br>������������������� long nano = System.nanoTime();<br>
<br>������������������� Envelope envelope = delivery.getEnvelope();<br><br>������������������� if (!autoAck) {<br>����������������������� channel.basicAck(envelope.getDeliveryTag(), false);<br>������������������� }<br><br>
������������������� if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>����������������������� channel.txCommit();<br>������������������� }<br><br>������������������� now = System.currentTimeMillis();<br><br>������������������� stats.collectStats(now, id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
��������������� }<br><br>����������� } catch (IOException e) {<br>��������������� throw new RuntimeException(e);<br>����������� } catch (InterruptedException e) {<br>��������������� throw new RuntimeException (e);<br>����������� } catch (ShutdownSignalException e) {<br>
��������������� throw new RuntimeException(e);<br>����������� }<br><br>����������� long elapsed = now - startTime;<br>����������� if (elapsed &gt; 0) {<br>��������������� System.out.println(&quot;recving rate avg: &quot; +<br>
���������������������������������� (totalMsgCount * 1000L / elapsed) +<br>���������������������������������� &quot; msg/s&quot;);<br>����������� }<br>������� }<br><br>��� }<br><br>��� public static class Stats {<br><br>������� private long��� interval;<br>
<br>������� private long��� lastStatsTime;<br>������� private int���� msgCount;<br>������� private int���� latencyCount;<br>������� private long��� minLatency;<br>������� private long��� maxLatency;<br>������� private long��� cumulativeLatency;<br>
<br>������� public Stats(long interval) {<br>����������� this.interval = interval;<br>����������� reset(System.currentTimeMillis());<br>������� }<br><br>������� private void reset(long t) {<br>����������� lastStatsTime���� = t;<br>
����������� msgCount��������� = 0;<br>����������� latencyCount����� = 0;<br>����������� minLatency������� = Long.MAX_VALUE;<br>����������� maxLatency������� = Long.MIN_VALUE;<br>����������� cumulativeLatency = 0L;<br>������� }<br>
<br>������� public synchronized void collectStats(long now, long latency) {<br>����������� msgCount++;<br><br>����������� if (latency &gt; 0) {<br>��������������� minLatency = Math.min(minLatency, latency);<br>��������������� maxLatency = Math.max(maxLatency, latency);<br>
��������������� cumulativeLatency += latency;<br>��������������� latencyCount++;<br>����������� }<br><br>����������� long elapsed = now - lastStatsTime;<br>����������� if (elapsed &gt; interval) {<br>��������������� System.out.println(&quot;recving rate: &quot; +<br>
���������������������������������� (1000L * msgCount / elapsed) +<br>���������������������������������� &quot; msg/s&quot; +<br>���������������������������������� (latencyCount &gt; 0 ?<br>����������������������������������� &quot;, min/avg/max latency: &quot; +<br>
����������������������������������� minLatency/1000L + &quot;/&quot; +<br>����������������������������������� cumulativeLatency / (1000L * latencyCount) + &quot;/&quot; +<br>����������������������������������� maxLatency/1000L + &quot; microseconds&quot; :<br>
����������������������������������� &quot;&quot;));<br>��������������� reset(now);<br>����������� }<br><br>������� }<br><br>��� }<br><br>}<br><br><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <span dir="ltr">&lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;</span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">I am sorry but this link doesn&#39;t have any source code - this is the library.<br>
I meant *your* source code, the code of your program; the code which<br>
exercises the library and gets 5000 msg/s.<br>
<br>
P.S. Please use &quot;Reply all&quot;, this discussion is most probably<br>
interesting for many people in the community.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt; I am following this link for my java code:<br>
&gt; <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>

&gt;<br>
&gt; On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt; wrote:<br>
&gt;&gt;<br>
&gt;&gt; Please show the source code of all participating components. It&#39;s<br>
&gt;&gt; impossible to say anything definite without it.<br>
&gt;&gt;<br>
&gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt; Till now i am getting the result of 5000 msg/sec in java.<br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; I guess you should get the speed you want, and much more, if you just<br>
&gt;&gt; &gt;&gt; set up a high enough value for prefetch (see basic.qos) and set<br>
&gt;&gt; &gt;&gt; autoack.<br>
&gt;&gt; &gt;&gt; I got ~8k msg/s with much larger messages and persistence turned on<br>
&gt;&gt; &gt;&gt; too.<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; Message size is 199 bytes<br>
&gt;&gt; &gt;&gt; &gt; Messaging scenario is Persistence<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt; &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; Hi.<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; What&#39;s the size of messages?<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; What&#39;s your messaging scenario in general? Persistence, prefetch,<br>
&gt;&gt; &gt;&gt; &gt;&gt; transactions?<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ. I<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; have<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; tried a lot in java as well as in ruby. And right now implementing<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; node.js .<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; I want to prefer java because after lots of experimentation the<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; only<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; java has given me good result but still goal is not achieved.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; So i want help to know how to reach upto 15000 msg/sec in RabbitMQ<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; using java. If any good suggestion �is there please tell me or if<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; any<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; site to prefer then please tell me.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Thanks.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; _______________________________________________<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; rabbitmq-discuss mailing list<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; --<br>
&gt;&gt; Eugene Kirpichov<br>
&gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;<br>
&gt;<br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br>