[rabbitmq-discuss] 15000 msg/sec
News Aanad
news.anand11 at gmail.com
Fri Jul 15 18:08:58 BST 2011
Thanks for reply Eugene Kirpichov ,
One more help, Can you suggest me any reference site from where i can follow
and code for my goal?
On Fri, Jul 15, 2011 at 7:57 PM, Eugene Kirpichov <ekirpichov at gmail.com>wrote:
> Is this the minimal piece of code that demonstrates poor performance?
> It's hard to decipher that much code and it can have many kinds of
> problems inside; I think you should be able to trim it down to a
> couple dozen lines demonstrating the same speed; then diagnosing will
> become a lot easier.
>
> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> > This is my code.
> >
> >
> > // The contents of this file are subject to the Mozilla Public License
> > // Version 1.1 (the "License"); you may not use this file except in
> > // compliance with the License. You may obtain a copy of the License
> > // at http://www.mozilla.org/MPL/
> > //
> > // Software distributed under the License is distributed on an "AS IS"
> > // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
> > // the License for the specific language governing rights and
> > // limitations under the License.
> > //
> > // The Original Code is RabbitMQ.
> > //
> > // The Initial Developer of the Original Code is VMware, Inc.
> > // Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
> > //
> >
> >
> > package com.rabbitmq.examples;
> >
> > import java.io.ByteArrayInputStream;
> > import java.io.ByteArrayOutputStream;
> > import java.io.DataInputStream;
> > import java.io.DataOutputStream;
> > import java.io.IOException;
> > import java.util.Arrays;
> > import java.util.Collections;
> > import java.util.List;
> > import java.util.SortedSet;
> > import java.util.TreeSet;
> > import java.util.UUID;
> > import java.util.concurrent.Semaphore;
> >
> > import org.apache.commons.cli.CommandLine;
> > import org.apache.commons.cli.CommandLineParser;
> > import org.apache.commons.cli.GnuParser;
> > import org.apache.commons.cli.HelpFormatter;
> > import org.apache.commons.cli.Option;
> > import org.apache.commons.cli.Options;
> > import org.apache.commons.cli.ParseException;
> >
> > import com.rabbitmq.client.AMQP;
> > import com.rabbitmq.client.Channel;
> > import com.rabbitmq.client.ConfirmListener;
> > import com.rabbitmq.client.Connection;
> > import com.rabbitmq.client.ConnectionFactory;
> > import com.rabbitmq.client.Envelope;
> > import com.rabbitmq.client.MessageProperties;
> > import com.rabbitmq.client.QueueingConsumer;
> > import com.rabbitmq.client.QueueingConsumer.Delivery;
> > import com.rabbitmq.client.ReturnListener;
> > import com.rabbitmq.client.ShutdownSignalException;
> >
> >
> > public class MulticastMain {
> >
> > public static void main(String[] args) {
> > Options options = getOptions();
> > CommandLineParser parser = new GnuParser();
> > try {
> > CommandLine cmd = parser.parse(options, args);
> >
> > if (cmd.hasOption('?')) {
> > usage(options);
> > System.exit(0);
> > }
> >
> > String hostName = strArg(cmd, 'h', "localhost");
> > int portNumber = intArg(cmd, 'p', AMQP.PROTOCOL.PORT);
> > String exchangeType = strArg(cmd, 't', "direct");
> > String exchangeName = strArg(cmd, 'e', exchangeType);
> > int samplingInterval = intArg(cmd, 'i', 1);
> > int rateLimit = intArg(cmd, 'r', 0);
> > int producerCount = intArg(cmd, 'x', 1);
> > int consumerCount = intArg(cmd, 'y', 1);
> > int producerTxSize = intArg(cmd, 'm', 0);
> > int consumerTxSize = intArg(cmd, 'n', 0);
> > long confirm = intArg(cmd, 'c', -1);
> > boolean autoAck = cmd.hasOption('a');
> > int prefetchCount = intArg(cmd, 'q', 0);
> > int minMsgSize = intArg(cmd, 's', 0);
> > int timeLimit = intArg(cmd, 'z', 0);
> > List<?> flags = lstArg(cmd, 'f');
> > int frameMax = intArg(cmd, 'M', 0);
> > int heartbeat = intArg(cmd, 'b', 0);
> >
> > if ((producerTxSize > 0) && confirm >= 0) {
> > throw new ParseException("Cannot select both
> > producerTxSize"+
> > " and confirm");
> > }
> >
> > //setup
> > String id = UUID.randomUUID().toString();
> > Stats stats = new Stats(1000L * samplingInterval);
> > ConnectionFactory factory = new ConnectionFactory();
> > factory.setHost(hostName);
> > factory.setPort(portNumber);
> > factory.setRequestedFrameMax(frameMax);
> > factory.setRequestedHeartbeat(heartbeat);
> >
> > Thread[] consumerThreads = new Thread[consumerCount];
> > Connection[] consumerConnections = new
> > Connection[consumerCount];
> > for (int i = 0; i < consumerCount; i++) {
> > System.out.println("starting consumer #" + i);
> > Connection conn = factory.newConnection();
> > consumerConnections[i] = conn;
> > Channel channel = conn.createChannel();
> > if (consumerTxSize > 0) channel.txSelect();
> > channel.exchangeDeclare(exchangeName, exchangeType);
> > String queueName =
> > channel.queueDeclare("",
> > flags.contains("persistent"),
> > true, false,
> null).getQueue();
> > QueueingConsumer consumer = new
> QueueingConsumer(channel);
> > if (prefetchCount > 0) channel.basicQos(prefetchCount);
> > channel.basicConsume(queueName, autoAck, consumer);
> > channel.queueBind(queueName, exchangeName, id);
> > Thread t =
> > new Thread(new Consumer(consumer, id,
> > consumerTxSize, autoAck,
> > stats, timeLimit));
> > consumerThreads[i] = t;
> > t.start();
> > }
> > Thread[] producerThreads = new Thread[producerCount];
> > Connection[] producerConnections = new
> > Connection[producerCount];
> > for (int i = 0; i < producerCount; i++) {
> > System.out.println("starting producer #" + i);
> > Connection conn = factory.newConnection();
> > producerConnections[i] = conn;
> > Channel channel = conn.createChannel();
> > if (producerTxSize > 0) channel.txSelect();
> > if (confirm >= 0) channel.confirmSelect();
> > channel.exchangeDeclare(exchangeName, exchangeType);
> > final Producer p = new Producer(channel, exchangeName,
> id,
> > flags, producerTxSize,
> > 1000L * samplingInterval,
> > rateLimit, minMsgSize,
> > timeLimit,
> > confirm);
> > channel.setReturnListener(p);
> > channel.setConfirmListener(p);
> > Thread t = new Thread(p);
> > producerThreads[i] = t;
> > t.start();
> > }
> >
> > for (int i = 0; i < producerCount; i++) {
> > producerThreads[i].join();
> > producerConnections[i].close();
> > }
> >
> > for (int i = 0; i < consumerCount; i++) {
> > consumerThreads[i].join();
> > consumerConnections[i].close();
> > }
> >
> > }
> > catch( ParseException exp ) {
> > System.err.println("Parsing failed. Reason: " +
> > exp.getMessage());
> > usage(options);
> > } catch (Exception e) {
> > System.err.println("Main thread caught exception: " + e);
> > e.printStackTrace();
> > System.exit(1);
> > }
> > }
> >
> > private static void usage(Options options) {
> > HelpFormatter formatter = new HelpFormatter();
> > formatter.printHelp("<program>", options);
> > }
> >
> > private static Options getOptions() {
> > Options options = new Options();
> > options.addOption(new Option("?", "help", false,"show
> usage"));
> > options.addOption(new Option("h", "host", true, "broker
> > host"));
> > options.addOption(new Option("p", "port", true, "broker
> > port"));
> > options.addOption(new Option("t", "type", true, "exchange
> > type"));
> > options.addOption(new Option("e", "exchange", true, "exchange
> > name"));
> > options.addOption(new Option("i", "interval", true, "sampling
> > interval"));
> > options.addOption(new Option("r", "rate", true, "rate
> limit"));
> > options.addOption(new Option("x", "producers", true, "producer
> > count"));
> > options.addOption(new Option("y", "consumers", true, "consumer
> > count"));
> > options.addOption(new Option("m", "ptxsize", true, "producer tx
> > size"));
> > options.addOption(new Option("n", "ctxsize", true, "consumer tx
> > size"));
> > options.addOption(new Option("c", "confirm", true, "max
> > unconfirmed publishes"));
> > options.addOption(new Option("a", "autoack", false,"auto
> ack"));
> > options.addOption(new Option("q", "qos", true, "qos
> prefetch
> > count"));
> > options.addOption(new Option("s", "size", true, "message
> > size"));
> > options.addOption(new Option("z", "time", true, "time
> limit"));
> > Option flag = new Option("f", "flag", true, "message
> > flag");
> > flag.setArgs(Option.UNLIMITED_VALUES);
> > options.addOption(flag);
> > options.addOption(new Option("M", "framemax", true, "frame
> max"));
> > options.addOption(new Option("b", "heartbeat", true, "heartbeat
> > interval"));
> > return options;
> > }
> >
> > private static String strArg(CommandLine cmd, char opt, String def) {
> > return cmd.getOptionValue(opt, def);
> > }
> >
> > private static int intArg(CommandLine cmd, char opt, int def) {
> > return Integer.parseInt(cmd.getOptionValue(opt,
> > Integer.toString(def)));
> > }
> >
> > private static List<?> lstArg(CommandLine cmd, char opt) {
> > String[] vals = cmd.getOptionValues('f');
> > if (vals == null) {
> > vals = new String[] {};
> > }
> > return Arrays.asList(vals);
> > }
> >
> > public static class Producer implements Runnable, ReturnListener,
> > ConfirmListener
> > {
> > private Channel channel;
> > private String exchangeName;
> > private String id;
> > private boolean mandatory;
> > private boolean immediate;
> > private boolean persistent;
> > private int txSize;
> > private long interval;
> > private int rateLimit;
> > private long timeLimit;
> >
> > private byte[] message;
> >
> > private long startTime;
> > private long lastStatsTime;
> > private int msgCount;
> > private int returnCount;
> >
> > private long confirm;
> > private Semaphore confirmPool;
> > private long confirmCount;
> > private long nackCount;
> > private volatile SortedSet<Long> unconfirmedSet =
> > Collections.synchronizedSortedSet(new TreeSet<Long>());
> >
> > public Producer(Channel channel, String exchangeName, String id,
> > List<?> flags, int txSize,
> > long interval, int rateLimit, int minMsgSize, int
> > timeLimit,
> > long confirm)
> > throws IOException {
> >
> > this.channel = channel;
> > this.exchangeName = exchangeName;
> > this.id = id;
> > this.mandatory = flags.contains("mandatory");
> > this.immediate = flags.contains("immediate");
> > this.persistent = flags.contains("persistent");
> > this.txSize = txSize;
> > this.interval = interval;
> > this.rateLimit = rateLimit;
> > this.timeLimit = 1000L * timeLimit;
> > this.message = new byte[minMsgSize];
> > this.confirm = confirm;
> > if (confirm > 0) {
> > this.confirmPool = new Semaphore((int)confirm);
> > }
> > }
> >
> > public synchronized void handleReturn(int replyCode,
> > String replyText,
> > String exchange,
> > String routingKey,
> > AMQP.BasicProperties
> > properties,
> > byte[] body)
> > throws IOException {
> > returnCount++;
> > }
> >
> > public void handleAck(long seqNo, boolean multiple) {
> > handleAckNack(seqNo, multiple, false);
> > }
> >
> > public void handleNack(long seqNo, boolean multiple) {
> > handleAckNack(seqNo, multiple, true);
> > }
> >
> > private void handleAckNack(long seqNo, boolean multiple,
> > boolean nack) {
> > int numConfirms = 0;
> > if (multiple) {
> > SortedSet<Long> confirmed = unconfirmedSet.headSet(seqNo
> +
> > 1);
> > numConfirms += confirmed.size();
> > confirmed.clear();
> > } else {
> > unconfirmedSet.remove(seqNo);
> > numConfirms = 1;
> > }
> > synchronized (this) {
> > if (nack) {
> > nackCount += numConfirms;
> > } else {
> > confirmCount += numConfirms;
> > }
> > }
> >
> > if (confirmPool != null) {
> > for (int i = 0; i < numConfirms; ++i) {
> > confirmPool.release();
> > }
> > }
> >
> > }
> >
> > public void run() {
> >
> > long now;
> > now = startTime = lastStatsTime = System.currentTimeMillis();
> > msgCount = 0;
> > int totalMsgCount = 0;
> >
> > try {
> >
> > while (timeLimit == 0 || now < startTime + timeLimit) {
> > if (confirmPool != null) {
> > confirmPool.acquire();
> > }
> > delay(now);
> > publish(createMessage(totalMsgCount));
> > totalMsgCount++;
> > msgCount++;
> >
> > if (txSize != 0 && totalMsgCount % txSize == 0) {
> > channel.txCommit();
> > }
> > now = System.currentTimeMillis();
> > }
> >
> > } catch (IOException e) {
> > throw new RuntimeException(e);
> > } catch (InterruptedException e) {
> > throw new RuntimeException (e);
> > }
> >
> > System.out.println("sending rate avg: " +
> > (totalMsgCount * 1000L / (now -
> startTime)) +
> > " msg/s");
> >
> > }
> >
> > private void publish(byte[] msg)
> > throws IOException {
> >
> > unconfirmedSet.add(channel.getNextPublishSeqNo());
> > channel.basicPublish(exchangeName, id,
> > mandatory, immediate,
> > persistent ?
> > MessageProperties.MINIMAL_PERSISTENT_BASIC :
> > MessageProperties.MINIMAL_BASIC,
> > msg);
> > }
> >
> > private void delay(long now)
> > throws InterruptedException {
> >
> > long elapsed = now - lastStatsTime;
> > //example: rateLimit is 5000 msg/s,
> > //10 ms have elapsed, we have sent 200 messages
> > //the 200 msgs we have actually sent should have taken us
> > //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms
> > long pause = rateLimit == 0 ?
> > 0 : (msgCount * 1000L / rateLimit - elapsed);
> > if (pause > 0) {
> > Thread.sleep(pause);
> > }
> > if (elapsed > interval) {
> > long sendRate, returnRate, confirmRate, nackRate;
> > synchronized(this) {
> > sendRate = msgCount * 1000L / elapsed;
> > returnRate = returnCount * 1000L / elapsed;
> > confirmRate = confirmCount * 1000L / elapsed;
> > nackRate = nackCount * 1000L / elapsed;
> > msgCount = 0;
> > returnCount = 0;
> > confirmCount = 0;
> > nackCount = 0;
> > }
> > System.out.print("sending rate: " + sendRate + " msg/s");
> > if (mandatory || immediate) {
> > System.out.print(", returns: " + returnRate + "
> ret/s");
> > }
> > if (confirm >= 0) {
> > System.out.print(", confirms: " + confirmRate + "
> c/s");
> > if (nackRate > 0) {
> > System.out.print(", nacks: " + nackRate + "
> n/s");
> > }
> > }
> > System.out.println();
> > lastStatsTime = now;
> > }
> > }
> >
> > private byte[] createMessage(int sequenceNumber)
> > throws IOException {
> >
> > ByteArrayOutputStream acc = new ByteArrayOutputStream();
> > DataOutputStream d = new DataOutputStream(acc);
> > long nano = System.nanoTime();
> > d.writeInt(sequenceNumber);
> > d.writeLong(nano);
> > d.flush();
> > acc.flush();
> > byte[] m = acc.toByteArray();
> > if (m.length <= message.length) {
> > System.arraycopy(m, 0, message, 0, m.length);
> > return message;
> > } else {
> > return m;
> > }
> > }
> >
> > }
> >
> > public static class Consumer implements Runnable {
> >
> > private QueueingConsumer q;
> > private String id;
> > private int txSize;
> > private boolean autoAck;
> > private Stats stats;
> > private long timeLimit;
> >
> > public Consumer(QueueingConsumer q, String id,
> > int txSize, boolean autoAck,
> > Stats stats, int timeLimit) {
> >
> > this.q = q;
> > this.id = id;
> > this.txSize = txSize;
> > this.autoAck = autoAck;
> > this.stats = stats;
> > this.timeLimit = 1000L * timeLimit;
> > }
> >
> > public void run() {
> >
> > long now;
> > long startTime;
> > startTime = now = System.currentTimeMillis();
> > int totalMsgCount = 0;
> >
> > Channel channel = q.getChannel();
> >
> > try {
> >
> > while (timeLimit == 0 || now < startTime + timeLimit) {
> > Delivery delivery;
> > if (timeLimit == 0) {
> > delivery = q.nextDelivery();
> > } else {
> > delivery = q.nextDelivery(startTime + timeLimit -
> > now);
> > if (delivery == null) break;
> > }
> > totalMsgCount++;
> >
> > DataInputStream d = new DataInputStream(new
> > ByteArrayInputStream(delivery.getBody()));
> > d.readInt();
> > long msgNano = d.readLong();
> > long nano = System.nanoTime();
> >
> > Envelope envelope = delivery.getEnvelope();
> >
> > if (!autoAck) {
> > channel.basicAck(envelope.getDeliveryTag(),
> false);
> > }
> >
> > if (txSize != 0 && totalMsgCount % txSize == 0) {
> > channel.txCommit();
> > }
> >
> > now = System.currentTimeMillis();
> >
> > stats.collectStats(now,
> > id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);
> > }
> >
> > } catch (IOException e) {
> > throw new RuntimeException(e);
> > } catch (InterruptedException e) {
> > throw new RuntimeException (e);
> > } catch (ShutdownSignalException e) {
> > throw new RuntimeException(e);
> > }
> >
> > long elapsed = now - startTime;
> > if (elapsed > 0) {
> > System.out.println("recving rate avg: " +
> > (totalMsgCount * 1000L / elapsed) +
> > " msg/s");
> > }
> > }
> >
> > }
> >
> > public static class Stats {
> >
> > private long interval;
> >
> > private long lastStatsTime;
> > private int msgCount;
> > private int latencyCount;
> > private long minLatency;
> > private long maxLatency;
> > private long cumulativeLatency;
> >
> > public Stats(long interval) {
> > this.interval = interval;
> > reset(System.currentTimeMillis());
> > }
> >
> > private void reset(long t) {
> > lastStatsTime = t;
> > msgCount = 0;
> > latencyCount = 0;
> > minLatency = Long.MAX_VALUE;
> > maxLatency = Long.MIN_VALUE;
> > cumulativeLatency = 0L;
> > }
> >
> > public synchronized void collectStats(long now, long latency) {
> > msgCount++;
> >
> > if (latency > 0) {
> > minLatency = Math.min(minLatency, latency);
> > maxLatency = Math.max(maxLatency, latency);
> > cumulativeLatency += latency;
> > latencyCount++;
> > }
> >
> > long elapsed = now - lastStatsTime;
> > if (elapsed > interval) {
> > System.out.println("recving rate: " +
> > (1000L * msgCount / elapsed) +
> > " msg/s" +
> > (latencyCount > 0 ?
> > ", min/avg/max latency: " +
> > minLatency/1000L + "/" +
> > cumulativeLatency / (1000L *
> > latencyCount) + "/" +
> > maxLatency/1000L + " microseconds" :
> > ""));
> > reset(now);
> > }
> >
> > }
> >
> > }
> >
> > }
> >
> >
> > On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <ekirpichov at gmail.com>
> > wrote:
> >>
> >> I am sorry but this link doesn't have any source code - this is the
> >> library.
> >> I meant *your* source code, the code of your program; the code which
> >> exercises the library and gets 5000 msg/s.
> >>
> >> P.S. Please use "Reply all", this discussion is most probably
> >> interesting for many people in the community.
> >>
> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> > I am following this link for my java code:
> >> >
> >> >
> http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz
> >> >
> >> > On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov <
> ekirpichov at gmail.com>
> >> > wrote:
> >> >>
> >> >> Please show the source code of all participating components. It's
> >> >> impossible to say anything definite without it.
> >> >>
> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> >> >
> >> >> > Till now i am getting the result of 5000 msg/sec in java.
> >> >> >
> >> >> > On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov
> >> >> > <ekirpichov at gmail.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> I guess you should get the speed you want, and much more, if you
> >> >> >> just
> >> >> >> set up a high enough value for prefetch (see basic.qos) and set
> >> >> >> autoack.
> >> >> >> I got ~8k msg/s with much larger messages and persistence turned
> on
> >> >> >> too.
> >> >> >>
> >> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> >> >> >
> >> >> >> > Message size is 199 bytes
> >> >> >> > Messaging scenario is Persistence
> >> >> >> >
> >> >> >> > On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov
> >> >> >> > <ekirpichov at gmail.com>
> >> >> >> > wrote:
> >> >> >> >>
> >> >> >> >> Hi.
> >> >> >> >>
> >> >> >> >> What's the size of messages?
> >> >> >> >
> >> >> >> >> What's your messaging scenario in general? Persistence,
> prefetch,
> >> >> >> >> transactions?
> >> >> >> >
> >> >> >> >>
> >> >> >> >> 2011/7/15 News Aanad <news.anand11 at gmail.com>:
> >> >> >> >> > Hi, I wanted to achieve the goal of 15000 msg/sec in
> RabbitMQ.
> >> >> >> >> > I
> >> >> >> >> > have
> >> >> >> >> > tried a lot in java as well as in ruby. And right now
> >> >> >> >> > implementing
> >> >> >> >> > node.js .
> >> >> >> >> > I want to prefer java because after lots of experimentation
> the
> >> >> >> >> > only
> >> >> >> >> > java has given me good result but still goal is not achieved.
> >> >> >> >> > So i want help to know how to reach upto 15000 msg/sec in
> >> >> >> >> > RabbitMQ
> >> >> >> >> > using java. If any good suggestion is there please tell me
> or
> >> >> >> >> > if
> >> >> >> >> > any
> >> >> >> >> > site to prefer then please tell me.
> >> >> >> >> >
> >> >> >> >> > Thanks.
> >> >> >> >> > _______________________________________________
> >> >> >> >> > rabbitmq-discuss mailing list
> >> >> >> >> > rabbitmq-discuss at lists.rabbitmq.com
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> >
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
> >> >> >> >> >
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> --
> >> >> >> >> Eugene Kirpichov
> >> >> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> >> >> >> Editor, http://fprog.ru/
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >> Eugene Kirpichov
> >> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> >> >> Editor, http://fprog.ru/
> >> >> >
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Eugene Kirpichov
> >> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> >> Editor, http://fprog.ru/
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Eugene Kirpichov
> >> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> >> Editor, http://fprog.ru/
> >
> >
>
>
>
> --
> Eugene Kirpichov
> Principal Engineer, Mirantis Inc. http://www.mirantis.com/
> Editor, http://fprog.ru/
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110715/cdc1ef93/attachment.htm>
More information about the rabbitmq-discuss
mailing list