<div dir="ltr">And the consume is called from another thread...<div><br></div><div><div>void *con(void *arg) {</div><div>                amqp_bytes_t queue;</div><div>                printf("%s %s\n",__FUNCTION__,(char *) arg);</div>

<div><br></div><div>                Debug("%s() - Get an mq connection\n", __FUNCTION__);</div><div>                amqConnection *conn =  util.getAMQConnection ();</div><div>                if (conn == NULL) {</div>

<div>                        ErrLog("%s() - get connection failed\n", __FUNCTION__);</div><div>                        pthread_exit(0) ;</div><div>                }</div><div>                Debug("%s() - declare exchange %s\n", __FUNCTION__, EXCHANGE);</div>

<div>                int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);</div><div>                if (rc != 0) {</div><div>                        ErrLog("%s() - declare exchange %s %s failed\n", __FUNCTION__, EXCHANGE, QUEUETYPE);</div>

<div>                        util.returnAMQConnection(conn);</div><div>                        pthread_exit(0) ;</div><div>                }</div><div><br></div><div>                pthread_mutex_trylock(&mutex_lock);</div>

<div>                printf("Consumer Thread %s: Lock acquired and trying to consume\n",(char*) arg);</div><div>                messageCounter = conn->Qlength(QUEUENAME);</div><div>                //while (messageCounter == 0)</div>

<div>                {</div><div>                        printf("Consumer thread: %s waiting for signal as messageCounter = 0\n",(char *) arg);</div><div>                        //pthread_cond_wait(&cond_lock, &mutex_lock);</div>

<div>                }</div><div>                printf("Consumer thread: %s Got signal !!! %d messages in queue \n", (char *) arg, messageCounter);</div><div>                Debug("%s() - consume  %s\n", __FUNCTION__,ROUTINGKEY );</div>

<div><br></div><div>                std::string message = conn->consume(QUEUENAME, EXCHANGE, ROUTINGKEY);</div><div>                if (0 == message.length()) {</div><div>                        ErrLog("%s() - consume queue %s failed\n", __FUNCTION__, QUEUENAME);</div>

<div>                        printf("%s() - consume queue %s failed\n", __FUNCTION__, QUEUENAME);</div><div>                        rc = conn->deleteQueue(QUEUENAME);</div><div>                        if (rc != 0) {</div>

<div>                                ErrLog("%s() - delete queue %s failed\n", __FUNCTION__, QUEUENAME);</div><div>                        }</div><div>                        util.returnAMQConnection(conn);</div>

<div>                        pthread_exit(0) ;</div><div>                }</div><div><snip></div></div><div><br></div><div>   At the end we call pthread_mutex_unlock( ) so that other threads can consume too !!</div>

<div><br></div><div><br></div><div class="gmail_extra"><br><br><div class="gmail_quote">On Mon, Dec 16, 2013 at 9:43 PM, kingsmasher1 <span dir="ltr"><<a href="mailto:raj.kumar.sanpui@gmail.com" target="_blank">raj.kumar.sanpui@gmail.com</a>></span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Dear All,<br>
<br>
Thanks for the response, yeah the message count in the GUI as well as the<br>
rabbitmqctl shows as 0.<br>
<br>
This is my consume method: (i just commented all the unbind calls to make<br>
sure, after one consume the queue is not getting unbinded)<br>
<br>
std::string amqConnection::consume() {<br>
        Debug("%s - \n", __FUNCTION__);<br>
        amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,<br>
amqp_empty_table);<br>
        amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);<br>
        int rc = check_amqp_error(res, "consume");<br>
        if (rc != 0) return NULL;<br>
<br>
        amqp_frame_t frame;<br>
        int result;<br>
<br>
        amqp_basic_deliver_t *d;<br>
        amqp_basic_properties_t *p;<br>
        size_t body_target;<br>
        size_t body_received;<br>
<br>
        amqp_maybe_release_buffers(conn);<br>
        Debug("%s - wait frame\n", __FUNCTION__);<br>
        result = amqp_simple_wait_frame(conn, &frame);<br>
        Debug("%s Result %d\n",__FUNCTION__, result);<br>
        if (result < 0)<br>
                return "";<br>
<br>
        Debug("%s Frame type %d, channel %d\n",__FUNCTION__, frame.frame_type,<br>
frame.channel);<br>
        if (frame.frame_type != AMQP_FRAME_METHOD)<br>
                return "";<br>
<br>
        Debug("%s Method %s\n", __FUNCTION__,<br>
amqp_method_name(<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a>));<br>
        if (<a href="http://frame.payload.method.id" target="_blank">frame.payload.method.id</a> != AMQP_BASIC_DELIVER_METHOD)<br>
                return "";<br>
<br>
        d = (amqp_basic_deliver_t *) frame.payload.method.decoded;<br>
        Debug("%s Delivery %u, exchange %.*s routingkey %.*s\n",__FUNCTION__,<br>
                        (unsigned) d->delivery_tag,<br>
                        (int) d->exchange.len, (char *) d->exchange.bytes,<br>
                        (int) d->routing_key.len, (char *) d->routing_key.bytes);<br>
<br>
        result = amqp_simple_wait_frame(conn, &frame);<br>
        if (result < 0)<br>
                return "";<br>
<br>
        if (frame.frame_type != AMQP_FRAME_HEADER) {<br>
                ErrLog( "%s Expected header!",__FUNCTION__);<br>
                return "";<br>
        }<br>
        p = (amqp_basic_properties_t *) frame.payload.properties.decoded;<br>
        if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {<br>
                Debug("%s Content-type: %.*s\n",__FUNCTION__,<br>
                                (int) p->content_type.len, (char *) p->content_type.bytes);<br>
        }<br>
        Debug("%s ----\n", __FUNCTION__);<br>
<br>
        body_target = frame.payload.properties.body_size;<br>
        body_received = 0;<br>
        std::string response="";<br>
        while (body_received < body_target) {<br>
                result = amqp_simple_wait_frame(conn, &frame);<br>
                if (result < 0)<br>
                        break;<br>
<br>
                if (frame.frame_type != AMQP_FRAME_BODY) {<br>
                        ErrLog( " %s Expected body!", __FUNCTION__);<br>
                        break;<br>
                }<br>
<br>
                body_received += frame.payload.body_fragment.len;<br>
                //assert(body_received <= body_target);<br>
<br>
                //amqp_dump(frame.payload.body_fragment.bytes,<br>
                //              frame.payload.body_fragment.len);<br>
                response += std::string((char *) frame.payload.body_fragment.bytes,<br>
body_received );<br>
        }<br>
<br>
<br>
<br>
        return response;<br>
}<br>
<br>
<br>
<br>
--<br>
View this message in context: <a href="http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html" target="_blank">http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html</a><br>


<div class="HOEnZb"><div class="h5">Sent from the RabbitMQ mailing list archive at Nabble.com.<br>
_______________________________________________<br>
rabbitmq-discuss mailing list<br>
<a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a><br>
<a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss" target="_blank">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a><br>
</div></div></blockquote></div><br></div></div>