[rabbitmq-discuss] Connection Closing when used by more that one exchange

Marcus Vinitius Baffa mbaffa at opus.com.br
Tue Jun 7 18:12:58 BST 2011


Hi,

 

Iam new to RabbitMQ. I am developing my test applications and I have a
problem that maybe you can help me.

 

I have a Producer, in Flex, that send a message to a direct exchange,
sendMsgExc and a routing key with same name. The consumer, in C#, binds
the exchange and receives the message and sends a reply like this:

 

The message is received in an thread like this:

 

        private void receiveMessagesAmqp() {

 

            IConnection connection = null;

            IModel channel = null;

 

            try {

 

                var connectionFactory = new ConnectionFactory();

                connectionFactory.HostName = "localhost";

                connectionFactory.UserName = "guest";

                connectionFactory.Password = "guest";

 

                connection = connectionFactory.CreateConnection();

                channel = connection.CreateModel();

            }

            catch( Exception ) { }

 

            try {

 

                channel.ExchangeDeclare( "sendMsgExc",
ExchangeType.Direct, false );

                channel.QueueDeclare( "myQueue", false, false, false,
null );

                channel.QueueBind( "myQueue", "sendMsgExc", "sendMsgExc"
);

 

                QueueingBasicConsumer consumer = new
QueueingBasicConsumer( channel );

                String consumerTag = channel.BasicConsume( "myQueue",
false, consumer );

                RabbitMQ.Client.Events.BasicDeliverEventArgs
basicDeliveryEventArgs = null;

 

                while( true ) {

                    try {

                        basicDeliveryEventArgs =
(RabbitMQ.Client.Events.BasicDeliverEventArgs) consumer.Queue.Dequeue();

 

                        IBasicProperties props =
basicDeliveryEventArgs.BasicProperties;

                        byte[] body = basicDeliveryEventArgs.Body;

 

                        string messageContent = Encoding.UTF8.GetString(
basicDeliveryEventArgs.Body );

                        channel.BasicAck(
basicDeliveryEventArgs.DeliveryTag, false );

 

                        MessageVO message =
JsonConvert.DeserializeObject<MessageVO>( messageContent );

                        sendReply( message );

                    }

                    catch( OperationInterruptedException ex ) {

                        // The consumer was removed, either through

                        // channel or connection closure, or through the

                        // action of IModel.BasicCancel().

                        break;

                    }

                }

            }

            catch( Exception ) {

            }

            finally {

 

                channel.Close();

                connection.Close();

            }

                }

 

The sendReply method sends a message back to the Producer, like this:

 

        private void sendReply( MessageVO messageVO ) {

 

            var connectionFactory = new ConnectionFactory();

            connectionFactory.HostName = "localhost";

            connectionFactory.UserName = "guest";

            connectionFactory.Password = "guest";

 

            using( IConnection connection =
connectionFactory.CreateConnection() ) {

                using( IModel model = connection.CreateModel() ) {

 

                    messageVO.task.question = 8;

                    messageVO.task.answer += 10;

                    messageVO.task.msg = messageVO.task.msg + " -
Message Reply";

 

                    messageVO.task.errorMessage = new ErrorMessageVO();

                    messageVO.task.errorMessage.errorNumber = -10;

  messageVO.task.errorMessage.errorMessage =
messageVO.task.errorMessage.errorMessage 

+ " - Message Reply";

 

                    string message = JsonConvert.SerializeObject(
messageVO );

                    IBasicProperties basicProperties =
model.CreateBasicProperties();

                    model.BasicPublish( "ReplyExc", "ReplyExc", false,
false, basicProperties, Encoding.UTF8.GetBytes( 

 message ) );

                }

            }

        }

 

 

As you can see the sendReply opens another connection and channel to
send the reply back to the consumer. I know I can use the RPC pattern
but I need to send a message back like this.

 

When I try to define one connection that would be used by the
receiveMessageAmqp and the sendReply it stops working. 

 

That is the connection becomes global to the class, that implements a
thread, receiveMessageAmqp creates the connection and channel exactly in
the same manner.  The sendReply will now only to create the channel,
will not create a connection, and It will use the same connection
created by receiveMessageAmqp.

 

What happens is that the first message received everything works ok, the
message is received and the reply is sent. When another message is sent
by the producer the consumer receives the message and the line
channel.BasicAck( basicDeliveryEventArgs.DeliveryTag, false ); is
executed an exception is thrown:

base {RabbitMQ.Client.Exceptions.OperationInterruptedException} = {"The
AMQP operation was interrupted: AMQP close-reason, initiated by
Application, code=200, text=\"Connection close forced\", classId=0,
methodId=0, cause="}

 

Debugging the code I could verify that the finally block did not close
neither the connection nor the channel.

 

I presumed I could use one connection per thread and open as many as I
need channels. What is wrong here please ???

 

Thanks in advance.

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110607/6fcc64a9/attachment-0001.htm>


More information about the rabbitmq-discuss mailing list