I have let only one node  rabbitmq online in my company in order to decoupling messages.<span style="color: rgb(51, 51, 51); font-family: arial; font-size: 18px; line-height: 22px;"><span id="_editor_bookmark_start_1" style="display: none; line-height: 0px;">‍</span></span><div>    This node has not been down for almost one month fortunately?but some basic problem,such as the HA and Load balancing need to be considered,so that  can keep all messages complete no matter when the related server or app is down.Such like,another node will  work still and keep the produce、customer healthy when a node is down.</div><div>    So,i have considered a solution to protect from losting messages,it‘s rabbitmq(mirrored queue)+Haproxy.</div><div>    I have tried two different code,php  plays a role,both HA and load balancing.</div><div>    But,java code is not enough fortunately,LOL... Testing for several times,and I get a result,that is java code doesn‘t work at all <span style="line-height: 1.5;">in HA,but only load balancing....,during testing,when I down a random node,producer will throw out a related session error.</span></div><div>    Having troubled me for almost one week this problem,and looking  sincerely forward to ur reply. </div><div>    The environment will be shown below:</div><div>Haproxy:192.168.1.61:5670</div><div>rabbit@ubuntu :192.168.1.61:5672(3.1.5)</div><div>rabbit@ubuntu2:192.168.1.35:5672(3.1.5)</div><div><div><br></div><div><b>PHP code:</b> </div><div><div> <?php<br><br>$n = 100000;<br>for($i=0;$i<$n;$i++){<br>echo '<br/>'.$i;<br>//连接RabbitMQ<br>$conn_args = array( 'host'=>'192.168.1.61' , 'port'=> '5670', 'login'=>'aaron' , 'password'=> '5555','vhost' =>'/');<br>$conn = new AMQPConnection($conn_args);<br>$conn->connect();<br><br>//创建exchange名称和类型<br>$channel = new AMQPChannel($conn);<br>$ex = new AMQPExchange($channel);<br>$ex->setName('direct_exchange_name');<br>$ex->setType(AMQP_EX_TYPE_DIRECT);<br>$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);<br>$ex->declare();<br><br>//创建queue名称,使用exchange,绑定routingkey<br>$q = new AMQPQueue($channel);<br>$q->setName('queue_name');<br>$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);<br>$q->declare();<br><br>$q->bind('direct_exchange_name', 'routingkey_name');<br><br>//消息发布<br>$channel->startTransaction();<br>$message = json_encode(array('Hello World!','1','2','3','4','DIRECT'));<br>$ex->publish($message, 'routingkey_name');<br>$channel->commitTransaction();<br>$conn->disconnect();<br>}<br><br>?><span id="_editor_bookmark_start_6" style="display: none; line-height: 0px;">‍</span> </div></div><div><br></div><div><br></div><div><br></div><div><b>JAVA code:</b></div><div>ConnectionFactory factory = new ConnectionFactory();// 创建链接工厂<br>              factory.setHost("192.168.1.61");<br>            factory.setUsername("aaron");<br>               factory.setPassword("5555");</div><div><div>factory.setPort(“5670”);</div><div><div>factory.setAutomaticRecoveryEnabled(true);</div></div>                Connection connection = factory.newConnection();// 创建链接</div><div>factory.setNetworkRecoveryInterval(10000);<span id="_editor_bookmark_start_4" style="display: none; line-height: 0px;">‍</span><br>               for(int i = 0; i < 10000; i++) {<br>                   Channel channel = connection.createChannel();// 创建信息通道<br>                    channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);// 创建交换机并生命持久化<br>                      String message = "Hello Wrold " + Math.random();<br>                    // 消息的持久化<br>                 channel.basicPublish(EXCHANGE_NAME, "",<br>                                     MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());<br>                 System.out.println("[x] Sent '" + message + "'");<br>                 channel.close();<br>              }<br>             connection.close();<br><br> }<span id="_editor_bookmark_start_2" style="display: none; line-height: 0px;">‍</span> </div></div>