[rabbitmq-discuss] Incorrect number of messages saved to the queue after RabbitMQ broker failure
venkat veludandi
venkatveludandi at yahoo.com
Wed Mar 9 00:16:48 GMT 2011
Good evening.
After consuming few published messages, if I bring down the RabbitMQ broker (hard failure) and bring it back, the number of messages saved to queue is incorrect.
For example, I publish 100 messages. After consuming 10 messages, I bring down the broker and bring it back. Run: rabbitmqctl list_queues messages_ready, I see the count of messages_ready as 100 instead of 90.
Could you please let me know if there is anything wrong with the following Producer and consumer code:
Thanks
Venkat
===========================
TestProducer.java
-------------------------
public class TestProducer {
public static void main(String[] args) {
for(int i=0; i < 100; i++) {
String msg = "M" + i;
try {
RabbitMQProducer.INSTANCE.run(msg);
} catch(Exception e) {
System.out.println("exception occurred");
break;
}
}
}
}
=============================================================
RabbitMQProducer.java:
--------------------------------
public class RabbitMQProducer {
public static final RabbitMQProducer INSTANCE = new RabbitMQProducer();
private RabbitMQProducer() {}
public void run(String msg) {
String hostName = "localhost";
String port = "5672";
int portNumber = Integer.valueOf(port);;
Connection conn = null;
try {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(hostName);
cf.setPort(portNumber);
cf.setUsername("guest");
cf.setPassword("guest");
conn = cf.newConnection();
Channel ch = conn.createChannel();
ch.exchangeDeclare("mdb.testq.exchange", "topic", true);
ch.queueDeclare("mdb.testq.queue", true, false, false, null);
ch.txSelect();
ch.basicPublish("mdb.testq.exchange", "mdb.testq.queue", MessageProperties.PERSISTENT_BASIC, msg.getBytes());
ch.txCommit();
ch.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
=================================================================
public class RabbitConsumer {
private int count = 0;
private void runConsumer() {
Connection conn = null;
try {
String hostName = "localhost";
String port = "5672";
int portNumber = Integer.valueOf(port);
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(hostName);
cf.setPort(portNumber);
cf.setUsername("guest");
cf.setPassword("guest");
conn = cf.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare("mdb.testq.exchange", "topic", true);
channel.queueBind("mdb.testq.queue", "mdb.testq.exchange", "#");
Consumer consumer = new QueueingConsumer(channel);
channel.basicQos(1);
channel.basicConsume("mdb.testq.queue", false, consumer);
boolean running = true;
while(running) {
QueueingConsumer.Delivery delivery;
try {
delivery = ((QueueingConsumer) consumer).nextDelivery();
Thread.currentThread().sleep(500);
count++;
System.out.println("count: "+count);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
RabbitConsumer rc = new RabbitConsumer();
rc.runConsumer();
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110308/a194e5d8/attachment-0001.htm>
More information about the rabbitmq-discuss
mailing list