<div dir="ltr">I am trying to implement the active-active High availability Queue and i have some issues in the implementation. The problem is when we consume messages from the Mirrored queue if any one of the node is down, we can able to consume the messages after the node failure. But the consumed messages after the node failure are still available in the rabbitmq console even after acking all of the messages.<br>
<br>Some information about the overview of the configuration to test active-active queues<br><br>I have NODE1 in Host1 and NODE2 in Host2. Both of the nodes are in cluster. I have a Queue named TestQ in NODE1, since i have made this queue (TestQ) as Mirrored Queue through the policies it is available in Node2 which is running in Host2.<br>
<br>I am passing the array of Address(Host1, Host2) to get the connection in my consumer. the code snippet is<br><br>Address address1 = new Address(Host1, 5672);<br>Address address2 = new Address(Host2, 5672);<br><br>RabbitmqConsumer consumer = new RabbitmqConsumer(new Address[]{address1, address2}, queue);<br>
<br>While getting the connection, <br>Connection conn = connectionFactory.newConnection(addresses);<br><br>My use case is as below.<br><br>I have 30 messages in TestQ and i can see it in the RabbitMQ console of Host1,Host2 looks like below<br>
<br>Host1:<br>Name��� Node<br>TestQ��� rabbit@Node2<br><br>Host2:<br>Name��� Node<br>TestQ��� rabbit@Node2<br><br>In my consumer, I have implemented my own consumer as below.<br><br>class MyConsumer extends QueueingConsumer{<br>
<br>��� Channel channel;<br>��� public MyConsumer(Channel ch) {<br>��� ��� super(ch);<br>��� ��� this.channel = ch;<br>��� }<br>��� <br>��� @Override<br>��� public void handleCancel(String consumerTag)� {<br>��� ��� System.out.println(&quot;handleCancel :: consumertag &quot;+consumerTag+&quot; channel opened &quot;+channel.isOpen()+&quot;,channel &quot;+channel+&quot;, rabbitmq consumer ch &quot;+RabbitmqConsumer.ch);<br>
��� <br>��� }<br>}<br><br>If i run my consumer, it creates a channel with the reference of AMQChannel(amqp://guest@Host1:5672/,1) and it consumes the messasges from the TestQ. I have checked the consumer tag of the channel in both of the host and both are same.<br>
<br>After some of the messages (assume 10 messages) are consumed, i explicitly down the Node2 by giving the below command<br><br>node2$ ./rabbitmqctl stop_app<br><br>After the failure of Node2, i can see the print statements which i added in the handleCancel() method and consumer consumes all of the messages with out any error. The problem is the even though the remaining 20 messages are consumed and acked in the consumer, but if i check it in the rabbitmq console, the 20 messages are still exists in the TestQ.<br>
<br>What i have to do in this scenario to overcome this issue.<br></div>