This is my code.<br><br><br>// The contents of this file are subject to the Mozilla Public License<br>// Version 1.1 (the "License"); you may not use this file except in<br>// compliance with the License. You may obtain a copy of the License<br>
// at <a href="http://www.mozilla.org/MPL/">http://www.mozilla.org/MPL/</a><br>//<br>// Software distributed under the License is distributed on an "AS IS"<br>// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See<br>
// the License for the specific language governing rights and<br>// limitations under the License.<br>//<br>// The Original Code is RabbitMQ.<br>//<br>// The Initial Developer of the Original Code is VMware, Inc.<br>// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.<br>
//<br><br><br>package com.rabbitmq.examples;<br><br>import java.io.ByteArrayInputStream;<br>import java.io.ByteArrayOutputStream;<br>import java.io.DataInputStream;<br>import java.io.DataOutputStream;<br>import java.io.IOException;<br>
import java.util.Arrays;<br>import java.util.Collections;<br>import java.util.List;<br>import java.util.SortedSet;<br>import java.util.TreeSet;<br>import java.util.UUID;<br>import java.util.concurrent.Semaphore;<br><br>import org.apache.commons.cli.CommandLine;<br>
import org.apache.commons.cli.CommandLineParser;<br>import org.apache.commons.cli.GnuParser;<br>import org.apache.commons.cli.HelpFormatter;<br>import org.apache.commons.cli.Option;<br>import org.apache.commons.cli.Options;<br>
import org.apache.commons.cli.ParseException;<br><br>import com.rabbitmq.client.AMQP;<br>import com.rabbitmq.client.Channel;<br>import com.rabbitmq.client.ConfirmListener;<br>import com.rabbitmq.client.Connection;<br>import com.rabbitmq.client.ConnectionFactory;<br>
import com.rabbitmq.client.Envelope;<br>import com.rabbitmq.client.MessageProperties;<br>import com.rabbitmq.client.QueueingConsumer;<br>import com.rabbitmq.client.QueueingConsumer.Delivery;<br>import com.rabbitmq.client.ReturnListener;<br>
import com.rabbitmq.client.ShutdownSignalException;<br><br><br>public class MulticastMain {<br><br> public static void main(String[] args) {<br> Options options = getOptions();<br> CommandLineParser parser = new GnuParser();<br>
try {<br> CommandLine cmd = parser.parse(options, args);<br><br> if (cmd.hasOption('?')) {<br> usage(options);<br> System.exit(0);<br> }<br><br>
String hostName = strArg(cmd, 'h', "localhost");<br> int portNumber = intArg(cmd, 'p', AMQP.PROTOCOL.PORT);<br> String exchangeType = strArg(cmd, 't', "direct");<br>
String exchangeName = strArg(cmd, 'e', exchangeType);<br> int samplingInterval = intArg(cmd, 'i', 1);<br> int rateLimit = intArg(cmd, 'r', 0);<br> int producerCount = intArg(cmd, 'x', 1);<br>
int consumerCount = intArg(cmd, 'y', 1);<br> int producerTxSize = intArg(cmd, 'm', 0);<br> int consumerTxSize = intArg(cmd, 'n', 0);<br> long confirm = intArg(cmd, 'c', -1);<br>
boolean autoAck = cmd.hasOption('a');<br> int prefetchCount = intArg(cmd, 'q', 0);<br> int minMsgSize = intArg(cmd, 's', 0);<br> int timeLimit = intArg(cmd, 'z', 0);<br>
List<?> flags = lstArg(cmd, 'f');<br> int frameMax = intArg(cmd, 'M', 0);<br> int heartbeat = intArg(cmd, 'b', 0);<br><br> if ((producerTxSize > 0) && confirm >= 0) {<br>
throw new ParseException("Cannot select both producerTxSize"+<br> " and confirm");<br> }<br><br> //setup<br> String id = UUID.randomUUID().toString();<br>
Stats stats = new Stats(1000L * samplingInterval);<br> ConnectionFactory factory = new ConnectionFactory();<br> factory.setHost(hostName);<br> factory.setPort(portNumber);<br> factory.setRequestedFrameMax(frameMax);<br>
factory.setRequestedHeartbeat(heartbeat);<br><br> Thread[] consumerThreads = new Thread[consumerCount];<br> Connection[] consumerConnections = new Connection[consumerCount];<br> for (int i = 0; i < consumerCount; i++) {<br>
System.out.println("starting consumer #" + i);<br> Connection conn = factory.newConnection();<br> consumerConnections[i] = conn;<br> Channel channel = conn.createChannel();<br>
if (consumerTxSize > 0) channel.txSelect();<br> channel.exchangeDeclare(exchangeName, exchangeType);<br> String queueName =<br> channel.queueDeclare("", flags.contains("persistent"),<br>
true, false, null).getQueue();<br> QueueingConsumer consumer = new QueueingConsumer(channel);<br> if (prefetchCount > 0) channel.basicQos(prefetchCount);<br>
channel.basicConsume(queueName, autoAck, consumer);<br> channel.queueBind(queueName, exchangeName, id);<br> Thread t =<br> new Thread(new Consumer(consumer, id,<br>
consumerTxSize, autoAck,<br> stats, timeLimit));<br> consumerThreads[i] = t;<br> t.start();<br> }<br>
Thread[] producerThreads = new Thread[producerCount];<br> Connection[] producerConnections = new Connection[producerCount];<br> for (int i = 0; i < producerCount; i++) {<br> System.out.println("starting producer #" + i);<br>
Connection conn = factory.newConnection();<br> producerConnections[i] = conn;<br> Channel channel = conn.createChannel();<br> if (producerTxSize > 0) channel.txSelect();<br>
if (confirm >= 0) channel.confirmSelect();<br> channel.exchangeDeclare(exchangeName, exchangeType);<br> final Producer p = new Producer(channel, exchangeName, id,<br> flags, producerTxSize,<br>
1000L * samplingInterval,<br> rateLimit, minMsgSize, timeLimit,<br> confirm);<br>
channel.setReturnListener(p);<br> channel.setConfirmListener(p);<br> Thread t = new Thread(p);<br> producerThreads[i] = t;<br> t.start();<br> }<br>
<br> for (int i = 0; i < producerCount; i++) {<br> producerThreads[i].join();<br> producerConnections[i].close();<br> }<br><br> for (int i = 0; i < consumerCount; i++) {<br>
consumerThreads[i].join();<br> consumerConnections[i].close();<br> }<br><br> }<br> catch( ParseException exp ) {<br> System.err.println("Parsing failed. Reason: " + exp.getMessage());<br>
usage(options);<br> } catch (Exception e) {<br> System.err.println("Main thread caught exception: " + e);<br> e.printStackTrace();<br> System.exit(1);<br> }<br>
}<br><br> private static void usage(Options options) {<br> HelpFormatter formatter = new HelpFormatter();<br> formatter.printHelp("<program>", options);<br> }<br><br> private static Options getOptions() {<br>
Options options = new Options();<br> options.addOption(new Option("?", "help", false,"show usage"));<br> options.addOption(new Option("h", "host", true, "broker host"));<br>
options.addOption(new Option("p", "port", true, "broker port"));<br> options.addOption(new Option("t", "type", true, "exchange type"));<br>
options.addOption(new Option("e", "exchange", true, "exchange name"));<br> options.addOption(new Option("i", "interval", true, "sampling interval"));<br>
options.addOption(new Option("r", "rate", true, "rate limit"));<br> options.addOption(new Option("x", "producers", true, "producer count"));<br>
options.addOption(new Option("y", "consumers", true, "consumer count"));<br> options.addOption(new Option("m", "ptxsize", true, "producer tx size"));<br>
options.addOption(new Option("n", "ctxsize", true, "consumer tx size"));<br> options.addOption(new Option("c", "confirm", true, "max unconfirmed publishes"));<br>
options.addOption(new Option("a", "autoack", false,"auto ack"));<br> options.addOption(new Option("q", "qos", true, "qos prefetch count"));<br>
options.addOption(new Option("s", "size", true, "message size"));<br> options.addOption(new Option("z", "time", true, "time limit"));<br>
Option flag = new Option("f", "flag", true, "message flag");<br> flag.setArgs(Option.UNLIMITED_VALUES);<br> options.addOption(flag);<br> options.addOption(new Option("M", "framemax", true, "frame max"));<br>
options.addOption(new Option("b", "heartbeat", true, "heartbeat interval"));<br> return options;<br> }<br><br> private static String strArg(CommandLine cmd, char opt, String def) {<br>
return cmd.getOptionValue(opt, def);<br> }<br><br> private static int intArg(CommandLine cmd, char opt, int def) {<br> return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));<br> }<br>
<br> private static List<?> lstArg(CommandLine cmd, char opt) {<br> String[] vals = cmd.getOptionValues('f');<br> if (vals == null) {<br> vals = new String[] {};<br> }<br> return Arrays.asList(vals);<br>
}<br><br> public static class Producer implements Runnable, ReturnListener,<br> ConfirmListener<br> {<br> private Channel channel;<br> private String exchangeName;<br>
private String id;<br> private boolean mandatory;<br> private boolean immediate;<br> private boolean persistent;<br> private int txSize;<br> private long interval;<br> private int rateLimit;<br>
private long timeLimit;<br><br> private byte[] message;<br><br> private long startTime;<br> private long lastStatsTime;<br> private int msgCount;<br> private int returnCount;<br>
<br> private long confirm;<br> private Semaphore confirmPool;<br> private long confirmCount;<br> private long nackCount;<br> private volatile SortedSet<Long> unconfirmedSet =<br>
Collections.synchronizedSortedSet(new TreeSet<Long>());<br><br> public Producer(Channel channel, String exchangeName, String id,<br> List<?> flags, int txSize,<br> long interval, int rateLimit, int minMsgSize, int timeLimit,<br>
long confirm)<br> throws IOException {<br><br> this.channel = channel;<br> this.exchangeName = exchangeName;<br> <a href="http://this.id">this.id</a> = id;<br>
this.mandatory = flags.contains("mandatory");<br> this.immediate = flags.contains("immediate");<br> this.persistent = flags.contains("persistent");<br>
this.txSize = txSize;<br> this.interval = interval;<br> this.rateLimit = rateLimit;<br> this.timeLimit = 1000L * timeLimit;<br> this.message = new byte[minMsgSize];<br>
this.confirm = confirm;<br> if (confirm > 0) {<br> this.confirmPool = new Semaphore((int)confirm);<br> }<br> }<br><br> public synchronized void handleReturn(int replyCode,<br>
String replyText,<br> String exchange,<br> String routingKey,<br> AMQP.BasicProperties properties,<br>
byte[] body)<br> throws IOException {<br> returnCount++;<br> }<br><br> public void handleAck(long seqNo, boolean multiple) {<br> handleAckNack(seqNo, multiple, false);<br>
}<br><br> public void handleNack(long seqNo, boolean multiple) {<br> handleAckNack(seqNo, multiple, true);<br> }<br><br> private void handleAckNack(long seqNo, boolean multiple,<br>
boolean nack) {<br> int numConfirms = 0;<br> if (multiple) {<br> SortedSet<Long> confirmed = unconfirmedSet.headSet(seqNo + 1);<br> numConfirms += confirmed.size();<br>
confirmed.clear();<br> } else {<br> unconfirmedSet.remove(seqNo);<br> numConfirms = 1;<br> }<br> synchronized (this) {<br> if (nack) {<br>
nackCount += numConfirms;<br> } else {<br> confirmCount += numConfirms;<br> }<br> }<br><br> if (confirmPool != null) {<br> for (int i = 0; i < numConfirms; ++i) {<br>
confirmPool.release();<br> }<br> }<br><br> }<br><br> public void run() {<br><br> long now;<br> now = startTime = lastStatsTime = System.currentTimeMillis();<br>
msgCount = 0;<br> int totalMsgCount = 0;<br><br> try {<br><br> while (timeLimit == 0 || now < startTime + timeLimit) {<br> if (confirmPool != null) {<br>
confirmPool.acquire();<br> }<br> delay(now);<br> publish(createMessage(totalMsgCount));<br> totalMsgCount++;<br> msgCount++;<br>
<br> if (txSize != 0 && totalMsgCount % txSize == 0) {<br> channel.txCommit();<br> }<br> now = System.currentTimeMillis();<br> }<br>
<br> } catch (IOException e) {<br> throw new RuntimeException(e);<br> } catch (InterruptedException e) {<br> throw new RuntimeException (e);<br> }<br><br> System.out.println("sending rate avg: " +<br>
(totalMsgCount * 1000L / (now - startTime)) +<br> " msg/s");<br><br> }<br><br> private void publish(byte[] msg)<br> throws IOException {<br>
<br> unconfirmedSet.add(channel.getNextPublishSeqNo());<br> channel.basicPublish(exchangeName, id,<br> mandatory, immediate,<br> persistent ? MessageProperties.MINIMAL_PERSISTENT_BASIC : MessageProperties.MINIMAL_BASIC,<br>
msg);<br> }<br><br> private void delay(long now)<br> throws InterruptedException {<br><br> long elapsed = now - lastStatsTime;<br> //example: rateLimit is 5000 msg/s,<br>
//10 ms have elapsed, we have sent 200 messages<br> //the 200 msgs we have actually sent should have taken us<br> //200 * 1000 / 5000 = 40 ms. So we pause for 40ms - 10ms<br> long pause = rateLimit == 0 ?<br>
0 : (msgCount * 1000L / rateLimit - elapsed);<br> if (pause > 0) {<br> Thread.sleep(pause);<br> }<br> if (elapsed > interval) {<br> long sendRate, returnRate, confirmRate, nackRate;<br>
synchronized(this) {<br> sendRate = msgCount * 1000L / elapsed;<br> returnRate = returnCount * 1000L / elapsed;<br> confirmRate = confirmCount * 1000L / elapsed;<br>
nackRate = nackCount * 1000L / elapsed;<br> msgCount = 0;<br> returnCount = 0;<br> confirmCount = 0;<br> nackCount = 0;<br>
}<br> System.out.print("sending rate: " + sendRate + " msg/s");<br> if (mandatory || immediate) {<br> System.out.print(", returns: " + returnRate + " ret/s");<br>
}<br> if (confirm >= 0) {<br> System.out.print(", confirms: " + confirmRate + " c/s");<br> if (nackRate > 0) {<br> System.out.print(", nacks: " + nackRate + " n/s");<br>
}<br> }<br> System.out.println();<br> lastStatsTime = now;<br> }<br> }<br><br> private byte[] createMessage(int sequenceNumber)<br>
throws IOException {<br><br> ByteArrayOutputStream acc = new ByteArrayOutputStream();<br> DataOutputStream d = new DataOutputStream(acc);<br> long nano = System.nanoTime();<br>
d.writeInt(sequenceNumber);<br> d.writeLong(nano);<br> d.flush();<br> acc.flush();<br> byte[] m = acc.toByteArray();<br> if (m.length <= message.length) {<br>
System.arraycopy(m, 0, message, 0, m.length);<br> return message;<br> } else {<br> return m;<br> }<br> }<br><br> }<br><br> public static class Consumer implements Runnable {<br>
<br> private QueueingConsumer q;<br> private String id;<br> private int txSize;<br> private boolean autoAck;<br> private Stats stats;<br> private long timeLimit;<br>
<br> public Consumer(QueueingConsumer q, String id,<br> int txSize, boolean autoAck,<br> Stats stats, int timeLimit) {<br><br> this.q = q;<br> <a href="http://this.id">this.id</a> = id;<br>
this.txSize = txSize;<br> this.autoAck = autoAck;<br> this.stats = stats;<br> this.timeLimit = 1000L * timeLimit;<br> }<br><br> public void run() {<br><br>
long now;<br> long startTime;<br> startTime = now = System.currentTimeMillis();<br> int totalMsgCount = 0;<br><br> Channel channel = q.getChannel();<br><br> try {<br>
<br> while (timeLimit == 0 || now < startTime + timeLimit) {<br> Delivery delivery;<br> if (timeLimit == 0) {<br> delivery = q.nextDelivery();<br>
} else {<br> delivery = q.nextDelivery(startTime + timeLimit - now);<br> if (delivery == null) break;<br> }<br> totalMsgCount++;<br>
<br> DataInputStream d = new DataInputStream(new ByteArrayInputStream(delivery.getBody()));<br> d.readInt();<br> long msgNano = d.readLong();<br> long nano = System.nanoTime();<br>
<br> Envelope envelope = delivery.getEnvelope();<br><br> if (!autoAck) {<br> channel.basicAck(envelope.getDeliveryTag(), false);<br> }<br><br>
if (txSize != 0 && totalMsgCount % txSize == 0) {<br> channel.txCommit();<br> }<br><br> now = System.currentTimeMillis();<br><br> stats.collectStats(now, id.equals(envelope.getRoutingKey()) ? (nano - msgNano) : 0L);<br>
}<br><br> } catch (IOException e) {<br> throw new RuntimeException(e);<br> } catch (InterruptedException e) {<br> throw new RuntimeException (e);<br> } catch (ShutdownSignalException e) {<br>
throw new RuntimeException(e);<br> }<br><br> long elapsed = now - startTime;<br> if (elapsed > 0) {<br> System.out.println("recving rate avg: " +<br>
(totalMsgCount * 1000L / elapsed) +<br> " msg/s");<br> }<br> }<br><br> }<br><br> public static class Stats {<br><br> private long interval;<br>
<br> private long lastStatsTime;<br> private int msgCount;<br> private int latencyCount;<br> private long minLatency;<br> private long maxLatency;<br> private long cumulativeLatency;<br>
<br> public Stats(long interval) {<br> this.interval = interval;<br> reset(System.currentTimeMillis());<br> }<br><br> private void reset(long t) {<br> lastStatsTime = t;<br>
msgCount = 0;<br> latencyCount = 0;<br> minLatency = Long.MAX_VALUE;<br> maxLatency = Long.MIN_VALUE;<br> cumulativeLatency = 0L;<br> }<br>
<br> public synchronized void collectStats(long now, long latency) {<br> msgCount++;<br><br> if (latency > 0) {<br> minLatency = Math.min(minLatency, latency);<br> maxLatency = Math.max(maxLatency, latency);<br>
cumulativeLatency += latency;<br> latencyCount++;<br> }<br><br> long elapsed = now - lastStatsTime;<br> if (elapsed > interval) {<br> System.out.println("recving rate: " +<br>
(1000L * msgCount / elapsed) +<br> " msg/s" +<br> (latencyCount > 0 ?<br> ", min/avg/max latency: " +<br>
minLatency/1000L + "/" +<br> cumulativeLatency / (1000L * latencyCount) + "/" +<br> maxLatency/1000L + " microseconds" :<br>
""));<br> reset(now);<br> }<br><br> }<br><br> }<br><br>}<br><br><br><div class="gmail_quote">On Fri, Jul 15, 2011 at 7:06 PM, Eugene Kirpichov <span dir="ltr"><<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">I am sorry but this link doesn't have any source code - this is the library.<br>
I meant *your* source code, the code of your program; the code which<br>
exercises the library and gets 5000 msg/s.<br>
<br>
P.S. Please use "Reply all", this discussion is most probably<br>
interesting for many people in the community.<br>
<div><div></div><div class="h5"><br>
2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
> I am following this link for my java code:<br>
> <a href="http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz" target="_blank">http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.5.1/rabbitmq-java-client-bin-2.5.1.tar.gz</a><br>
><br>
> On Fri, Jul 15, 2011 at 6:56 PM, Eugene Kirpichov <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
> wrote:<br>
>><br>
>> Please show the source code of all participating components. It's<br>
>> impossible to say anything definite without it.<br>
>><br>
>> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> ><br>
>> > Till now i am getting the result of 5000 msg/sec in java.<br>
>> ><br>
>> > On Fri, Jul 15, 2011 at 6:53 PM, Eugene Kirpichov <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> > wrote:<br>
>> >><br>
>> >> I guess you should get the speed you want, and much more, if you just<br>
>> >> set up a high enough value for prefetch (see basic.qos) and set<br>
>> >> autoack.<br>
>> >> I got ~8k msg/s with much larger messages and persistence turned on<br>
>> >> too.<br>
>> >><br>
>> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> ><br>
>> >> > Message size is 199 bytes<br>
>> >> > Messaging scenario is Persistence<br>
>> >> ><br>
>> >> > On Fri, Jul 15, 2011 at 6:47 PM, Eugene Kirpichov<br>
>> >> > <<a href="mailto:ekirpichov@gmail.com">ekirpichov@gmail.com</a>><br>
>> >> > wrote:<br>
>> >> >><br>
>> >> >> Hi.<br>
>> >> >><br>
>> >> >> What's the size of messages?<br>
>> >> ><br>
>> >> >> What's your messaging scenario in general? Persistence, prefetch,<br>
>> >> >> transactions?<br>
>> >> ><br>
>> >> >><br>
>> >> >> 2011/7/15 News Aanad <<a href="mailto:news.anand11@gmail.com">news.anand11@gmail.com</a>>:<br>
>> >> >> > Hi, I wanted to achieve the goal of 15000 msg/sec in RabbitMQ. I<br>
>> >> >> > have<br>
>> >> >> > tried a lot in java as well as in ruby. And right now implementing<br>
>> >> >> > node.js .<br>
>> >> >> > I want to prefer java because after lots of experimentation the<br>
>> >> >> > only<br>
>> >> >> > java has given me good result but still goal is not achieved.<br>
>> >> >> > So i want help to know how to reach upto 15000 msg/sec in RabbitMQ<br>
>> >> >> > using java. If any good suggestion is there please tell me or if<br>
>> >> >> > any<br>
>> >> >> > site to prefer then please tell me.<br>
>> >> >> ><br>
>> >> >> > Thanks.<br>
>> >> >> > _______________________________________________<br>
>> >> >> > rabbitmq-discuss mailing list<br>
>> >> >> > <a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
>> >> >> ><br>
>> >> >> > <a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
>> >> >> ><br>
>> >> >><br>
>> >> >><br>
>> >> >><br>
>> >> >> --<br>
>> >> >> Eugene Kirpichov<br>
>> >> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> >> ><br>
>> >> ><br>
>> >><br>
>> >><br>
>> >><br>
>> >> --<br>
>> >> Eugene Kirpichov<br>
>> >> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> >> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
>> ><br>
>> ><br>
>><br>
>><br>
>><br>
>> --<br>
>> Eugene Kirpichov<br>
>> Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
>> Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
><br>
><br>
<br>
<br>
<br>
</div></div>--<br>
<div><div></div><div class="h5">Eugene Kirpichov<br>
Principal Engineer, Mirantis Inc. <a href="http://www.mirantis.com/" target="_blank">http://www.mirantis.com/</a><br>
Editor, <a href="http://fprog.ru/" target="_blank">http://fprog.ru/</a><br>
</div></div></blockquote></div><br>