[rabbitmq-discuss] Incorrect number of messages saved to the queue after RabbitMQ broker failure
Jerry Kuch
jerryk at vmware.com
Wed Mar 9 00:24:39 GMT 2011
Hi, Venkat...
If you haven't ACKed your 10 consumed messages, either by using the auto-ACK setting in your consuming code, or by explicitly invoking basic.ack, the broker will maintain those messages, since it isn't certain that a client has taken responsibility. The intent of this behavior is to guard against the case where a consumer crashes after getting a message but before doing whatever work it had planned to do on it.
Best regards,
Jerry
On Mar 8, 2011, at 4:16 PM, venkat veludandi wrote:
> Good evening.
> After consuming few published messages, if I bring down the RabbitMQ broker (hard failure) and bring it back, the number of messages saved to queue is incorrect.
> For example, I publish 100 messages. After consuming 10 messages, I bring down the broker and bring it back. Run: rabbitmqctl list_queues messages_ready, I see the count of messages_ready as 100 instead of 90.
> Could you please let me know if there is anything wrong with the following Producer and consumer code:
>
> Thanks
> Venkat
> ===========================
> TestProducer.java
> -------------------------
> public class TestProducer {
>
>
> public static void main(String[] args) {
>
> for(int i=0; i < 100; i++) {
>
> String msg = "M" + i;
>
> try {
>
> RabbitMQProducer.INSTANCE.run(msg);
>
> } catch(Exception e) {
>
> System.out.println("exception occurred");
>
> break;
>
> }
>
> }
>
> }
>
> }
>
> =============================================================
>
> RabbitMQProducer.java:
>
> --------------------------------
>
> public class RabbitMQProducer {
>
>
> public static final RabbitMQProducer INSTANCE = new RabbitMQProducer();
>
>
> private RabbitMQProducer() {}
>
>
> public void run(String msg) {
>
> String hostName = "localhost";
>
> String port = "5672";
>
> int portNumber = Integer.valueOf(port);;
>
> Connection conn = null;
>
> try {
>
>
> ConnectionFactory cf = new ConnectionFactory();
>
> cf.setHost(hostName);
>
> cf.setPort(portNumber);
>
> cf.setUsername("guest");
>
> cf.setPassword("guest");
>
> conn = cf.newConnection();
>
> Channel ch = conn.createChannel();
>
> ch.exchangeDeclare("mdb.testq.exchange", "topic", true);
>
> ch.queueDeclare("mdb.testq.queue", true, false, false, null);
>
> ch.txSelect();
>
> ch.basicPublish("mdb.testq.exchange", "mdb.testq.queue", MessageProperties.PERSISTENT_BASIC, msg.getBytes());
>
> ch.txCommit();
>
> ch.close();
>
> conn.close();
>
>
> } catch (IOException e) {
>
> e.printStackTrace();
>
> }
>
>
> }
>
>
> }
>
> =================================================================
>
> public class RabbitConsumer {
>
> private int count = 0;
>
>
> private void runConsumer() {
>
> Connection conn = null;
>
> try {
>
>
> String hostName = "localhost";
>
> String port = "5672";
>
> int portNumber = Integer.valueOf(port);
>
> ConnectionFactory cf = new ConnectionFactory();
>
> cf.setHost(hostName);
>
> cf.setPort(portNumber);
>
> cf.setUsername("guest");
>
> cf.setPassword("guest");
>
> conn = cf.newConnection();
>
> Channel channel = conn.createChannel();
>
> channel.exchangeDeclare("mdb.testq.exchange", "topic", true);
>
> channel.queueBind("mdb.testq.queue", "mdb.testq.exchange", "#");
>
> Consumer consumer = new QueueingConsumer(channel);
>
> channel.basicQos(1);
>
> channel.basicConsume("mdb.testq.queue", false, consumer);
>
> boolean running = true;
>
> while(running) {
>
>
> QueueingConsumer.Delivery delivery;
>
> try {
>
> delivery = ((QueueingConsumer) consumer).nextDelivery();
>
> Thread.currentThread().sleep(500);
>
> count++;
>
> System.out.println("count: "+count);
>
> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
>
> } catch (Exception e) {
>
> e.printStackTrace();
>
> break;
>
> }
>
> }
>
> } catch (IOException e) {
>
> e.printStackTrace();
>
> }
>
>
> }
>
>
> public static void main(String[] args) {
>
> RabbitConsumer rc = new RabbitConsumer();
>
> rc.runConsumer();
>
> }
>
> }
>
>
> <ATT00001..txt>
More information about the rabbitmq-discuss
mailing list