<div dir="ltr">Hello.<div>I've been testing the client autorecovery support in 3.3.0 and have had some issues getting it to work.</div><div>On my machine, recovery appears to work at fewer than about 5 consumers. Anything over that, and the client gets a stream of TopologyRecoveryExceptions and is never able to reconnect.</div><div><br></div><div>I've looked at the client source and haven't seen any obvious culprit. Has anyone else seen this?</div><div><br></div><div><b>Test Setup:</b></div><div>client jar: amqp-client-3.3.0.jar</div><div>server: rabbitmq-server 3.1.3-1 (from ubuntu 13.10)</div><div>Triggering autorecovery via 'service rabbitmq-server restart'</div><div><br></div><div><b>Client code (scala):</b></div><div><div><font face="courier new, monospace" size="1">package rabbittest</font></div><div><font face="courier new, monospace" size="1"><br></font></div><div><font face="courier new, monospace" size="1">import com.rabbitmq.client._</font></div><div><font face="courier new, monospace" size="1"><br></font></div><div><font face="courier new, monospace" size="1">object RecoveryTest {</font></div><div><font face="courier new, monospace" size="1">  // the consumer count matters. At ~5 consumers the client gets put into a bad state and never reconnects</font></div><div><font face="courier new, monospace" size="1">  // Fewer than 5, the client is usually able to reconnect</font></div><div><font face="courier new, monospace" size="1">  val consumers = 10</font></div><div><font face="courier new, monospace" size="1"><br></font></div><div><font face="courier new, monospace" size="1">  def main(args: Array[String]) {</font></div><div><font face="courier new, monospace" size="1">    val factory = new ConnectionFactory()</font></div><div><font face="courier new, monospace" size="1">    factory.setUri("amqp://guest@localhost:5672/test")</font></div><div><font face="courier new, monospace" size="1">    factory.setAutomaticRecoveryEnabled(true)</font></div><div><font face="courier new, monospace" size="1"><br></font></div><div><font face="courier new, monospace" size="1">    val conn = factory.newConnection()</font></div><div><font face="courier new, monospace" size="1"><br></font></div><div><font face="courier new, monospace" size="1">    for (x <- 1 to consumers) {</font></div><div><font face="courier new, monospace" size="1">      val chan = conn.createChannel()</font></div><div><font face="courier new, monospace" size="1">      val result = chan.queueDeclare()</font></div><div><font face="courier new, monospace" size="1">      val tag = chan.basicConsume(result.getQueue, new DefaultConsumer(chan))</font></div><div><font face="courier new, monospace" size="1">      println(s"channel=${chan.getChannelNumber} queue=${result.getQueue} consumer=$tag")</font></div><div><font face="courier new, monospace" size="1">    }</font></div><div><font face="courier new, monospace" size="1">  }</font></div><div><font face="courier new, monospace" size="1">}</font></div></div><div><br></div><div><b>Good Logs</b></div><div><div><font size="1" face="courier new, monospace">channel=1 queue=amq.gen-KvCBOoSjkyhpqle8GMGB0Q consumer=amq.ctag-4WBZRhTLhp_mnQDCXs1NGg</font></div><div><font size="1" face="courier new, monospace">channel=2 queue=amq.gen-bNUrlmqi2nHZiunZ9qy-UA consumer=amq.ctag-2_GR91qniV39a-qTtK9Beg</font></div><div><font size="1" face="courier new, monospace">channel=3 queue=amq.gen-li-v7cR25J7kmQ-YfcqbCQ consumer=amq.ctag-llfj8Zf1yFCnIGxpU0EI8A</font></div></div><div><br></div><div><b>Bad Logs (topology exceptions repeat on every reconnect attempt)</b></div><div><div><font face="courier new, monospace" size="1">channel=1 queue=amq.gen-TIierQJcDNd93QzX8gYuww consumer=amq.ctag-grxPT0_DsPA7WccDmbNBrg</font></div><div><font face="courier new, monospace" size="1">channel=2 queue=amq.gen-JaXYQy218NUlWGQCubjjhg consumer=amq.ctag-Toh8E8Ogvmb3UNkKcJszxg</font></div><div><font face="courier new, monospace" size="1">channel=3 queue=amq.gen-VzRS5YVgjP7Lz3UfhxYBgA consumer=amq.ctag-LOiutBNDPk8Dl4gUWxJgdg</font></div><div><font face="courier new, monospace" size="1">channel=4 queue=amq.gen-JNJAT8cGvVPRePX43hSPtg consumer=amq.ctag-0nyXfLeLl9lsApz3XsT0Iw</font></div><div><font face="courier new, monospace" size="1">channel=5 queue=amq.gen-Biake-GSvzyuCFe1b-4FVA consumer=amq.ctag-xtZbH08UoJnJKhi52vn_1g</font></div><div><font face="courier new, monospace" size="1">channel=6 queue=amq.gen-apsJKNLSgN3Dv0VBqPLqSQ consumer=amq.ctag-OwrpVcvqzqB3aQ42PdQY2g</font></div><div><font face="courier new, monospace" size="1">channel=7 queue=amq.gen-KxXs5Hkd1W2I1WPxIydpMQ consumer=amq.ctag-KPZzHfmW2f7Cni5u4gQCvA</font></div><div><font face="courier new, monospace" size="1">channel=8 queue=amq.gen-ZeLhlueGGsAXW_GhuOVS5w consumer=amq.ctag-54Raw5XNJp2W1-uQUcP2aw</font></div><div><font face="courier new, monospace" size="1">channel=9 queue=amq.gen-UZW1yR4QG2Sz90RM9CulQA consumer=amq.ctag-kDDpMuN3vaP-MLeH4Fk7oA</font></div><div><font face="courier new, monospace" size="1">channel=10 queue=amq.gen-Ejgc9jZrg6q1UH3qN2VXuA consumer=amq.ctag-3OxICESMijMUMgZOURXNmw</font></div><div><font face="courier new, monospace" size="1">Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg</font></div><div><font face="courier new, monospace" size="1">com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">       </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">     </span>at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre"> </span>at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">   </span>at java.lang.Thread.run(Thread.java:744)</font></div><div><font face="courier new, monospace" size="1">Caused by: java.io.IOException</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre"> </span>at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre"> </span>at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:995)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">     </span>at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">        </span>... 6 more</font></div><div><font face="courier new, monospace" size="1">Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">      </span>at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">      </span>at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">       </span>at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">     </span>at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:993)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">     </span>... 9 more</font></div><div><font face="courier new, monospace" size="1">Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw</font></div><div><font face="courier new, monospace" size="1">com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">   </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">        </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">       </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">     </span>at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre"> </span>at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">   </span>at java.lang.Thread.run(Thread.java:744)</font></div><div><font face="courier new, monospace" size="1">Caused by: com.rabbitmq.client.AlreadyClosedException: connectionconnection error; reason: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">       </span>at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre"> </span>at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">     </span>at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">  </span>at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)</font></div><div><font face="courier new, monospace" size="1"><span class="Apple-tab-span" style="white-space:pre">        </span>... 6 more</font></div></div></div>