[rabbitmq-discuss] connection problem. i want reconnection if failed

Abdul Nasir Khayam khayamabdulnasir at gmail.com
Tue Oct 15 13:49:10 BST 2013


My connecction fails to attempt, plz help me how to correct error. see
exception in log file. and code is below.

Log file:

2013-10-15 07:15:26,943 [18] DEBUG adam602.container: AMQP <-- this is an
amqp message published
2013-10-15 07:15:30,041 [18] DEBUG adam602.container: Exception:
SharedQueue closed
A first chance exception of type 'System.IO.EndOfStreamException' occurred
in RabbitMQ.Client.dll
The thread 0xed0 has exited with code 0 (0x0).
The thread 0xb364 has exited with code 0 (0x0).
The thread 0x2ef0 has exited with code 0 (0x0).
The program '[14372] adam602.vshost.exe: Managed' has exited with code 0
(0x0).


Code:

using System;
using System.Windows.Forms;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Messaging;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System.Collections;

namespace adam602
{
    public delegate void ProcessAmqpMessage(string webcmd, Hashtable
inputHash);

    class dispatchRabbitMq
    {
        protected IModel Model;
        protected IConnection Connection;
        protected string QueueName;
        protected bool isConsuming;

        // used to pass messages back to UI for processing
        public event onReceiveMessage onMessageReceived;

        //internal delegate to run the consuming queue on a seperate thread
        private delegate void ConsumeDelegate();

        //delegate to post to UI thread
        private delegate void showMessageDelegate(string message);

        public ProcessAmqpMessage _processAmqpMessage = null;

        public dispatchRabbitMq()
        {
        }

        public void init()
        {
            QueueName = "testqueue";
            string exchange = "exch";

            ConnectionFactory connectionFactory = new ConnectionFactory();
            try
            {
                connectionFactory.HostName = globals.serverHost;
                connectionFactory.Port = globals.amqpPort;
                connectionFactory.UserName = globals.amqpUser;
                connectionFactory.Password = globals.amqpPass;
                connectionFactory.RequestedHeartbeat = 30;

                //   connectionFactory.Parameters.RequestedHeartbeat = 30;
//if above line not works, then comment that line and uncomment this line


                Connection = connectionFactory.CreateConnection();


                Model = Connection.CreateModel();
                Model.QueueDeclare(QueueName, false, false, false, null);
                Model.QueueBind(QueueName, exchange, "");
                StartConsuming();
            }
            catch (Exception e)
            {
                Console.WriteLine("AMQP.init: " + e.Message);

            }

            this.onMessageReceived += this.parseAmqpData;



}

        private void StartConsuming()
        {
            isConsuming = true;
            ConsumeDelegate c = new ConsumeDelegate(Consume);
            c.BeginInvoke(null, null);
        }


      // new method for check if connection null then call init() method
again for reconnectig x times



        private void Consume()
        {
            QueueingBasicConsumer consumer = new
QueueingBasicConsumer(Model);
            String consumerTag = Model.BasicConsume(QueueName, false,
consumer);
            //Console.WriteLine("CONSUMER-TAG: |" + consumerTag + "|");
            while (isConsuming)
            {
                try
                {
                    RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    IBasicProperties props = e.BasicProperties;
                    byte[] body = e.Body;

                    // ... process the message
                    onMessageReceived(body);

                    Model.BasicAck(e.DeliveryTag, false);

                }
                catch (OperationInterruptedException ex)
                {
                    // The consumer was removed, either through
                    // channel or connection closure, or through the
                    // action of IModel.BasicCancel().
                    Console.WriteLine("OperationInterruptedException: " +
ex.Message);
                    break;
                }
                catch (Exception exx)
                {
                    Console.WriteLine("Exception: " + exx.Message);
                    break;
                }
            }

        }

        public void StopConsuming()
        {
            isConsuming = false;
            if (Connection != null)
                Connection.Close();
            if (Model != null)
                Model.Abort();
        }

        public void parseAmqpData(byte[] message)
        {
            string incomingString =
System.Text.Encoding.UTF8.GetString(message);
            incomingString.Trim();
            incomingString = incomingString.Replace("\n", "");
            Console.WriteLine(String.Format("AMQP <-- {0}",
incomingString));
        }
    }
}



-- 
Best Regards

Abdul Nasir Khayam
Software Engineer
Tel : 00923319449551
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20131015/91cd114c/attachment.htm>


More information about the rabbitmq-discuss mailing list