[rabbitmq-discuss] Unable to share fanned out queue between two cosnumers

Uday Mitra udaymitra4u at gmail.com
Thu Jul 14 06:01:56 BST 2011


Hi All,

I have a fanout exchange and I am binding different queues to this exchange.
When I send data to this exchange each queue that is bound to this exchange
is getting data as expected. But when I try to connect multiple consumers to
one of these fanned out queues, I get the following message:
"Queue message_queue1 has an exclusive consumer. No more consumers allowed."

Here is the properties file that has the configuration:

fanout.properties:
--------------------
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
connectionfactory.qpidConnectionfactory = amqp://guest:guest@xxxxxxx
/?brokerlist='tcp://xxxxx:5672'

# for producer
destination.fanoutQueue =
BURL:fanout://testcollector.fanout//message_queue?durable='false'&autodelete='true'&exclusive='false'

# for consumers
destination.fanoutQueue1 =
BURL:fanout://testcollector.fanout//message_queue1?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue2 =
BURL:fanout://testcollector.fanout//message_queue2?durable='false'&autodelete='true'&exclusive='false'

destination.fanoutQueue3 =
BURL:fanout://testcollector.fanout//message_queue3?durable='false'&autodelete='true'&exclusive='false'
-------------------------

I tried to connect two consumers to message_queue1 and I get the following
error:
INFO org.apache.qpid.client.AMQConnection - Closing AMQConnection due to
:org.apache.qpid.AMQException: ch=0 id=0
ExecutionException(errorCode=RESOURCE_LOCKED, commandId=6, classCode=4,
commandCode=7, fieldIndex=0, description=resource-locked: Queue
message_queue1 has an exclusive consumer. No more consumers allowed.
(qpid/broker/Queue.cpp:385), errorInfo={}) [error code 405: Already exists]

I checked the configuration on AMQP Server and it says that message_queue1
is a non-exclusive queue.
-bash-4.1$ qpid-stat -q
Queues
  queue                                  dur  autoDel  excl  msg   msgIn
msgOut  bytes  bytesIn  bytesOut  cons  bind
  message_queue1                              Y                 1    22
21      15    324      309         1     2


Here is the code I am using.

Producer.java: (queueName=fanoutQueue)
----------------
                Properties properties = new Properties();

properties.load(this.getClass().getResourceAsStream("fanout.properties"));

        //Create the initial context
        Context ctx = new InitialContext(properties);

        // look up destination and connection factory
        Destination destination = (Destination)ctx.lookup(queueName);
        ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");

        Connection connection = conFac.createConnection();
        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

        MessageProducer messageProducer =
session.createProducer(destination);
        TextMessage message;

        // Send a series of messages in a loop
        int i=0;
        while(true) {
            message = session.createTextMessage("Hello world! "+i);
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY, 60*1000);
            i++;
            Thread.sleep(1000);
        }
-------------------

Consumer.java: (queueName=fanoutQueue1)
-----------------
                Properties properties = new Properties();

properties.load(this.getClass().getResourceAsStream("fanout.properties"));

        //Create the initial context
        Context ctx = new InitialContext(properties);

        // look up destination and connection factory
        Destination destination = (Destination)ctx.lookup(queueName);
        ConnectionFactory conFac =
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");

        Connection connection = conFac.createConnection();
        connection.setExceptionListener(new ExceptionListener()
        {
            public void onException(JMSException jmse)
            {
                System.err.println(CLASS + ": The sample received an
exception through the ExceptionListener");
                System.exit(0);
            }
        });

        System.out.println(CLASS + ": Creating a non-transacted,
auto-acknowledged session");
        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

        MessageConsumer messageConsumer =
session.createConsumer(destination);
        connection.start();

        while(true) {
            TextMessage message =
(TextMessage)messageConsumer.receive(1000);
            if(message != null)
                System.out.println(message.getText());
        }
-----------------

I am using QPID, but I figured if this is doable in RabbitMQ, then it should
be possible in QPID too.
Am I missing something here?

I really appreciate your help.

Thanks,
Uday
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110713/50a63a4d/attachment.htm>


More information about the rabbitmq-discuss mailing list