[rabbitmq-discuss] Unable to connect multiple consumers to a queue bound to fanned out exchange

Mitra77 udaymitra4u at gmail.com
Thu Jul 14 01:12:40 BST 2011


Hi All,

I have a fanout exchange and I am binding different queues to this exchange.
When I send data to this exchange each queue that is bound to this exchange
is getting data as expected. But when I try to connect multiple consumers to
one of these fanned out queues, I get the following message:
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."

Here is the properties file that has the configuration:

fanout.properties:
--------------------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionfactory =
amqp://guest:guest@xxxxxxx/?brokerlist='tcp://xxxxx:5672'

# for producer
destination.fanoutQueue =
BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'

# for consumers
destination.fanoutQueue1 =
BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue2 =
BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue3 =
BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'
-------------------------

I tried to connect two consumers to message_queue1 and I get the following
error:
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
:org.apache.qpid.AMQException: ch=0 id=0
ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
commandCode=7, fieldIndex=0, description=resource-locked: Queue
message_queue1 has an exclusive consumer. No more consumers allowed.
(qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]

I checked the configuration on AMQP Server and it says that message_queue1
is a non-exclusive queue.
-bash-4.1$ qpid-stat -q
Queues
  queue                                  dur  autoDel  excl  msg   msgIn 
msgOut  bytes  bytesIn  bytesOut  cons  bind
  message_queue1                              Y                 1    22    
21      15    324      309         1     2


Here is the code I am using.

Producer.java: (queueName=fanoutQueue)
----------------
                Properties properties = new Properties();
		properties.load(this.getClass().getResourceAsStream("fanout.properties"));

		//Create the initial context
		Context ctx = new InitialContext(properties);

		// look up destination and connection factory
		Destination destination = (Destination)ctx.lookup(queueName);
		ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
		
		Connection connection = conFac.createConnection();
		Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		
		MessageProducer messageProducer = session.createProducer(destination);
		TextMessage message;

		// Send a series of messages in a loop
		int i=0;
		while(true) {
			message = session.createTextMessage("Hello world! "+i);
		    messageProducer.send(message, DeliveryMode.NON_PERSISTENT,   
Message.DEFAULT_PRIORITY, 60*1000);
			i++;
			Thread.sleep(1000);
		}
-------------------

Consumer.java: (queueName=fanoutQueue1)
-----------------
                Properties properties = new Properties();
		properties.load(this.getClass().getResourceAsStream("fanout.properties"));

		//Create the initial context
		Context ctx = new InitialContext(properties);

		// look up destination and connection factory
		Destination destination = (Destination)ctx.lookup(queueName);
		ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
		
		Connection connection = conFac.createConnection();
		connection.setExceptionListener(new ExceptionListener()
		{
		    public void onException(JMSException jmse)
		    {
		        System.err.println(CLASS + ": The sample received an exception
through the ExceptionListener");
		        System.exit(0);
		    }
		});

		System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged
session");
		Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		
		MessageConsumer messageConsumer = session.createConsumer(destination);
		connection.start();
		
		while(true) {
			TextMessage message = (TextMessage)messageConsumer.receive(1000);
			if(message != null)
				System.out.println(message.getText());
		}
-----------------

I am using QPID, but I figured if this is doable in RabbitMQ, then it should
be possible in QPID too.
Am I missing something here?

I really appreciate your help.

Thanks,
Uday

-- 
View this message in context: http://old.nabble.com/Unable-to-connect-multiple-consumers-to-a-queue-bound-to-fanned-out-exchange-tp32058027p32058027.html
Sent from the RabbitMQ mailing list archive at Nabble.com.



More information about the rabbitmq-discuss mailing list