[rabbitmq-discuss] Multicasting

bradford cross bradford.n.cross at gmail.com
Tue May 5 03:28:37 BST 2009


I would like to do redundant one-to-many multicasting from the java AMQP
client.  Well, actually, I am not sure if the word redundant is the proper
terminology or if I really need to do multicasting at all.

My requirement is to have two parallel (forked, or whatever) queues, one of
which the regular production process listens to, and one of which a backup
process listens to.

Rabbitmq seems to call this sort of thing one-to-many broadcasting and
multicast.  There is not a lot of documentation about it, just a blurb
here<http://www.rabbitmq.com/faq.html#scenarios>and then the code here
->
rabbitmq-java-client-1.5.4/test/src/com/rabbitmq/examples/MulticastMain.java

Looking at a snippet from the multicast code below, it is clear that each
producer and consumer runs on its own thread, on its own connection, on its
own channel, and each consumer creates its own queue.  But they all use one
exchange name, one ID, and one address (host and port.)  Is it solely these
last elements and what is happening in channel.queueBind that make this
multicast?  What exactly makes it multicasting?

Lastly, am I even headed in the right direction for what I want to do?  It
appears that I would ahve to manually write the code the puts a message into
two duplicate queues to achieve the queue for the backup.

What else is different about this multicasting and the two independent
queues in  the clojure example code below:



Clojure, two independent queues:

(defn two-independent-queues []
  (with-open [connection (let [params (doto (ConnectionParameters.)
                                        (.setVirtualHost "/")
                                        (.setUsername "guest")
                                        (.setPassword "guest"))
                               factory (ConnectionFactory. params)]
                           (.newConnection factory "localhost"))
              channel (.createChannel connection)
              channel2 (.createChannel connection)]
    (let [queue-name (random-id)
          queue-name2 (random-id)
          consumer (bootstrap-consumer channel exchangeName "direct"
queue-name id)
      consumer2 (bootstrap-consumer channel2 exchangeName "direct"
queue-name2 id)]
      (publish channel queue-name "hello")
      (publish channel2 queue-name2 "goodbye")
      (str (consume channel queue-name consumer) "  " (consume channel2
queue-name2 consumer2)))))




java, multicast:

 //setup
            String id = UUID.randomUUID().toString();
            Stats stats = new Stats(1000L * samplingInterval);
            Address[] addresses = new Address[] {
                new Address(hostName, portNumber)
            };
            ConnectionParameters params = new ConnectionParameters();
            Thread[] consumerThreads = new Thread[consumerCount];
            Connection[] consumerConnections = new
Connection[consumerCount];
            for (int i = 0; i < consumerCount; i++) {
                System.out.println("starting consumer #" + i);
                Connection conn = new
ConnectionFactory(params).newConnection(addresses, maxRedirects);
                consumerConnections[i] = conn;
                Channel channel = conn.createChannel();
                if (consumerTxSize > 0) channel.txSelect();
                channel.exchangeDeclare(exchangeName, exchangeType);
                Queue.DeclareOk res = channel.queueDeclare();
                String queueName = res.getQueue();
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(queueName, autoAck, consumer);
                channel.queueBind(queueName, exchangeName, id);
                Thread t =
                    new Thread(new Consumer(consumer, id,
                                            consumerTxSize, autoAck,
                                            stats, timeLimit));
                consumerThreads[i] = t;
                t.start();
            }
            Thread[] producerThreads = new Thread[producerCount];
            Connection[] producerConnections = new
Connection[producerCount];
            for (int i = 0; i < producerCount; i++) {
                System.out.println("starting producer #" + i);
                Connection conn = new
ConnectionFactory(params).newConnection(addresses, maxRedirects);
                producerConnections[i] = conn;
                Channel channel = conn.createChannel();
                if (producerTxSize > 0) channel.txSelect();
                channel.exchangeDeclare(exchangeName, exchangeType);
                Thread t =
                    new Thread(new Producer(channel, exchangeName, id,
                                            flags, producerTxSize,
                                            1000L * samplingInterval,
                                            rateLimit, minMsgSize,
timeLimit));
                producerThreads[i] = t;
                t.start();
            }
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090504/0b5b027c/attachment.htm 


More information about the rabbitmq-discuss mailing list