Hi All,<br><br>I have a fanout exchange and I am binding different queues to this exchange. When I send data to this exchange each queue that is bound to this exchange is getting data as expected. But when I try to connect multiple consumers to one of these fanned out queues, I get the following message:<br>
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."<br><br>Here is the properties file that has the configuration:<br><br>fanout.properties:<br>--------------------<br>java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory<br>
connectionfactory.qpidConnectionfactory = amqp://guest:guest@xxxxxxx/?brokerlist='tcp://xxxxx:5672'<br><br># for producer<br>destination.fanoutQueue = BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'<br>
<br># for consumers<br>destination.fanoutQueue1 = BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'<br><br>destination.fanoutQueue2 = BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'<br>
<br>destination.fanoutQueue3 = BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'<br>-------------------------<br><br>I tried to connect two consumers to message_queue1 and I get the following error:<br>
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to :org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4, commandCode=7, fieldIndex=0, description=resource-locked: Queue message_queue1 has an exclusive consumer. No more consumers allowed. (qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]<br>
<br>I checked the configuration on AMQP Server and it says that message_queue1 is a non-exclusive queue.<br>-bash-4.1$ qpid-stat -q<br>Queues<br> queue dur autoDel excl msg msgIn msgOut bytes bytesIn bytesOut cons bind<br>
message_queue1 Y 1 22 21 15 324 309 1 2<br><br><br>Here is the code I am using.<br><br>Producer.java: (queueName=fanoutQueue)<br>----------------<br>
Properties properties = new Properties();<br> properties.load(this.getClass().getResourceAsStream("fanout.properties"));<br><br> //Create the initial context<br> Context ctx = new InitialContext(properties);<br>
<br> // look up destination and connection factory<br> Destination destination = (Destination)ctx.lookup(queueName);<br> ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");<br>
<br> Connection connection = conFac.createConnection();<br> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);<br> <br> MessageProducer messageProducer = session.createProducer(destination);<br>
TextMessage message;<br><br> // Send a series of messages in a loop<br> int i=0;<br> while(true) {<br> message = session.createTextMessage("Hello world! "+i);<br> messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, 60*1000);<br>
i++;<br> Thread.sleep(1000);<br> }<br>-------------------<br><br>Consumer.java: (queueName=fanoutQueue1)<br>-----------------<br> Properties properties = new Properties();<br> properties.load(this.getClass().getResourceAsStream("fanout.properties"));<br>
<br> //Create the initial context<br> Context ctx = new InitialContext(properties);<br><br> // look up destination and connection factory<br> Destination destination = (Destination)ctx.lookup(queueName);<br>
ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");<br> <br> Connection connection = conFac.createConnection();<br> connection.setExceptionListener(new ExceptionListener()<br>
{<br> public void onException(JMSException jmse)<br> {<br> System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");<br> System.exit(0);<br>
}<br> });<br><br> System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");<br> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);<br>
<br> MessageConsumer messageConsumer = session.createConsumer(destination);<br> connection.start();<br> <br> while(true) {<br> TextMessage message = (TextMessage)messageConsumer.receive(1000);<br>
if(message != null)<br> System.out.println(message.getText());<br> }<br>-----------------<br><br>I am using QPID, but I figured if this is doable in RabbitMQ, then it should be possible in QPID too.<br>
Am I missing something here?<br><br>I really appreciate your help.<br><br>Thanks,<br>Uday<br><br>