<div dir="ltr">And the consume is called from another thread...<div><br></div><div><div>void *con(void *arg) {</div><div> amqp_bytes_t queue;</div><div> printf("%s %s\n",__FUNCTION__,(char *) arg);</div>
<div><br></div><div> Debug("%s() - Get an mq connection\n", __FUNCTION__);</div><div> amqConnection *conn = util.getAMQConnection ();</div><div> if (conn == NULL) {</div>
<div> ErrLog("%s() - get connection failed\n", __FUNCTION__);</div><div> pthread_exit(0) ;</div><div> }</div><div> Debug("%s() - declare exchange %s\n", __FUNCTION__, EXCHANGE);</div>
<div> int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);</div><div> if (rc != 0) {</div><div> ErrLog("%s() - declare exchange %s %s failed\n", __FUNCTION__, EXCHANGE, QUEUETYPE);</div>
<div> util.returnAMQConnection(conn);</div><div> pthread_exit(0) ;</div><div> }</div><div><br></div><div> pthread_mutex_trylock(&mutex_lock);</div>
<div> printf("Consumer Thread %s: Lock acquired and trying to consume\n",(char*) arg);</div><div> messageCounter = conn->Qlength(QUEUENAME);</div><div> //while (messageCounter == 0)</div>
<div> {</div><div> printf("Consumer thread: %s waiting for signal as messageCounter = 0\n",(char *) arg);</div><div> //pthread_cond_wait(&cond_lock, &mutex_lock);</div>
<div> }</div><div> printf("Consumer thread: %s Got signal !!! %d messages in queue \n", (char *) arg, messageCounter);</div><div> Debug("%s() - consume %s\n", __FUNCTION__,ROUTINGKEY );</div>
<div><br></div><div> std::string message = conn->consume(QUEUENAME, EXCHANGE, ROUTINGKEY);</div><div> if (0 == message.length()) {</div><div> ErrLog("%s() - consume queue %s failed\n", __FUNCTION__, QUEUENAME);</div>
<div> printf("%s() - consume queue %s failed\n", __FUNCTION__, QUEUENAME);</div><div> rc = conn->deleteQueue(QUEUENAME);</div><div> if (rc != 0) {</div>
<div> ErrLog("%s() - delete queue %s failed\n", __FUNCTION__, QUEUENAME);</div><div> }</div><div> util.returnAMQConnection(conn);</div>
<div> pthread_exit(0) ;</div><div> }</div><div><snip></div></div><div><br></div><div> At the end we call pthread_mutex_unlock( ) so that other threads can consume too !!</div>
<div><br></div><div><br></div><div class="gmail_extra"><br><br><div class="gmail_quote">On Mon, Dec 16, 2013 at 9:43 PM, kingsmasher1 <span dir="ltr"><<a href="mailto:raj.kumar.sanpui@gmail.com" target="_blank">raj.kumar.sanpui@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Dear All,<br>
<br>
Thanks for the response, yeah the message count in the GUI as well as the<br>
rabbitmqctl shows as 0.<br>
<br>
This is my consume method: (i just commented all the unbind calls to make<br>
sure, after one consume the queue is not getting unbinded)<br>
<br>
std::string amqConnection::consume() {<br>
Debug("%s - \n", __FUNCTION__);<br>
amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,<br>
amqp_empty_table);<br>
amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);<br>
int rc = check_amqp_error(res, "consume");<br>
if (rc != 0) return NULL;<br>
<br>
amqp_frame_t frame;<br>
int result;<br>
<br>
amqp_basic_deliver_t *d;<br>
amqp_basic_properties_t *p;<br>
size_t body_target;<br>
size_t body_received;<br>
<br>
amqp_maybe_release_buffers(conn);<br>
Debug("%s - wait frame\n", __FUNCTION__);<br>
result = amqp_simple_wait_frame(conn, &frame);<br>
Debug("%s Result %d\n",__FUNCTION__, result);<br>
if (result < 0)<br>
return "";<br>
<br>
Debug("%s Frame type %d, channel %d\n",__FUNCTION__, frame.frame_type,<br>
frame.channel);<br>
if (frame.frame_type != AMQP_FRAME_METHOD)<br>
return "";<br>
<br>
Debug("%s Method %s\n", __FUNCTION__,<br>
amqp_method_name(<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a>));<br>
if (<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br>
return "";<br>
<br>
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;<br>
Debug("%s Delivery %u, exchange %.*s routingkey %.*s\n",__FUNCTION__,<br>
(unsigned) d->delivery_tag,<br>
(int) d->exchange.len, (char *) d->exchange.bytes,<br>
(int) d->routing_key.len, (char *) d->routing_key.bytes);<br>
<br>
result = amqp_simple_wait_frame(conn, &frame);<br>
if (result < 0)<br>
return "";<br>
<br>
if (frame.frame_type != AMQP_FRAME_HEADER) {<br>
ErrLog( "%s Expected header!",__FUNCTION__);<br>
return "";<br>
}<br>
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;<br>
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {<br>
Debug("%s Content-type: %.*s\n",__FUNCTION__,<br>
(int) p->content_type.len, (char *) p->content_type.bytes);<br>
}<br>
Debug("%s ----\n", __FUNCTION__);<br>
<br>
body_target = frame.payload.properties.body_size;<br>
body_received = 0;<br>
std::string response="";<br>
while (body_received < body_target) {<br>
result = amqp_simple_wait_frame(conn, &frame);<br>
if (result < 0)<br>
break;<br>
<br>
if (frame.frame_type != AMQP_FRAME_BODY) {<br>
ErrLog( " %s Expected body!", __FUNCTION__);<br>
break;<br>
}<br>
<br>
body_received += frame.payload.body_fragment.len;<br>
//assert(body_received <= body_target);<br>
<br>
//amqp_dump(frame.payload.body_fragment.bytes,<br>
// frame.payload.body_fragment.len);<br>
response += std::string((char *) frame.payload.body_fragment.bytes,<br>
body_received );<br>
}<br>
<br>
<br>
<br>
return response;<br>
}<br>
<br>
<br>
<br>
--<br>
View this message in context: <a href="http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html" target="_blank">http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html</a><br>
<div class="HOEnZb"><div class="h5">Sent from the RabbitMQ mailing list archive at Nabble.com.<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</div></div></blockquote></div><br></div></div>