[rabbitmq-discuss] Why does the queue drop other messages after the first is consumed?

Raj Kumar Sanpui raj.kumar.sanpui at gmail.com
Mon Dec 16 16:27:28 GMT 2013


And the consume is called from another thread...

void *con(void *arg) {
                amqp_bytes_t queue;
                printf("%s %s\n",__FUNCTION__,(char *) arg);

                Debug("%s() - Get an mq connection\n", __FUNCTION__);
                amqConnection *conn =  util.getAMQConnection ();
                if (conn == NULL) {
                        ErrLog("%s() - get connection failed\n",
__FUNCTION__);
                        pthread_exit(0) ;
                }
                Debug("%s() - declare exchange %s\n", __FUNCTION__,
EXCHANGE);
                int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);
                if (rc != 0) {
                        ErrLog("%s() - declare exchange %s %s failed\n",
__FUNCTION__, EXCHANGE, QUEUETYPE);
                        util.returnAMQConnection(conn);
                        pthread_exit(0) ;
                }

                pthread_mutex_trylock(&mutex_lock);
                printf("Consumer Thread %s: Lock acquired and trying to
consume\n",(char*) arg);
                messageCounter = conn->Qlength(QUEUENAME);
                //while (messageCounter == 0)
                {
                        printf("Consumer thread: %s waiting for signal as
messageCounter = 0\n",(char *) arg);
                        //pthread_cond_wait(&cond_lock, &mutex_lock);
                }
                printf("Consumer thread: %s Got signal !!! %d messages in
queue \n", (char *) arg, messageCounter);
                Debug("%s() - consume  %s\n", __FUNCTION__,ROUTINGKEY );

                std::string message = conn->consume(QUEUENAME, EXCHANGE,
ROUTINGKEY);
                if (0 == message.length()) {
                        ErrLog("%s() - consume queue %s failed\n",
__FUNCTION__, QUEUENAME);
                        printf("%s() - consume queue %s failed\n",
__FUNCTION__, QUEUENAME);
                        rc = conn->deleteQueue(QUEUENAME);
                        if (rc != 0) {
                                ErrLog("%s() - delete queue %s failed\n",
__FUNCTION__, QUEUENAME);
                        }
                        util.returnAMQConnection(conn);
                        pthread_exit(0) ;
                }
<snip>

   At the end we call pthread_mutex_unlock( ) so that other threads can
consume too !!




On Mon, Dec 16, 2013 at 9:43 PM, kingsmasher1 <raj.kumar.sanpui at gmail.com>wrote:

> Dear All,
>
> Thanks for the response, yeah the message count in the GUI as well as the
> rabbitmqctl shows as 0.
>
> This is my consume method: (i just commented all the unbind calls to make
> sure, after one consume the queue is not getting unbinded)
>
> std::string amqConnection::consume() {
>         Debug("%s - \n", __FUNCTION__);
>         amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,
> amqp_empty_table);
>         amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);
>         int rc = check_amqp_error(res, "consume");
>         if (rc != 0) return NULL;
>
>         amqp_frame_t frame;
>         int result;
>
>         amqp_basic_deliver_t *d;
>         amqp_basic_properties_t *p;
>         size_t body_target;
>         size_t body_received;
>
>         amqp_maybe_release_buffers(conn);
>         Debug("%s - wait frame\n", __FUNCTION__);
>         result = amqp_simple_wait_frame(conn, &frame);
>         Debug("%s Result %d\n",__FUNCTION__, result);
>         if (result < 0)
>                 return "";
>
>         Debug("%s Frame type %d, channel %d\n",__FUNCTION__,
> frame.frame_type,
> frame.channel);
>         if (frame.frame_type != AMQP_FRAME_METHOD)
>                 return "";
>
>         Debug("%s Method %s\n", __FUNCTION__,
> amqp_method_name(frame.payload.method.id));
>         if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
>                 return "";
>
>         d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
>         Debug("%s Delivery %u, exchange %.*s routingkey
> %.*s\n",__FUNCTION__,
>                         (unsigned) d->delivery_tag,
>                         (int) d->exchange.len, (char *) d->exchange.bytes,
>                         (int) d->routing_key.len, (char *)
> d->routing_key.bytes);
>
>         result = amqp_simple_wait_frame(conn, &frame);
>         if (result < 0)
>                 return "";
>
>         if (frame.frame_type != AMQP_FRAME_HEADER) {
>                 ErrLog( "%s Expected header!",__FUNCTION__);
>                 return "";
>         }
>         p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
>         if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
>                 Debug("%s Content-type: %.*s\n",__FUNCTION__,
>                                 (int) p->content_type.len, (char *)
> p->content_type.bytes);
>         }
>         Debug("%s ----\n", __FUNCTION__);
>
>         body_target = frame.payload.properties.body_size;
>         body_received = 0;
>         std::string response="";
>         while (body_received < body_target) {
>                 result = amqp_simple_wait_frame(conn, &frame);
>                 if (result < 0)
>                         break;
>
>                 if (frame.frame_type != AMQP_FRAME_BODY) {
>                         ErrLog( " %s Expected body!", __FUNCTION__);
>                         break;
>                 }
>
>                 body_received += frame.payload.body_fragment.len;
>                 //assert(body_received <= body_target);
>
>                 //amqp_dump(frame.payload.body_fragment.bytes,
>                 //              frame.payload.body_fragment.len);
>                 response += std::string((char *)
> frame.payload.body_fragment.bytes,
> body_received );
>         }
>
>
>
>         return response;
> }
>
>
>
> --
> View this message in context:
> http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html
> Sent from the RabbitMQ mailing list archive at Nabble.com.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20131216/443242f3/attachment.html>


More information about the rabbitmq-discuss mailing list