[rabbitmq-discuss] FW: Retry Message for 3 times with 10 seconds interval - in Consumer

Gary Russell grussell at gopivotal.com
Thu May 29 15:48:03 BST 2014


You need to provide much more information (logs, all configuration).

Please move this question to Stack Overflow (http://stackoverflow.com/)
using the 'spring-amqp' tag since your concerns are all about Spring AMQP
and not RabbitMQ itself.


On Wed, May 28, 2014 at 8:03 PM, Srinath Sridharan -X (srinatsr - ZENSAR
TECHNOLOGIES INC at Cisco) <srinatsr at cisco.com> wrote:

>  Hi Gary,
>
>
>
> My listener getting disconnected if any messages published to queue (which
> it is listening).
>
>
>
> Following is the listerner configuration.
>
>
>
> <bean
>
>               class=
> *"org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"*
> >
>
>              <property name=*"connectionFactory"* ref=
> *"connectionFactory"* />
>
>              <property name=*"queueNames"*>
>
>                     <array>
>
>                            <value>validateRequestQueue</value>
>
>                     </array>
>
>              </property>
>
>              <property name=*"messageListener"*>
>
>                     <bean
>
>                            class=
> *"org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"*
> >
>
>                            <property name=*"delegate"* ref=
> *"validationListener"* />
>
>                     </bean>
>
>              </property>
>
>              <property name=*"adviceChain"*
>
>                     ref=*"statefulRetryOperationsInterceptorFactoryBean"*
> ></property>
>
>              <property name=*"acknowledgeMode"* value=*"NONE"* />
>
>        </bean>
>
>
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
> *From:* Gary Russell [mailto:grussell at gopivotal.com]
> *Sent:* Wednesday, May 21, 2014 2:34 PM
> *To:* Srinath Sridharan -X (srinatsr - ZENSAR TECHNOLOGIES INC at Cisco)
> *Subject:* Re: FW: [rabbitmq-discuss] Retry Message for 3 times with 10
> seconds interval - in Consumer
>
>
>
>     <bean id=*"statefulRetryOperationsInterceptorFactoryBean"*
>
>         class=
> *"org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean"*
> >
>
>         <property name=*"retryOperations"* ref=*"retryTemplate"* />
>
>         <property name=*"messageRecoverer">*
>
>             <bean class=
> *"org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer">*
>
>         </property*>    *
>
>     </bean>
>
>
>
> On Wed, May 21, 2014 at 4:46 PM, Srinath Sridharan -X (srinatsr - ZENSAR
> TECHNOLOGIES INC at Cisco) <srinatsr at cisco.com> wrote:
>
>
>
> I totally agree Gary.
>
>
>
> But how to configure in xml ?  Here is my xml and designed as you
> described.
>
>
>
>    <bean id=*"simpleRetryPolicy"* class=
> *"org.springframework.retry.policy.SimpleRetryPolicy"*>
>
>         <property name=*"maxAttempts"* value=*"3"* />
>
>     </bean>
>
>     <!-- <bean id="retryTemplate"
> class="org.springframework.retry.support.RetryTemplate">
>
>         <property name="retryPolicy" *ref*="simpleRetryPolicy" />
>
>     </bean> -->
>
>     <bean id=*"statefulRetryOperationsInterceptorFactoryBean"*
>
>         class=
> *"org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean"*
> >
>
>         <property name=*"retryOperations"* ref=*"retryTemplate"* />
>
>         <property name=*"messageRecoverer"*
>
>     </bean>
>
>
>
>
>
>        <bean id=*"retryTemplate"* class=
> *"org.springframework.retry.support.RetryTemplate"*>
>
>               <property name=*"retryPolicy"* ref=*"simpleRetryPolicy"* />
>
>              <property name=*"backOffPolicy"*>
>
>                     <bean class=
> *"org.springframework.retry.backoff.ExponentialBackOffPolicy"*>
>
>                            <property name=*"initialInterval"* value=
> *"10000"* />
>
>              <!--         <property name="multiplier" value="10.0" />
>
>                            <property name="maxInterval" value="10000" />
> -->
>
>                     </bean>
>
>              </property>
>
>        </bean>
>
>        <rabbit:template id=*"rabbitTemplate"*
>
>              connection-factory=*"connectionFactory"* />
>
>        <bean
>
>               class=
> *"org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"*
> >
>
>              <property name=*"connectionFactory"* ref=
> *"connectionFactory"* />
>
>              <property name=*"queueNames"*>
>
>                     <array>
>
>                            <value>validateRequestQueue</value>
>
>                     </array>
>
>              </property>
>
>              <property name=*"messageListener"*>
>
>                     <bean
>
>                            class=
> *"org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"*
> >
>
>                            <property name=*"delegate"* ref=
> *"retryConsumer"* />
>
>                     </bean>
>
>              </property>
>
>              <property name=*"adviceChain"* ref=
> *"statefulRetryOperationsInterceptorFactoryBean"*></property>
>
>              <!-- <property name="acknowledgeMode" value="NONE" /> -->
>
>        </bean>
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
> *From:* rabbitmq-discuss [mailto:
> rabbitmq-discuss-bounces at lists.rabbitmq.com] *On Behalf Of *Gary Russell
> *Sent:* Wednesday, May 21, 2014 1:14 PM
>
>
> *To:* Discussions about RabbitMQ
> *Subject:* Re: [rabbitmq-discuss] Retry Message for 3 times with 10
> seconds interval - in Consumer
>
>
>
> You did not configure a MessageRecoverer as I suggested; the default
> recoverer logs a WARN message and acks the message.
>
>
>
> Add a RejectAndDontRequeueRecoverer and the rejected message will go to
> the DLE.
>
>
>
> On Wed, May 21, 2014 at 2:14 PM, Srinath Sridharan -X (srinatsr - ZENSAR
> TECHNOLOGIES INC at Cisco) <srinatsr at cisco.com> wrote:
>
> After 3 times, I am getting the following exception.  But I don’t want
> this exception, directly the message should go to dead letter exchange.
>
>
>
> I  have configured dlx as follows.
>
>
>
> rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}'
>
>
>
>
>
> 2014-05-21 11:10:37:870 -0700, [WARN]
> StatefulRetryOperationsInterceptorFactoryBean  recover - 89 Message dropped
> on recovery: (Body:'[B at 3ed60d48(byte[163])'MessageProperties [headers={},
> timestamp=null, messageId=1, userId=null, appId=null, clusterId=null,
> type=null, correlationId=null, replyTo=null,
> contentType=application/octet-stream, contentEncoding=null,
> contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0,
> redelivered=true, receivedExchange=,
> receivedRoutingKey=validateRequestQueue, deliveryTag=8, messageCount=0])
>
> *org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException*:
> Listener threw exception
>
>        at
> org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(
> *AbstractMessageListenerContainer.java:758*)
>
>        at
> org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(
> *AbstractMessageListenerContainer.java:653*)
>
>        at
> org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(
> *AbstractMessageListenerContainer.java:576*)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(
> *SimpleMessageListenerContainer.java:75*)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(
> *SimpleMessageListenerContainer.java:154*)
>
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(*Native Method*)
>
>        at sun.reflect.NativeMethodAccessorImpl.invoke(
> *NativeMethodAccessorImpl.java:57*)
>
>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> *DelegatingMethodAccessorImpl.java:43*)
>
>        at java.lang.reflect.Method.invoke(*Method.java:606*)
>
>        at
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(
> *AopUtils.java:317*)
>
>        at
> org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(
> *ReflectiveMethodInvocation.java:183*)
>
>        at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(
> *ReflectiveMethodInvocation.java:150*)
>
>        at
> org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(
> *StatefulRetryOperationsInterceptor.java:162*)
>
>        at org.springframework.retry.support.RetryTemplate.doExecute(
> *RetryTemplate.java:263*)
>
>        at org.springframework.retry.support.RetryTemplate.execute(
> *RetryTemplate.java:193*)
>
>        at
> org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(
> *StatefulRetryOperationsInterceptor.java:137*)
>
>        at
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(
> *ReflectiveMethodInvocation.java:172*)
>
>        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(
> *JdkDynamicAopProxy.java:204*)
>
>        at com.sun.proxy.$Proxy62.invokeListener(Unknown Source)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(
> *SimpleMessageListenerContainer.java:1113*)
>
>        at
> org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(
> *AbstractMessageListenerContainer.java:559*)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(
> *SimpleMessageListenerContainer.java:904*)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(
> *SimpleMessageListenerContainer.java:888*)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(
> *SimpleMessageListenerContainer.java:75*)
>
>        at
> org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(
> *SimpleMessageListenerContainer.java:989*)
>
>        at java.lang.Thread.run(*Thread.java:724*)
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
> *From:* rabbitmq-discuss [mailto:
> rabbitmq-discuss-bounces at lists.rabbitmq.com] *On Behalf Of *Srinath
> Sridharan -X (srinatsr - ZENSAR TECHNOLOGIES INC at Cisco)
> *Sent:* Wednesday, May 21, 2014 11:00 AM
>
>
> *To:* Discussions about RabbitMQ
> *Subject:* Re: [rabbitmq-discuss] Retry Message for 3 times with 10
> seconds interval - in Consumer
>
>
>
> It works perfect thank you so much J
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
> *From:* rabbitmq-discuss [
> mailto:rabbitmq-discuss-bounces at lists.rabbitmq.com
> <rabbitmq-discuss-bounces at lists.rabbitmq.com>] *On Behalf Of *Gary Russell
> *Sent:* Tuesday, May 20, 2014 12:48 PM
> *To:* Discussions about RabbitMQ
> *Subject:* Re: [rabbitmq-discuss] Retry Message for 3 times with 10
> seconds interval - in Consumer
>
>
>
> To configure the retry advice in XML, you need to wire up
> a StatefulRetryOperationsInterceptorFactoryBean.
>
>
>
> It needs a RetryTemplate (RetryOperations) bean which needs a RetryPolicy
> and BackOffPolicy beans.
>
>
>
> Then wire the factory bean into the advice chain.
>
>
>
> To set the message id, it depends on how you are sending messages; you can
> set it directly, or if you are using message conversion, set
> the createMessageIds to true on the outbound template's
> SimpleMessageConverter.
>
>
>
> However, when using long backoffs like that, there's not much benefit in
> using stateful retry.
>
>
>
> On Tue, May 20, 2014 at 3:29 PM, Srinath Sridharan -X (srinatsr - ZENSAR
> TECHNOLOGIES INC at Cisco) <srinatsr at cisco.com> wrote:
>
> Thanks  do you have xml version of the following?
>
>
>
> I need to set message id before publishing the message?  How to do this?
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
> *From:* rabbitmq-discuss [mailto:
> rabbitmq-discuss-bounces at lists.rabbitmq.com] *On Behalf Of *Gary Russell
> *Sent:* Tuesday, May 20, 2014 12:24 PM
> *To:* Discussions about RabbitMQ
> *Subject:* Re: [rabbitmq-discuss] Retry Message for 3 times with 10
> seconds interval - in Consumer
>
>
>
> @Bean
>
> public MethodInterceptor retryAdvice() {
>
>      FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
>
>      backOffPolicy.setBackOffPeriod(10000);;
>
>      return RetryInterceptorBuilder.stateful()
>
>           .backOffPolicy(backOffPolicy)
>
>           .maxAttempts(3)
>
>           .recoverer(new RejectAndDontRequeueRecoverer())
>
>           .build();
>
> }
>
>
>
> Add it to the adviceChain property of the SimpleMessageListenerContainer.
>
>
>
> Note: stateful retry requires the message to have a message id; use a
> stateless advice otherwise.
>
>
>
> After the retries are exhausted, the message will be rejected (and dropped
> or sent to a Dead Letter Exchange of so configured).
>
>
>
>
>
> On Tue, May 20, 2014 at 3:09 PM, Srinath Sridharan -X (srinatsr - ZENSAR
> TECHNOLOGIES INC at Cisco) <srinatsr at cisco.com> wrote:
>
>
>
> Hello,
>
>
>
> I am consuming the messages using the listener as follows,  I need to
> retry processing three times if any exceptions occurred.
>
>
>
> <bean
>
>         class=
> *"org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"*
> >
>
>         <property name=*"connectionFactory"* ref=*"connectionFactory"* />
>
>         <property name=*"queueNames"*>
>
>             <array>
>
>                 <value>validateRequestQueue</value>
>
>             </array>
>
>         </property>
>
>         <property name=*"messageListener"*>
>
>             <bean
>
>                 class=
> *"org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"*
> >
>
>                 <property name=*"delegate"* ref=*"retryConsumer"* />
>
>             </bean>
>
>         </property>
>
>         <!-- <property name="acknowledgeMode" value="NONE" /> -->
>
>     </bean>
>
>
>
>
>
> *public* *class* RetryConsumer *implements* ChannelAwareMessageListener {
>
>
>
>        /**
>
>        * Callback for processing a received Rabbit message.
>
>        * *@param* message the received AMQP message
>
>        * *@param* channel the underlying Rabbit Channel
>
>        * *@throws* Exception
>
>         */
>
>        @Override
>
>        *public* *void* onMessage(Message message, Channel channel)
> *throws* Exception {
>
>
>
>              System.*out*.println("Received Message :: "+*new*
> String(message.getBody()));
>
>              *if*(*true*){
>
>              *throw* *new* RuntimeException("Error");
>
>              }
>
>
>
>        }
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
> *From:* Srinath Sridharan -X (srinatsr - ZENSAR TECHNOLOGIES INC at
> Cisco)
> *Sent:* Monday, May 19, 2014 10:47 AM
> *To:* rabbitmq-discuss at lists.rabbitmq.com
> *Subject:* Retry Message for 3 times with 10 seconds interval - in
> Consumer
>
>
>
>
>
> Retry unacknowledged RabbitMQ message in 10 second interval
>
> And Retry 3 times using Spring framework in java .
>
>
>
> Please need help on this
>
>
>
> Regards …*?*
>
> Srinath
>
>
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
>
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
>
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20140529/9a9b6a12/attachment.html>


More information about the rabbitmq-discuss mailing list