[rabbitmq-discuss] java client reconnect error: attempt to reuse consumer tag

James Bellenger james at bellenger.org
Fri Apr 11 23:38:31 BST 2014


Hello.
I've been testing the client autorecovery support in 3.3.0 and have had 
some issues getting it to work.
On my machine, recovery appears to work at fewer than about 5 
consumers. Anything over that, and the client gets a stream of 
TopologyRecoveryExceptions and is never able to reconnect.

I've looked at the client source and haven't seen any obvious culprit. 
Has anyone else seen this?

TEST SETUP:
client jar: amqp-client-3.3.0.jar
server: rabbitmq-server 3.1.3-1 (from ubuntu 13.10)
Triggering autorecovery via 'service rabbitmq-server restart'

CLIENT CODE (scala):
package rabbittest

import com.rabbitmq.client._

object RecoveryTest {
   // the consumer count matters. At ~5 consumers the client gets put 
into a bad state and never reconnects
   // Fewer than 5, the client is usually able to reconnect
   val consumers = 10

   def main(args: Array[String]) {
     val factory = new ConnectionFactory()
     factory.setUri("amqp://guest@localhost:5672/test")
     factory.setAutomaticRecoveryEnabled(true)

     val conn = factory.newConnection()

     for (x <- 1 to consumers) {
       val chan = conn.createChannel()
       val result = chan.queueDeclare()
       val tag = chan.basicConsume(result.getQueue, new 
DefaultConsumer(chan))
       println(s"channel=${chan.getChannelNumber} 
queue=${result.getQueue} consumer=$tag")
     }
   }
}

GOOD LOGS:
channel=1 queue=amq.gen-KvCBOoSjkyhpqle8GMGB0Q 
consumer=amq.ctag-4WBZRhTLhp_mnQDCXs1NGg
channel=2 queue=amq.gen-bNUrlmqi2nHZiunZ9qy-UA 
consumer=amq.ctag-2_GR91qniV39a-qTtK9Beg
channel=3 queue=amq.gen-li-v7cR25J7kmQ-YfcqbCQ 
consumer=amq.ctag-llfj8Zf1yFCnIGxpU0EI8A

BAD LOGS (topology exceptions repeat on every reconnect attempt)
channel=1 queue=amq.gen-TIierQJcDNd93QzX8gYuww 
consumer=amq.ctag-grxPT0_DsPA7WccDmbNBrg
channel=2 queue=amq.gen-JaXYQy218NUlWGQCubjjhg 
consumer=amq.ctag-Toh8E8Ogvmb3UNkKcJszxg
channel=3 queue=amq.gen-VzRS5YVgjP7Lz3UfhxYBgA 
consumer=amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
channel=4 queue=amq.gen-JNJAT8cGvVPRePX43hSPtg 
consumer=amq.ctag-0nyXfLeLl9lsApz3XsT0Iw
channel=5 queue=amq.gen-Biake-GSvzyuCFe1b-4FVA 
consumer=amq.ctag-xtZbH08UoJnJKhi52vn_1g
channel=6 queue=amq.gen-apsJKNLSgN3Dv0VBqPLqSQ 
consumer=amq.ctag-OwrpVcvqzqB3aQ42PdQY2g
channel=7 queue=amq.gen-KxXs5Hkd1W2I1WPxIydpMQ 
consumer=amq.ctag-KPZzHfmW2f7Cni5u4gQCvA
channel=8 queue=amq.gen-ZeLhlueGGsAXW_GhuOVS5w 
consumer=amq.ctag-54Raw5XNJp2W1-uQUcP2aw
channel=9 queue=amq.gen-UZW1yR4QG2Sz90RM9CulQA 
consumer=amq.ctag-kDDpMuN3vaP-MLeH4Fk7oA
channel=10 queue=amq.gen-Ejgc9jZrg6q1UH3qN2VXuA 
consumer=amq.ctag-3OxICESMijMUMgZOURXNmw
Caught an exception when recovering topology Caught an exception while 
recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
com.rabbitmq.client.TopologyRecoveryException: Caught an exception 
while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)
	at 
com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
	at 
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
	at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:995)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)
	at 
com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)
	... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection 
error; reason: #method<connection.close>(reply-code=530, 
reply-text=NOT_ALLOWED - attempt to reuse consumer tag 
'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)
	at 
com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
	at 
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
	at 
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
	at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:993)
	... 9 more
Caught an exception when recovering topology Caught an exception while 
recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw
com.rabbitmq.client.TopologyRecoveryException: Caught an exception 
while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)
	at 
com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
	at 
com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
	at java.lang.Thread.run(Thread.java:744)
Caused by: com.rabbitmq.client.AlreadyClosedException: 
connectionconnection error; reason: 
#method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - 
attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', 
class-id=60, method-id=20)
	at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
	at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)
	at 
com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
	at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)
	... 6 more


More information about the rabbitmq-discuss mailing list