[rabbitmq-discuss] Unable to share fanned out queue between two cosnumers
Uday Mitra
udaymitra4u at gmail.com
Thu Jul 14 06:01:56 BST 2011
Hi All,
I have a fanout exchange and I am binding different queues to this exchange.
When I send data to this exchange each queue that is bound to this exchange
is getting data as expected. But when I try to connect multiple consumers to
one of these fanned out queues, I get the following message:
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."
Here is the properties file that has the configuration:
fanout.properties:
--------------------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionfactory = amqp://guest:guest@xxxxxxx
/?brokerlist='tcp://xxxxx:5672'
# for producer
destination.fanoutQueue =
BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'
# for consumers
destination.fanoutQueue1 =
BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'
destination.fanoutQueue2 =
BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'
destination.fanoutQueue3 =
BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'
-------------------------
I tried to connect two consumers to message_queue1 and I get the following
error:
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
:org.apache.qpid.AMQException: ch=0 id=0
ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
commandCode=7, fieldIndex=0, description=resource-locked: Queue
message_queue1 has an exclusive consumer. No more consumers allowed.
(qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]
I checked the configuration on AMQP Server and it says that message_queue1
is a non-exclusive queue.
-bash-4.1$ qpid-stat -q
Queues
queue dur autoDel excl msg msgIn
msgOut bytes bytesIn bytesOut cons bind
message_queue1 Y 1 22
21 15 324 309 1 2
Here is the code I am using.
Producer.java: (queueName=fanoutQueue)
----------------
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("fanout.properties"));
//Create the initial context
Context ctx = new InitialContext(properties);
// look up destination and connection factory
Destination destination = (Destination)ctx.lookup(queueName);
ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
Connection connection = conFac.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer =
session.createProducer(destination);
TextMessage message;
// Send a series of messages in a loop
int i=0;
while(true) {
message = session.createTextMessage("Hello world! "+i);
messageProducer.send(message, DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY, 60*1000);
i++;
Thread.sleep(1000);
}
-------------------
Consumer.java: (queueName=fanoutQueue1)
-----------------
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("fanout.properties"));
//Create the initial context
Context ctx = new InitialContext(properties);
// look up destination and connection factory
Destination destination = (Destination)ctx.lookup(queueName);
ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
Connection connection = conFac.createConnection();
connection.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException jmse)
{
System.err.println(CLASS + ": The sample received an
exception through the ExceptionListener");
System.exit(0);
}
});
System.out.println(CLASS + ": Creating a non-transacted,
auto-acknowledged session");
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer =
session.createConsumer(destination);
connection.start();
while(true) {
TextMessage message =
(TextMessage)messageConsumer.receive(1000);
if(message != null)
System.out.println(message.getText());
}
-----------------
I am using QPID, but I figured if this is doable in RabbitMQ, then it should
be possible in QPID too.
Am I missing something here?
I really appreciate your help.
Thanks,
Uday
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110713/50a63a4d/attachment.htm>
More information about the rabbitmq-discuss
mailing list