[rabbitmq-discuss] using autorecovery in case of broken connection

Aiman Ashraf kurtrips at gmail.com
Sat May 17 22:45:00 BST 2014


I am trying to use the autorecovery feature in case connection breaks.
I have 2 problems:

1. The autorecovery code seemingly completely ignores the 
networkRecoveryInterval. In my log file, for the 1 minute that the 
connection is broken, the file grows to 1.5 GB. The following error is 
constantly repeated.

> Caught an exception during connection recovery!
>
> java.net.SocketException: Network is unreachable
>
> at java.net.PlainSocketImpl.socketConnect(Native Method)
>
> at 
>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
>
> at 
>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
>
> at 
>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
>
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
>
> at java.net.Socket.connect(Socket.java:579)
>
> at 
>> com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
>
> at 
>> com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:34)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:388)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:360)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:48)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:345)
>
> at 
>> com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
>
> at 
>> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
>
> at java.lang.Thread.run(Thread.java:722)
>
>
>
 2. Finally when I do switch my router on, the network recovery does not 
work. I get this in my log file.

> Caught an exception when recovering topology Caught an exception while 
>> recovering queue 8923yrbk
>
> com.rabbitmq.client.TopologyRecoveryException: Caught an exception while 
>> recovering queue 8923yrbk
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:459)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:424)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:48)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:345)
>
> at 
>> com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
>
> at 
>> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
>
> at java.lang.Thread.run(Thread.java:722)
>
> Caused by: com.rabbitmq.client.AlreadyClosedException: connection is 
>> already closed due to connection error; cause: 
>> com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with 
>> heartbeat = 45 seconds
>
> at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
>
> at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
>
> at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:209)
>
> at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
>
> at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:230)
>
> at 
>> com.rabbitmq.client.impl.recovery.RecordedQueue.recover(RecordedQueue.java:36)
>
> at 
>> com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:448)
>
> ... 7 more
>
>
>
Here is my rather simple consumer side code. Note that I write all the code 
in a new Thread because I don't want my constructor to block.

        private ConnectionFactory factory = null;

private Connection connection = null;

private Channel channel = null;

 private PaymentInfoFromGlobalServerConsumer() {

    new Thread(new Runnable() {

     public void run() {

    factory = new ConnectionFactory();

    try {

factory.setUri(amqpServerUrl);

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(30000); // In case of broken connection, 
> try again every 30 seconds (hope this is correct understanding)

factory.setRequestedHeartbeat(45); //Keep sending the heartbeat every 45 
> seconds to prevent any routers from considering the connection stale.

} catch (KeyManagementException | NoSuchAlgorithmException | 
> URISyntaxException e) {

//Will never happen if configured properly

logger.error(e);

return;

}

    

     try {

    connection = factory.newConnection();

    channel = connection.createChannel();

//Create a durable queue (if not already present)

channel.queueDeclare(merchantId, true, false, false, null);

    

     QueueingConsumer consumer = new QueueingConsumer(channel);

    channel.basicConsume(merchantId, false, consumer);

    

while (true) {

    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    String billId = new String(delivery.getBody());

    //TODO - Redeliveries are possible as per design

    System.out.println(" [x] Received '" + billId + "'");

    System.out.println(" [x] Done" );

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    } 

     } catch (IOException | ConsumerCancelledException | 
> InterruptedException e) {

     e.printStackTrace();

     logger.error(e);

} catch (ShutdownSignalException e) {

System.out.println(e.isInitiatedByApplication() + " " + e.isHardError());

} finally {

close();

}

} 

}).start();

}

 public void close() {

try {

if (channel != null) channel.close();

} catch (IOException | AlreadyClosedException e) {

//Cannot do anything now

}

try {

if (connection != null) connection.close();

} catch (IOException | AlreadyClosedException e) {

//Cannot do anything now

}

}

 


I am a newbie to amqp, so any help is appreciated. Thanks 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140517/372291a0/attachment.html>


More information about the rabbitmq-discuss mailing list