If you haven't ACKed your 10 consumed messages, either by using the auto-ACK setting in your consuming code, or by explicitly invoking basic.ack, the broker will maintain those messages, since it isn't certain that a client has taken responsibility.  The intent of this behavior is to guard against the case where a consumer crashes after getting a message but before doing whatever work it had planned to do on it.

> Good evening.
> After consuming few published messages, if I bring down the RabbitMQ broker (hard failure) and bring it back, the number of messages saved to queue is incorrect.
> For example, I publish 100 messages. After consuming 10 messages, I bring down the broker and bring it back. Run: rabbitmqctl list_queues messages_ready, I see the count of messages_ready as 100 instead of 90.
> Could you please let me know if there is anything wrong with the following Producer and consumer code:
> Thanks
> Venkat
> ===========================
> TestProducer.java
> -------------------------
> public class TestProducer {
> public static void main(String[] args) {
> for(int i=0; i < 100; i++) {
> String msg = "M" + i;
> try {
> RabbitMQProducer.INSTANCE.run(msg);
> } catch(Exception e) {
> System.out.println("exception occurred");
> break;
> }
> }
> }
> }
> =============================================================
> RabbitMQProducer.java:
> --------------------------------
> public class RabbitMQProducer {
> public static final RabbitMQProducer INSTANCE = new RabbitMQProducer();
> private RabbitMQProducer() {}
> public void run(String msg) {
> String hostName = "localhost";
> String port = "5672";
> int portNumber = Integer.valueOf(port);;
> Connection conn = null;
> try {
> ConnectionFactory cf = new ConnectionFactory();
> cf.setHost(hostName);
> cf.setPort(portNumber);
> cf.setUsername("guest");
> cf.setPassword("guest");
> conn = cf.newConnection();
> Channel ch = conn.createChannel();
> ch.exchangeDeclare("mdb.testq.exchange", "topic", true);
> ch.queueDeclare("mdb.testq.queue", true, false, false, null);
> ch.txSelect();
> ch.basicPublish("mdb.testq.exchange", "mdb.testq.queue", MessageProperties.PERSISTENT_BASIC, msg.getBytes());
> ch.txCommit();
> ch.close();
> conn.close();
> } catch (IOException e) {
> e.printStackTrace();
> }
> }
> }
> =================================================================
> public class RabbitConsumer {
> private int count = 0;
> private void runConsumer() {
> Connection conn = null;
> try {
> String hostName = "localhost";
> String port = "5672";
> int portNumber = Integer.valueOf(port);
> ConnectionFactory cf = new ConnectionFactory();
> cf.setHost(hostName);
> cf.setPort(portNumber);
> cf.setUsername("guest");
> cf.setPassword("guest");
> conn = cf.newConnection();
> Channel channel = conn.createChannel();
> channel.exchangeDeclare("mdb.testq.exchange", "topic", true);
> channel.queueBind("mdb.testq.queue", "mdb.testq.exchange", "#");
> Consumer consumer = new QueueingConsumer(channel);
> channel.basicQos(1);
> channel.basicConsume("mdb.testq.queue", false, consumer);
> boolean running = true;
> while(running) {
> QueueingConsumer.Delivery delivery;
> try {
> delivery = ((QueueingConsumer) consumer).nextDelivery();
> Thread.currentThread().sleep(500);
> count++;
> System.out.println("count: "+count);
> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
> } catch (Exception e) {
> e.printStackTrace();
> break;
> }
> }
> } catch (IOException e) {
> e.printStackTrace();
> }
> }
> public static void main(String[] args) {
> RabbitConsumer rc = new RabbitConsumer();
> rc.runConsumer();
> }
> }
