[rabbitmq-discuss] shutdownCompleted is not called when queue is deleted

jchappelle jchappelle at 4redi.com
Tue Aug 6 20:48:16 BST 2013


I have implemented a Consumer that extends DefaultConsumer. When I delete my
queue using the web interface it fires the handleCancel event in my consumer
but it does not call shutdownCompleted in the class that is managing my
connections. I'm trying to make my application fault tolerant so that it
will re-connect in this case. I throw a ConsumerCancelledException in my
handleCancel method thinking that the DefaultExceptionHandler will close the
channel and ultimately fire the event that calls the shutdownCompleted
method but this does not happen. My code works fine in the event that the
server goes down, because shutdownCompleted is called and I'm able to
re-establish the connection. 

Is there some other way that I should be handling this? I have listed my
code below:


package com.redi.email.service;

import java.io.IOException;

import org.apache.log4j.Logger;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.*;
import com.redi.email.*;
import com.redi.email.rabbitmq.QueuedEmail;

public class EmailConsumerApplication implements Application,
ShutdownListener
{
	private static final Logger logger =
Logger.getLogger(EmailConsumerApplication.class);
	
	private final EmailConfig defaultEmailConfig;
 
	private boolean stopped;
	private final ConnectionFactory factory;
	private Connection connection = null;
	private Channel channel = null;

	private EmailConsumer consumer;
	private final String queue;
	private static final String EXCHANGE = "email";
	
	public EmailConsumerApplication(MqConfig mqConfig, EmailConfig
defaultEmailConfig)
	{
		factory = new ConnectionFactory();
	    factory.setHost(mqConfig.getHost());
	    factory.setUsername(mqConfig.getUsername());
	    factory.setPassword(mqConfig.getPassword());
	    factory.setVirtualHost(mqConfig.getVhost());
	    
	    this.queue = mqConfig.getQueue();
	    this.defaultEmailConfig = defaultEmailConfig;
	}

	public void run()
	{
		connectAndConsume();
	}

	private void connectAndConsume()
	{
		int connectionRetries = 0;
		int retrySeconds = 30;
		boolean connected = false;
		while(!stopped && !connected)
		{
			try
			{
				boolean autoAck = false;
				boolean durable = true;
				
				logger.debug("Establishing a connection");
				connection = factory.newConnection();
				connection.addShutdownListener(this);
				channel = connection.createChannel();
				channel.addShutdownListener(this);
				
				logger.debug("Setting up channel");
				channel.exchangeDeclare(EXCHANGE, "direct", true);
				channel.queueDeclare(queue, durable, false, false, null);
				channel.queueBind(queue, EXCHANGE, queue);
				channel.basicQos(1);
				
				logger.info("Waiting for messages.");
				consumer = new EmailConsumer(channel, defaultEmailConfig);
				channel.basicConsume(queue, autoAck, consumer);
				connected = true;
			}
			catch(Exception e)
			{
				close();
				logger.error("Unable to establish a connection to the RabbitMQ server.
Retry #" + connectionRetries++ + " in " + retrySeconds + " seconds...", e);
				Utils.sleep(retrySeconds);
			}
		}
	}

	private void close()
	{
		logger.debug("Closing channel and connection");
		try
		{
			Utils.close(channel);
			Utils.close(connection);
		}
		catch(Exception e)
		{
			logger.debug("Unable to abort consumer", e);
		}
	}

	public void stop()
	{
		logger.debug("Stopping Consumer");
		stopped = true;
		close();
	}

	public boolean isStopped()
	{
		return stopped;
	}

	@Override
	public void shutdownCompleted(ShutdownSignalException cause)
	{
		logger.debug("handleShutdownOrCancel");
		if(stopped)
		{
			logger.info("Consumer shutdown completed.");
		}
		else
		{
			logger.info("Connection has been shutdown.", cause);
			logger.info("Re-establishing connection");
			connectAndConsume();
		}
	}
	
	private class EmailConsumer extends DefaultConsumer
	{
		private EmailService emailService = new DefaultEmailService();
		private DeduplicateService deduplicateService = new
DefaultDeduplicateService();

		private final EmailConfig defaultEmailConfig;
	 
		public EmailConsumer(Channel channel, EmailConfig defaultEmailConfig)
		{
			super(channel);
			
			this.defaultEmailConfig = defaultEmailConfig;
		}

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException
		{
			if(body == null)
			{
				logger.warn("Ignoring null message!");
			}
			else
			{
				String rawMessage = new String(body, "UTF-8");
				logger.debug("Received '" + rawMessage + "'");     
				sendEmail(rawMessage);
				logger.debug("Done");
				getChannel().basicAck(envelope.getDeliveryTag(), false);
			}
		}

		private void sendEmail(String rawMessage)
		{
			try
			{
				QueuedEmail queuedEmail = makeQueuedEmail(rawMessage);
				if(deduplicateService.isDuplicate(queuedEmail))
				{
					logger.warn("Ignoring duplicate message id=" + queuedEmail.getMsgId());
				}
				else
				{
					Email email = queuedEmail.getEmail();
					if(email.getEmailConfig() == null)
					{
						email.setEmailConfig(defaultEmailConfig);
					}
					emailService.sendEmail(email);
				}
			}
			catch(EmailException e)
			{
				logger.error("Unable to send email", e);
			}
		}
		
		private QueuedEmail makeQueuedEmail(String rawMessage)
		{
			try
			{
				Gson gson = new Gson();
				return gson.fromJson(rawMessage, QueuedEmail.class);
			}
			catch(JsonSyntaxException e)
			{
				logger.error("Could not create QueuedEmail from json string " +
rawMessage, e);
				throw e;
			}
		}
		
		@Override
		public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig)
		{
			logger.debug("handleShutdownSignal");
		}
		
		@Override
		public void handleCancelOk(String consumerTag)
		{
			logger.debug("handleCancelOk");
		}

		@Override
		public void handleCancel(String consumerTag) throws IOException
		{
			logger.debug("handleCancel");
			throw new ConsumerCancelledException();
		}
	}
}




--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/shutdownCompleted-is-not-called-when-queue-is-deleted-tp28621.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list