Hi All,<br><br>I have a fanout exchange and I am binding different queues to this exchange. When I send data to this exchange each queue that is bound to this exchange is getting data as expected. But when I try to connect multiple consumers to one of these fanned out queues, I get the following message:<br>
&quot;Queue message_queue1 has an exclusive consumer. No more consumers allowed.&quot;<br><br>Here is the properties file that has the configuration:<br><br>fanout.properties:<br>--------------------<br>java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory<br>
connectionfactory.qpidConnectionfactory = amqp://guest:guest@xxxxxxx/?brokerlist=&#39;tcp://xxxxx:5672&#39;<br><br># for producer<br>destination.fanoutQueue = BURL:fanout://testcollector.fanout//message_queue?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br>
<br># for consumers<br>destination.fanoutQueue1 = BURL:fanout://testcollector.fanout//message_queue1?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br><br>destination.fanoutQueue2 = BURL:fanout://testcollector.fanout//message_queue2?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br>
<br>destination.fanoutQueue3 = BURL:fanout://testcollector.fanout//message_queue3?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br>-------------------------<br><br>I tried to connect two consumers to message_queue1 and I get the following error:<br>
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to :org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4, commandCode=7, fieldIndex=0, description=resource-locked: Queue message_queue1 has an exclusive consumer. No more consumers allowed. (qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]<br>
<br>I checked the configuration on AMQP Server and it says that message_queue1 is a non-exclusive queue.<br>-bash-4.1$ qpid-stat -q<br>Queues<br>� queue��������������������������������� dur� autoDel� excl� msg�� msgIn� msgOut� bytes� bytesIn� bytesOut� cons� bind<br>
� message_queue1����������������������������� Y���������������� 1��� 22���� 21����� 15��� 324����� 309�������� 1���� 2<br><br><br>Here is the code I am using.<br><br>Producer.java: (queueName=fanoutQueue)<br>----------------<br>
��������������� Properties properties = new Properties();<br>��� ��� properties.load(this.getClass().getResourceAsStream(&quot;fanout.properties&quot;));<br><br>��� ��� //Create the initial context<br>��� ��� Context ctx = new InitialContext(properties);<br>
<br>��� ��� // look up destination and connection factory<br>��� ��� Destination destination = (Destination)ctx.lookup(queueName);<br>��� ��� ConnectionFactory conFac = (ConnectionFactory)ctx.lookup(&quot;qpidConnectionfactory&quot;);<br>
��� ��� <br>��� ��� Connection connection = conFac.createConnection();<br>��� ��� Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);<br>��� ��� <br>��� ��� MessageProducer messageProducer = session.createProducer(destination);<br>
��� ��� TextMessage message;<br><br>��� ��� // Send a series of messages in a loop<br>��� ��� int i=0;<br>��� ��� while(true) {<br>��� ��� ��� message = session.createTextMessage(&quot;Hello world! &quot;+i);<br>��� ��� ��� messageProducer.send(message, DeliveryMode.NON_PERSISTENT,��� Message.DEFAULT_PRIORITY, 60*1000);<br>
��� ��� ��� i++;<br>��� ��� ��� Thread.sleep(1000);<br>��� ��� }<br>-------------------<br><br>Consumer.java: (queueName=fanoutQueue1)<br>-----------------<br>��������������� Properties properties = new Properties();<br>��� ��� properties.load(this.getClass().getResourceAsStream(&quot;fanout.properties&quot;));<br>
<br>��� ��� //Create the initial context<br>��� ��� Context ctx = new InitialContext(properties);<br><br>��� ��� // look up destination and connection factory<br>��� ��� Destination destination = (Destination)ctx.lookup(queueName);<br>
��� ��� ConnectionFactory conFac = (ConnectionFactory)ctx.lookup(&quot;qpidConnectionfactory&quot;);<br>��� ��� <br>��� ��� Connection connection = conFac.createConnection();<br>��� ��� connection.setExceptionListener(new ExceptionListener()<br>
��� ��� {<br>��� ��� ��� public void onException(JMSException jmse)<br>��� ��� ��� {<br>��� ��� ������� System.err.println(CLASS + &quot;: The sample received an exception through the ExceptionListener&quot;);<br>��� ��� ������� System.exit(0);<br>
��� ��� ��� }<br>��� ��� });<br><br>��� ��� System.out.println(CLASS + &quot;: Creating a non-transacted, auto-acknowledged session&quot;);<br>��� ��� Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);<br>
��� ��� <br>��� ��� MessageConsumer messageConsumer = session.createConsumer(destination);<br>��� ��� connection.start();<br>��� ��� <br>��� ��� while(true) {<br>��� ��� ��� TextMessage message = (TextMessage)messageConsumer.receive(1000);<br>
��� ��� ��� if(message != null)<br>��� ��� ��� ��� System.out.println(message.getText());<br>��� ��� }<br>-----------------<br><br>I am using QPID, but I figured if this is doable in RabbitMQ, then it should be possible in QPID too.<br>
Am I missing something here?<br><br>I really appreciate your help.<br><br>Thanks,<br>Uday<br><br>