[rabbitmq-discuss] deadlock at QueueingBasicConsumer.Queue.Dequeue
Emile Joubert
emile at rabbitmq.com
Mon Sep 27 09:17:22 BST 2010
Hi Yiming,
You are using both BasicGet() and BasicConsume() to retrieve messages.
You only need BasicGet(). The library is blocking because you are
attempting to retrieve the message twice.
The .net Client Library User Guide has more information about the
difference between polling and subscription in section 2.7 and 2.8:
http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v2.1.0/rabbitmq-dotnet-client-2.1.0-user-guide.pdf
Regards
Emile
On 25/09/10 12:44, 鲍毅铭 wrote:
> Hi everyone,
>
>
>
> I was blocked when calling QueueingBasicConsumer.Queue.Dequeue(), but
> after several minutes the queue works well and messages can be dequeued.
>
> I run deep into the source code and got that Monitor.Wait(m_queue) was
> blocked.
>
>
>
> Also I got an System.Threading.ThreadAbortException triggered by
> System.Threading.Monitor.ObjWait(Boolean exitContext, Int32
> millisecondsTimeout, Object obj)
>
>
>
> Below is my code written by C#, the version of RabbitMQ.Client.dll is
> 1.8.1.0.
>
>
>
> public static void Send<T>(T data, string serverAddress, string
> exchange, string queuename, string routingKey)
>
> {
>
> byte[] message = SerializeToByteArray(data);
>
> try
>
> {
>
> ConnectionFactory cf = new ConnectionFactory();
>
> cf.Address = serverAddress;
>
>
>
> using (IConnection conn = cf.CreateConnection())
>
> {
>
> using (IModel ch = conn.CreateModel())
>
> {
>
> conn.AutoClose = true;
>
>
>
> ch.ExchangeDeclare(exchange, ExchangeType.Direct);
>
> ch.QueueDeclare(queuename);
>
> ch.QueueBind(queuename, exchange, routingKey,
> false, null);
>
>
>
> IBytesMessageBuilder ibm = new
> BytesMessageBuilder(ch);
>
> ibm.WriteBytes(message);
>
>
>
> ch.BasicPublish(exchange, routingKey,
> (IBasicProperties)ibm.GetContentHeader(), ibm.GetContentBody());
>
>
>
> if (ch.IsOpen)
>
> {
>
> ch.Close();
>
> }
>
> }
>
>
>
> if (conn.IsOpen)
>
> {
>
> conn.Close();
>
> }
>
> }
>
> }
>
> catch (Exception exp)
>
> {
>
> throw exp;
>
> }
>
> }
>
>
>
> public static T Receive<T>(string serverAddress, string queuename)
>
> {
>
> T data = default(T);
>
> ConnectionFactory cf = new ConnectionFactory();
>
> cf.Address = serverAddress;
>
> using (IConnection conn = cf.CreateConnection())
>
> {
>
> using (IModel ch = conn.CreateModel())
>
> {
>
> conn.AutoClose = true;
>
>
>
> ch.QueueDeclare(queuename);
>
>
>
> BasicGetResult result = ch.BasicGet(queuename, false);
>
>
>
> QueueingBasicConsumer consumer = new
> QueueingBasicConsumer(ch);
>
> ch.BasicConsume(queuename, null, consumer);
>
>
>
> BasicDeliverEventArgs e = consumer.Queue.Dequeue()
> as BasicDeliverEventArgs;
>
> if (e != null)
>
> {
>
> IBasicProperties props = e.BasicProperties;
>
> byte[] body = e.Body;
>
> data = DeserializeFromByteArray<T>(body);
>
>
>
> ch.BasicAck(e.DeliveryTag, false);
>
> }
>
>
>
> if (ch.IsOpen)
>
> {
>
> ch.Close();
>
> }
>
> }
>
>
>
> if (conn.IsOpen)
>
> {
>
> conn.Close();
>
> }
>
> }
>
>
>
> return data;
>
> }
>
>
>
> I will be appreciated if anybody can help with this issue. Thank you in
> advance.
>
>
>
> Yiming
More information about the rabbitmq-discuss
mailing list