<div dir="ltr">And the consume is called from another thread...<div><br></div><div><div>void *con(void *arg) {</div><div>� � � � � � � � amqp_bytes_t queue;</div><div>� � � � � � � � printf("%s %s\n",__FUNCTION__,(char *) arg);</div>

<div><br></div><div>� � � � � � � � Debug("%s() - Get an mq connection\n", __FUNCTION__);</div><div>� � � � � � � � amqConnection *conn = �util.getAMQConnection ();</div><div>� � � � � � � � if (conn == NULL) {</div>

<div>� � � � � � � � � � � � ErrLog("%s() - get connection failed\n", __FUNCTION__);</div><div>� � � � � � � � � � � � pthread_exit(0) ;</div><div>� � � � � � � � }</div><div>� � � � � � � � Debug("%s() - declare exchange %s\n", __FUNCTION__, EXCHANGE);</div>

<div>� � � � � � � � int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);</div><div>� � � � � � � � if (rc != 0) {</div><div>� � � � � � � � � � � � ErrLog("%s() - declare exchange %s %s failed\n", __FUNCTION__, EXCHANGE, QUEUETYPE);</div>

<div>� � � � � � � � � � � � util.returnAMQConnection(conn);</div><div>� � � � � � � � � � � � pthread_exit(0) ;</div><div>� � � � � � � � }</div><div><br></div><div>� � � � � � � � pthread_mutex_trylock(&mutex_lock);</div>

<div>� � � � � � � � printf("Consumer Thread %s: Lock acquired and trying to consume\n",(char*) arg);</div><div>� � � � � � � � messageCounter = conn->Qlength(QUEUENAME);</div><div>� � � � � � � � //while (messageCounter == 0)</div>

<div>� � � � � � � � {</div><div>� � � � � � � � � � � � printf("Consumer thread: %s waiting for signal as messageCounter = 0\n",(char *) arg);</div><div>� � � � � � � � � � � � //pthread_cond_wait(&cond_lock, &mutex_lock);</div>

<div>� � � � � � � � }</div><div>� � � � � � � � printf("Consumer thread: %s Got signal !!! %d messages in queue \n", (char *) arg, messageCounter);</div><div>� � � � � � � � Debug("%s() - consume �%s\n", __FUNCTION__,ROUTINGKEY );</div>

<div><br></div><div>� � � � � � � � std::string message = conn->consume(QUEUENAME, EXCHANGE, ROUTINGKEY);</div><div>� � � � � � � � if (0 == message.length()) {</div><div>� � � � � � � � � � � � ErrLog("%s() - consume queue %s failed\n", __FUNCTION__, QUEUENAME);</div>

<div>� � � � � � � � � � � � printf("%s() - consume queue %s failed\n", __FUNCTION__, QUEUENAME);</div><div>� � � � � � � � � � � � rc = conn->deleteQueue(QUEUENAME);</div><div>� � � � � � � � � � � � if (rc != 0) {</div>

<div>� � � � � � � � � � � � � � � � ErrLog("%s() - delete queue %s failed\n", __FUNCTION__, QUEUENAME);</div><div>� � � � � � � � � � � � }</div><div>� � � � � � � � � � � � util.returnAMQConnection(conn);</div>

<div>� � � � � � � � � � � � pthread_exit(0) ;</div><div>� � � � � � � � }</div><div><snip></div></div><div><br></div><div>� �At the end we call pthread_mutex_unlock( ) so that other threads can consume too !!</div>

