This is my code.<br><br><br>//  The contents of this file are subject to the Mozilla Public License<br>//  Version 1.1 (the &quot;License&quot;); you may not use this file except in<br>//  compliance with the License. You may obtain a copy of the License<br>
//  at <a href="http://www.mozilla.org/MPL/">http://www.mozilla.org/MPL/</a><br>//<br>//  Software distributed under the License is distributed on an &quot;AS IS&quot;<br>//  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
//  the License for the specific language governing rights and<br>//  limitations under the License.<br>//<br>//  The Original Code is RabbitMQ.<br>//<br>//  The Initial Developer of the Original Code is VMware, Inc.<br>//  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.<br>
//<br><br><br>package com.rabbitmq.examples;<br><br>import java.io.ByteArrayInputStream;<br>import java.io.ByteArrayOutputStream;<br>import java.io.DataInputStream;<br>import java.io.DataOutputStream;<br>import java.io.IOException;<br>
import java.util.Arrays;<br>import java.util.Collections;<br>import java.util.List;<br>import java.util.SortedSet;<br>import java.util.TreeSet;<br>import java.util.UUID;<br>import java.util.concurrent.Semaphore;<br><br>import org.apache.commons.cli.CommandLine;<br>
import org.apache.commons.cli.CommandLineParser;<br>import org.apache.commons.cli.GnuParser;<br>import org.apache.commons.cli.HelpFormatter;<br>import org.apache.commons.cli.Option;<br>import org.apache.commons.cli.Options;<br>
import org.apache.commons.cli.ParseException;<br><br>import com.rabbitmq.client.AMQP;<br>import com.rabbitmq.client.Channel;<br>import com.rabbitmq.client.ConfirmListener;<br>import com.rabbitmq.client.Connection;<br>import com.rabbitmq.client.ConnectionFactory;<br>
import com.rabbitmq.client.Envelope;<br>import com.rabbitmq.client.MessageProperties;<br>import com.rabbitmq.client.QueueingConsumer;<br>import com.rabbitmq.client.QueueingConsumer.Delivery;<br>import com.rabbitmq.client.ReturnListener;<br>
import com.rabbitmq.client.ShutdownSignalException;<br><br><br>public class MulticastMain {<br><br>    public static void main(String[] args) {<br>        Options options = getOptions();<br>        CommandLineParser parser = new GnuParser();<br>
        try {<br>            CommandLine cmd = parser.parse(options, args);<br><br>            if (cmd.hasOption(&#39;?&#39;)) {<br>                usage(options);<br>                System.exit(0);<br>            }<br><br>
            String hostName      = strArg(cmd, &#39;h&#39;, &quot;localhost&quot;);<br>            int portNumber       = intArg(cmd, &#39;p&#39;, AMQP.PROTOCOL.PORT);<br>            String exchangeType  = strArg(cmd, &#39;t&#39;, &quot;direct&quot;);<br>
            String exchangeName  = strArg(cmd, &#39;e&#39;, exchangeType);<br>            int samplingInterval = intArg(cmd, &#39;i&#39;, 1);<br>            int rateLimit        = intArg(cmd, &#39;r&#39;, 0);<br>            int producerCount    = intArg(cmd, &#39;x&#39;, 1);<br>
            int consumerCount    = intArg(cmd, &#39;y&#39;, 1);<br>            int producerTxSize   = intArg(cmd, &#39;m&#39;, 0);<br>            int consumerTxSize   = intArg(cmd, &#39;n&#39;, 0);<br>            long confirm         = intArg(cmd, &#39;c&#39;, -1);<br>
            boolean autoAck      = cmd.hasOption(&#39;a&#39;);<br>            int prefetchCount    = intArg(cmd, &#39;q&#39;, 0);<br>            int minMsgSize       = intArg(cmd, &#39;s&#39;, 0);<br>            int timeLimit        = intArg(cmd, &#39;z&#39;, 0);<br>
            List&lt;?&gt; flags        = lstArg(cmd, &#39;f&#39;);<br>            int frameMax         = intArg(cmd, &#39;M&#39;, 0);<br>            int heartbeat        = intArg(cmd, &#39;b&#39;, 0);<br><br>            if ((producerTxSize &gt; 0) &amp;&amp; confirm &gt;= 0) {<br>
                throw new ParseException(&quot;Cannot select both producerTxSize&quot;+<br>                                         &quot; and confirm&quot;);<br>            }<br><br>            //setup<br>            String id = UUID.randomUUID().toString();<br>
            Stats stats = new Stats(1000L * samplingInterval);<br>            ConnectionFactory factory = new ConnectionFactory();<br>            factory.setHost(hostName);<br>            factory.setPort(portNumber);<br>            factory.setRequestedFrameMax(frameMax);<br>
            factory.setRequestedHeartbeat(heartbeat);<br><br>            Thread[] consumerThreads = new Thread[consumerCount];<br>            Connection[] consumerConnections = new Connection[consumerCount];<br>            for (int i = 0; i &lt; consumerCount; i++) {<br>
                System.out.println(&quot;starting consumer #&quot; + i);<br>                Connection conn = factory.newConnection();<br>                consumerConnections[i] = conn;<br>                Channel channel = conn.createChannel();<br>
                if (consumerTxSize &gt; 0) channel.txSelect();<br>                channel.exchangeDeclare(exchangeName, exchangeType);<br>                String queueName =<br>                        channel.queueDeclare(&quot;&quot;, flags.contains(&quot;persistent&quot;),<br>
                                             true, false, null).getQueue();<br>                QueueingConsumer consumer = new QueueingConsumer(channel);<br>                if (prefetchCount &gt; 0) channel.basicQos(prefetchCount);<br>
                channel.basicConsume(queueName, autoAck, consumer);<br>                channel.queueBind(queueName, exchangeName, id);<br>                Thread t =<br>                    new Thread(new Consumer(consumer, id,<br>
                                            consumerTxSize, autoAck,<br>                                            stats, timeLimit));<br>                consumerThreads[i] = t;<br>                t.start();<br>            }<br>
            Thread[] producerThreads = new Thread[producerCount];<br>            Connection[] producerConnections = new Connection[producerCount];<br>            for (int i = 0; i &lt; producerCount; i++) {<br>                System.out.println(&quot;starting producer #&quot; + i);<br>
                Connection conn = factory.newConnection();<br>                producerConnections[i] = conn;<br>                Channel channel = conn.createChannel();<br>                if (producerTxSize &gt; 0) channel.txSelect();<br>
                if (confirm &gt;= 0) channel.confirmSelect();<br>                channel.exchangeDeclare(exchangeName, exchangeType);<br>                final Producer p = new Producer(channel, exchangeName, id,<br>                                                flags, producerTxSize,<br>
                                                1000L * samplingInterval,<br>                                                rateLimit, minMsgSize, timeLimit,<br>                                                confirm);<br>
                channel.setReturnListener(p);<br>                channel.setConfirmListener(p);<br>                Thread t = new Thread(p);<br>                producerThreads[i] = t;<br>                t.start();<br>            }<br>
<br>            for (int i = 0; i &lt; producerCount; i++) {<br>                producerThreads[i].join();<br>                producerConnections[i].close();<br>            }<br><br>            for (int i = 0; i &lt; consumerCount; i++) {<br>
                consumerThreads[i].join();<br>                consumerConnections[i].close();<br>            }<br><br>        }<br>        catch( ParseException exp ) {<br>            System.err.println(&quot;Parsing failed. Reason: &quot; + exp.getMessage());<br>
            usage(options);<br>        } catch (Exception e) {<br>            System.err.println(&quot;Main thread caught exception: &quot; + e);<br>            e.printStackTrace();<br>            System.exit(1);<br>        }<br>
    }<br><br>    private static void usage(Options options) {<br>        HelpFormatter formatter = new HelpFormatter();<br>        formatter.printHelp(&quot;&lt;program&gt;&quot;, options);<br>    }<br><br>    private static Options getOptions() {<br>
        Options options = new Options();<br>        options.addOption(new Option(&quot;?&quot;, &quot;help&quot;,      false,&quot;show usage&quot;));<br>        options.addOption(new Option(&quot;h&quot;, &quot;host&quot;,      true, &quot;broker host&quot;));<br>
        options.addOption(new Option(&quot;p&quot;, &quot;port&quot;,      true, &quot;broker port&quot;));<br>        options.addOption(new Option(&quot;t&quot;, &quot;type&quot;,      true, &quot;exchange type&quot;));<br>
        options.addOption(new Option(&quot;e&quot;, &quot;exchange&quot;,  true, &quot;exchange name&quot;));<br>        options.addOption(new Option(&quot;i&quot;, &quot;interval&quot;,  true, &quot;sampling interval&quot;));<br>
        options.addOption(new Option(&quot;r&quot;, &quot;rate&quot;,      true, &quot;rate limit&quot;));<br>        options.addOption(new Option(&quot;x&quot;, &quot;producers&quot;, true, &quot;producer count&quot;));<br>
        options.addOption(new Option(&quot;y&quot;, &quot;consumers&quot;, true, &quot;consumer count&quot;));<br>        options.addOption(new Option(&quot;m&quot;, &quot;ptxsize&quot;,   true, &quot;producer tx size&quot;));<br>
        options.addOption(new Option(&quot;n&quot;, &quot;ctxsize&quot;,   true, &quot;consumer tx size&quot;));<br>        options.addOption(new Option(&quot;c&quot;, &quot;confirm&quot;,   true, &quot;max unconfirmed publishes&quot;));<br>
        options.addOption(new Option(&quot;a&quot;, &quot;autoack&quot;,   false,&quot;auto ack&quot;));<br>        options.addOption(new Option(&quot;q&quot;, &quot;qos&quot;,       true, &quot;qos prefetch count&quot;));<br>
        options.addOption(new Option(&quot;s&quot;, &quot;size&quot;,      true, &quot;message size&quot;));<br>        options.addOption(new Option(&quot;z&quot;, &quot;time&quot;,      true, &quot;time limit&quot;));<br>
        Option flag =     new Option(&quot;f&quot;, &quot;flag&quot;,      true, &quot;message flag&quot;);<br>        flag.setArgs(Option.UNLIMITED_VALUES);<br>        options.addOption(flag);<br>        options.addOption(new Option(&quot;M&quot;, &quot;framemax&quot;,  true, &quot;frame max&quot;));<br>
        options.addOption(new Option(&quot;b&quot;, &quot;heartbeat&quot;, true, &quot;heartbeat interval&quot;));<br>        return options;<br>    }<br><br>    private static String strArg(CommandLine cmd, char opt, String def) {<br>
        return cmd.getOptionValue(opt, def);<br>    }<br><br>    private static int intArg(CommandLine cmd, char opt, int def) {<br>        return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));<br>    }<br>
<br>    private static List&lt;?&gt; lstArg(CommandLine cmd, char opt) {<br>        String[] vals = cmd.getOptionValues(&#39;f&#39;);<br>        if (vals == null) {<br>            vals = new String[] {};<br>        }<br>        return Arrays.asList(vals);<br>
    }<br><br>    public static class Producer implements Runnable, ReturnListener,<br>                                            ConfirmListener<br>    {<br>        private Channel channel;<br>        private String  exchangeName;<br>
        private String  id;<br>        private boolean mandatory;<br>        private boolean immediate;<br>        private boolean persistent;<br>        private int     txSize;<br>        private long    interval;<br>        private int     rateLimit;<br>
        private long    timeLimit;<br><br>        private byte[]  message;<br><br>        private long    startTime;<br>        private long    lastStatsTime;<br>        private int     msgCount;<br>        private int     returnCount;<br>
<br>        private long      confirm;<br>        private Semaphore confirmPool;<br>        private long      confirmCount;<br>        private long      nackCount;<br>        private volatile SortedSet&lt;Long&gt; unconfirmedSet =<br>
            Collections.synchronizedSortedSet(new TreeSet&lt;Long&gt;());<br><br>        public Producer(Channel channel, String exchangeName, String id,<br>                        List&lt;?&gt; flags, int txSize,<br>                        long interval, int rateLimit, int minMsgSize, int timeLimit,<br>
                        long confirm)<br>            throws IOException {<br><br>            this.channel      = channel;<br>            this.exchangeName = exchangeName;<br>            <a href="http://this.id">this.id</a>           = id;<br>
            this.mandatory    = flags.contains(&quot;mandatory&quot;);<br>            this.immediate    = flags.contains(&quot;immediate&quot;);<br>            this.persistent   = flags.contains(&quot;persistent&quot;);<br>
            this.txSize       = txSize;<br>            this.interval     = interval;<br>            this.rateLimit    = rateLimit;<br>            this.timeLimit    = 1000L * timeLimit;<br>            this.message      = new byte[minMsgSize];<br>
            this.confirm      = confirm;<br>            if (confirm &gt; 0) {<br>                this.confirmPool  = new Semaphore((int)confirm);<br>            }<br>        }<br><br>        public synchronized void handleReturn(int replyCode,<br>
                                              String replyText,<br>                                              String exchange,<br>                                              String routingKey,<br>                                              AMQP.BasicProperties properties,<br>
                                              byte[] body)<br>            throws IOException {<br>            returnCount++;<br>        }<br><br>        public void handleAck(long seqNo, boolean multiple) {<br>            handleAckNack(seqNo, multiple, false);<br>
        }<br><br>        public void handleNack(long seqNo, boolean multiple) {<br>            handleAckNack(seqNo, multiple, true);<br>        }<br><br>        private void handleAckNack(long seqNo, boolean multiple,<br>
                                   boolean nack) {<br>            int numConfirms = 0;<br>            if (multiple) {<br>                SortedSet&lt;Long&gt; confirmed = unconfirmedSet.headSet(seqNo + 1);<br>                numConfirms += confirmed.size();<br>
                confirmed.clear();<br>            } else {<br>                unconfirmedSet.remove(seqNo);<br>                numConfirms = 1;<br>            }<br>            synchronized (this) {<br>                if (nack) {<br>
                    nackCount += numConfirms;<br>                } else {<br>                    confirmCount += numConfirms;<br>                }<br>            }<br><br>            if (confirmPool != null) {<br>                for (int i = 0; i &lt; numConfirms; ++i) {<br>
                    confirmPool.release();<br>                }<br>            }<br><br>        }<br><br>        public void run() {<br><br>            long now;<br>            now = startTime = lastStatsTime = System.currentTimeMillis();<br>
            msgCount = 0;<br>            int totalMsgCount = 0;<br><br>            try {<br><br>                while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>                    if (confirmPool != null) {<br>
                        confirmPool.acquire();<br>                    }<br>                    delay(now);<br>                    publish(createMessage(totalMsgCount));<br>                    totalMsgCount++;<br>                    msgCount++;<br>
<br>                    if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>                        channel.txCommit();<br>                    }<br>                    now = System.currentTimeMillis();<br>                }<br>
<br>            } catch (IOException e) {<br>                throw new RuntimeException(e);<br>            } catch (InterruptedException e) {<br>                throw new RuntimeException (e);<br>            }<br><br>            System.out.println(&quot;sending rate avg: &quot; +<br>
                               (totalMsgCount * 1000L / (now - startTime)) +<br>                               &quot; msg/s&quot;);<br><br>        }<br><br>        private void publish(byte[] msg)<br>            throws IOException {<br>
<br>            unconfirmedSet.add(channel.getNextPublishSeqNo());<br>            channel.basicPublish(exchangeName, id,<br>                                 mandatory, immediate,<br>                                 persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,<br>
                                 msg);<br>        }<br><br>        private void delay(long now)<br>            throws InterruptedException {<br><br>            long elapsed = now - lastStatsTime;<br>            //example: rateLimit is 5000 msg/s,<br>
            //10 ms have elapsed, we have sent 200 messages<br>            //the 200 msgs we have actually sent should have taken us<br>            //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br>            long pause = rateLimit == 0 ?<br>
                0 : (msgCount * 1000L / rateLimit - elapsed);<br>            if (pause &gt; 0) {<br>                Thread.sleep(pause);<br>            }<br>            if (elapsed &gt; interval) {<br>                long sendRate, returnRate, confirmRate, nackRate;<br>
                synchronized(this) {<br>                    sendRate     = msgCount     * 1000L / elapsed;<br>                    returnRate   = returnCount  * 1000L / elapsed;<br>                    confirmRate  = confirmCount * 1000L / elapsed;<br>
                    nackRate     = nackCount    * 1000L / elapsed;<br>                    msgCount     = 0;<br>                    returnCount  = 0;<br>                    confirmCount = 0;<br>                    nackCount    = 0;<br>
                }<br>                System.out.print(&quot;sending rate: &quot; + sendRate + &quot; msg/s&quot;);<br>                if (mandatory || immediate) {<br>                    System.out.print(&quot;, returns: &quot; + returnRate + &quot; ret/s&quot;);<br>
                }<br>                if (confirm &gt;= 0) {<br>                    System.out.print(&quot;, confirms: &quot; + confirmRate + &quot; c/s&quot;);<br>                    if (nackRate &gt; 0) {<br>                        System.out.print(&quot;, nacks: &quot; + nackRate + &quot; n/s&quot;);<br>
                    }<br>                }<br>                System.out.println();<br>                lastStatsTime = now;<br>            }<br>        }<br><br>        private byte[] createMessage(int sequenceNumber)<br>
            throws IOException {<br><br>            ByteArrayOutputStream acc = new ByteArrayOutputStream();<br>            DataOutputStream d = new DataOutputStream(acc);<br>            long nano = System.nanoTime();<br>
            d.writeInt(sequenceNumber);<br>            d.writeLong(nano);<br>            d.flush();<br>            acc.flush();<br>            byte[] m = acc.toByteArray();<br>            if (m.length &lt;= message.length) {<br>
                System.arraycopy(m, 0, message, 0, m.length);<br>                return message;<br>            } else {<br>                return m;<br>            }<br>        }<br><br>    }<br><br>    public static class Consumer implements Runnable {<br>
<br>        private QueueingConsumer q;<br>        private String           id;<br>        private int              txSize;<br>        private boolean          autoAck;<br>        private Stats            stats;<br>        private long             timeLimit;<br>
<br>        public Consumer(QueueingConsumer q, String id,<br>                        int txSize, boolean autoAck,<br>                        Stats stats, int timeLimit) {<br><br>            this.q         = q;<br>            <a href="http://this.id">this.id</a>        = id;<br>
            this.txSize    = txSize;<br>            this.autoAck   = autoAck;<br>            this.stats     = stats;<br>            this.timeLimit = 1000L * timeLimit;<br>        }<br><br>        public void run() {<br><br>
            long now;<br>            long startTime;<br>            startTime = now = System.currentTimeMillis();<br>            int totalMsgCount = 0;<br><br>            Channel channel = q.getChannel();<br><br>            try {<br>
<br>                while (timeLimit == 0 || now &lt; startTime + timeLimit) {<br>                    Delivery delivery;<br>                    if (timeLimit == 0) {<br>                        delivery = q.nextDelivery();<br>
                    } else {<br>                        delivery = q.nextDelivery(startTime + timeLimit - now);<br>                        if (delivery == null) break;<br>                    }<br>            totalMsgCount++;<br>
<br>                    DataInputStream d = new DataInputStream(new ByteArrayInputStream(delivery.getBody()));<br>                    d.readInt();<br>                    long msgNano = d.readLong();<br>                    long nano = System.nanoTime();<br>
<br>                    Envelope envelope = delivery.getEnvelope();<br><br>                    if (!autoAck) {<br>                        channel.basicAck(envelope.getDeliveryTag(), false);<br>                    }<br><br>
                    if (txSize != 0 &amp;&amp; totalMsgCount % txSize == 0) {<br>                        channel.txCommit();<br>                    }<br><br>                    now = System.currentTimeMillis();<br><br>                    stats.collectStats(now, id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
                }<br><br>            } catch (IOException e) {<br>                throw new RuntimeException(e);<br>            } catch (InterruptedException e) {<br>                throw new RuntimeException (e);<br>            } catch (ShutdownSignalException e) {<br>
                throw new RuntimeException(e);<br>            }<br><br>            long elapsed = now - startTime;<br>            if (elapsed &gt; 0) {<br>                System.out.println(&quot;recving rate avg: &quot; +<br>
                                   (totalMsgCount * 1000L / elapsed) +<br>                                   &quot; msg/s&quot;);<br>            }<br>        }<br><br>    }<br><br>    public static class Stats {<br><br>        private long    interval;<br>
<br>        private long    lastStatsTime;<br>        private int     msgCount;<br>        private int     latencyCount;<br>        private long    minLatency;<br>        private long    maxLatency;<br>        private long    cumulativeLatency;<br>
<br>        public Stats(long interval) {<br>            this.interval = interval;<br>            reset(System.currentTimeMillis());<br>        }<br><br>        private void reset(long t) {<br>            lastStatsTime     = t;<br>
            msgCount          = 0;<br>            latencyCount      = 0;<br>            minLatency        = Long.MAX_VALUE;<br>            maxLatency        = Long.MIN_VALUE;<br>            cumulativeLatency = 0L;<br>        }<br>
<br>        public synchronized void collectStats(long now, long latency) {<br>            msgCount++;<br><br>            if (latency &gt; 0) {<br>                minLatency = Math.min(minLatency, latency);<br>                maxLatency = Math.max(maxLatency, latency);<br>
                cumulativeLatency += latency;<br>                latencyCount++;<br>            }<br><br>            long elapsed = now - lastStatsTime;<br>            if (elapsed &gt; interval) {<br>                System.out.println(&quot;recving rate: &quot; +<br>
                                   (1000L * msgCount / elapsed) +<br>                                   &quot; msg/s&quot; +<br>                                   (latencyCount &gt; 0 ?<br>                                    &quot;, min/avg/max latency: &quot; +<br>
                                    minLatency/1000L + &quot;/&quot; +<br>                                    cumulativeLatency / (1000L * latencyCount) + &quot;/&quot; +<br>                                    maxLatency/1000L + &quot; microseconds&quot; :<br>
                                    &quot;&quot;));<br>                reset(now);<br>            }<br><br>        }<br><br>    }<br><br>}<br><br><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <span dir="ltr">&lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;</span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">I am sorry but this link doesn&#39;t have any source code - this is the library.<br>
I meant *your* source code, the code of your program; the code which<br>
exercises the library and gets 5000 msg/s.<br>
<br>
P.S. Please use &quot;Reply all&quot;, this discussion is most probably<br>
interesting for many people in the community.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt; I am following this link for my java code:<br>
&gt; <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>

&gt;<br>
&gt; On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt; wrote:<br>
&gt;&gt;<br>
&gt;&gt; Please show the source code of all participating components. It&#39;s<br>
&gt;&gt; impossible to say anything definite without it.<br>
&gt;&gt;<br>
&gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt; Till now i am getting the result of 5000 msg/sec in java.<br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; I guess you should get the speed you want, and much more, if you just<br>
&gt;&gt; &gt;&gt; set up a high enough value for prefetch (see basic.qos) and set<br>
&gt;&gt; &gt;&gt; autoack.<br>
&gt;&gt; &gt;&gt; I got ~8k msg/s with much larger messages and persistence turned on<br>
&gt;&gt; &gt;&gt; too.<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; Message size is 199 bytes<br>
&gt;&gt; &gt;&gt; &gt; Messaging scenario is Persistence<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt; On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt; &lt;<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>&gt;<br>
&gt;&gt; &gt;&gt; &gt; wrote:<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; Hi.<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; What&#39;s the size of messages?<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; What&#39;s your messaging scenario in general? Persistence, prefetch,<br>
&gt;&gt; &gt;&gt; &gt;&gt; transactions?<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; 2011/7/15 News Aanad &lt;<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>&gt;:<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ. I<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; have<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; tried a lot in java as well as in ruby. And right now implementing<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; node.js .<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; I want to prefer java because after lots of experimentation the<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; only<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; java has given me good result but still goal is not achieved.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; So i want help to know how to reach upto 15000 msg/sec in RabbitMQ<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; using java. If any good suggestion  is there please tell me or if<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; any<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; site to prefer then please tell me.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; Thanks.<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; _______________________________________________<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; rabbitmq-discuss mailing list<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt; <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt; &gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt;<br>
&gt;&gt; &gt;&gt; --<br>
&gt;&gt; &gt;&gt; Eugene Kirpichov<br>
&gt;&gt; &gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; &gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;&gt; &gt;<br>
&gt;&gt; &gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; --<br>
&gt;&gt; Eugene Kirpichov<br>
&gt;&gt; Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
&gt;&gt; Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
&gt;<br>
&gt;<br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br>