using System; using System.IO; using System.Linq; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.MessagePatterns; using System.Windows.Forms; using System.Collections.Generic; using System.Text; namespace Oryo.eTaksi.ServicesModule.Services { public class Subscriber { string serverAddress = "dev.rabbitmq.com:5672"; string exchange = "sub"; string exchangeType = "topic"; string realm = "/data"; string routingKey; string RabbitQueryID; bool threadWorking = false; bool reconnect = true; private object DataLock = new object(); ConnectionFactory cf = new ConnectionFactory(); private IEntityTranslatorService _translator = null; [ServiceDependency] public IEntityTranslatorService Translator { get { return _translator; } set { _translator = value; } } private Thread subsc = null; private void SubscribeThread() { bool working = true; while (working) { try { count++; if (count > 1) { if (count == 5) { //MessageBox.Show("error"); } } else Subscribe(); } catch (Exception e) { System.Diagnostics.Debug.WriteLine("Exeption inside SubscribeThread: ");// + e); } finally { System.Diagnostics.Debug.WriteLine("Calling finally inside SubscribeThread"); } lock (DataLock) { if (this.reconnect) { System.Diagnostics.Debug.WriteLine("Reconnect was true..."); int v = count * count; if (v > 20) v = 20; System.Threading.Thread.Sleep(v * 1000); } else { System.Diagnostics.Debug.WriteLine("Reconnect was false, finishing..."); } working = this.reconnect; this.threadWorking = working; } } } void start() { this.subsc = new Thread(new ThreadStart(SubscribeThread)); this.subsc.IsBackground = true; this.subsc.Name = "RabbitMQThread"; this.threadWorking = true; subsc.Start(); } public void Close() { lock (DataLock) { this.reconnect = false; this.threadWorking = false; } if (conn == null) return; try { conn.Close(); } catch { conn.Abort(); } //if (this.subsc != null) //{ // this.subsc.Join(new TimeSpan(0, 0, 5)); //} } IConnection conn; void Subscribe() { bool continueWorking = true; try { ConnectionParameters pars = cf.Parameters; pars.UserName = UserName; pars.Password = Password; //TODO: This is the place where it hangs !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! using (conn = cf.CreateConnection(serverAddress)) { using (IModel ch = conn.CreateModel()) { Subscription sub; var a = ch.QueueDeclare(RabbitQueryID, false, false, true, true, true, null); sub = new Subscription(ch, RabbitQueryID, false, exchange, exchangeType, routingKey); sub.Bind("ToAll", "fanout", ""); sub.Bind("Direct", "direct", UserID); count = 0; foreach (BasicDeliverEventArgs ee in sub) { OnNewMessage(ee); sub.Ack(ee); lock (DataLock) { continueWorking = this.threadWorking; } if (!continueWorking) { break; } } if (continueWorking) { OnStatusUpdate("disconected..."); } //try //{ // sub.Close(); // //ch.Close(200, "Goodbye"); // conn.Close(2000); //} //catch //{ } } } } catch (Exception e) { System.Diagnostics.Debug.WriteLine("Exeption inside subscribe thread2: " + e); } //}); //subsc.IsBackground = true; //subsc.Start(); } protected virtual void OnNewMessage(BasicDeliverEventArgs eventArgs) { //code removed } } }