<br>Thanks for reply Eugene Kirpichov�, <br>One more help, Can you suggest me any reference site from where i can follow and code for my goal?<div><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:57 PM, Eugene Kirpichov <span dir="ltr"><<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">Is this the minimal piece of code that demonstrates poor performance?<br>
It's hard to decipher that much code and it can have many kinds of<br>
problems inside; I think you should be able to trim it down to a<br>
couple dozen lines demonstrating the same speed; then diagnosing will<br>
become a lot easier.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
> This is my code.<br>
><br>
><br>
> //� The contents of this file are subject to the Mozilla Public License<br>
> //� Version 1.1 (the "License"); you may not use this file except in<br>
> //� compliance with the License. You may obtain a copy of the License<br>
> //� at <a href="http://www.mozilla.org/MPL/" target="_blank">http://www.mozilla.org/MPL/</a><br>
> //<br>
> //� Software distributed under the License is distributed on an "AS IS"<br>
> //� basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
> //� the License for the specific language governing rights and<br>
> //� limitations under the License.<br>
> //<br>
> //� The Original Code is RabbitMQ.<br>
> //<br>
> //� The Initial Developer of the Original Code is VMware, Inc.<br>
> //� Copyright (c) 2007-2011 VMware, Inc.� All rights reserved.<br>
> //<br>
><br>
><br>
> package com.rabbitmq.examples;<br>
><br>
> import java.io.ByteArrayInputStream;<br>
> import java.io.ByteArrayOutputStream;<br>
> import java.io.DataInputStream;<br>
> import java.io.DataOutputStream;<br>
> import java.io.IOException;<br>
> import java.util.Arrays;<br>
> import java.util.Collections;<br>
> import java.util.List;<br>
> import java.util.SortedSet;<br>
> import java.util.TreeSet;<br>
> import java.util.UUID;<br>
> import java.util.concurrent.Semaphore;<br>
><br>
> import org.apache.commons.cli.CommandLine;<br>
> import org.apache.commons.cli.CommandLineParser;<br>
> import org.apache.commons.cli.GnuParser;<br>
> import org.apache.commons.cli.HelpFormatter;<br>
> import org.apache.commons.cli.Option;<br>
> import org.apache.commons.cli.Options;<br>
> import org.apache.commons.cli.ParseException;<br>
><br>
> import com.rabbitmq.client.AMQP;<br>
> import com.rabbitmq.client.Channel;<br>
> import com.rabbitmq.client.ConfirmListener;<br>
> import com.rabbitmq.client.Connection;<br>
> import com.rabbitmq.client.ConnectionFactory;<br>
> import com.rabbitmq.client.Envelope;<br>
> import com.rabbitmq.client.MessageProperties;<br>
> import com.rabbitmq.client.QueueingConsumer;<br>
> import com.rabbitmq.client.QueueingConsumer.Delivery;<br>
> import com.rabbitmq.client.ReturnListener;<br>
> import com.rabbitmq.client.ShutdownSignalException;<br>
><br>
><br>
> public class MulticastMain {<br>
><br>
> ��� public static void main(String[] args) {<br>
> ������� Options options = getOptions();<br>
> ������� CommandLineParser parser = new GnuParser();<br>
> ������� try {<br>
> ����������� CommandLine cmd = parser.parse(options, args);<br>
><br>
> ����������� if (cmd.hasOption('?')) {<br>
> ��������������� usage(options);<br>
> ��������������� System.exit(0);<br>
> ����������� }<br>
><br>
> ����������� String hostName����� = strArg(cmd, 'h', "localhost");<br>
> ����������� int portNumber������ = intArg(cmd, 'p', AMQP.PROTOCOL.PORT);<br>
> ����������� String exchangeType� = strArg(cmd, 't', "direct");<br>
> ����������� String exchangeName� = strArg(cmd, 'e', exchangeType);<br>
> ����������� int samplingInterval = intArg(cmd, 'i', 1);<br>
> ����������� int rateLimit������� = intArg(cmd, 'r', 0);<br>
> ����������� int producerCount��� = intArg(cmd, 'x', 1);<br>
> ����������� int consumerCount��� = intArg(cmd, 'y', 1);<br>
> ����������� int producerTxSize�� = intArg(cmd, 'm', 0);<br>
> ����������� int consumerTxSize�� = intArg(cmd, 'n', 0);<br>
> ����������� long confirm�������� = intArg(cmd, 'c', -1);<br>
> ����������� boolean autoAck����� = cmd.hasOption('a');<br>
> ����������� int prefetchCount��� = intArg(cmd, 'q', 0);<br>
> ����������� int minMsgSize������ = intArg(cmd, 's', 0);<br>
> ����������� int timeLimit������� = intArg(cmd, 'z', 0);<br>
> ����������� List<?> flags������� = lstArg(cmd, 'f');<br>
> ����������� int frameMax�������� = intArg(cmd, 'M', 0);<br>
> ����������� int heartbeat������� = intArg(cmd, 'b', 0);<br>
><br>
> ����������� if ((producerTxSize > 0) && confirm >= 0) {<br>
> ��������������� throw new ParseException("Cannot select both<br>
> producerTxSize"+<br>
> ���������������������������������������� " and confirm");<br>
> ����������� }<br>
><br>
> ����������� //setup<br>
> ����������� String id = UUID.randomUUID().toString();<br>
> ����������� Stats stats = new Stats(1000L * samplingInterval);<br>
> ����������� ConnectionFactory factory = new ConnectionFactory();<br>
> ����������� factory.setHost(hostName);<br>
> ����������� factory.setPort(portNumber);<br>
> ����������� factory.setRequestedFrameMax(frameMax);<br>
> ����������� factory.setRequestedHeartbeat(heartbeat);<br>
><br>
> ����������� Thread[] consumerThreads = new Thread[consumerCount];<br>
> ����������� Connection[] consumerConnections = new<br>
> Connection[consumerCount];<br>
> ����������� for (int i = 0; i < consumerCount; i++) {<br>
> ��������������� System.out.println("starting consumer #" + i);<br>
> ��������������� Connection conn = factory.newConnection();<br>
> ��������������� consumerConnections[i] = conn;<br>
> ��������������� Channel channel = conn.createChannel();<br>
> ��������������� if (consumerTxSize > 0) channel.txSelect();<br>
> ��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>
> ��������������� String queueName =<br>
> ����������������������� channel.queueDeclare("",<br>
> flags.contains("persistent"),<br>
> �������������������������������������������� true, false, null).getQueue();<br>
> ��������������� QueueingConsumer consumer = new QueueingConsumer(channel);<br>
> ��������������� if (prefetchCount > 0) channel.basicQos(prefetchCount);<br>
> ��������������� channel.basicConsume(queueName, autoAck, consumer);<br>
> ��������������� channel.queueBind(queueName, exchangeName, id);<br>
> ��������������� Thread t =<br>
> ������������������� new Thread(new Consumer(consumer, id,<br>
> ������������������������������������������� consumerTxSize, autoAck,<br>
> ������������������������������������������� stats, timeLimit));<br>
> ��������������� consumerThreads[i] = t;<br>
> ��������������� t.start();<br>
> ����������� }<br>
> ����������� Thread[] producerThreads = new Thread[producerCount];<br>
> ����������� Connection[] producerConnections = new<br>
> Connection[producerCount];<br>
> ����������� for (int i = 0; i < producerCount; i++) {<br>
> ��������������� System.out.println("starting producer #" + i);<br>
> ��������������� Connection conn = factory.newConnection();<br>
> ��������������� producerConnections[i] = conn;<br>
> ��������������� Channel channel = conn.createChannel();<br>
> ��������������� if (producerTxSize > 0) channel.txSelect();<br>
> ��������������� if (confirm >= 0) channel.confirmSelect();<br>
> ��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>
> ��������������� final Producer p = new Producer(channel, exchangeName, id,<br>
> ����������������������������������������������� flags, producerTxSize,<br>
> ����������������������������������������������� 1000L * samplingInterval,<br>
> ����������������������������������������������� rateLimit, minMsgSize,<br>
> timeLimit,<br>
> ����������������������������������������������� confirm);<br>
> ��������������� channel.setReturnListener(p);<br>
> ��������������� channel.setConfirmListener(p);<br>
> ��������������� Thread t = new Thread(p);<br>
> ��������������� producerThreads[i] = t;<br>
> ��������������� t.start();<br>
> ����������� }<br>
><br>
> ����������� for (int i = 0; i < producerCount; i++) {<br>
> ��������������� producerThreads[i].join();<br>
> ��������������� producerConnections[i].close();<br>
> ����������� }<br>
><br>
> ����������� for (int i = 0; i < consumerCount; i++) {<br>
> ��������������� consumerThreads[i].join();<br>
> ��������������� consumerConnections[i].close();<br>
> ����������� }<br>
><br>
> ������� }<br>
> ������� catch( ParseException exp ) {<br>
> ����������� System.err.println("Parsing failed. Reason: " +<br>
> exp.getMessage());<br>
> ����������� usage(options);<br>
> ������� } catch (Exception e) {<br>
> ����������� System.err.println("Main thread caught exception: " + e);<br>
> ����������� e.printStackTrace();<br>
> ����������� System.exit(1);<br>
> ������� }<br>
> ��� }<br>
><br>
> ��� private static void usage(Options options) {<br>
> ������� HelpFormatter formatter = new HelpFormatter();<br>
> ������� formatter.printHelp("<program>", options);<br>
> ��� }<br>
><br>
> ��� private static Options getOptions() {<br>
> ������� Options options = new Options();<br>
> ������� options.addOption(new Option("?", "help",����� false,"show usage"));<br>
> ������� options.addOption(new Option("h", "host",����� true, "broker<br>
> host"));<br>
> ������� options.addOption(new Option("p", "port",����� true, "broker<br>
> port"));<br>
> ������� options.addOption(new Option("t", "type",����� true, "exchange<br>
> type"));<br>
> ������� options.addOption(new Option("e", "exchange",� true, "exchange<br>
> name"));<br>
> ������� options.addOption(new Option("i", "interval",� true, "sampling<br>
> interval"));<br>
> ������� options.addOption(new Option("r", "rate",����� true, "rate limit"));<br>
> ������� options.addOption(new Option("x", "producers", true, "producer<br>
> count"));<br>
> ������� options.addOption(new Option("y", "consumers", true, "consumer<br>
> count"));<br>
> ������� options.addOption(new Option("m", "ptxsize",�� true, "producer tx<br>
> size"));<br>
> ������� options.addOption(new Option("n", "ctxsize",�� true, "consumer tx<br>
> size"));<br>
> ������� options.addOption(new Option("c", "confirm",�� true, "max<br>
> unconfirmed publishes"));<br>
> ������� options.addOption(new Option("a", "autoack",�� false,"auto ack"));<br>
> ������� options.addOption(new Option("q", "qos",������ true, "qos prefetch<br>
> count"));<br>
> ������� options.addOption(new Option("s", "size",����� true, "message<br>
> size"));<br>
> ������� options.addOption(new Option("z", "time",����� true, "time limit"));<br>
> ������� Option flag =���� new Option("f", "flag",����� true, "message<br>
> flag");<br>
> ������� flag.setArgs(Option.UNLIMITED_VALUES);<br>
> ������� options.addOption(flag);<br>
> ������� options.addOption(new Option("M", "framemax",� true, "frame max"));<br>
> ������� options.addOption(new Option("b", "heartbeat", true, "heartbeat<br>
> interval"));<br>
> ������� return options;<br>
> ��� }<br>
><br>
> ��� private static String strArg(CommandLine cmd, char opt, String def) {<br>
> ������� return cmd.getOptionValue(opt, def);<br>
> ��� }<br>
><br>
> ��� private static int intArg(CommandLine cmd, char opt, int def) {<br>
> ������� return Integer.parseInt(cmd.getOptionValue(opt,<br>
> Integer.toString(def)));<br>
> ��� }<br>
><br>
> ��� private static List<?> lstArg(CommandLine cmd, char opt) {<br>
> ������� String[] vals = cmd.getOptionValues('f');<br>
> ������� if (vals == null) {<br>
> ����������� vals = new String[] {};<br>
> ������� }<br>
> ������� return Arrays.asList(vals);<br>
> ��� }<br>
><br>
> ��� public static class Producer implements Runnable, ReturnListener,<br>
> ������������������������������������������� ConfirmListener<br>
> ��� {<br>
> ������� private Channel channel;<br>
> ������� private String� exchangeName;<br>
> ������� private String� id;<br>
> ������� private boolean mandatory;<br>
> ������� private boolean immediate;<br>
> ������� private boolean persistent;<br>
> ������� private int���� txSize;<br>
> ������� private long��� interval;<br>
> ������� private int���� rateLimit;<br>
> ������� private long��� timeLimit;<br>
><br>
> ������� private byte[]� message;<br>
><br>
> ������� private long��� startTime;<br>
> ������� private long��� lastStatsTime;<br>
> ������� private int���� msgCount;<br>
> ������� private int���� returnCount;<br>
><br>
> ������� private long����� confirm;<br>
> ������� private Semaphore confirmPool;<br>
> ������� private long����� confirmCount;<br>
> ������� private long����� nackCount;<br>
> ������� private volatile SortedSet<Long> unconfirmedSet =<br>
> ����������� Collections.synchronizedSortedSet(new TreeSet<Long>());<br>
><br>
> ������� public Producer(Channel channel, String exchangeName, String id,<br>
> ����������������������� List<?> flags, int txSize,<br>
> ����������������������� long interval, int rateLimit, int minMsgSize, int<br>
> timeLimit,<br>
> ����������������������� long confirm)<br>
> ����������� throws IOException {<br>
><br>
> ����������� this.channel����� = channel;<br>
> ����������� this.exchangeName = exchangeName;<br>
> ����������� <a href="http://this.id" target="_blank">this.id</a>���������� = id;<br>
> ����������� this.mandatory��� = flags.contains("mandatory");<br>
> ����������� this.immediate��� = flags.contains("immediate");<br>
> ����������� this.persistent�� = flags.contains("persistent");<br>
> ����������� this.txSize������ = txSize;<br>
> ����������� this.interval���� = interval;<br>
> ����������� this.rateLimit��� = rateLimit;<br>
> ����������� this.timeLimit��� = 1000L * timeLimit;<br>
> ����������� this.message����� = new byte[minMsgSize];<br>
> ����������� this.confirm����� = confirm;<br>
> ����������� if (confirm > 0) {<br>
> ��������������� this.confirmPool� = new Semaphore((int)confirm);<br>
> ����������� }<br>
> ������� }<br>
><br>
> ������� public synchronized void handleReturn(int replyCode,<br>
> ��������������������������������������������� String replyText,<br>
> ��������������������������������������������� String exchange,<br>
> ��������������������������������������������� String routingKey,<br>
> ��������������������������������������������� AMQP.BasicProperties<br>
> properties,<br>
> ��������������������������������������������� byte[] body)<br>
> ����������� throws IOException {<br>
> ����������� returnCount++;<br>
> ������� }<br>
><br>
> ������� public void handleAck(long seqNo, boolean multiple) {<br>
> ����������� handleAckNack(seqNo, multiple, false);<br>
> ������� }<br>
><br>
> ������� public void handleNack(long seqNo, boolean multiple) {<br>
> ����������� handleAckNack(seqNo, multiple, true);<br>
> ������� }<br>
><br>
> ������� private void handleAckNack(long seqNo, boolean multiple,<br>
> ���������������������������������� boolean nack) {<br>
> ����������� int numConfirms = 0;<br>
> ����������� if (multiple) {<br>
> ��������������� SortedSet<Long> confirmed = unconfirmedSet.headSet(seqNo +<br>
> 1);<br>
> ��������������� numConfirms += confirmed.size();<br>
> ��������������� confirmed.clear();<br>
> ����������� } else {<br>
> ��������������� unconfirmedSet.remove(seqNo);<br>
> ��������������� numConfirms = 1;<br>
> ����������� }<br>
> ����������� synchronized (this) {<br>
> ��������������� if (nack) {<br>
> ������������������� nackCount += numConfirms;<br>
> ��������������� } else {<br>
> ������������������� confirmCount += numConfirms;<br>
> ��������������� }<br>
> ����������� }<br>
><br>
> ����������� if (confirmPool != null) {<br>
> ��������������� for (int i = 0; i < numConfirms; ++i) {<br>
> ������������������� confirmPool.release();<br>
> ��������������� }<br>
> ����������� }<br>
><br>
> ������� }<br>
><br>
> ������� public void run() {<br>
><br>
> ����������� long now;<br>
> ����������� now = startTime = lastStatsTime = System.currentTimeMillis();<br>
> ����������� msgCount = 0;<br>
> ����������� int totalMsgCount = 0;<br>
><br>
> ����������� try {<br>
><br>
> ��������������� while (timeLimit == 0 || now < startTime + timeLimit) {<br>
> ������������������� if (confirmPool != null) {<br>
> ����������������������� confirmPool.acquire();<br>
> ������������������� }<br>
> ������������������� delay(now);<br>
> ������������������� publish(createMessage(totalMsgCount));<br>
> ������������������� totalMsgCount++;<br>
> ������������������� msgCount++;<br>
><br>
> ������������������� if (txSize != 0 && totalMsgCount % txSize == 0) {<br>
> ����������������������� channel.txCommit();<br>
> ������������������� }<br>
> ������������������� now = System.currentTimeMillis();<br>
> ��������������� }<br>
><br>
> ����������� } catch (IOException e) {<br>
> ��������������� throw new RuntimeException(e);<br>
> ����������� } catch (InterruptedException e) {<br>
> ��������������� throw new RuntimeException (e);<br>
> ����������� }<br>
><br>
> ����������� System.out.println("sending rate avg: " +<br>
> ������������������������������ (totalMsgCount * 1000L / (now - startTime)) +<br>
> ������������������������������ " msg/s");<br>
><br>
> ������� }<br>
><br>
> ������� private void publish(byte[] msg)<br>
> ����������� throws IOException {<br>
><br>
> ����������� unconfirmedSet.add(channel.getNextPublishSeqNo());<br>
> ����������� channel.basicPublish(exchangeName, id,<br>
> �������������������������������� mandatory, immediate,<br>
> �������������������������������� persistent ?<br>
> MessageProperties.MINIMAL_PERSISTENT_BASIC :<br>
> MessageProperties.MINIMAL_BASIC,<br>
> �������������������������������� msg);<br>
> ������� }<br>
><br>
> ������� private void delay(long now)<br>
> ����������� throws InterruptedException {<br>
><br>
> ����������� long elapsed = now - lastStatsTime;<br>
> ����������� //example: rateLimit is 5000 msg/s,<br>
> ����������� //10 ms have elapsed, we have sent 200 messages<br>
> ����������� //the 200 msgs we have actually sent should have taken us<br>
> ����������� //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br>
> ����������� long pause = rateLimit == 0 ?<br>
> ��������������� 0 : (msgCount * 1000L / rateLimit - elapsed);<br>
> ����������� if (pause > 0) {<br>
> ��������������� Thread.sleep(pause);<br>
> ����������� }<br>
> ����������� if (elapsed > interval) {<br>
> ��������������� long sendRate, returnRate, confirmRate, nackRate;<br>
> ��������������� synchronized(this) {<br>
> ������������������� sendRate���� = msgCount���� * 1000L / elapsed;<br>
> ������������������� returnRate�� = returnCount� * 1000L / elapsed;<br>
> ������������������� confirmRate� = confirmCount * 1000L / elapsed;<br>
> ������������������� nackRate���� = nackCount��� * 1000L / elapsed;<br>
> ������������������� msgCount���� = 0;<br>
> ������������������� returnCount� = 0;<br>
> ������������������� confirmCount = 0;<br>
> ������������������� nackCount��� = 0;<br>
> ��������������� }<br>
> ��������������� System.out.print("sending rate: " + sendRate + " msg/s");<br>
> ��������������� if (mandatory || immediate) {<br>
> ������������������� System.out.print(", returns: " + returnRate + " ret/s");<br>
> ��������������� }<br>
> ��������������� if (confirm >= 0) {<br>
> ������������������� System.out.print(", confirms: " + confirmRate + " c/s");<br>
> ������������������� if (nackRate > 0) {<br>
> ����������������������� System.out.print(", nacks: " + nackRate + " n/s");<br>
> ������������������� }<br>
> ��������������� }<br>
> ��������������� System.out.println();<br>
> ��������������� lastStatsTime = now;<br>
> ����������� }<br>
> ������� }<br>
><br>
> ������� private byte[] createMessage(int sequenceNumber)<br>
> ����������� throws IOException {<br>
><br>
> ����������� ByteArrayOutputStream acc = new ByteArrayOutputStream();<br>
> ����������� DataOutputStream d = new DataOutputStream(acc);<br>
> ����������� long nano = System.nanoTime();<br>
> ����������� d.writeInt(sequenceNumber);<br>
> ����������� d.writeLong(nano);<br>
> ����������� d.flush();<br>
> ����������� acc.flush();<br>
> ����������� byte[] m = acc.toByteArray();<br>
> ����������� if (m.length <= message.length) {<br>
> ��������������� System.arraycopy(m, 0, message, 0, m.length);<br>
> ��������������� return message;<br>
> ����������� } else {<br>
> ��������������� return m;<br>
> ����������� }<br>
> ������� }<br>
><br>
> ��� }<br>
><br>
> ��� public static class Consumer implements Runnable {<br>
><br>
> ������� private QueueingConsumer q;<br>
> ������� private String���������� id;<br>
> ������� private int������������� txSize;<br>
> ������� private boolean��������� autoAck;<br>
> ������� private Stats����������� stats;<br>
> ������� private long������������ timeLimit;<br>
><br>
> ������� public Consumer(QueueingConsumer q, String id,<br>
> ����������������������� int txSize, boolean autoAck,<br>
> ����������������������� Stats stats, int timeLimit) {<br>
><br>
> ����������� this.q�������� = q;<br>
> ����������� <a href="http://this.id" target="_blank">this.id</a>������� = id;<br>
> ����������� this.txSize��� = txSize;<br>
> ����������� this.autoAck�� = autoAck;<br>
> ����������� this.stats���� = stats;<br>
> ����������� this.timeLimit = 1000L * timeLimit;<br>
> ������� }<br>
><br>
> ������� public void run() {<br>
><br>
> ����������� long now;<br>
> ����������� long startTime;<br>
> ����������� startTime = now = System.currentTimeMillis();<br>
> ����������� int totalMsgCount = 0;<br>
><br>
> ����������� Channel channel = q.getChannel();<br>
><br>
> ����������� try {<br>
><br>
> ��������������� while (timeLimit == 0 || now < startTime + timeLimit) {<br>
> ������������������� Delivery delivery;<br>
> ������������������� if (timeLimit == 0) {<br>
> ����������������������� delivery = q.nextDelivery();<br>
> ������������������� } else {<br>
> ����������������������� delivery = q.nextDelivery(startTime + timeLimit -<br>
> now);<br>
> ����������������������� if (delivery == null) break;<br>
> ������������������� }<br>
> ��� ��� ��� totalMsgCount++;<br>
><br>
> ������������������� DataInputStream d = new DataInputStream(new<br>
> ByteArrayInputStream(delivery.getBody()));<br>
> ������������������� d.readInt();<br>
> ������������������� long msgNano = d.readLong();<br>
> ������������������� long nano = System.nanoTime();<br>
><br>
> ������������������� Envelope envelope = delivery.getEnvelope();<br>
><br>
> ������������������� if (!autoAck) {<br>
> ����������������������� channel.basicAck(envelope.getDeliveryTag(), false);<br>
> ������������������� }<br>
><br>
> ������������������� if (txSize != 0 && totalMsgCount % txSize == 0) {<br>
> ����������������������� channel.txCommit();<br>
> ������������������� }<br>
><br>
> ������������������� now = System.currentTimeMillis();<br>
><br>
> ������������������� stats.collectStats(now,<br>
> id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
> ��������������� }<br>
><br>
> ����������� } catch (IOException e) {<br>
> ��������������� throw new RuntimeException(e);<br>
> ����������� } catch (InterruptedException e) {<br>
> ��������������� throw new RuntimeException (e);<br>
> ����������� } catch (ShutdownSignalException e) {<br>
> ��������������� throw new RuntimeException(e);<br>
> ����������� }<br>
><br>
> ����������� long elapsed = now - startTime;<br>
> ����������� if (elapsed > 0) {<br>
> ��������������� System.out.println("recving rate avg: " +<br>
> ���������������������������������� (totalMsgCount * 1000L / elapsed) +<br>
> ���������������������������������� " msg/s");<br>
> ����������� }<br>
> ������� }<br>
><br>
> ��� }<br>
><br>
> ��� public static class Stats {<br>
><br>
> ������� private long��� interval;<br>
><br>
> ������� private long��� lastStatsTime;<br>
> ������� private int���� msgCount;<br>
> ������� private int���� latencyCount;<br>
> ������� private long��� minLatency;<br>
> ������� private long��� maxLatency;<br>
> ������� private long��� cumulativeLatency;<br>
><br>
> ������� public Stats(long interval) {<br>
> ����������� this.interval = interval;<br>
> ����������� reset(System.currentTimeMillis());<br>
> ������� }<br>
><br>
> ������� private void reset(long t) {<br>
> ����������� lastStatsTime���� = t;<br>
> ����������� msgCount��������� = 0;<br>
> ����������� latencyCount����� = 0;<br>
> ����������� minLatency������� = Long.MAX_VALUE;<br>
> ����������� maxLatency������� = Long.MIN_VALUE;<br>
> ����������� cumulativeLatency = 0L;<br>
> ������� }<br>
><br>
> ������� public synchronized void collectStats(long now, long latency) {<br>
> ����������� msgCount++;<br>
><br>
> ����������� if (latency > 0) {<br>
> ��������������� minLatency = Math.min(minLatency, latency);<br>
> ��������������� maxLatency = Math.max(maxLatency, latency);<br>
> ��������������� cumulativeLatency += latency;<br>
> ��������������� latencyCount++;<br>
> ����������� }<br>
><br>
> ����������� long elapsed = now - lastStatsTime;<br>
> ����������� if (elapsed > interval) {<br>
> ��������������� System.out.println("recving rate: " +<br>
> ���������������������������������� (1000L * msgCount / elapsed) +<br>
> ���������������������������������� " msg/s" +<br>
> ���������������������������������� (latencyCount > 0 ?<br>
> ����������������������������������� ", min/avg/max latency: " +<br>
> ����������������������������������� minLatency/1000L + "/" +<br>
> ����������������������������������� cumulativeLatency / (1000L *<br>
> latencyCount) + "/" +<br>
> ����������������������������������� maxLatency/1000L + " microseconds" :<br>
> ����������������������������������� ""));<br>
> ��������������� reset(now);<br>
> ����������� }<br>
><br>
> ������� }<br>
><br>
> ��� }<br>
><br>
> }<br>
><br>
><br>
> On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
> wrote:<br>
>><br>
>> I am sorry but this link doesn't have any source code - this is the<br>
>> library.<br>
>> I meant *your* source code, the code of your program; the code which<br>
>> exercises the library and gets 5000 msg/s.<br>
>><br>
>> P.S. Please use "Reply all", this discussion is most probably<br>
>> interesting for many people in the community.<br>
>><br>
>> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> > I am following this link for my java code:<br>
>> ><br>
>> > <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>
>> ><br>
>> > On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> > wrote:<br>
>> >><br>
>> >> Please show the source code of all participating components. It's<br>
>> >> impossible to say anything definite without it.<br>
>> >><br>
>> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> ><br>
>> >> > Till now i am getting the result of 5000 msg/sec in java.<br>
>> >> ><br>
>> >> > On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov<br>
>> >> > <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> >> > wrote:<br>
>> >> >><br>
>> >> >> I guess you should get the speed you want, and much more, if you<br>
>> >> >> just<br>
>> >> >> set up a high enough value for prefetch (see basic.qos) and set<br>
>> >> >> autoack.<br>
>> >> >> I got ~8k msg/s with much larger messages and persistence turned on<br>
>> >> >> too.<br>
>> >> >><br>
>> >> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> >> ><br>
>> >> >> > Message size is 199 bytes<br>
>> >> >> > Messaging scenario is Persistence<br>
>> >> >> ><br>
>> >> >> > On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
>> >> >> > <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> >> >> > wrote:<br>
>> >> >> >><br>
>> >> >> >> Hi.<br>
>> >> >> >><br>
>> >> >> >> What's the size of messages?<br>
>> >> >> ><br>
>> >> >> >> What's your messaging scenario in general? Persistence, prefetch,<br>
>> >> >> >> transactions?<br>
>> >> >> ><br>
>> >> >> >><br>
>> >> >> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> >> >> > Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ.<br>
>> >> >> >> > I<br>
>> >> >> >> > have<br>
>> >> >> >> > tried a lot in java as well as in ruby. And right now<br>
>> >> >> >> > implementing<br>
>> >> >> >> > node.js .<br>
>> >> >> >> > I want to prefer java because after lots of experimentation the<br>
>> >> >> >> > only<br>
>> >> >> >> > java has given me good result but still goal is not achieved.<br>
>> >> >> >> > So i want help to know how to reach upto 15000 msg/sec in<br>
>> >> >> >> > RabbitMQ<br>
>> >> >> >> > using java. If any good suggestion �is there please tell me or<br>
>> >> >> >> > if<br>
>> >> >> >> > any<br>
>> >> >> >> > site to prefer then please tell me.<br>
>> >> >> >> ><br>
>> >> >> >> > Thanks.<br>
>> >> >> >> > _______________________________________________<br>
>> >> >> >> > rabbitmq-discuss mailing list<br>
>> >> >> >> > <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
>> >> >> >> ><br>
>> >> >> >> ><br>
>> >> >> >> > <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
>> >> >> >> ><br>
>> >> >> >><br>
>> >> >> >><br>
>> >> >> >><br>
>> >> >> >> --<br>
>> >> >> >> Eugene Kirpichov<br>
>> >> >> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> >> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> >> >> ><br>
>> >> >> ><br>
>> >> >><br>
>> >> >><br>
>> >> >><br>
>> >> >> --<br>
>> >> >> Eugene Kirpichov<br>
>> >> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> >> ><br>
>> >> ><br>
>> >><br>
>> >><br>
>> >><br>
>> >> --<br>
>> >> Eugene Kirpichov<br>
>> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> ><br>
>> ><br>
>><br>
>><br>
>><br>
>> --<br>
>> Eugene Kirpichov<br>
>> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
><br>
><br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br></div>