[rabbitmq-discuss] RabbitMQ broker crashing under heavy load with mirrored queues

Venkat vveludan at gmail.com
Mon Jan 9 23:39:48 GMT 2012


Hi Steve I have run some tests using RabbitMQ 2.7.1 please find the
following:

OS: CentOS release 5.5
RabbitMQ Version: 2.7.1
RabbitMQ Java Client: 2.7.1
spring-amqp Version: 1.0.0RELEASE
erlang version : 5.8.4 (http://repos.fedorapeople.org/repos/peter/
erlang/epel-erlang.repo: using Erlang version:R14B)
----------------------------------
Cluster Information:
Started Cluster with one RabbitMQ Node (Node A)
After test.queue is created on Node A, joined Node B to the cluster.
---------------------------------------------
Parent Queue Configuration:

@Configuration
public abstract class AbstractMDBRabbitConfiguration {

	protected abstract void configureMDBTemplate(RabbitTemplate
template);




	@Bean
	public ConnectionFactory connectionFactory() {
		String rabbitMQUser = "guest";
		String rabbitMQPassword = "guest";
		String rabbitMQHost = "localhost";
		CachingConnectionFactory connectionFactory = new
CachingConnectionFactory(rabbitMQHost);
		connectionFactory.setChannelCacheSize(10);
		connectionFactory.setUsername(rabbitMQUser);
		connectionFactory.setPassword(rabbitMQPassword);
		return connectionFactory;
	}

	@Bean
	public RabbitTemplate rabbitTemplate() {
		RabbitTemplate template = new DMBTemplate(connectionFactory());
		template.setMessageConverter(messageConverter());
		configureMDBTemplate(template);
		return template;
	}


	@Bean
	public MessageConverter messageConverter() {
		return new SimpleMessageConverter();
	}

	@Bean
	public AmqpAdmin amqpAdmin() {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
		return rabbitAdmin ;
	}
-------------------------------------------------------------------------------------
Test Queue Configuaration:

@Configuration
public class LoggingConfiguration extends
AbstractMDBRabbitConfiguration {

	private static final String AUDIT_EXCHANGE_NAME = "test.exchange";
	private static final String AUDIT_QUEUE_NAME = "test.queue";
	private static final String AUDIT_ROUTING_KEY = AUDIT_QUEUE_NAME;

	@Override
	public void configureMDBTemplate(RabbitTemplate mdbTemplate) {
		mdbTemplate.setExchange(AUDIT_EXCHANGE_NAME);
		mdbTemplate.setRoutingKey(AUDIT_ROUTING_KEY);
	}

	@Bean
	public Queue mdbQueue() {
		Map<String, Object> args = new HashMap<String, Object>();
		String haPolicyValue = "all";
		args.put("x-ha-policy", haPolicyValue);
		Queue queue = new Queue(AUDIT_QUEUE_NAME,true,false,false,args);
		return queue;
	}

	@Bean
	public TopicExchange mdbAuditExchange() {
		TopicExchange exchange = new
TopicExchange(AUDIT_EXCHANGE_NAME,true,false);
		return exchange;
	}

	@Bean
	public Binding mdbAuditBinding() {
		return
BindingBuilder.bind(mdbQueue()).to(mdbAuditExchange()).with(AUDIT_ROUTING_KEY);
	}
}
------------------------------------------------------
Queue Consumer configuration:

	<context:component-scan base-package="com.test.services.logging">
	</context:component-scan>

	<bean id="messageListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="queueNames" value="test.queue"/>
	    <property name="concurrentConsumers" value="5" />
	    <property name="messageListener" ref="messageListenerAdapter" />
	</bean>

	<bean id="messageListenerAdapter"
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
		<property name="delegate" ref="logHandler" />
		<property name="messageConverter" ref="messageConverter" />
	</bean>

	<bean id="loggingEventHandler"
class="com.test.services.logging.LoggingEventHandler">
	</bean>

	<bean id="logHandler"
class="com.test.services.logging.LoggingProcessor">
		<property name="loggingEventHandler" ref="loggingEventHandler"/>
	</bean>


-------------------------------------------------------
Test Scenario:

First Test: Stopped the Queue Consumer during the test to watch for
the message count using rabbitmqctl list_queues

Executed a program that spawns 10 threads, each thread placing 2000
messages on test.queue.
While messages are pumped into test.queue crashed (hard-failure using
kill -9) RabbitMQ broker on Node A.
Ran the above test for two iterations.
Following are the findings:
- In the first iteration lost only 3 messages. When the program that
was pumping in messages is stopped found that 19997 messages were
piled up in the queue.
  Verified this by running rabbitmqctl list_queues
- In the second iteration lost only 2 messages. When the program that
was pumping in messages is stopped found that 19998 messages were
piled up in the queue.
  Verified this by running rabbitmqctl list_queues

Second Test: Queue consumer was running during the test
Executed the same program used in the first test to pump in messages.
While messages are pumped in, hard crashed RabbitMQ broker on Node A.
Same findings as in the first test. Lost 2 messages during the first
iteration and 3 messages in the second iteration.

These results are better compared to 2.7.0.

I have one question, referring to http://www.rabbitmq.com/ha.html:
As a result of the requeuing, clients that re-consume from the queue
must be aware that they are likely to subsequently receive messages
that they have seen previously

During the second test when I had Queue Consumer was up and running, I
didn't find any duplicate messages processed (though 2-3 messages
lost)
All messages processed are written to log. I have verified that there
were no duplicate messages by running grep command to fetch the Thread
Id on the log file.
Please find the following info:
[ecloud at t-3 log]$ grep Thread-0 central-log.log | wc -l
1999
[ecloud at t-3 log]$ grep Thread-1 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-2 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-3 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-4 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-5 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-6 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-7 central-log.log | wc -l
2000
[ecloud at t-3 log]$ grep Thread-8 central-log.log | wc -l
1999
[ecloud at t-3 log]$ grep Thread-9 central-log.log | wc -l
2000

You notice that there are two lines displaying 1999, this is because
two messages were lost. Otherwise you see 2000 messages processed
from each thread.



More information about the rabbitmq-discuss mailing list