[rabbitmq-discuss] Why does the queue drop other messages after the first is consumed?
kingsmasher1
raj.kumar.sanpui at gmail.com
Mon Dec 16 16:13:02 GMT 2013
Dear All,
Thanks for the response, yeah the message count in the GUI as well as the
rabbitmqctl shows as 0.
This is my consume method: (i just commented all the unbind calls to make
sure, after one consume the queue is not getting unbinded)
std::string amqConnection::consume() {
Debug("%s - \n", __FUNCTION__);
amqp_basic_consume(conn, 1, m_queuename, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);
int rc = check_amqp_error(res, "consume");
if (rc != 0) return NULL;
amqp_frame_t frame;
int result;
amqp_basic_deliver_t *d;
amqp_basic_properties_t *p;
size_t body_target;
size_t body_received;
amqp_maybe_release_buffers(conn);
Debug("%s - wait frame\n", __FUNCTION__);
result = amqp_simple_wait_frame(conn, &frame);
Debug("%s Result %d\n",__FUNCTION__, result);
if (result < 0)
return "";
Debug("%s Frame type %d, channel %d\n",__FUNCTION__, frame.frame_type,
frame.channel);
if (frame.frame_type != AMQP_FRAME_METHOD)
return "";
Debug("%s Method %s\n", __FUNCTION__,
amqp_method_name(frame.payload.method.id));
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
return "";
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
Debug("%s Delivery %u, exchange %.*s routingkey %.*s\n",__FUNCTION__,
(unsigned) d->delivery_tag,
(int) d->exchange.len, (char *) d->exchange.bytes,
(int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0)
return "";
if (frame.frame_type != AMQP_FRAME_HEADER) {
ErrLog( "%s Expected header!",__FUNCTION__);
return "";
}
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
Debug("%s Content-type: %.*s\n",__FUNCTION__,
(int) p->content_type.len, (char *) p->content_type.bytes);
}
Debug("%s ----\n", __FUNCTION__);
body_target = frame.payload.properties.body_size;
body_received = 0;
std::string response="";
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0)
break;
if (frame.frame_type != AMQP_FRAME_BODY) {
ErrLog( " %s Expected body!", __FUNCTION__);
break;
}
body_received += frame.payload.body_fragment.len;
//assert(body_received <= body_target);
//amqp_dump(frame.payload.body_fragment.bytes,
// frame.payload.body_fragment.len);
response += std::string((char *) frame.payload.body_fragment.bytes,
body_received );
}
return response;
}
--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/Why-does-the-queue-drop-other-messages-after-the-first-is-consumed-tp32118p32122.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list