[rabbitmq-discuss] Why is my consumer unable to receive any message? amqp_simple_wait_frame returns less than zero.

kingsmasher1 raj.kumar.sanpui at gmail.com
Thu Dec 12 20:11:09 GMT 2013


I am creating a queue, using the rabbitmq GUI, declaring exchanges, and
routing keys as needed. For some reason "amqp_simple_wait_frame" returns
less than 0 always, and my program does not consume any messages.

Please help.

Here is my code.

std::string consume(std::string queuename, std::string exchange, std::string
routingkey, amqp_connection_state_t conn) {

        //if (exchange.length() > 0) bindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
        //amqp_queue_bind(conn, 1,
        //                amqp_cstring_bytes(QUEUENAME),
        //                amqp_cstring_bytes(EXCHANGE),
        //                amqp_cstring_bytes(ROUTINGKEY),
        //                amqp_empty_table);
        amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename.c_str()),
amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
        amqp_get_rpc_reply(conn);

        amqp_frame_t frame;
        int result;

        amqp_basic_deliver_t *d;
        amqp_basic_properties_t *p;
        size_t body_target;
        size_t body_received;

        amqp_maybe_release_buffers(conn);
        result = amqp_simple_wait_frame(conn, &frame);
        if (result < 0) {
                printf("%s Returning as result < 0\n",__FUNCTION__);
                //if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
                return "";
        }

        printf("%s Frame type %d, channel %d\n",__FUNCTION__,
frame.frame_type, frame.channel);
        if (frame.frame_type != AMQP_FRAME_METHOD) {
                //if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
                return "";
        }

        printf("%s Method %s\n", __FUNCTION__,
amqp_method_name(frame.payload.method.id));
        if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
                //if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
                return "";
        }

        d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
        printf("%s Delivery %u, exchange %.*s routingkey
%.*s\n",__FUNCTION__,
                        (unsigned) d->delivery_tag,
                        (int) d->exchange.len, (char *) d->exchange.bytes,
                                                                                          
     (int) d->routing_key.len, (char *) d->routing_key.bytes);

        result = amqp_simple_wait_frame(conn, &frame);
        if (result < 0) {
                //if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
                return "";
        }
        if (frame.frame_type != AMQP_FRAME_HEADER) {
                printf( "%s Expected header!",__FUNCTION__);
                //unbindQueue(queuename.c_str(), exchange.c_str(),
routingkey.c_str());
                return "";
        }
        p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
        if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
                printf("%s Content-type: %.*s\n",__FUNCTION__,
                                (int) p->content_type.len, (char *)
p->content_type.bytes);
        }
        //Debug("%s ----\n", __FUNCTION__);

        body_target = frame.payload.properties.body_size;
        body_received = 0;
        std::string response="";
        while (body_received < body_target) {
                result = amqp_simple_wait_frame(conn, &frame);
                if (result < 0)
                        break;

                if (frame.frame_type != AMQP_FRAME_BODY) {
                        printf( " %s Expected body!", __FUNCTION__);
                        break;
                }

                body_received += frame.payload.body_fragment.len;
                //assert(body_received <= body_target);

                //amqp_dump(frame.payload.body_fragment.bytes,
                //              frame.payload.body_fragment.len);
                response += std::string((char *)
frame.payload.body_fragment.bytes, body_received );
        }


        //if (exchange.length() > 0) unbindQueue(queuename.c_str(),
exchange.c_str(), routingkey.c_str());
        return response;
}

int main()
{
  amqp_connection_state_t conn;
  conn = amqp_new_connection();

  amqp_exchange_declare(conn, 1, amqp_cstring_bytes(EXCHANGE),
amqp_cstring_bytes(QUEUETYPE),\
                                0, 1, amqp_empty_table);
  std::string queuename="myPrefetchQ";
  amqp_queue_bind(conn, 1,
                        amqp_cstring_bytes(QUEUENAME),
                        amqp_cstring_bytes(EXCHANGE),
                        amqp_cstring_bytes(ROUTINGKEY),
                        amqp_empty_table);

  std::string consumed = consume(QUEUENAME, EXCHANGE, \
                ROUTINGKEY, conn);
  cout << "Received: ";
  cout << consumed;
}




--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/Why-is-my-consumer-unable-to-receive-any-message-amqp-simple-wait-frame-returns-less-than-zero-tp32037.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list