<br>Thanks for reply Eugene Kirpichov , <br>One more help, Can you suggest me any reference site from where i can follow and code for my goal?<div><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:57 PM, Eugene Kirpichov <span dir="ltr"><<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">Is this the minimal piece of code that demonstrates poor performance?<br>
It's hard to decipher that much code and it can have many kinds of<br>
problems inside; I think you should be able to trim it down to a<br>
couple dozen lines demonstrating the same speed; then diagnosing will<br>
become a lot easier.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
> This is my code.<br>
><br>
><br>
> // The contents of this file are subject to the Mozilla Public License<br>
> // Version 1.1 (the "License"); you may not use this file except in<br>
> // compliance with the License. You may obtain a copy of the License<br>
> // at <a href="http://www.mozilla.org/MPL/" target="_blank">http://www.mozilla.org/MPL/</a><br>
> //<br>
> // Software distributed under the License is distributed on an "AS IS"<br>
> // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
> // the License for the specific language governing rights and<br>
> // limitations under the License.<br>
> //<br>
> // The Original Code is RabbitMQ.<br>
> //<br>
> // The Initial Developer of the Original Code is VMware, Inc.<br>
> // Copyright (c) 2007-2011 VMware, Inc. All rights reserved.<br>
> //<br>
><br>
><br>
> package com.rabbitmq.examples;<br>
><br>
> import java.io.ByteArrayInputStream;<br>
> import java.io.ByteArrayOutputStream;<br>
> import java.io.DataInputStream;<br>
> import java.io.DataOutputStream;<br>
> import java.io.IOException;<br>
> import java.util.Arrays;<br>
> import java.util.Collections;<br>
> import java.util.List;<br>
> import java.util.SortedSet;<br>
> import java.util.TreeSet;<br>
> import java.util.UUID;<br>
> import java.util.concurrent.Semaphore;<br>
><br>
> import org.apache.commons.cli.CommandLine;<br>
> import org.apache.commons.cli.CommandLineParser;<br>
> import org.apache.commons.cli.GnuParser;<br>
> import org.apache.commons.cli.HelpFormatter;<br>
> import org.apache.commons.cli.Option;<br>
> import org.apache.commons.cli.Options;<br>
> import org.apache.commons.cli.ParseException;<br>
><br>
> import com.rabbitmq.client.AMQP;<br>
> import com.rabbitmq.client.Channel;<br>
> import com.rabbitmq.client.ConfirmListener;<br>
> import com.rabbitmq.client.Connection;<br>
> import com.rabbitmq.client.ConnectionFactory;<br>
> import com.rabbitmq.client.Envelope;<br>
> import com.rabbitmq.client.MessageProperties;<br>
> import com.rabbitmq.client.QueueingConsumer;<br>
> import com.rabbitmq.client.QueueingConsumer.Delivery;<br>
> import com.rabbitmq.client.ReturnListener;<br>
> import com.rabbitmq.client.ShutdownSignalException;<br>
><br>
><br>
> public class MulticastMain {<br>
><br>
> public static void main(String[] args) {<br>
> Options options = getOptions();<br>
> CommandLineParser parser = new GnuParser();<br>
> try {<br>
> CommandLine cmd = parser.parse(options, args);<br>
><br>
> if (cmd.hasOption('?')) {<br>
> usage(options);<br>
> System.exit(0);<br>
> }<br>
><br>
> String hostName = strArg(cmd, 'h', "localhost");<br>
> int portNumber = intArg(cmd, 'p', AMQP.PROTOCOL.PORT);<br>
> String exchangeType = strArg(cmd, 't', "direct");<br>
> String exchangeName = strArg(cmd, 'e', exchangeType);<br>
> int samplingInterval = intArg(cmd, 'i', 1);<br>
> int rateLimit = intArg(cmd, 'r', 0);<br>
> int producerCount = intArg(cmd, 'x', 1);<br>
> int consumerCount = intArg(cmd, 'y', 1);<br>
> int producerTxSize = intArg(cmd, 'm', 0);<br>
> int consumerTxSize = intArg(cmd, 'n', 0);<br>
> long confirm = intArg(cmd, 'c', -1);<br>
> boolean autoAck = cmd.hasOption('a');<br>
> int prefetchCount = intArg(cmd, 'q', 0);<br>
> int minMsgSize = intArg(cmd, 's', 0);<br>
> int timeLimit = intArg(cmd, 'z', 0);<br>
> List<?> flags = lstArg(cmd, 'f');<br>
> int frameMax = intArg(cmd, 'M', 0);<br>
> int heartbeat = intArg(cmd, 'b', 0);<br>
><br>
> if ((producerTxSize > 0) && confirm >= 0) {<br>
> throw new ParseException("Cannot select both<br>
> producerTxSize"+<br>
> " and confirm");<br>
> }<br>
><br>
> //setup<br>
> String id = UUID.randomUUID().toString();<br>
> Stats stats = new Stats(1000L * samplingInterval);<br>
> ConnectionFactory factory = new ConnectionFactory();<br>
> factory.setHost(hostName);<br>
> factory.setPort(portNumber);<br>
> factory.setRequestedFrameMax(frameMax);<br>
> factory.setRequestedHeartbeat(heartbeat);<br>
><br>
> Thread[] consumerThreads = new Thread[consumerCount];<br>
> Connection[] consumerConnections = new<br>
> Connection[consumerCount];<br>
> for (int i = 0; i < consumerCount; i++) {<br>
> System.out.println("starting consumer #" + i);<br>
> Connection conn = factory.newConnection();<br>
> consumerConnections[i] = conn;<br>
> Channel channel = conn.createChannel();<br>
> if (consumerTxSize > 0) channel.txSelect();<br>
> channel.exchangeDeclare(exchangeName, exchangeType);<br>
> String queueName =<br>
> channel.queueDeclare("",<br>
> flags.contains("persistent"),<br>
> true, false, null).getQueue();<br>
> QueueingConsumer consumer = new QueueingConsumer(channel);<br>
> if (prefetchCount > 0) channel.basicQos(prefetchCount);<br>
> channel.basicConsume(queueName, autoAck, consumer);<br>
> channel.queueBind(queueName, exchangeName, id);<br>
> Thread t =<br>
> new Thread(new Consumer(consumer, id,<br>
> consumerTxSize, autoAck,<br>
> stats, timeLimit));<br>
> consumerThreads[i] = t;<br>
> t.start();<br>
> }<br>
> Thread[] producerThreads = new Thread[producerCount];<br>
> Connection[] producerConnections = new<br>
> Connection[producerCount];<br>
> for (int i = 0; i < producerCount; i++) {<br>
> System.out.println("starting producer #" + i);<br>
> Connection conn = factory.newConnection();<br>
> producerConnections[i] = conn;<br>
> Channel channel = conn.createChannel();<br>
> if (producerTxSize > 0) channel.txSelect();<br>
> if (confirm >= 0) channel.confirmSelect();<br>
> channel.exchangeDeclare(exchangeName, exchangeType);<br>
> final Producer p = new Producer(channel, exchangeName, id,<br>
> flags, producerTxSize,<br>
> 1000L * samplingInterval,<br>
> rateLimit, minMsgSize,<br>
> timeLimit,<br>
> confirm);<br>
> channel.setReturnListener(p);<br>
> channel.setConfirmListener(p);<br>
> Thread t = new Thread(p);<br>
> producerThreads[i] = t;<br>
> t.start();<br>
> }<br>
><br>
> for (int i = 0; i < producerCount; i++) {<br>
> producerThreads[i].join();<br>
> producerConnections[i].close();<br>
> }<br>
><br>
> for (int i = 0; i < consumerCount; i++) {<br>
> consumerThreads[i].join();<br>
> consumerConnections[i].close();<br>
> }<br>
><br>
> }<br>
> catch( ParseException exp ) {<br>
> System.err.println("Parsing failed. Reason: " +<br>
> exp.getMessage());<br>
> usage(options);<br>
> } catch (Exception e) {<br>
> System.err.println("Main thread caught exception: " + e);<br>
> e.printStackTrace();<br>
> System.exit(1);<br>
> }<br>
> }<br>
><br>
> private static void usage(Options options) {<br>
> HelpFormatter formatter = new HelpFormatter();<br>
> formatter.printHelp("<program>", options);<br>
> }<br>
><br>
> private static Options getOptions() {<br>
> Options options = new Options();<br>
> options.addOption(new Option("?", "help", false,"show usage"));<br>
> options.addOption(new Option("h", "host", true, "broker<br>
> host"));<br>
> options.addOption(new Option("p", "port", true, "broker<br>
> port"));<br>
> options.addOption(new Option("t", "type", true, "exchange<br>
> type"));<br>
> options.addOption(new Option("e", "exchange", true, "exchange<br>
> name"));<br>
> options.addOption(new Option("i", "interval", true, "sampling<br>
> interval"));<br>
> options.addOption(new Option("r", "rate", true, "rate limit"));<br>
> options.addOption(new Option("x", "producers", true, "producer<br>
> count"));<br>
> options.addOption(new Option("y", "consumers", true, "consumer<br>
> count"));<br>
> options.addOption(new Option("m", "ptxsize", true, "producer tx<br>
> size"));<br>
> options.addOption(new Option("n", "ctxsize", true, "consumer tx<br>
> size"));<br>
> options.addOption(new Option("c", "confirm", true, "max<br>
> unconfirmed publishes"));<br>
> options.addOption(new Option("a", "autoack", false,"auto ack"));<br>
> options.addOption(new Option("q", "qos", true, "qos prefetch<br>
> count"));<br>
> options.addOption(new Option("s", "size", true, "message<br>
> size"));<br>
> options.addOption(new Option("z", "time", true, "time limit"));<br>
> Option flag = new Option("f", "flag", true, "message<br>
> flag");<br>
> flag.setArgs(Option.UNLIMITED_VALUES);<br>
> options.addOption(flag);<br>
> options.addOption(new Option("M", "framemax", true, "frame max"));<br>
> options.addOption(new Option("b", "heartbeat", true, "heartbeat<br>
> interval"));<br>
> return options;<br>
> }<br>
><br>
> private static String strArg(CommandLine cmd, char opt, String def) {<br>
> return cmd.getOptionValue(opt, def);<br>
> }<br>
><br>
> private static int intArg(CommandLine cmd, char opt, int def) {<br>
> return Integer.parseInt(cmd.getOptionValue(opt,<br>
> Integer.toString(def)));<br>
> }<br>
><br>
> private static List<?> lstArg(CommandLine cmd, char opt) {<br>
> String[] vals = cmd.getOptionValues('f');<br>
> if (vals == null) {<br>
> vals = new String[] {};<br>
> }<br>
> return Arrays.asList(vals);<br>
> }<br>
><br>
> public static class Producer implements Runnable, ReturnListener,<br>
> ConfirmListener<br>
> {<br>
> private Channel channel;<br>
> private String exchangeName;<br>
> private String id;<br>
> private boolean mandatory;<br>
> private boolean immediate;<br>
> private boolean persistent;<br>
> private int txSize;<br>
> private long interval;<br>
> private int rateLimit;<br>
> private long timeLimit;<br>
><br>
> private byte[] message;<br>
><br>
> private long startTime;<br>
> private long lastStatsTime;<br>
> private int msgCount;<br>
> private int returnCount;<br>
><br>
> private long confirm;<br>
> private Semaphore confirmPool;<br>
> private long confirmCount;<br>
> private long nackCount;<br>
> private volatile SortedSet<Long> unconfirmedSet =<br>
> Collections.synchronizedSortedSet(new TreeSet<Long>());<br>
><br>
> public Producer(Channel channel, String exchangeName, String id,<br>
> List<?> flags, int txSize,<br>
> long interval, int rateLimit, int minMsgSize, int<br>
> timeLimit,<br>
> long confirm)<br>
> throws IOException {<br>
><br>
> this.channel = channel;<br>
> this.exchangeName = exchangeName;<br>
> <a href="http://this.id" target="_blank">this.id</a> = id;<br>
> this.mandatory = flags.contains("mandatory");<br>
> this.immediate = flags.contains("immediate");<br>
> this.persistent = flags.contains("persistent");<br>
> this.txSize = txSize;<br>
> this.interval = interval;<br>
> this.rateLimit = rateLimit;<br>
> this.timeLimit = 1000L * timeLimit;<br>
> this.message = new byte[minMsgSize];<br>
> this.confirm = confirm;<br>
> if (confirm > 0) {<br>
> this.confirmPool = new Semaphore((int)confirm);<br>
> }<br>
> }<br>
><br>
> public synchronized void handleReturn(int replyCode,<br>
> String replyText,<br>
> String exchange,<br>
> String routingKey,<br>
> AMQP.BasicProperties<br>
> properties,<br>
> byte[] body)<br>
> throws IOException {<br>
> returnCount++;<br>
> }<br>
><br>
> public void handleAck(long seqNo, boolean multiple) {<br>
> handleAckNack(seqNo, multiple, false);<br>
> }<br>
><br>
> public void handleNack(long seqNo, boolean multiple) {<br>
> handleAckNack(seqNo, multiple, true);<br>
> }<br>
><br>
> private void handleAckNack(long seqNo, boolean multiple,<br>
> boolean nack) {<br>
> int numConfirms = 0;<br>
> if (multiple) {<br>
> SortedSet<Long> confirmed = unconfirmedSet.headSet(seqNo +<br>
> 1);<br>
> numConfirms += confirmed.size();<br>
> confirmed.clear();<br>
> } else {<br>
> unconfirmedSet.remove(seqNo);<br>
> numConfirms = 1;<br>
> }<br>
> synchronized (this) {<br>
> if (nack) {<br>
> nackCount += numConfirms;<br>
> } else {<br>
> confirmCount += numConfirms;<br>
> }<br>
> }<br>
><br>
> if (confirmPool != null) {<br>
> for (int i = 0; i < numConfirms; ++i) {<br>
> confirmPool.release();<br>
> }<br>
> }<br>
><br>
> }<br>
><br>
> public void run() {<br>
><br>
> long now;<br>
> now = startTime = lastStatsTime = System.currentTimeMillis();<br>
> msgCount = 0;<br>
> int totalMsgCount = 0;<br>
><br>
> try {<br>
><br>
> while (timeLimit == 0 || now < startTime + timeLimit) {<br>
> if (confirmPool != null) {<br>
> confirmPool.acquire();<br>
> }<br>
> delay(now);<br>
> publish(createMessage(totalMsgCount));<br>
> totalMsgCount++;<br>
> msgCount++;<br>
><br>
> if (txSize != 0 && totalMsgCount % txSize == 0) {<br>
> channel.txCommit();<br>
> }<br>
> now = System.currentTimeMillis();<br>
> }<br>
><br>
> } catch (IOException e) {<br>
> throw new RuntimeException(e);<br>
> } catch (InterruptedException e) {<br>
> throw new RuntimeException (e);<br>
> }<br>
><br>
> System.out.println("sending rate avg: " +<br>
> (totalMsgCount * 1000L / (now - startTime)) +<br>
> " msg/s");<br>
><br>
> }<br>
><br>
> private void publish(byte[] msg)<br>
> throws IOException {<br>
><br>
> unconfirmedSet.add(channel.getNextPublishSeqNo());<br>
> channel.basicPublish(exchangeName, id,<br>
> mandatory, immediate,<br>
> persistent ?<br>
> MessageProperties.MINIMAL_PERSISTENT_BASIC :<br>
> MessageProperties.MINIMAL_BASIC,<br>
> msg);<br>
> }<br>
><br>
> private void delay(long now)<br>
> throws InterruptedException {<br>
><br>
> long elapsed = now - lastStatsTime;<br>
> //example: rateLimit is 5000 msg/s,<br>
> //10 ms have elapsed, we have sent 200 messages<br>
> //the 200 msgs we have actually sent should have taken us<br>
> //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br>
> long pause = rateLimit == 0 ?<br>
> 0 : (msgCount * 1000L / rateLimit - elapsed);<br>
> if (pause > 0) {<br>
> Thread.sleep(pause);<br>
> }<br>
> if (elapsed > interval) {<br>
> long sendRate, returnRate, confirmRate, nackRate;<br>
> synchronized(this) {<br>
> sendRate = msgCount * 1000L / elapsed;<br>
> returnRate = returnCount * 1000L / elapsed;<br>
> confirmRate = confirmCount * 1000L / elapsed;<br>
> nackRate = nackCount * 1000L / elapsed;<br>
> msgCount = 0;<br>
> returnCount = 0;<br>
> confirmCount = 0;<br>
> nackCount = 0;<br>
> }<br>
> System.out.print("sending rate: " + sendRate + " msg/s");<br>
> if (mandatory || immediate) {<br>
> System.out.print(", returns: " + returnRate + " ret/s");<br>
> }<br>
> if (confirm >= 0) {<br>
> System.out.print(", confirms: " + confirmRate + " c/s");<br>
> if (nackRate > 0) {<br>
> System.out.print(", nacks: " + nackRate + " n/s");<br>
> }<br>
> }<br>
> System.out.println();<br>
> lastStatsTime = now;<br>
> }<br>
> }<br>
><br>
> private byte[] createMessage(int sequenceNumber)<br>
> throws IOException {<br>
><br>
> ByteArrayOutputStream acc = new ByteArrayOutputStream();<br>
> DataOutputStream d = new DataOutputStream(acc);<br>
> long nano = System.nanoTime();<br>
> d.writeInt(sequenceNumber);<br>
> d.writeLong(nano);<br>
> d.flush();<br>
> acc.flush();<br>
> byte[] m = acc.toByteArray();<br>
> if (m.length <= message.length) {<br>
> System.arraycopy(m, 0, message, 0, m.length);<br>
> return message;<br>
> } else {<br>
> return m;<br>
> }<br>
> }<br>
><br>
> }<br>
><br>
> public static class Consumer implements Runnable {<br>
><br>
> private QueueingConsumer q;<br>
> private String id;<br>
> private int txSize;<br>
> private boolean autoAck;<br>
> private Stats stats;<br>
> private long timeLimit;<br>
><br>
> public Consumer(QueueingConsumer q, String id,<br>
> int txSize, boolean autoAck,<br>
> Stats stats, int timeLimit) {<br>
><br>
> this.q = q;<br>
> <a href="http://this.id" target="_blank">this.id</a> = id;<br>
> this.txSize = txSize;<br>
> this.autoAck = autoAck;<br>
> this.stats = stats;<br>
> this.timeLimit = 1000L * timeLimit;<br>
> }<br>
><br>
> public void run() {<br>
><br>
> long now;<br>
> long startTime;<br>
> startTime = now = System.currentTimeMillis();<br>
> int totalMsgCount = 0;<br>
><br>
> Channel channel = q.getChannel();<br>
><br>
> try {<br>
><br>
> while (timeLimit == 0 || now < startTime + timeLimit) {<br>
> Delivery delivery;<br>
> if (timeLimit == 0) {<br>
> delivery = q.nextDelivery();<br>
> } else {<br>
> delivery = q.nextDelivery(startTime + timeLimit -<br>
> now);<br>
> if (delivery == null) break;<br>
> }<br>
> totalMsgCount++;<br>
><br>
> DataInputStream d = new DataInputStream(new<br>
> ByteArrayInputStream(delivery.getBody()));<br>
> d.readInt();<br>
> long msgNano = d.readLong();<br>
> long nano = System.nanoTime();<br>
><br>
> Envelope envelope = delivery.getEnvelope();<br>
><br>
> if (!autoAck) {<br>
> channel.basicAck(envelope.getDeliveryTag(), false);<br>
> }<br>
><br>
> if (txSize != 0 && totalMsgCount % txSize == 0) {<br>
> channel.txCommit();<br>
> }<br>
><br>
> now = System.currentTimeMillis();<br>
><br>
> stats.collectStats(now,<br>
> id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
> }<br>
><br>
> } catch (IOException e) {<br>
> throw new RuntimeException(e);<br>
> } catch (InterruptedException e) {<br>
> throw new RuntimeException (e);<br>
> } catch (ShutdownSignalException e) {<br>
> throw new RuntimeException(e);<br>
> }<br>
><br>
> long elapsed = now - startTime;<br>
> if (elapsed > 0) {<br>
> System.out.println("recving rate avg: " +<br>
> (totalMsgCount * 1000L / elapsed) +<br>
> " msg/s");<br>
> }<br>
> }<br>
><br>
> }<br>
><br>
> public static class Stats {<br>
><br>
> private long interval;<br>
><br>
> private long lastStatsTime;<br>
> private int msgCount;<br>
> private int latencyCount;<br>
> private long minLatency;<br>
> private long maxLatency;<br>
> private long cumulativeLatency;<br>
><br>
> public Stats(long interval) {<br>
> this.interval = interval;<br>
> reset(System.currentTimeMillis());<br>
> }<br>
><br>
> private void reset(long t) {<br>
> lastStatsTime = t;<br>
> msgCount = 0;<br>
> latencyCount = 0;<br>
> minLatency = Long.MAX_VALUE;<br>
> maxLatency = Long.MIN_VALUE;<br>
> cumulativeLatency = 0L;<br>
> }<br>
><br>
> public synchronized void collectStats(long now, long latency) {<br>
> msgCount++;<br>
><br>
> if (latency > 0) {<br>
> minLatency = Math.min(minLatency, latency);<br>
> maxLatency = Math.max(maxLatency, latency);<br>
> cumulativeLatency += latency;<br>
> latencyCount++;<br>
> }<br>
><br>
> long elapsed = now - lastStatsTime;<br>
> if (elapsed > interval) {<br>
> System.out.println("recving rate: " +<br>
> (1000L * msgCount / elapsed) +<br>
> " msg/s" +<br>
> (latencyCount > 0 ?<br>
> ", min/avg/max latency: " +<br>
> minLatency/1000L + "/" +<br>
> cumulativeLatency / (1000L *<br>
> latencyCount) + "/" +<br>
> maxLatency/1000L + " microseconds" :<br>
> ""));<br>
> reset(now);<br>
> }<br>
><br>
> }<br>
><br>
> }<br>
><br>
> }<br>
><br>
><br>
> On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
> wrote:<br>
>><br>
>> I am sorry but this link doesn't have any source code - this is the<br>
>> library.<br>
>> I meant *your* source code, the code of your program; the code which<br>
>> exercises the library and gets 5000 msg/s.<br>
>><br>
>> P.S. Please use "Reply all", this discussion is most probably<br>
>> interesting for many people in the community.<br>
>><br>
>> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> > I am following this link for my java code:<br>
>> ><br>
>> > <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>
>> ><br>
>> > On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> > wrote:<br>
>> >><br>
>> >> Please show the source code of all participating components. It's<br>
>> >> impossible to say anything definite without it.<br>
>> >><br>
>> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> ><br>
>> >> > Till now i am getting the result of 5000 msg/sec in java.<br>
>> >> ><br>
>> >> > On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov<br>
>> >> > <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> >> > wrote:<br>
>> >> >><br>
>> >> >> I guess you should get the speed you want, and much more, if you<br>
>> >> >> just<br>
>> >> >> set up a high enough value for prefetch (see basic.qos) and set<br>
>> >> >> autoack.<br>
>> >> >> I got ~8k msg/s with much larger messages and persistence turned on<br>
>> >> >> too.<br>
>> >> >><br>
>> >> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> >> ><br>
>> >> >> > Message size is 199 bytes<br>
>> >> >> > Messaging scenario is Persistence<br>
>> >> >> ><br>
>> >> >> > On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
>> >> >> > <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> >> >> > wrote:<br>
>> >> >> >><br>
>> >> >> >> Hi.<br>
>> >> >> >><br>
>> >> >> >> What's the size of messages?<br>
>> >> >> ><br>
>> >> >> >> What's your messaging scenario in general? Persistence, prefetch,<br>
>> >> >> >> transactions?<br>
>> >> >> ><br>
>> >> >> >><br>
>> >> >> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> >> >> > Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ.<br>
>> >> >> >> > I<br>
>> >> >> >> > have<br>
>> >> >> >> > tried a lot in java as well as in ruby. And right now<br>
>> >> >> >> > implementing<br>
>> >> >> >> > node.js .<br>
>> >> >> >> > I want to prefer java because after lots of experimentation the<br>
>> >> >> >> > only<br>
>> >> >> >> > java has given me good result but still goal is not achieved.<br>
>> >> >> >> > So i want help to know how to reach upto 15000 msg/sec in<br>
>> >> >> >> > RabbitMQ<br>
>> >> >> >> > using java. If any good suggestion is there please tell me or<br>
>> >> >> >> > if<br>
>> >> >> >> > any<br>
>> >> >> >> > site to prefer then please tell me.<br>
>> >> >> >> ><br>
>> >> >> >> > Thanks.<br>
>> >> >> >> > _______________________________________________<br>
>> >> >> >> > rabbitmq-discuss mailing list<br>
>> >> >> >> > <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
>> >> >> >> ><br>
>> >> >> >> ><br>
>> >> >> >> > <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
>> >> >> >> ><br>
>> >> >> >><br>
>> >> >> >><br>
>> >> >> >><br>
>> >> >> >> --<br>
>> >> >> >> Eugene Kirpichov<br>
>> >> >> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> >> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> >> >> ><br>
>> >> >> ><br>
>> >> >><br>
>> >> >><br>
>> >> >><br>
>> >> >> --<br>
>> >> >> Eugene Kirpichov<br>
>> >> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> >> ><br>
>> >> ><br>
>> >><br>
>> >><br>
>> >><br>
>> >> --<br>
>> >> Eugene Kirpichov<br>
>> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> ><br>
>> ><br>
>><br>
>><br>
>><br>
>> --<br>
>> Eugene Kirpichov<br>
>> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
><br>
><br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br></div>