[rabbitmq-discuss] Testing Lyra recovery with the java client best practices ?

Dave S trash.collector at seyb.com
Tue Apr 8 10:36:53 BST 2014


Hello All

I was wondering if any one has had much success using Lyra
https://github.com/jhalterman/lyra

>From my testing so far it appears to do exactly what it says it will do 
However

My problem is that when I try to use the recovered consumer the same 
shutdown exception is thrown that cause the recovery in the first place.
My simple test case is to kill the connection using the web management 
console  after sending a number of messages.

There must be something here I am not understand correctly.
What I would expect is that after the connection and consumer is recovered 
any un acked messages would be republished but no the forced disconnect 
that caused the recovery in the first place.

Is there a way to over come this or is forcefully killing the connection 
not a valid recovery test case ?

Here is test code


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.ConfigurableConnection;
import net.jodah.lyra.config.RecoveryPolicies;


public class RecoveryTest {
     
    private static RecoveryTest test;

    private String EXCHANGE_NAME = "ORDER_UPDATES";
    private String EXCHANGE_HOST = "localhost";
    private String EXCHANGE_USER = "recover";
    private String EXCHANGE_PASS = "recover";
    private String QUEUE = "TEST_UPDATES";
    private String TOPIC = "*.#";

    private final Config config = new 
Config().withRecoveryPolicy(RecoveryPolicies.recoverAlways());;
    private ConnectionOptions options;
    private Channel channel;
    private ConnectionFactory factory;
    private ConfigurableConnection RecoveryConnection;
    private QueueingConsumer consumer;
    
      public static void main(String[] args) throws Exception  {
          test = new RecoveryTest();
          test.run();
      }
    
      public RecoveryTest() throws Exception {
          
            options = new 
ConnectionOptions().withAddresses(EXCHANGE_HOST);      
            factory = options.getConnectionFactory();
            factory.setUsername(EXCHANGE_USER);
            factory.setPassword(EXCHANGE_PASS);
            RecoveryConnection = Connections.create(options, config);
            channel = RecoveryConnection.createChannel();
            channel.queueDeclare(QUEUE, true, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE_NAME, TOPIC);
            
            consumer = new QueueingConsumer(channel);
            boolean autoAck = true;
            channel.basicConsume(QUEUE, autoAck, consumer);
      }

      
      public void run() { 
       QueueingConsumer.Delivery delivery = null;   
       System.out.println("Starting Test Run");
       while(true){           
           try {              
               delivery = consumer.nextDelivery();
           } catch (Exception ex) {
                 System.out.println("Somthing went wrong ..." + 
ex.getLocalizedMessage());
           }     
           byte [] msg = delivery.getBody();
           System.out.println(String.valueOf(new String(msg)));
           try {
               System.out.println("Lets take a rest");
               Thread.sleep(8000);
           } catch (InterruptedException ex) {
              System.out.println("We have been intrupted");
           }
             
       }
     } 
      
    
}


[main] INFO net.jodah.lyra.internal.ConnectionHandler - Creating connection 
cxn-1 to [localhost]
[main] INFO net.jodah.lyra.internal.ConnectionHandler - Created connection 
cxn-1 to amqp://127.0.0.1:5672/
[main] INFO net.jodah.lyra.internal.ConnectionHandler - Created channel-1 
on cxn-1
[main] INFO net.jodah.lyra.internal.ChannelHandler - Created 
consumer-amq.ctag-vUb0yGdYSY3Cq6eJEARsvA of TEST_UPDATES via channel-1 on 
cxn-1
Starting Test Run
hello
Lets take a rest
hello
Lets take a rest
hello again 
Lets take a rest
This is a new message 
Lets take a rest
[AMQP Connection 127.0.0.1:5672] ERROR 
net.jodah.lyra.internal.ChannelHandler - Channel channel-1 on cxn-1 was 
closed unexpectedly
Somthing went wrong ...connection error; reason: 
{#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - 
Closed via management plugin, class-id=0, method-id=0), null, ""}
This is a new message 
Lets take a rest
[AMQP Connection 127.0.0.1:5672] ERROR 
net.jodah.lyra.internal.ConnectionHandler - Connection cxn-1 was closed 
unexpectedly
[lyra-recovery-1] INFO net.jodah.lyra.internal.ConnectionHandler - 
Recovering connection cxn-1 to [localhost]
[lyra-recovery-1] INFO net.jodah.lyra.internal.ConnectionHandler - 
Recovered connection cxn-1 to amqp://127.0.0.1:5672/
[lyra-recovery-1] INFO net.jodah.lyra.internal.ConnectionHandler - 
Recovering queue binding from ORDER_UPDATES to TEST_UPDATES with *.# via 
cxn-1
[lyra-recovery-1] INFO net.jodah.lyra.internal.ChannelHandler - Recovering 
channel-1 on cxn-1
[lyra-recovery-1] INFO net.jodah.lyra.internal.ChannelHandler - Recovered 
channel-1 on cxn-1
[lyra-recovery-1] INFO net.jodah.lyra.internal.ChannelHandler - Recovering 
consumer-amq.ctag-vUb0yGdYSY3Cq6eJEARsvA of TEST_UPDATES via channel-1 on 
cxn-1
Somthing went wrong ...connection error; reason: 
{#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - 
Closed via management plugin, class-id=0, method-id=0), null, ""}
This is a new message 
Lets take a rest
Somthing went wrong ...connection error; reason: 
{#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - 
Closed via management plugin, class-id=0, method-id=0), null, ""}
This is a new message 

loop for ever ...... 








-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140408/49395bd3/attachment.html>


More information about the rabbitmq-discuss mailing list