<div dir="ltr"><div>My connecction fails to attempt, plz help me how to correct error. see exception in log file. and code is below.</div><div><br></div>Log file: <div><br></div><div><div>2013-10-15 07:15:26,943 [18] DEBUG adam602.container: AMQP <-- this is an amqp message published</div>
<div>2013-10-15 07:15:30,041 [18] DEBUG adam602.container: Exception: SharedQueue closed</div><div>A first chance exception of type 'System.IO.EndOfStreamException' occurred in RabbitMQ.Client.dll</div><div>The thread 0xed0 has exited with code 0 (0x0).</div>
<div>The thread 0xb364 has exited with code 0 (0x0).</div><div>The thread 0x2ef0 has exited with code 0 (0x0).</div><div>The program '[14372] adam602.vshost.exe: Managed' has exited with code 0 (0x0).</div><div><br>
</div><div><br></div><div>Code:</div><div><br></div><div><div>using System;</div><div>using System.Windows.Forms;</div><div>using System.Collections.Generic;</div><div>using System.Linq;</div><div>using System.Text;</div>
<div>using System.Messaging;</div><div>using RabbitMQ.Client;</div><div>using RabbitMQ.Client.Exceptions;</div><div>using System.Collections;</div><div><br></div><div>namespace adam602</div><div>{</div><div> public delegate void ProcessAmqpMessage(string webcmd, Hashtable inputHash);</div>
<div><br></div><div> class dispatchRabbitMq</div><div> {</div><div> protected IModel Model;</div><div> protected IConnection Connection;</div><div> protected string QueueName;</div><div> protected bool isConsuming;</div>
<div><br></div><div> // used to pass messages back to UI for processing</div><div> public event onReceiveMessage onMessageReceived;</div><div><br></div><div> //internal delegate to run the consuming queue on a seperate thread</div>
<div> private delegate void ConsumeDelegate();</div><div><br></div><div> //delegate to post to UI thread</div><div> private delegate void showMessageDelegate(string message);</div><div><br></div><div>
public ProcessAmqpMessage _processAmqpMessage = null;</div><div><br></div><div> public dispatchRabbitMq()</div><div> {</div><div> }</div><div><br></div><div> public void init()</div><div>
{</div><div> QueueName = "testqueue";</div><div> string exchange = "exch";</div><div><br></div><div> ConnectionFactory connectionFactory = new ConnectionFactory();</div>
<div> try</div><div> {</div><div> connectionFactory.HostName = globals.serverHost;</div><div> connectionFactory.Port = globals.amqpPort;</div><div> connectionFactory.UserName = globals.amqpUser;</div>
<div> connectionFactory.Password = globals.amqpPass;</div><div> connectionFactory.RequestedHeartbeat = 30;</div><div><br></div><div> // connectionFactory.Parameters.RequestedHeartbeat = 30; //if above line not works, then comment that line and uncomment this line</div>
<div><br></div><div><br></div><div> Connection = connectionFactory.CreateConnection();</div><div><br></div><div> </div><div> Model = Connection.CreateModel();</div><div> Model.QueueDeclare(QueueName, false, false, false, null);</div>
<div> Model.QueueBind(QueueName, exchange, "");</div><div> StartConsuming();</div><div> }</div><div> catch (Exception e)</div><div> {</div><div> Console.WriteLine("AMQP.init: " + e.Message);</div>
<div> </div><div> }</div><div><br></div><div> this.onMessageReceived += this.parseAmqpData;</div><div> </div><div> </div><div><br></div><div>}</div><div><br></div><div> private void StartConsuming()</div>
<div> {</div><div> isConsuming = true;</div><div> ConsumeDelegate c = new ConsumeDelegate(Consume);</div><div> c.BeginInvoke(null, null);</div><div> }</div><div><br></div><div>
<br></div><div> // new method for check if connection null then call init() method again for reconnectig x times</div><div> </div><div><br></div><div><br></div><div> private void Consume()</div><div> {</div>
<div> QueueingBasicConsumer consumer = new QueueingBasicConsumer(Model);</div><div> String consumerTag = Model.BasicConsume(QueueName, false, consumer);</div><div> //Console.WriteLine("CONSUMER-TAG: |" + consumerTag + "|");</div>
<div> while (isConsuming)</div><div> {</div><div> try</div><div> {</div><div> RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();</div>
<div> IBasicProperties props = e.BasicProperties;</div><div> byte[] body = e.Body;</div><div><br></div><div> // ... process the message</div><div> onMessageReceived(body);</div>
<div><br></div><div> Model.BasicAck(e.DeliveryTag, false);</div><div><br></div><div> }</div><div> catch (OperationInterruptedException ex)</div><div> {</div>
<div> // The consumer was removed, either through</div><div> // channel or connection closure, or through the</div><div> // action of IModel.BasicCancel().</div><div>
Console.WriteLine("OperationInterruptedException: " + ex.Message);</div><div> break;</div><div> }</div><div> catch (Exception exx)</div><div> {</div>
<div> Console.WriteLine("Exception: " + exx.Message);</div><div> break;</div><div> }</div><div> }</div><div><br></div><div> }</div><div><br>
</div><div> public void StopConsuming()</div><div> {</div><div> isConsuming = false;</div><div> if (Connection != null)</div><div> Connection.Close();</div><div> if (Model != null)</div>
<div> Model.Abort();</div><div> }</div><div><br></div><div> public void parseAmqpData(byte[] message)</div><div> {</div><div> string incomingString = System.Text.Encoding.UTF8.GetString(message);</div>
<div> incomingString.Trim();</div><div> incomingString = incomingString.Replace("\n", "");</div><div> Console.WriteLine(String.Format("AMQP <-- {0}", incomingString));</div>
<div> }</div><div> }</div><div>}</div></div><div><br></div><div><br clear="all"><div><br></div>-- <br><div dir="ltr"><font face="verdana, sans-serif">Best Regards</font><div><font face="verdana, sans-serif"><br>
</font><div><div><div><div><font face="verdana, sans-serif">Abdul Nasir Khayam</font></div></div></div></div></div><div><font face="verdana, sans-serif">Software Engineer</font></div><div><font face="verdana, sans-serif">Tel : 00923319449551</font></div>
</div>
</div></div></div>