[rabbitmq-discuss] Handling Undelivered messages in rabbitmq

Prashant Yadav PRYD at mach.com
Thu Jan 6 09:15:10 GMT 2011


Hi,

 

For one of the reuirement we need to keep track of queue depth and
successfully processed messages. The idea is to publish messages and get
a list of successful and failed messages. To simulate the requirement I
did the following

 

1)      Publish the messages with Mandatory and Immediate flag sent
channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello
World".bytes

2)      The consumer consumes even marked ( I have put numbers from
1..10 as marked value in header of each messages)  and does not ACKS odd
numbered messages.

3)      I have implemented setReturnListnere in the publisher to capture
undelivered messages.

 

While am able to get the number of unack messages via Rabbmitmqctl
list_queues messages_unacknowledged, somehow my handleBasicReturn method
does not gets called. Am in missing something.

 

Code snippets:

 

Producer:

channel.setReturnListener(new ReturnListener() {

        public void handleBasicReturn(int replyCode, 

 
String replyText, 

 
String exchange, 

 
String routingKey, 

 
AMQP.BasicProperties properties, 

 
byte[] body) 

            throws IOException {

                                                println "Debugging
messages!!!!"

            println "The details of returned messages are ${replyText}
from  ${exchange} with routingKey as ${routingKey} with properties"

        }

    });

 

println " queuename is ${dec.queue} and consumerCount is
${dec.consumerCount} messageCount is ${dec.messageCount}"

                (1..10).each {

                                println "Sending file ${i}....."

                                def headers = new
HashMap<String,Object>()

                                headers.put "operatiion","scp"

                                headers.put "dest","joker.dk.mach.com"

                                headers.put "id", i

                                println headers

 

                                BasicProperties props = new
BasicProperties(null, null, headers, null, null, null, null, null,null,
null, null, null,null, null)

                                                channel.basicPublish
'exchange' ,'rKey',true,false, props,"Hello Worls".bytes


                                i++                         

                }

channel.close()

 

Consumer:

while (true) {

                def delivery = consumer.nextDelivery()

                def headers = delivery?.properties?.headers

                def id = headers.get("id")

    println "Received message:"

                println " ${id.toString()}"

 

                if( id % 2 == 0){

                channel.basicAck delivery.envelope.deliveryTag, false


                }    

}

 

Regds

Pras

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110106/e6246eb2/attachment-0001.htm>


More information about the rabbitmq-discuss mailing list