[rabbitmq-discuss] Incorrect number of messages saved to the queue after RabbitMQ broker failure

Jerry Kuch jerryk at vmware.com
Wed Mar 9 00:24:39 GMT 2011


Hi, Venkat...

If you haven't ACKed your 10 consumed messages, either by using the auto-ACK setting in your consuming code, or by explicitly invoking basic.ack, the broker will maintain those messages, since it isn't certain that a client has taken responsibility.  The intent of this behavior is to guard against the case where a consumer crashes after getting a message but before doing whatever work it had planned to do on it.

Best regards,
Jerry

On Mar 8, 2011, at 4:16 PM, venkat veludandi wrote:

> Good evening.
> After consuming few published messages, if I bring down the RabbitMQ broker (hard failure) and bring it back, the number of messages saved to queue is incorrect.
> For example, I publish 100 messages. After consuming 10 messages, I bring down the broker and bring it back. Run: rabbitmqctl list_queues messages_ready, I see the count of messages_ready as 100 instead of 90.
> Could you please let me know if there is anything wrong with the following Producer and consumer code:
>  
> Thanks
> Venkat
> ===========================
> TestProducer.java
> -------------------------
> public class TestProducer {
> 
> 
> public static void main(String[] args) {
> 
> for(int i=0; i < 100; i++) {
> 
> String msg = "M" + i;
> 
> try {
> 
> RabbitMQProducer.INSTANCE.run(msg);
> 
> } catch(Exception e) {
> 
> System.out.println("exception occurred");
> 
> break;
> 
> }
> 
> }
> 
> }
> 
> }
> 
> =============================================================
> 
> RabbitMQProducer.java:
> 
> --------------------------------
> 
> public class RabbitMQProducer {
> 
> 
> public static final RabbitMQProducer INSTANCE = new RabbitMQProducer();
> 
> 
> private RabbitMQProducer() {}
> 
> 
> public void run(String msg) {
> 
> String hostName = "localhost";
> 
> String port = "5672";
> 
> int portNumber = Integer.valueOf(port);;
> 
> Connection conn = null;
> 
> try {
> 
> 
> ConnectionFactory cf = new ConnectionFactory();
> 
> cf.setHost(hostName);
> 
> cf.setPort(portNumber);
> 
> cf.setUsername("guest");
> 
> cf.setPassword("guest");
> 
> conn = cf.newConnection();
> 
> Channel ch = conn.createChannel();
> 
> ch.exchangeDeclare("mdb.testq.exchange", "topic", true);
> 
> ch.queueDeclare("mdb.testq.queue", true, false, false, null);
> 
> ch.txSelect();
> 
> ch.basicPublish("mdb.testq.exchange", "mdb.testq.queue", MessageProperties.PERSISTENT_BASIC, msg.getBytes());
> 
> ch.txCommit();
> 
> ch.close();
> 
> conn.close();
> 
> 
> } catch (IOException e) {
> 
> e.printStackTrace();
> 
> }
> 
> 
> }
> 
> 
> }
> 
> =================================================================
> 
> public class RabbitConsumer {
> 
> private int count = 0;
> 
> 
> private void runConsumer() {
> 
> Connection conn = null;
> 
> try {
> 
> 
> String hostName = "localhost";
> 
> String port = "5672";
> 
> int portNumber = Integer.valueOf(port);
> 
> ConnectionFactory cf = new ConnectionFactory();
> 
> cf.setHost(hostName);
> 
> cf.setPort(portNumber);
> 
> cf.setUsername("guest");
> 
> cf.setPassword("guest");
> 
> conn = cf.newConnection();
> 
> Channel channel = conn.createChannel();
> 
> channel.exchangeDeclare("mdb.testq.exchange", "topic", true);
> 
> channel.queueBind("mdb.testq.queue", "mdb.testq.exchange", "#");
> 
> Consumer consumer = new QueueingConsumer(channel);
> 
> channel.basicQos(1);
> 
> channel.basicConsume("mdb.testq.queue", false, consumer);
> 
> boolean running = true;
> 
> while(running) {
> 
> 
> QueueingConsumer.Delivery delivery;
> 
> try {
> 
> delivery = ((QueueingConsumer) consumer).nextDelivery();
> 
> Thread.currentThread().sleep(500);
> 
> count++;
> 
> System.out.println("count: "+count);
> 
> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
> 
> } catch (Exception e) {
> 
> e.printStackTrace();
> 
> break;
> 
> }
> 
> }
> 
> } catch (IOException e) {
> 
> e.printStackTrace();
> 
> }
> 
> 
> }
> 
> 
> public static void main(String[] args) {
> 
> RabbitConsumer rc = new RabbitConsumer();
> 
> rc.runConsumer();
> 
> }
> 
> }
> 
> 
> <ATT00001..txt>



More information about the rabbitmq-discuss mailing list