[rabbitmq-discuss] Testing Lyra recovery with the java client best practices ?
Dave S
trash.collector at seyb.com
Tue Apr 8 10:36:53 BST 2014
Hello All
I was wondering if any one has had much success using Lyra
https://github.com/jhalterman/lyra
>From my testing so far it appears to do exactly what it says it will do
However
My problem is that when I try to use the recovered consumer the same
shutdown exception is thrown that cause the recovery in the first place.
My simple test case is to kill the connection using the web management
console after sending a number of messages.
There must be something here I am not understand correctly.
What I would expect is that after the connection and consumer is recovered
any un acked messages would be republished but no the forced disconnect
that caused the recovery in the first place.
Is there a way to over come this or is forcefully killing the connection
not a valid recovery test case ?
Here is test code
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.ConfigurableConnection;
import net.jodah.lyra.config.RecoveryPolicies;
public class RecoveryTest {
private static RecoveryTest test;
private String EXCHANGE_NAME = "ORDER_UPDATES";
private String EXCHANGE_HOST = "localhost";
private String EXCHANGE_USER = "recover";
private String EXCHANGE_PASS = "recover";
private String QUEUE = "TEST_UPDATES";
private String TOPIC = "*.#";
private final Config config = new
Config().withRecoveryPolicy(RecoveryPolicies.recoverAlways());;
private ConnectionOptions options;
private Channel channel;
private ConnectionFactory factory;
private ConfigurableConnection RecoveryConnection;
private QueueingConsumer consumer;
public static void main(String[] args) throws Exception {
test = new RecoveryTest();
test.run();
}
public RecoveryTest() throws Exception {
options = new
ConnectionOptions().withAddresses(EXCHANGE_HOST);
factory = options.getConnectionFactory();
factory.setUsername(EXCHANGE_USER);
factory.setPassword(EXCHANGE_PASS);
RecoveryConnection = Connections.create(options, config);
channel = RecoveryConnection.createChannel();
channel.queueDeclare(QUEUE, true, false, false, null);
channel.queueBind(QUEUE, EXCHANGE_NAME, TOPIC);
consumer = new QueueingConsumer(channel);
boolean autoAck = true;
channel.basicConsume(QUEUE, autoAck, consumer);
}
public void run() {
QueueingConsumer.Delivery delivery = null;
System.out.println("Starting Test Run");
while(true){
try {
delivery = consumer.nextDelivery();
} catch (Exception ex) {
System.out.println("Somthing went wrong ..." +
ex.getLocalizedMessage());
}
byte [] msg = delivery.getBody();
System.out.println(String.valueOf(new String(msg)));
try {
System.out.println("Lets take a rest");
Thread.sleep(8000);
} catch (InterruptedException ex) {
System.out.println("We have been intrupted");
}
}
}
}
[main] INFO net.jodah.lyra.internal.ConnectionHandler - Creating connection
cxn-1 to [localhost]
[main] INFO net.jodah.lyra.internal.ConnectionHandler - Created connection
cxn-1 to amqp://127.0.0.1:5672/
[main] INFO net.jodah.lyra.internal.ConnectionHandler - Created channel-1
on cxn-1
[main] INFO net.jodah.lyra.internal.ChannelHandler - Created
consumer-amq.ctag-vUb0yGdYSY3Cq6eJEARsvA of TEST_UPDATES via channel-1 on
cxn-1
Starting Test Run
hello
Lets take a rest
hello
Lets take a rest
hello again
Lets take a rest
This is a new message
Lets take a rest
[AMQP Connection 127.0.0.1:5672] ERROR
net.jodah.lyra.internal.ChannelHandler - Channel channel-1 on cxn-1 was
closed unexpectedly
Somthing went wrong ...connection error; reason:
{#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED -
Closed via management plugin, class-id=0, method-id=0), null, ""}
This is a new message
Lets take a rest
[AMQP Connection 127.0.0.1:5672] ERROR
net.jodah.lyra.internal.ConnectionHandler - Connection cxn-1 was closed
unexpectedly
[lyra-recovery-1] INFO net.jodah.lyra.internal.ConnectionHandler -
Recovering connection cxn-1 to [localhost]
[lyra-recovery-1] INFO net.jodah.lyra.internal.ConnectionHandler -
Recovered connection cxn-1 to amqp://127.0.0.1:5672/
[lyra-recovery-1] INFO net.jodah.lyra.internal.ConnectionHandler -
Recovering queue binding from ORDER_UPDATES to TEST_UPDATES with *.# via
cxn-1
[lyra-recovery-1] INFO net.jodah.lyra.internal.ChannelHandler - Recovering
channel-1 on cxn-1
[lyra-recovery-1] INFO net.jodah.lyra.internal.ChannelHandler - Recovered
channel-1 on cxn-1
[lyra-recovery-1] INFO net.jodah.lyra.internal.ChannelHandler - Recovering
consumer-amq.ctag-vUb0yGdYSY3Cq6eJEARsvA of TEST_UPDATES via channel-1 on
cxn-1
Somthing went wrong ...connection error; reason:
{#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED -
Closed via management plugin, class-id=0, method-id=0), null, ""}
This is a new message
Lets take a rest
Somthing went wrong ...connection error; reason:
{#method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED -
Closed via management plugin, class-id=0, method-id=0), null, ""}
This is a new message
loop for ever ......
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140408/49395bd3/attachment.html>
More information about the rabbitmq-discuss
mailing list