Hi All,<br><br>I have a fanout exchange and I am binding different queues to this exchange. When I send data to this exchange each queue that is bound to this exchange is getting data as expected. But when I try to connect multiple consumers to one of these fanned out queues, I get the following message:<br>
&quot;Queue message_queue1 has an exclusive consumer. No more consumers allowed.&quot;<br><br>Here is the properties file that has the configuration:<br><br>fanout.properties:<br>--------------------<br>java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory<br>
connectionfactory.qpidConnectionfactory = amqp://guest:guest@xxxxxxx/?brokerlist=&#39;tcp://xxxxx:5672&#39;<br><br># for producer<br>destination.fanoutQueue = BURL:fanout://testcollector.fanout//message_queue?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br>
<br># for consumers<br>destination.fanoutQueue1 = BURL:fanout://testcollector.fanout//message_queue1?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br><br>destination.fanoutQueue2 = BURL:fanout://testcollector.fanout//message_queue2?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br>
<br>destination.fanoutQueue3 = BURL:fanout://testcollector.fanout//message_queue3?durable=&#39;false&#39;&amp;autodelete=&#39;true&#39;&amp;exclusive=&#39;false&#39;<br>-------------------------<br><br>I tried to connect two consumers to message_queue1 and I get the following error:<br>
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to :org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4, commandCode=7, fieldIndex=0, description=resource-locked: Queue message_queue1 has an exclusive consumer. No more consumers allowed. (qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]<br>
<br>I checked the configuration on AMQP Server and it says that message_queue1 is a non-exclusive queue.<br>-bash-4.1$ qpid-stat -q<br>Queues<br>  queue                                  dur  autoDel  excl  msg   msgIn  msgOut  bytes  bytesIn  bytesOut  cons  bind<br>
  message_queue1                              Y                 1    22     21      15    324      309         1     2<br><br><br>Here is the code I am using.<br><br>Producer.java: (queueName=fanoutQueue)<br>----------------<br>
                Properties properties = new Properties();<br>        properties.load(this.getClass().getResourceAsStream(&quot;fanout.properties&quot;));<br><br>        //Create the initial context<br>        Context ctx = new InitialContext(properties);<br>
<br>        // look up destination and connection factory<br>        Destination destination = (Destination)ctx.lookup(queueName);<br>        ConnectionFactory conFac = (ConnectionFactory)ctx.lookup(&quot;qpidConnectionfactory&quot;);<br>
        <br>        Connection connection = conFac.createConnection();<br>        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);<br>        <br>        MessageProducer messageProducer = session.createProducer(destination);<br>
        TextMessage message;<br><br>        // Send a series of messages in a loop<br>        int i=0;<br>        while(true) {<br>            message = session.createTextMessage(&quot;Hello world! &quot;+i);<br>            messageProducer.send(message, DeliveryMode.NON_PERSISTENT,    Message.DEFAULT_PRIORITY, 60*1000);<br>
            i++;<br>            Thread.sleep(1000);<br>        }<br>-------------------<br><br>Consumer.java: (queueName=fanoutQueue1)<br>-----------------<br>                Properties properties = new Properties();<br>        properties.load(this.getClass().getResourceAsStream(&quot;fanout.properties&quot;));<br>
<br>        //Create the initial context<br>        Context ctx = new InitialContext(properties);<br><br>        // look up destination and connection factory<br>        Destination destination = (Destination)ctx.lookup(queueName);<br>
        ConnectionFactory conFac = (ConnectionFactory)ctx.lookup(&quot;qpidConnectionfactory&quot;);<br>        <br>        Connection connection = conFac.createConnection();<br>        connection.setExceptionListener(new ExceptionListener()<br>
        {<br>            public void onException(JMSException jmse)<br>            {<br>                System.err.println(CLASS + &quot;: The sample received an exception through the ExceptionListener&quot;);<br>                System.exit(0);<br>
            }<br>        });<br><br>        System.out.println(CLASS + &quot;: Creating a non-transacted, auto-acknowledged session&quot;);<br>        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);<br>
        <br>        MessageConsumer messageConsumer = session.createConsumer(destination);<br>        connection.start();<br>        <br>        while(true) {<br>            TextMessage message = (TextMessage)messageConsumer.receive(1000);<br>
            if(message != null)<br>                System.out.println(message.getText());<br>        }<br>-----------------<br><br>I am using QPID, but I figured if this is doable in RabbitMQ, then it should be possible in QPID too.<br>
Am I missing something here?<br><br>I really appreciate your help.<br><br>Thanks,<br>Uday<br><br>