[rabbitmq-discuss] 15000 msg/sec
Eugene Kirpichov
ekirpichov at gmail.com
Fri Jul 15 15:27:12 BST 2011
Is this the minimal piece of code that demonstrates poor performance?
It's hard to decipher that much code and it can have many kinds of
problems inside; I think you should be able to trim it down to a
couple dozen lines demonstrating the same speed; then diagnosing will
become a lot easier.
2011/7/15 News Aanad <news.anand11 at gmail.com>:
> This is my code.
>
>
> // The contents of this file are subject to the Mozilla Public License
> // Version 1.1 (the "License"); you may not use this file except in
> // compliance with the License. You may obtain a copy of the License
> // at http://www.mozilla.org/MPL/
> //
> // Software distributed under the License is distributed on an "AS IS"
> // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
> // the License for the specific language governing rights and
> // limitations under the License.
> //
> // The Original Code is RabbitMQ.
> //
> // The Initial Developer of the Original Code is VMware, Inc.
> // Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
> //
>
>
> package com.rabbitmq.examples;
>
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.io.DataInputStream;
> import java.io.DataOutputStream;
> import java.io.IOException;
> import java.util.Arrays;
> import java.util.Collections;
> import java.util.List;
> import java.util.SortedSet;
> import java.util.TreeSet;
> import java.util.UUID;
> import java.util.concurrent.Semaphore;
>
> import org.apache.commons.cli.CommandLine;
> import org.apache.commons.cli.CommandLineParser;
> import org.apache.commons.cli.GnuParser;
> import org.apache.commons.cli.HelpFormatter;
> import org.apache.commons.cli.Option;
> import org.apache.commons.cli.Options;
> import org.apache.commons.cli.ParseException;
>
> import com.rabbitmq.client.AMQP;
> import com.rabbitmq.client.Channel;
> import com.rabbitmq.client.ConfirmListener;
> import com.rabbitmq.client.Connection;
> import com.rabbitmq.client.ConnectionFactory;
> import com.rabbitmq.client.Envelope;
> import com.rabbitmq.client.MessageProperties;
> import com.rabbitmq.client.QueueingConsumer;
> import com.rabbitmq.client.QueueingConsumer.Delivery;
> import com.rabbitmq.client.ReturnListener;
> import com.rabbitmq.client.ShutdownSignalException;
>
>
> public class MulticastMain {
>
> public static void main(String[] args) {
> Options options = getOptions();
> CommandLineParser parser = new GnuParser();
> try {
> CommandLine cmd = parser.parse(options, args);
>
> if (cmd.hasOption('?')) {
> usage(options);
> System.exit(0);
> }
>
> String hostName = strArg(cmd, 'h', "localhost");
> int portNumber = intArg(cmd, 'p', AMQP.PROTOCOL.PORT);
> String exchangeType = strArg(cmd, 't', "direct");
> String exchangeName = strArg(cmd, 'e', exchangeType);
> int samplingInterval = intArg(cmd, 'i', 1);
> int rateLimit = intArg(cmd, 'r', 0);
> int producerCount = intArg(cmd, 'x', 1);
> int consumerCount = intArg(cmd, 'y', 1);
> int producerTxSize = intArg(cmd, 'm', 0);
> int consumerTxSize = intArg(cmd, 'n', 0);
> long confirm = intArg(cmd, 'c', -1);
> boolean autoAck = cmd.hasOption('a');
> int prefetchCount = intArg(cmd, 'q', 0);
> int minMsgSize = intArg(cmd, 's', 0);
> int timeLimit = intArg(cmd, 'z', 0);
> List<?> flags = lstArg(cmd, 'f');
> int frameMax = intArg(cmd, 'M', 0);
> int heartbeat = intArg(cmd, 'b', 0);
>
> if ((producerTxSize > 0) && confirm >= 0) {
> throw new ParseException("Cannot select both
> producerTxSize"+
> " and confirm");
> }
>
> //setup
> String id = UUID.randomUUID().toString();
> Stats stats = new Stats(1000L * samplingInterval);
> ConnectionFactory factory = new ConnectionFactory();
> factory.setHost(hostName);
> factory.setPort(portNumber);
> factory.setRequestedFrameMax(frameMax);
> factory.setRequestedHeartbeat(heartbeat);
>
> Thread[] consumerThreads = new Thread[consumerCount];
> Connection[] consumerConnections = new
> Connection[consumerCount];
> for (int i = 0; i < consumerCount; i++) {
> System.out.println("starting consumer #" + i);
> Connection conn = factory.newConnection();
> consumerConnections[i] = conn;
> Channel channel = conn.createChannel();
> if (consumerTxSize > 0) channel.txSelect();
> channel.exchangeDeclare(exchangeName, exchangeType);
> String queueName =
> channel.queueDeclare("",
> flags.contains("persistent"),
> true, false, null).getQueue();
> QueueingConsumer consumer = new QueueingConsumer(channel);
> if (prefetchCount > 0) channel.basicQos(prefetchCount);
> channel.basicConsume(queueName, autoAck, consumer);
> channel.queueBind(queueName, exchangeName, id);
> Thread t =
> new Thread(new Consumer(consumer, id,
> consumerTxSize, autoAck,
> stats, timeLimit));
> consumerThreads[i] = t;
> t.start();
> }
> Thread[] producerThreads = new Thread[producerCount];
> Connection[] producerConnections = new
> Connection[producerCount];
> for (int i = 0; i < producerCount; i++) {
> System.out.println("starting producer #" + i);
> Connection conn = factory.newConnection();
> producerConnections[i] = conn;
> Channel channel = conn.createChannel();
> if (producerTxSize > 0) channel.txSelect();
> if (confirm >= 0) channel.confirmSelect();
> channel.exchangeDeclare(exchangeName, exchangeType);
> final Producer p = new Producer(channel, exchangeName, id,
> flags, producerTxSize,
> 1000L * samplingInterval,
> rateLimit, minMsgSize,
> timeLimit,
> confirm);
> channel.setReturnListener(p);
> channel.setConfirmListener(p);
> Thread t = new Thread(p);
> producerThreads[i] = t;
> t.start();
> }
>
> for (int i = 0; i < producerCount; i++) {
> producerThreads[i].join();
> producerConnections[i].close();
> }
>
> for (int i = 0; i < consumerCount; i++) {
> consumerThreads[i].join();
> consumerConnections[i].close();
> }
>
> }
> catch( ParseException exp ) {
> System.err.println("Parsing failed. Reason: " +
> exp.getMessage());
> usage(options);
> } catch (Exception e) {
> System.err.println("Main thread caught exception: " + e);
> e.printStackTrace();
> System.exit(1);
> }
> }
>
> private static void usage(Options options) {
> HelpFormatter formatter = new HelpFormatter();
> formatter.printHelp("<program>", options);
> }
>
> private static Options getOptions() {
> Options options = new Options();
> options.addOption(new Option("?", "help", false,"show usage"));
> options.addOption(new Option("h", "host", true, "broker
> host"));
> options.addOption(new Option("p", "port", true, "broker
> port"));
> options.addOption(new Option("t", "type", true, "exchange
> type"));
> options.addOption(new Option("e", "exchange", true, "exchange
> name"));
> options.addOption(new Option("i", "interval", true, "sampling
> interval"));
> options.addOption(new Option("r", "rate", true, "rate limit"));
> options.addOption(new Option("x", "producers", true, "producer
> count"));
> options.addOption(new Option("y", "consumers", true, "consumer
> count"));
> options.addOption(new Option("m", "ptxsize", true, "producer tx
> size"));
> options.addOption(new Option("n", "ctxsize", true, "consumer tx
> size"));
> options.addOption(new Option("c", "confirm", true, "max
> unconfirmed publishes"));
> options.addOption(new Option("a", "autoack", false,"auto ack"));
> options.addOption(new Option("q", "qos", true, "qos prefetch
> count"));
> options.addOption(new Option("s", "size", true, "message
> size"));
> options.addOption(new Option("z", "time", true, "time limit"));
> Option flag = new Option("f", "flag", true, "message
> flag");
> flag.setArgs(Option.UNLIMITED_VALUES);
> options.addOption(flag);
> options.addOption(new Option("M", "framemax", true, "frame max"));
> options.addOption(new Option("b", "heartbeat", true, "heartbeat
> interval"));
> return options;
> }
>
> private static String strArg(CommandLine cmd, char opt, String def) {
> return cmd.getOptionValue(opt, def);
> }
>
> private static int intArg(CommandLine cmd, char opt, int def) {
> return Integer.parseInt(cmd.getOptionValue(opt,
> Integer.toString(def)));
> }
>
> private static List<?> lstArg(CommandLine cmd, char opt) {
> String[] vals = cmd.getOptionValues('f');
> if (vals == null) {
> vals = new String[] {};
> }
> return Arrays.asList(vals);
> }
>
> public static class Producer implements Runnable, ReturnListener,
> ConfirmListener
> {
> private Channel channel;
> private String exchangeName;
> private String id;
> private boolean mandatory;
> private boolean immediate;
> private boolean persistent;
> private int txSize;
> private long interval;
> private int rateLimit;
> private long timeLimit;
>
> private byte[] message;
>
> private long startTime;
> private long lastStatsTime;
> private int msgCount;
> private int returnCount;
>
> private long confirm;
> private Semaphore confirmPool;
> private long confirmCount;
> private long nackCount;
> private volatile SortedSet<Long> unconfirmedSet =
> Collections.synchronizedSortedSet(new TreeSet<Long>());
>
> public Producer(Channel channel, String exchangeName, String id,
> List<?> flags, int txSize,
> long interval, int rateLimit, int minMsgSize, int
> timeLimit,
> long confirm)
> throws IOException {
>
> this.channel = channel;
> this.exchangeName = exchangeName;
> this.id = id;
> this.mandatory = flags.contains("mandatory");
> this.immediate = flags.contains("immediate");
> this.persistent = flags.contains("persistent");
> this.txSize = txSize;
> this.interval = interval;
> this.rateLimit = rateLimit;
> this.timeLimit = 1000L * timeLimit;
> this.message = new byte[minMsgSize];
> this.confirm = confirm;
> if (confirm > 0) {
> this.confirmPool = new Semaphore((int)confirm);
> }
> }
>
> public synchronized void handleReturn(int replyCode,
> String replyText,
> String exchange,
> String routingKey,
> AMQP.BasicProperties
> properties,
> byte[] body)
> throws IOException {
> returnCount++;
> }
>
> public void handleAck(long seqNo, boolean multiple) {
> handleAckNack(seqNo, multiple, false);
> }
>
> public void handleNack(long seqNo, boolean multiple) {
> handleAckNack(seqNo, multiple, true);
> }
>
> private void handleAckNack(long seqNo, boolean multiple,
> boolean nack) {
> int numConfirms = 0;
> if (multiple) {
> SortedSet<Long> confirmed = unconfirmedSet.headSet(seqNo +
> 1);
> numConfirms += confirmed.size();
> confirmed.clear();
> } else {
> unconfirmedSet.remove(seqNo);
> numConfirms = 1;
> }
> synchronized (this) {
> if (nack) {
> nackCount += numConfirms;
> } else {
> confirmCount += numConfirms;
> }
> }
>
> if (confirmPool != null) {
> for (int i = 0; i < numConfirms; ++i) {
> confirmPool.release();
> }
> }
>
> }
>
> public void run() {
>
> long now;
> now = startTime = lastStatsTime = System.currentTimeMillis();
> msgCount = 0;
> int totalMsgCount = 0;
>
> try {
>
> while (timeLimit == 0 || now < startTime + timeLimit) {
> if (confirmPool != null) {
> confirmPool.acquire();
> }
> delay(now);
> publish(createMessage(totalMsgCount));
> totalMsgCount++;
> msgCount++;
>
> if (txSize != 0 && totalMsgCount % txSize == 0) {
> channel.txCommit();
> }
> now = System.currentTimeMillis();
> }
>
> } catch (IOException e) {
> throw new RuntimeException(e);
> } catch (InterruptedException e) {
> throw new RuntimeException (e);
> }
>
> System.out.println("sending rate avg: " +
> (totalMsgCount * 1000L / (now - startTime)) +
> " msg/s");
>
> }
>
> private void publish(byte[] msg)
> throws IOException {
>
> unconfirmedSet.add(channel.getNextPublishSeqNo());
> channel.basicPublish(exchangeName, id,
> mandatory, immediate,
> persistent ?
> MessageProperties.MINIMAL_PERSISTENT_BASIC :
> MessageProperties.MINIMAL_BASIC,
> msg);
> }
>
> private void delay(long now)
> throws InterruptedException {
>
> long elapsed = now - lastStatsTime;
> //example: rateLimit is 5000 msg/s,
> //10 ms have elapsed, we have sent 200 messages
> //the 200 msgs we have actually sent should have taken us
> //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms
> long pause = rateLimit == 0 ?
> 0 : (msgCount * 1000L / rateLimit - elapsed);
> if (pause > 0) {
> Thread.sleep(pause);
> }
> if (elapsed > interval) {
> long sendRate, returnRate, confirmRate, nackRate;
> synchronized(this) {
> sendRate = msgCount * 1000L / elapsed;
> returnRate = returnCount * 1000L / elapsed;
> confirmRate = confirmCount * 1000L / elapsed;
> nackRate = nackCount * 1000L / elapsed;
> msgCount = 0;
> returnCount = 0;
> confirmCount = 0;
> nackCount = 0;
> }
> System.out.print("sending rate: " + sendRate + " msg/s");
> if (mandatory || immediate) {
> System.out.print(", returns: " + returnRate + " ret/s");
> }
> if (confirm >= 0) {
> System.out.print(", confirms: " + confirmRate + " c/s");
> if (nackRate > 0) {
> System.out.print(", nacks: " + nackRate + " n/s");
> }
> }
> System.out.println();
> lastStatsTime = now;
> }
> }
>
> private byte[] createMessage(int sequenceNumber)
> throws IOException {
>
> ByteArrayOutputStream acc = new ByteArrayOutputStream();
> DataOutputStream d = new DataOutputStream(acc);
> long nano = System.nanoTime();
> d.writeInt(sequenceNumber);
> d.writeLong(nano);
> d.flush();
> acc.flush();
> byte[] m = acc.toByteArray();
> if (m.length <= message.length) {
> System.arraycopy(m, 0, message, 0, m.length);
> return message;
> } else {
> return m;
> }
> }
>
> }
>
> public static class Consumer implements Runnable {
>
> private QueueingConsumer q;
> private String id;
> private int txSize;
> private boolean autoAck;
> private Stats stats;
> private long timeLimit;
>
> public Consumer(QueueingConsumer q, String id,
> int txSize, boolean autoAck,
> Stats stats, int timeLimit) {
>
> this.q = q;
> this.id = id;
> this.txSize = txSize;
> this.autoAck = autoAck;
> this.stats = stats;
> this.timeLimit = 1000L * timeLimit;
> }
>
> public void run() {
>
> long now;
> long startTime;
> startTime = now = System.currentTimeMillis();
> int totalMsgCount = 0;
>
> Channel channel = q.getChannel();
>
> try {
>
> while (timeLimit == 0 || now < startTime + timeLimit) {
> Delivery delivery;
> if (timeLimit == 0) {
> delivery = q.nextDelivery();
> } else {
> delivery = q.nextDelivery(startTime + timeLimit -
> now);
> if (delivery == null) break;
> }
> totalMsgCount++;
>
> DataInputStream d = new DataInputStream(new
> ByteArrayInputStream(delivery.getBody()));
> d.readInt();
> long msgNano = d.readLong();
> long nano = System.nanoTime();
>
> Envelope envelope = delivery.getEnvelope();
>
> if (!autoAck) {
> channel.basicAck(envelope.getDeliveryTag(), false);
> }
>
> if (txSize != 0 && totalMsgCount % txSize == 0) {
> channel.txCommit();
> }
>
> now = System.currentTimeMillis();
>
> stats.collectStats(now,
> id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
> }
>
> } catch (IOException e) {
> throw new RuntimeException(e);
> } catch (InterruptedException e) {
> throw new RuntimeException (e);
> } catch (ShutdownSignalException e) {
> throw new RuntimeException(e);
> }
>
> long elapsed = now - startTime;
> if (elapsed > 0) {
> System.out.println("recving rate avg: " +
> (totalMsgCount * 1000L / elapsed) +
> " msg/s");
> }
> }
>
> }
>
> public static class Stats {
>
> private long interval;
>
> private long lastStatsTime;
> private int msgCount;
> private int latencyCount;
> private long minLatency;
> private long maxLatency;
> private long cumulativeLatency;
>
> public Stats(long interval) {
> this.interval = interval;
> reset(System.currentTimeMillis());
> }
>
> private void reset(long t) {
> lastStatsTime = t;
> msgCount = 0;
> latencyCount = 0;
> minLatency = Long.MAX_VALUE;
> maxLatency = Long.MIN_VALUE;
> cumulativeLatency = 0L;
> }
>
> public synchronized void collectStats(long now, long latency) {
> msgCount++;
>
> if (latency > 0) {
> minLatency = Math.min(minLatency, latency);
> maxLatency = Math.max(maxLatency, latency);
> cumulativeLatency += latency;
> latencyCount++;
> }
>
> long elapsed = now - lastStatsTime;
> if (elapsed > interval) {
> System.out.println("recving rate: " +
> (1000L * msgCount / elapsed) +
> " msg/s" +
> (latencyCount > 0 ?
> ", min/avg/max latency: " +
> minLatency/1000L + "/" +
> cumulativeLatency / (1000L *
> latencyCount) + "/" +
> maxLatency/1000L + " microseconds" :
> ""));
> reset(now);
> }
>
> }
>
> }
>
> }
>
>
> On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <ekirpichov at gmail.com>
> wrote:
>>
>> I am sorry but this link doesn't have any source code - this is the
>> library.
>> I meant *your* source code, the code of your program; the code which
>> exercises the library and gets 5000 msg/s.
>>
>> P.S. Please use "Reply all", this discussion is most probably
>> interesting for many people in the community.
>>
>> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
>> > I am following this link for my java code:
>> >
>> > http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz
>> >
>> > On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov <ekirpichov at gmail.com>
>> > wrote:
>> >>
>> >> Please show the source code of all participating components. It's
>> >> impossible to say anything definite without it.
>> >>
>> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
>> >> >
>> >> > Till now i am getting the result of 5000 msg/sec in java.
>> >> >
>> >> > On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov
>> >> > <ekirpichov at gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> I guess you should get the speed you want, and much more, if you
>> >> >> just
>> >> >> set up a high enough value for prefetch (see basic.qos) and set
>> >> >> autoack.
>> >> >> I got ~8k msg/s with much larger messages and persistence turned on
>> >> >> too.
>> >> >>
>> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
>> >> >> >
>> >> >> > Message size is 199 bytes
>> >> >> > Messaging scenario is Persistence
>> >> >> >
>> >> >> > On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov
>> >> >> > <ekirpichov at gmail.com>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Hi.
>> >> >> >>
>> >> >> >> What's the size of messages?
>> >> >> >
>> >> >> >> What's your messaging scenario in general? Persistence, prefetch,
>> >> >> >> transactions?
>> >> >> >
>> >> >> >>
>> >> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
>> >> >> >> > Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ.
>> >> >> >> > I
>> >> >> >> > have
>> >> >> >> > tried a lot in java as well as in ruby. And right now
>> >> >> >> > implementing
>> >> >> >> > node.js .
>> >> >> >> > I want to prefer java because after lots of experimentation the
>> >> >> >> > only
>> >> >> >> > java has given me good result but still goal is not achieved.
>> >> >> >> > So i want help to know how to reach upto 15000 msg/sec in
>> >> >> >> > RabbitMQ
>> >> >> >> > using java. If any good suggestion is there please tell me or
>> >> >> >> > if
>> >> >> >> > any
>> >> >> >> > site to prefer then please tell me.
>> >> >> >> >
>> >> >> >> > Thanks.
>> >> >> >> > _______________________________________________
>> >> >> >> > rabbitmq-discuss mailing list
>> >> >> >> > rabbitmq-discuss at lists.rabbitmq.com
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>> >> >> >> >
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Eugene Kirpichov
>> >> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
>> >> >> >> Editor, http://fprog.ru/
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Eugene Kirpichov
>> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
>> >> >> Editor, http://fprog.ru/
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Eugene Kirpichov
>> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
>> >> Editor, http://fprog.ru/
>> >
>> >
>>
>>
>>
>> --
>> Eugene Kirpichov
>> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
>> Editor, http://fprog.ru/
>
>
--
Eugene Kirpichov
Principal Engineer, Mirantis Inc. http://www.mirantis.com/
Editor, http://fprog.ru/
More information about the rabbitmq-discuss
mailing list