<div><br></div><div><br></div><div class="gmail_extra"><br><br><div class="gmail_quote">On Mon, Dec 16, 2013 at 9:43 PM, kingsmasher1 <span dir="ltr"><<a href="mailto:raj.kumar.sanpui@gmail.com" target="_blank">raj.kumar.sanpui@gmail.com</a>></span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Dear All,<br>
<br>
Thanks for the response, yeah the message count in the GUI as well as the<br>
rabbitmqctl shows as 0.<br>
<br>
This is my consume method: (i just commented all the unbind calls to make<br>
sure, after one consume the queue is not getting unbinded)<br>
<br>
std::string amqConnection::consume() {<br>
� � � � Debug("%s - \n", __FUNCTION__);<br>
� � � � amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,<br>
amqp_empty_table);<br>
� � � � amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);<br>
� � � � int rc = check_amqp_error(res, "consume");<br>
� � � � if (rc != 0) return NULL;<br>
<br>
� � � � amqp_frame_t frame;<br>
� � � � int result;<br>
<br>
� � � � amqp_basic_deliver_t *d;<br>
� � � � amqp_basic_properties_t *p;<br>
� � � � size_t body_target;<br>
� � � � size_t body_received;<br>
<br>
� � � � amqp_maybe_release_buffers(conn);<br>
� � � � Debug("%s - wait frame\n", __FUNCTION__);<br>
� � � � result = amqp_simple_wait_frame(conn, &frame);<br>
� � � � Debug("%s Result %d\n",__FUNCTION__, result);<br>
� � � � if (result < 0)<br>
� � � � � � � � return "";<br>
<br>
� � � � Debug("%s Frame type %d, channel %d\n",__FUNCTION__, frame.frame_type,<br>
frame.channel);<br>
� � � � if (frame.frame_type != AMQP_FRAME_METHOD)<br>
� � � � � � � � return "";<br>
<br>
� � � � Debug("%s Method %s\n", __FUNCTION__,<br>
amqp_method_name(<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a>));<br>
� � � � if (<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br>
� � � � � � � � return "";<br>
<br>
� � � � d = (amqp_basic_deliver_t *) frame.payload.method.decoded;<br>
� � � � Debug("%s Delivery %u, exchange %.*s routingkey %.*s\n",__FUNCTION__,<br>
� � � � � � � � � � � � (unsigned) d->delivery_tag,<br>
� � � � � � � � � � � � (int) d->exchange.len, (char *) d->exchange.bytes,<br>
� � � � � � � � � � � � (int) d->routing_key.len, (char *) d->routing_key.bytes);<br>
<br>
� � � � result = amqp_simple_wait_frame(conn, &frame);<br>
� � � � if (result < 0)<br>
� � � � � � � � return "";<br>
<br>
� � � � if (frame.frame_type != AMQP_FRAME_HEADER) {<br>
� � � � � � � � ErrLog( "%s Expected header!",__FUNCTION__);<br>
� � � � � � � � return "";<br>
� � � � }<br>
� � � � p = (amqp_basic_properties_t *) frame.payload.properties.decoded;<br>
� � � � if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {<br>
� � � � � � � � Debug("%s Content-type: %.*s\n",__FUNCTION__,<br>
� � � � � � � � � � � � � � � � (int) p->content_type.len, (char *) p->content_type.bytes);<br>
� � � � }<br>
� � � � Debug("%s ----\n", __FUNCTION__);<br>
<br>
� � � � body_target = frame.payload.properties.body_size;<br>
� � � � body_received = 0;<br>
� � � � std::string response="";<br>
� � � � while (body_received < body_target) {<br>
� � � � � � � � result = amqp_simple_wait_frame(conn, &frame);<br>
� � � � � � � � if (result < 0)<br>
� � � � � � � � � � � � break;<br>
<br>
� � � � � � � � if (frame.frame_type != AMQP_FRAME_BODY) {<br>
� � � � � � � � � � � � ErrLog( " %s Expected body!", __FUNCTION__);<br>
� � � � � � � � � � � � break;<br>
� � � � � � � � }<br>
<br>
� � � � � � � � body_received += frame.payload.body_fragment.len;<br>
� � � � � � � � //assert(body_received <= body_target);<br>
<br>
� � � � � � � � //amqp_dump(frame.payload.body_fragment.bytes,<br>
� � � � � � � � // � � � � � � �frame.payload.body_fragment.len);<br>
� � � � � � � � response += std::string((char *) frame.payload.body_fragment.bytes,<br>
body_received );<br>
� � � � }<br>
<br>
<br>
<br>
� � � � return response;<br>
}<br>
<br>
<br>
<br>
--<br>
View this message in context: <a href="http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html" target="_blank">http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html</a><br>


<div class="HOEnZb"><div class="h5">Sent from the RabbitMQ mailing list archive at Nabble.com.<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</div></div></blockquote></div><br></div></div>