[rabbitmq-discuss] Why does the queue drop other messages after the first is consumed?
Raj Kumar Sanpui
raj.kumar.sanpui at gmail.com
Mon Dec 16 16:27:28 GMT 2013
And the consume is called from another thread...
void *con(void *arg) {
amqp_bytes_t queue;
printf("%s %s\n",__FUNCTION__,(char *) arg);
Debug("%s() - Get an mq connection\n", __FUNCTION__);
amqConnection *conn = util.getAMQConnection ();
if (conn == NULL) {
ErrLog("%s() - get connection failed\n",
__FUNCTION__);
pthread_exit(0) ;
}
Debug("%s() - declare exchange %s\n", __FUNCTION__,
EXCHANGE);
int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);
if (rc != 0) {
ErrLog("%s() - declare exchange %s %s failed\n",
__FUNCTION__, EXCHANGE, QUEUETYPE);
util.returnAMQConnection(conn);
pthread_exit(0) ;
}
pthread_mutex_trylock(&mutex_lock);
printf("Consumer Thread %s: Lock acquired and trying to
consume\n",(char*) arg);
messageCounter = conn->Qlength(QUEUENAME);
//while (messageCounter == 0)
{
printf("Consumer thread: %s waiting for signal as
messageCounter = 0\n",(char *) arg);
//pthread_cond_wait(&cond_lock, &mutex_lock);
}
printf("Consumer thread: %s Got signal !!! %d messages in
queue \n", (char *) arg, messageCounter);
Debug("%s() - consume %s\n", __FUNCTION__,ROUTINGKEY );
std::string message = conn->consume(QUEUENAME, EXCHANGE,
ROUTINGKEY);
if (0 == message.length()) {
ErrLog("%s() - consume queue %s failed\n",
__FUNCTION__, QUEUENAME);
printf("%s() - consume queue %s failed\n",
__FUNCTION__, QUEUENAME);
rc = conn->deleteQueue(QUEUENAME);
if (rc != 0) {
ErrLog("%s() - delete queue %s failed\n",
__FUNCTION__, QUEUENAME);
}
util.returnAMQConnection(conn);
pthread_exit(0) ;
}
<snip>
At the end we call pthread_mutex_unlock( ) so that other threads can
consume too !!
On Mon, Dec 16, 2013 at 9:43 PM, kingsmasher1 <raj.kumar.sanpui at gmail.com>wrote:
> Dear All,
>
> Thanks for the response, yeah the message count in the GUI as well as the
> rabbitmqctl shows as 0.
>
> This is my consume method: (i just commented all the unbind calls to make
> sure, after one consume the queue is not getting unbinded)
>
> std::string amqConnection::consume() {
> Debug("%s - \n", __FUNCTION__);
> amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,
> amqp_empty_table);
> amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);
> int rc = check_amqp_error(res, "consume");
> if (rc != 0) return NULL;
>
> amqp_frame_t frame;
> int result;
>
> amqp_basic_deliver_t *d;
> amqp_basic_properties_t *p;
> size_t body_target;
> size_t body_received;
>
> amqp_maybe_release_buffers(conn);
> Debug("%s - wait frame\n", __FUNCTION__);
> result = amqp_simple_wait_frame(conn, &frame);
> Debug("%s Result %d\n",__FUNCTION__, result);
> if (result < 0)
> return "";
>
> Debug("%s Frame type %d, channel %d\n",__FUNCTION__,
> frame.frame_type,
> frame.channel);
> if (frame.frame_type != AMQP_FRAME_METHOD)
> return "";
>
> Debug("%s Method %s\n", __FUNCTION__,
> amqp_method_name(frame.payload.method.id));
> if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
> return "";
>
> d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
> Debug("%s Delivery %u, exchange %.*s routingkey
> %.*s\n",__FUNCTION__,
> (unsigned) d->delivery_tag,
> (int) d->exchange.len, (char *) d->exchange.bytes,
> (int) d->routing_key.len, (char *)
> d->routing_key.bytes);
>
> result = amqp_simple_wait_frame(conn, &frame);
> if (result < 0)
> return "";
>
> if (frame.frame_type != AMQP_FRAME_HEADER) {
> ErrLog( "%s Expected header!",__FUNCTION__);
> return "";
> }
> p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
> if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
> Debug("%s Content-type: %.*s\n",__FUNCTION__,
> (int) p->content_type.len, (char *)
> p->content_type.bytes);
> }
> Debug("%s ----\n", __FUNCTION__);
>
> body_target = frame.payload.properties.body_size;
> body_received = 0;
> std::string response="";
> while (body_received < body_target) {
> result = amqp_simple_wait_frame(conn, &frame);
> if (result < 0)
> break;
>
> if (frame.frame_type != AMQP_FRAME_BODY) {
> ErrLog( " %s Expected body!", __FUNCTION__);
> break;
> }
>
> body_received += frame.payload.body_fragment.len;
> //assert(body_received <= body_target);
>
> //amqp_dump(frame.payload.body_fragment.bytes,
> // frame.payload.body_fragment.len);
> response += std::string((char *)
> frame.payload.body_fragment.bytes,
> body_received );
> }
>
>
>
> return response;
> }
>
>
>
> --
> View this message in context:
> http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html
> Sent from the RabbitMQ mailing list archive at Nabble.com.
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss at lists.rabbitmq.com
> https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20131216/443242f3/attachment.html>
More information about the rabbitmq-discuss
mailing list