[rabbitmq-discuss] Why does the queue drop other messages after the first is consumed?

kingsmasher1 raj.kumar.sanpui at gmail.com
Mon Dec 16 16:13:02 GMT 2013


Dear All,

Thanks for the response, yeah the message count in the GUI as well as the
rabbitmqctl shows as 0.

This is my consume method: (i just commented all the unbind calls to make
sure, after one consume the queue is not getting unbinded)

std::string amqConnection::consume() {
	Debug("%s - \n", __FUNCTION__);
	amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
	amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);
	int rc = check_amqp_error(res, "consume");
	if (rc != 0) return NULL;

	amqp_frame_t frame;
	int result;

	amqp_basic_deliver_t *d;
	amqp_basic_properties_t *p;
	size_t body_target;
	size_t body_received;

	amqp_maybe_release_buffers(conn);
	Debug("%s - wait frame\n", __FUNCTION__);
	result = amqp_simple_wait_frame(conn, &frame);
	Debug("%s Result %d\n",__FUNCTION__, result);
	if (result < 0)
		return "";

	Debug("%s Frame type %d, channel %d\n",__FUNCTION__, frame.frame_type,
frame.channel);
	if (frame.frame_type != AMQP_FRAME_METHOD)
		return "";

	Debug("%s Method %s\n", __FUNCTION__,
amqp_method_name(frame.payload.method.id));
	if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
		return "";

	d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
	Debug("%s Delivery %u, exchange %.*s routingkey %.*s\n",__FUNCTION__,
			(unsigned) d->delivery_tag,
			(int) d->exchange.len, (char *) d->exchange.bytes,
			(int) d->routing_key.len, (char *) d->routing_key.bytes);

	result = amqp_simple_wait_frame(conn, &frame);
	if (result < 0)
		return "";

	if (frame.frame_type != AMQP_FRAME_HEADER) {
		ErrLog( "%s Expected header!",__FUNCTION__);
		return "";
	}
	p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
	if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
		Debug("%s Content-type: %.*s\n",__FUNCTION__,
				(int) p->content_type.len, (char *) p->content_type.bytes);
	}
	Debug("%s ----\n", __FUNCTION__);

	body_target = frame.payload.properties.body_size;
	body_received = 0;
        std::string response="";
	while (body_received < body_target) {
		result = amqp_simple_wait_frame(conn, &frame);
		if (result < 0)
			break;

		if (frame.frame_type != AMQP_FRAME_BODY) {
			ErrLog( " %s Expected body!", __FUNCTION__);
			break;
		}

		body_received += frame.payload.body_fragment.len;
		//assert(body_received <= body_target);

		//amqp_dump(frame.payload.body_fragment.bytes,
		//		frame.payload.body_fragment.len);
	        response += std::string((char *) frame.payload.body_fragment.bytes,
body_received );
	}



	return response;
}



--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list