[rabbitmq-discuss] Why is my consumer unable to receive any message? amqp_simple_wait_frame returns less than zero.
kingsmasher1
raj.kumar.sanpui at gmail.com
Thu Dec 12 20:11:09 GMT 2013
I am creating a queue, using the rabbitmq GUI, declaring exchanges, and
routing keys as needed. For some reason "amqp_simple_wait_frame" returns
less than 0 always, and my program does not consume any messages.
Please help.
Here is my code.
std::string consume(std::string queuename, std::string exchange, std::string
routingkey, amqp_connection_state_t conn) {
//if (exchange.length() > 0) bindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
//amqp_queue_bind(conn, 1,
// amqp_cstring_bytes(QUEUENAME),
// amqp_cstring_bytes(EXCHANGE),
// amqp_cstring_bytes(ROUTINGKEY),
// amqp_empty_table);
amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename.c_str()),
amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_get_rpc_reply(conn);
amqp_frame_t frame;
int result;
amqp_basic_deliver_t *d;
amqp_basic_properties_t *p;
size_t body_target;
size_t body_received;
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
printf("%s Returning as result < 0\n",__FUNCTION__);
//if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
return "";
}
printf("%s Frame type %d, channel %d\n",__FUNCTION__,
frame.frame_type, frame.channel);
if (frame.frame_type != AMQP_FRAME_METHOD) {
//if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
return "";
}
printf("%s Method %s\n", __FUNCTION__,
amqp_method_name(frame.payload.method.id));
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
//if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
return "";
}
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("%s Delivery %u, exchange %.*s routingkey
%.*s\n",__FUNCTION__,
(unsigned) d->delivery_tag,
(int) d->exchange.len, (char *) d->exchange.bytes,
(int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
//if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
return "";
}
if (frame.frame_type != AMQP_FRAME_HEADER) {
printf( "%s Expected header!",__FUNCTION__);
//unbindQueue(queuename.c_str(), exchange.c_str(),
routingkey.c_str());
return "";
}
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("%s Content-type: %.*s\n",__FUNCTION__,
(int) p->content_type.len, (char *)
p->content_type.bytes);
}
//Debug("%s ----\n", __FUNCTION__);
body_target = frame.payload.properties.body_size;
body_received = 0;
std::string response="";
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0)
break;
if (frame.frame_type != AMQP_FRAME_BODY) {
printf( " %s Expected body!", __FUNCTION__);
break;
}
body_received += frame.payload.body_fragment.len;
//assert(body_received <= body_target);
//amqp_dump(frame.payload.body_fragment.bytes,
// frame.payload.body_fragment.len);
response += std::string((char *)
frame.payload.body_fragment.bytes, body_received );
}
//if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
return response;
}
int main()
{
amqp_connection_state_t conn;
conn = amqp_new_connection();
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(EXCHANGE),
amqp_cstring_bytes(QUEUETYPE),\
0, 1, amqp_empty_table);
std::string queuename="myPrefetchQ";
amqp_queue_bind(conn, 1,
amqp_cstring_bytes(QUEUENAME),
amqp_cstring_bytes(EXCHANGE),
amqp_cstring_bytes(ROUTINGKEY),
amqp_empty_table);
std::string consumed = consume(QUEUENAME, EXCHANGE, \
ROUTINGKEY, conn);
cout << "Received: ";
cout << consumed;
}
--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/Why-is-my-consumer-unable-to-receive-any-message-amqp-simple-wait-frame-returns-less-than-zero-tp32037.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list