I would like to do redundant one-to-many multicasting from the java AMQP client.� Well, actually, I am not sure if the word redundant is the proper terminology or if I really need to do multicasting at all.� <br><br>My requirement is to have two parallel (forked, or whatever) queues, one of which the regular production process listens to, and one of which a backup process listens to.<br>
<br>Rabbitmq seems to call this sort of thing one-to-many broadcasting and multicast.�
There is not a lot of documentation about it, just a blurb <a title="here" href="http://www.rabbitmq.com/faq.html#scenarios" id="xayb">here</a> and then the code here -> rabbitmq-java-client-1.5.4/test/src/com/rabbitmq/examples/MulticastMain.java<br>
<br>Looking at a snippet from the multicast code below, it is clear that each producer and consumer runs on its own thread, on its own connection, on its own channel, and each consumer creates its own queue.� But they all use one exchange name, one ID, and one address (host and port.)� Is it solely these last elements and what is happening in channel.queueBind that make this multicast?� What exactly makes it multicasting?<br>
<br>Lastly, am I even headed in the right direction for what I want to do?� It appears that I would ahve to manually write the code the puts a message into two duplicate queues to achieve the queue for the backup.<br><br>
What else is different about this multicasting and the two independent queues in� the clojure example code below:<br><br><br><br>Clojure, two independent queues:<br><blockquote>(defn two-independent-queues []<br>� (with-open [connection (let [params (doto (ConnectionParameters.)<br>
��������������������������������������� (.setVirtualHost "/")<br>��������������������������������������� (.setUsername "guest")<br>��������������������������������������� (.setPassword "guest"))<br>
������������������������������ factory (ConnectionFactory. params)]<br>�������������������������� (.newConnection factory "localhost"))<br>������������� channel (.createChannel connection)<br>������������� channel2 (.createChannel connection)]<br>
��� (let [queue-name (random-id)<br>��������� queue-name2 (random-id)<br>��������� consumer (bootstrap-consumer channel exchangeName "direct" queue-name id)<br>��� � consumer2 (bootstrap-consumer channel2 exchangeName "direct" queue-name2 id)]<br>
����� (publish channel queue-name "hello")<br>����� (publish channel2 queue-name2 "goodbye")<br>����� (str (consume channel queue-name consumer) "� " (consume channel2 queue-name2 consumer2)))))<br>
</blockquote><br><br><br>java, multicast:<br><br><blockquote>�//setup<br>����������� String id = UUID.randomUUID().toString();<br>����������� Stats stats = new Stats(1000L * samplingInterval);<br>����������� Address[] addresses = new Address[] {<br>
��������������� new Address(hostName, portNumber)<br>����������� };<br>����������� ConnectionParameters params = new ConnectionParameters();<br>����������� Thread[] consumerThreads = new Thread[consumerCount];<br>����������� Connection[] consumerConnections = new Connection[consumerCount];<br>
����������� for (int i = 0; i < consumerCount; i++) {<br>��������������� System.out.println("starting consumer #" + i);<br>��������������� Connection conn = new ConnectionFactory(params).newConnection(addresses, maxRedirects);<br>
��������������� consumerConnections[i] = conn;<br>��������������� Channel channel = conn.createChannel();<br>��������������� if (consumerTxSize > 0) channel.txSelect();<br>��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>
��������������� Queue.DeclareOk res = channel.queueDeclare();<br>��������������� String queueName = res.getQueue();<br>��������������� QueueingConsumer consumer = new QueueingConsumer(channel);<br>��������������� channel.basicConsume(queueName, autoAck, consumer);<br>
��������������� channel.queueBind(queueName, exchangeName, id);<br>��������������� Thread t = <br>������������������� new Thread(new Consumer(consumer, id,<br>������������������������������������������� consumerTxSize, autoAck,<br>
������������������������������������������� stats, timeLimit));<br>��������������� consumerThreads[i] = t;<br>��������������� t.start();<br>����������� }<br>����������� Thread[] producerThreads = new Thread[producerCount];<br>
����������� Connection[] producerConnections = new Connection[producerCount];<br>����������� for (int i = 0; i < producerCount; i++) {<br>��������������� System.out.println("starting producer #" + i);<br>��������������� Connection conn = new ConnectionFactory(params).newConnection(addresses, maxRedirects);<br>
��������������� producerConnections[i] = conn;<br>��������������� Channel channel = conn.createChannel();<br>��������������� if (producerTxSize > 0) channel.txSelect();<br>��������������� channel.exchangeDeclare(exchangeName, exchangeType);<br>
��������������� Thread t = <br>������������������� new Thread(new Producer(channel, exchangeName, id,<br>������������������������������������������� flags, producerTxSize,<br>������������������������������������������� 1000L * samplingInterval,<br>
������������������������������������������� rateLimit, minMsgSize, timeLimit));<br>��������������� producerThreads[i] = t;<br>��������������� t.start();<br>����������� }<br></blockquote><br>