[rabbitmq-discuss] 15000 msg/sec
News Aanad
news.anand11 at gmail.com
Fri Jul 15 14:42:40 BST 2011
This is my code.
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is VMware, Inc.
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
//
package com.rabbitmq.examples;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownSignalException;
public class MulticastMain {
public static void main(String[] args) {
Options options = getOptions();
CommandLineParser parser = new GnuParser();
try {
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption('?')) {
usage(options);
System.exit(0);
}
String hostName = strArg(cmd, 'h', "localhost");
int portNumber = intArg(cmd, 'p', AMQP.PROTOCOL.PORT);
String exchangeType = strArg(cmd, 't', "direct");
String exchangeName = strArg(cmd, 'e', exchangeType);
int samplingInterval = intArg(cmd, 'i', 1);
int rateLimit = intArg(cmd, 'r', 0);
int producerCount = intArg(cmd, 'x', 1);
int consumerCount = intArg(cmd, 'y', 1);
int producerTxSize = intArg(cmd, 'm', 0);
int consumerTxSize = intArg(cmd, 'n', 0);
long confirm = intArg(cmd, 'c', -1);
boolean autoAck = cmd.hasOption('a');
int prefetchCount = intArg(cmd, 'q', 0);
int minMsgSize = intArg(cmd, 's', 0);
int timeLimit = intArg(cmd, 'z', 0);
List<?> flags = lstArg(cmd, 'f');
int frameMax = intArg(cmd, 'M', 0);
int heartbeat = intArg(cmd, 'b', 0);
if ((producerTxSize > 0) && confirm >= 0) {
throw new ParseException("Cannot select both
producerTxSize"+
" and confirm");
}
//setup
String id = UUID.randomUUID().toString();
Stats stats = new Stats(1000L * samplingInterval);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setRequestedFrameMax(frameMax);
factory.setRequestedHeartbeat(heartbeat);
Thread[] consumerThreads = new Thread[consumerCount];
Connection[] consumerConnections = new
Connection[consumerCount];
for (int i = 0; i < consumerCount; i++) {
System.out.println("starting consumer #" + i);
Connection conn = factory.newConnection();
consumerConnections[i] = conn;
Channel channel = conn.createChannel();
if (consumerTxSize > 0) channel.txSelect();
channel.exchangeDeclare(exchangeName, exchangeType);
String queueName =
channel.queueDeclare("",
flags.contains("persistent"),
true, false, null).getQueue();
QueueingConsumer consumer = new QueueingConsumer(channel);
if (prefetchCount > 0) channel.basicQos(prefetchCount);
channel.basicConsume(queueName, autoAck, consumer);
channel.queueBind(queueName, exchangeName, id);
Thread t =
new Thread(new Consumer(consumer, id,
consumerTxSize, autoAck,
stats, timeLimit));
consumerThreads[i] = t;
t.start();
}
Thread[] producerThreads = new Thread[producerCount];
Connection[] producerConnections = new
Connection[producerCount];
for (int i = 0; i < producerCount; i++) {
System.out.println("starting producer #" + i);
Connection conn = factory.newConnection();
producerConnections[i] = conn;
Channel channel = conn.createChannel();
if (producerTxSize > 0) channel.txSelect();
if (confirm >= 0) channel.confirmSelect();
channel.exchangeDeclare(exchangeName, exchangeType);
final Producer p = new Producer(channel, exchangeName, id,
flags, producerTxSize,
1000L * samplingInterval,
rateLimit, minMsgSize,
timeLimit,
confirm);
channel.setReturnListener(p);
channel.setConfirmListener(p);
Thread t = new Thread(p);
producerThreads[i] = t;
t.start();
}
for (int i = 0; i < producerCount; i++) {
producerThreads[i].join();
producerConnections[i].close();
}
for (int i = 0; i < consumerCount; i++) {
consumerThreads[i].join();
consumerConnections[i].close();
}
}
catch( ParseException exp ) {
System.err.println("Parsing failed. Reason: " +
exp.getMessage());
usage(options);
} catch (Exception e) {
System.err.println("Main thread caught exception: " + e);
e.printStackTrace();
System.exit(1);
}
}
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("<program>", options);
}
private static Options getOptions() {
Options options = new Options();
options.addOption(new Option("?", "help", false,"show usage"));
options.addOption(new Option("h", "host", true, "broker
host"));
options.addOption(new Option("p", "port", true, "broker
port"));
options.addOption(new Option("t", "type", true, "exchange
type"));
options.addOption(new Option("e", "exchange", true, "exchange
name"));
options.addOption(new Option("i", "interval", true, "sampling
interval"));
options.addOption(new Option("r", "rate", true, "rate limit"));
options.addOption(new Option("x", "producers", true, "producer
count"));
options.addOption(new Option("y", "consumers", true, "consumer
count"));
options.addOption(new Option("m", "ptxsize", true, "producer tx
size"));
options.addOption(new Option("n", "ctxsize", true, "consumer tx
size"));
options.addOption(new Option("c", "confirm", true, "max
unconfirmed publishes"));
options.addOption(new Option("a", "autoack", false,"auto ack"));
options.addOption(new Option("q", "qos", true, "qos prefetch
count"));
options.addOption(new Option("s", "size", true, "message
size"));
options.addOption(new Option("z", "time", true, "time limit"));
Option flag = new Option("f", "flag", true, "message
flag");
flag.setArgs(Option.UNLIMITED_VALUES);
options.addOption(flag);
options.addOption(new Option("M", "framemax", true, "frame max"));
options.addOption(new Option("b", "heartbeat", true, "heartbeat
interval"));
return options;
}
private static String strArg(CommandLine cmd, char opt, String def) {
return cmd.getOptionValue(opt, def);
}
private static int intArg(CommandLine cmd, char opt, int def) {
return Integer.parseInt(cmd.getOptionValue(opt,
Integer.toString(def)));
}
private static List<?> lstArg(CommandLine cmd, char opt) {
String[] vals = cmd.getOptionValues('f');
if (vals == null) {
vals = new String[] {};
}
return Arrays.asList(vals);
}
public static class Producer implements Runnable, ReturnListener,
ConfirmListener
{
private Channel channel;
private String exchangeName;
private String id;
private boolean mandatory;
private boolean immediate;
private boolean persistent;
private int txSize;
private long interval;
private int rateLimit;
private long timeLimit;
private byte[] message;
private long startTime;
private long lastStatsTime;
private int msgCount;
private int returnCount;
private long confirm;
private Semaphore confirmPool;
private long confirmCount;
private long nackCount;
private volatile SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet<Long>());
public Producer(Channel channel, String exchangeName, String id,
List<?> flags, int txSize,
long interval, int rateLimit, int minMsgSize, int
timeLimit,
long confirm)
throws IOException {
this.channel = channel;
this.exchangeName = exchangeName;
this.id = id;
this.mandatory = flags.contains("mandatory");
this.immediate = flags.contains("immediate");
this.persistent = flags.contains("persistent");
this.txSize = txSize;
this.interval = interval;
this.rateLimit = rateLimit;
this.timeLimit = 1000L * timeLimit;
this.message = new byte[minMsgSize];
this.confirm = confirm;
if (confirm > 0) {
this.confirmPool = new Semaphore((int)confirm);
}
}
public synchronized void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
returnCount++;
}
public void handleAck(long seqNo, boolean multiple) {
handleAckNack(seqNo, multiple, false);
}
public void handleNack(long seqNo, boolean multiple) {
handleAckNack(seqNo, multiple, true);
}
private void handleAckNack(long seqNo, boolean multiple,
boolean nack) {
int numConfirms = 0;
if (multiple) {
SortedSet<Long> confirmed = unconfirmedSet.headSet(seqNo +
1);
numConfirms += confirmed.size();
confirmed.clear();
} else {
unconfirmedSet.remove(seqNo);
numConfirms = 1;
}
synchronized (this) {
if (nack) {
nackCount += numConfirms;
} else {
confirmCount += numConfirms;
}
}
if (confirmPool != null) {
for (int i = 0; i < numConfirms; ++i) {
confirmPool.release();
}
}
}
public void run() {
long now;
now = startTime = lastStatsTime = System.currentTimeMillis();
msgCount = 0;
int totalMsgCount = 0;
try {
while (timeLimit == 0 || now < startTime + timeLimit) {
if (confirmPool != null) {
confirmPool.acquire();
}
delay(now);
publish(createMessage(totalMsgCount));
totalMsgCount++;
msgCount++;
if (txSize != 0 && totalMsgCount % txSize == 0) {
channel.txCommit();
}
now = System.currentTimeMillis();
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException (e);
}
System.out.println("sending rate avg: " +
(totalMsgCount * 1000L / (now - startTime)) +
" msg/s");
}
private void publish(byte[] msg)
throws IOException {
unconfirmedSet.add(channel.getNextPublishSeqNo());
channel.basicPublish(exchangeName, id,
mandatory, immediate,
persistent ?
MessageProperties.MINIMAL_PERSISTENT_BASIC :
MessageProperties.MINIMAL_BASIC,
msg);
}
private void delay(long now)
throws InterruptedException {
long elapsed = now - lastStatsTime;
//example: rateLimit is 5000 msg/s,
//10 ms have elapsed, we have sent 200 messages
//the 200 msgs we have actually sent should have taken us
//200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms
long pause = rateLimit == 0 ?
0 : (msgCount * 1000L / rateLimit - elapsed);
if (pause > 0) {
Thread.sleep(pause);
}
if (elapsed > interval) {
long sendRate, returnRate, confirmRate, nackRate;
synchronized(this) {
sendRate = msgCount * 1000L / elapsed;
returnRate = returnCount * 1000L / elapsed;
confirmRate = confirmCount * 1000L / elapsed;
nackRate = nackCount * 1000L / elapsed;
msgCount = 0;
returnCount = 0;
confirmCount = 0;
nackCount = 0;
}
System.out.print("sending rate: " + sendRate + " msg/s");
if (mandatory || immediate) {
System.out.print(", returns: " + returnRate + " ret/s");
}
if (confirm >= 0) {
System.out.print(", confirms: " + confirmRate + " c/s");
if (nackRate > 0) {
System.out.print(", nacks: " + nackRate + " n/s");
}
}
System.out.println();
lastStatsTime = now;
}
}
private byte[] createMessage(int sequenceNumber)
throws IOException {
ByteArrayOutputStream acc = new ByteArrayOutputStream();
DataOutputStream d = new DataOutputStream(acc);
long nano = System.nanoTime();
d.writeInt(sequenceNumber);
d.writeLong(nano);
d.flush();
acc.flush();
byte[] m = acc.toByteArray();
if (m.length <= message.length) {
System.arraycopy(m, 0, message, 0, m.length);
return message;
} else {
return m;
}
}
}
public static class Consumer implements Runnable {
private QueueingConsumer q;
private String id;
private int txSize;
private boolean autoAck;
private Stats stats;
private long timeLimit;
public Consumer(QueueingConsumer q, String id,
int txSize, boolean autoAck,
Stats stats, int timeLimit) {
this.q = q;
this.id = id;
this.txSize = txSize;
this.autoAck = autoAck;
this.stats = stats;
this.timeLimit = 1000L * timeLimit;
}
public void run() {
long now;
long startTime;
startTime = now = System.currentTimeMillis();
int totalMsgCount = 0;
Channel channel = q.getChannel();
try {
while (timeLimit == 0 || now < startTime + timeLimit) {
Delivery delivery;
if (timeLimit == 0) {
delivery = q.nextDelivery();
} else {
delivery = q.nextDelivery(startTime + timeLimit -
now);
if (delivery == null) break;
}
totalMsgCount++;
DataInputStream d = new DataInputStream(new
ByteArrayInputStream(delivery.getBody()));
d.readInt();
long msgNano = d.readLong();
long nano = System.nanoTime();
Envelope envelope = delivery.getEnvelope();
if (!autoAck) {
channel.basicAck(envelope.getDeliveryTag(), false);
}
if (txSize != 0 && totalMsgCount % txSize == 0) {
channel.txCommit();
}
now = System.currentTimeMillis();
stats.collectStats(now,
id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException (e);
} catch (ShutdownSignalException e) {
throw new RuntimeException(e);
}
long elapsed = now - startTime;
if (elapsed > 0) {
System.out.println("recving rate avg: " +
(totalMsgCount * 1000L / elapsed) +
" msg/s");
}
}
}
public static class Stats {
private long interval;
private long lastStatsTime;
private int msgCount;
private int latencyCount;
private long minLatency;
private long maxLatency;
private long cumulativeLatency;
public Stats(long interval) {
this.interval = interval;
reset(System.currentTimeMillis());
}
private void reset(long t) {
lastStatsTime = t;
msgCount = 0;
latencyCount = 0;
minLatency = Long.MAX_VALUE;
maxLatency = Long.MIN_VALUE;
cumulativeLatency = 0L;
}
public synchronized void collectStats(long now, long latency) {
msgCount++;
if (latency > 0) {
minLatency = Math.min(minLatency, latency);
maxLatency = Math.max(maxLatency, latency);
cumulativeLatency += latency;
latencyCount++;
}
long elapsed = now - lastStatsTime;
if (elapsed > interval) {
System.out.println("recving rate: " +
(1000L * msgCount / elapsed) +
" msg/s" +
(latencyCount > 0 ?
", min/avg/max latency: " +
minLatency/1000L + "/" +
cumulativeLatency / (1000L *
latencyCount) + "/" +
maxLatency/1000L + " microseconds" :
""));
reset(now);
}
}
}
}
On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <ekirpichov at gmail.com>wrote:
> I am sorry but this link doesn't have any source code - this is the
> library.
> I meant *your* source code, the code of your program; the code which
> exercises the library and gets 5000 msg/s.
>
> P.S. Please use "Reply all", this discussion is most probably
> interesting for many people in the community.
>
> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> > I am following this link for my java code:
> >
> http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz
> >
> > On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov <ekirpichov at gmail.com>
> > wrote:
> >>
> >> Please show the source code of all participating components. It's
> >> impossible to say anything definite without it.
> >>
> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> >
> >> > Till now i am getting the result of 5000 msg/sec in java.
> >> >
> >> > On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov <
> ekirpichov at gmail.com>
> >> > wrote:
> >> >>
> >> >> I guess you should get the speed you want, and much more, if you just
> >> >> set up a high enough value for prefetch (see basic.qos) and set
> >> >> autoack.
> >> >> I got ~8k msg/s with much larger messages and persistence turned on
> >> >> too.
> >> >>
> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> >> >
> >> >> > Message size is 199 bytes
> >> >> > Messaging scenario is Persistence
> >> >> >
> >> >> > On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov
> >> >> > <ekirpichov at gmail.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> Hi.
> >> >> >>
> >> >> >> What's the size of messages?
> >> >> >
> >> >> >> What's your messaging scenario in general? Persistence, prefetch,
> >> >> >> transactions?
> >> >> >
> >> >> >>
> >> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> >> >> > Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ. I
> >> >> >> > have
> >> >> >> > tried a lot in java as well as in ruby. And right now
> implementing
> >> >> >> > node.js .
> >> >> >> > I want to prefer java because after lots of experimentation the
> >> >> >> > only
> >> >> >> > java has given me good result but still goal is not achieved.
> >> >> >> > So i want help to know how to reach upto 15000 msg/sec in
> RabbitMQ
> >> >> >> > using java. If any good suggestion is there please tell me or
> if
> >> >> >> > any
> >> >> >> > site to prefer then please tell me.
> >> >> >> >
> >> >> >> > Thanks.
> >> >> >> > _______________________________________________
> >> >> >> > rabbitmq-discuss mailing list
> >> >> >> > rabbitmq-discuss at lists.rabbitmq.com
> >> >> >> >
> >> >> >> >
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >> Eugene Kirpichov
> >> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> >> >> Editor, http://fprog.ru/
> >> >> >
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Eugene Kirpichov
> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> >> Editor, http://fprog.ru/
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Eugene Kirpichov
> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> Editor, http://fprog.ru/
> >
> >
>
>
>
> --
> Eugene Kirpichov
> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> Editor, http://fprog.ru/
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110715/e18e2d3d/attachment.htm>
More information about the rabbitmq-discuss
mailing list