[rabbitmq-discuss] Why is the "select" always getting timed out in consumer?

kingsmasher1 raj.kumar.sanpui at gmail.com
Fri Dec 13 08:36:28 GMT 2013


I desperately need a help. I have been slogging last couple of nights, but
could not make it work !!!!!!

I have a program in which a seperate thread publishes a message, and a
consumer accepts it after ensuring the socket is readable (select), and
ensuring and there are no frames enqueued (amqp_frames_enqueued) or data in
buffer (amqp_data_in_buffer).

But though the log says, the message is indeed published to the queue, it
always times out.
There was a similar example posted on the same forum, which i followed, but
nothing works for me, and i always receive a timeout.

Please help !!!!

Here is my consumer code, till the point where it timedout.


void *con(void *arg) {
                        amqp_bytes_t queue;
                std::string message;
                printf("%s %s\n",__FUNCTION__,(char *) arg);

                Debug("%s() - Get an mq connection\n", __FUNCTION__);
                amqConnection *conn =  util.getAMQConnection ();
                if (conn == NULL) {
                        ErrLog("%s() - get connection failed\n",
__FUNCTION__);
                        pthread_exit(0) ;
                }
                Debug("%s() - declare exchange %s\n", __FUNCTION__,
EXCHANGE);
                int rc = conn->declareExchange(EXCHANGE, QUEUETYPE, 1);
                if (rc != 0) {
                        ErrLog("%s() - declare exchange %s %s failed\n",
__FUNCTION__, EXCHANGE, QUEUETYPE);
                        util.returnAMQConnection(conn);
                        pthread_exit(0) ;
                }

                Debug("%s() - consume  %s\n", __FUNCTION__,ROUTINGKEY );
                amqp_connection_state_t *state;
                state = conn->getConnectionState( );
                if (state == NULL)
                        printf("Could not receive connection state
object\n");

                if (!amqp_frames_enqueued(*state) &&
!amqp_data_in_buffer(*state)) {
                         int sock = amqp_get_sockfd(*state);
                         printf("socket: %d\n", sock);
                         int flags = fcntl(sock, F_GETFL, 0);
                         fcntl(sock, F_SETFL, flags | O_NONBLOCK);
                         int ret = 0;
                         do {
                                struct timeval timeout;
                                fd_set read_flags;
                                FD_ZERO(&read_flags);
                                FD_SET(sock, &read_flags);
                                /* Wait upto a second. */

                                timeout.tv_sec = 5;
                                timeout.tv_usec = 0;
                                ret = select(sock+1, &read_flags, NULL,
NULL, &timeout);
                                if (ret == -1)
                                        printf("select: %s\n",
strerror(errno));
                                else if (ret == 0)
                                        printf("select timedout\n");

                                 if (FD_ISSET(sock, &read_flags)) {
                                        printf("Flag is set\n");
                                        message = conn->consume(QUEUENAME,
EXCHANGE, ROUTINGKEY);
                                        if (0 == message.length()) {
                                                ErrLog("%s() - consume queue
%s failed\n", __FUNCTION__, QUEUENAME);
                                                printf("%s() - consume queue
%s failed\n", __FUNCTION__, QUEUENAME);
                                                rc =
conn->deleteQueue(QUEUENAME);
                                                if (rc != 0) {
                                                        ErrLog("%s() -
delete queue %s failed\n", __FUNCTION__, QUEUENAME);
                                                }
                                               
util.returnAMQConnection(conn);
                                                pthread_exit(0) ;

                                        }
                                }
                        } while (ret == 0);
                }




--
View this message in context: http://rabbitmq.1065348.n5.nabble.com/Why-is-the-select-always-getting-timed-out-in-consumer-tp32046.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


More information about the rabbitmq-discuss mailing list