[rabbitmq-discuss] Unable to connect multiple consumers to a queue bound to fanned out exchange
Mitra77
udaymitra4u at gmail.com
Thu Jul 14 01:12:40 BST 2011
Hi All,
I have a fanout exchange and I am binding different queues to this exchange.
When I send data to this exchange each queue that is bound to this exchange
is getting data as expected. But when I try to connect multiple consumers to
one of these fanned out queues, I get the following message:
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."
Here is the properties file that has the configuration:
fanout.properties:
--------------------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionfactory =
amqp://guest:guest@xxxxxxx/?brokerlist='tcp://xxxxx:5672'
# for producer
destination.fanoutQueue =
BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'
# for consumers
destination.fanoutQueue1 =
BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'
destination.fanoutQueue2 =
BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'
destination.fanoutQueue3 =
BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'
-------------------------
I tried to connect two consumers to message_queue1 and I get the following
error:
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
:org.apache.qpid.AMQException: ch=0 id=0
ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
commandCode=7, fieldIndex=0, description=resource-locked: Queue
message_queue1 has an exclusive consumer. No more consumers allowed.
(qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]
I checked the configuration on AMQP Server and it says that message_queue1
is a non-exclusive queue.
-bash-4.1$ qpid-stat -q
Queues
queue dur autoDel excl msg msgIn
msgOut bytes bytesIn bytesOut cons bind
message_queue1 Y 1 22
21 15 324 309 1 2
Here is the code I am using.
Producer.java: (queueName=fanoutQueue)
----------------
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("fanout.properties"));
//Create the initial context
Context ctx = new InitialContext(properties);
// look up destination and connection factory
Destination destination = (Destination)ctx.lookup(queueName);
ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
Connection connection = conFac.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(destination);
TextMessage message;
// Send a series of messages in a loop
int i=0;
while(true) {
message = session.createTextMessage("Hello world! "+i);
messageProducer.send(message, DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY, 60*1000);
i++;
Thread.sleep(1000);
}
-------------------
Consumer.java: (queueName=fanoutQueue1)
-----------------
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("fanout.properties"));
//Create the initial context
Context ctx = new InitialContext(properties);
// look up destination and connection factory
Destination destination = (Destination)ctx.lookup(queueName);
ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
Connection connection = conFac.createConnection();
connection.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException jmse)
{
System.err.println(CLASS + ": The sample received an exception
through the ExceptionListener");
System.exit(0);
}
});
System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged
session");
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(destination);
connection.start();
while(true) {
TextMessage message = (TextMessage)messageConsumer.receive(1000);
if(message != null)
System.out.println(message.getText());
}
-----------------
I am using QPID, but I figured if this is doable in RabbitMQ, then it should
be possible in QPID too.
Am I missing something here?
I really appreciate your help.
Thanks,
Uday
--
View this message in context: http://old.nabble.com/Unable-to-connect-multiple-consumers-to-a-queue-bound-to-fanned-out-exchange-tp32058027p32058027.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list