[rabbitmq-discuss] Why is the "select" always getting timed out in consumer?
kingsmasher1
raj.kumar.sanpui at gmail.com
Fri Dec 13 16:08:05 GMT 2013
Thanks a lot for the reply Alan.
Seems like i made some basic mistake, my bindQueue happened inside the
consume method, and i was planning to call consume only after the "select(
)" validation, so that might be the reason as you suggested that the
messages are dropped.
In betwn. here is the functions we use, it's from an already-existing old
code, which does not have "message in queue or not" validation, which i am
trying to add.
#define EXCHANGE "topicex"
#define QUEUETYPE "topic"
#define ROUTINGKEY "pname"
#define QUEUENAME "MyprefetchQ"
void createPrefetchQueue(void) {
amqConnection *conn = util.getAMQConnection ();
int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);
if (rc != 0) {
ErrLog("%s() - declare exchange %s %s failed\n",
__FUNCTION__, EXCHANGE, QUEUETYPE);
util.returnAMQConnection(conn);
pthread_exit(0) ;
}
// Create my prefetch queue
rc = conn->createQueue(QUEUENAME, 0, 1, 1, 0);
if (rc != 0) {
ErrLog("%s() - create MyprefetchQ %s failed\n", __FUNCTION__,
QUEUENAME);
util.returnAMQConnection(conn);
pthread_exit(0) ;
}
}
int amqConnection::createQueue(std::string queue, int passive, int durable,
int exclusive, int autoDelete){
Debug("%s - Create queue %s \n", __FUNCTION__, queue.c_str());
/*
amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel,
amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable,
amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t
arguments)
*/
int channel=1;
amqp_queue_declare(conn, channel, amqp_cstring_bytes(queue.c_str()), 0
, 0, 0, 1, amqp_empty_table);
amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);
return check_amqp_error(res, "create queue");
}
Binding:
int amqConnection::bindQueue(const char* queue, const char* exchange, const
char* bindingkey) {
Debug("%s() - %s %s\n", __FUNCTION__, queue, exchange, bindingkey);
amqp_queue_bind(conn, 1,
amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
amqp_empty_table);
amqp_rpc_reply_t res = amqp_get_rpc_reply(conn);
return check_amqp_error(res, "bind queue");
}
std::string amqConnection::consume(amqp_bytes_t queuename, std::string
exchange, std::string routingkey) {
Debug("%s q %s ex %s rk %s- \n", __FUNCTION__, (char*)queuename.bytes,
exchange.c_str(), routingkey.c_str());
if (exchange.length() > 0) bindQueue(queuename, exchange.c_str(),
routingkey.c_str());
<snip>
}
--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/Why-is-the-select-always-getting-timed-out-in-consumer-tp32046p32073.html
Sent from the RabbitMQ mailing list archive at Nabble.com.
More information about the rabbitmq-discuss
mailing list