[rabbitmq-discuss] amqp 1.0.4 blocking consume seems to block forever even though new messages are on the queue

servobit johnc at codecobblers.com
Thu Sep 13 19:07:51 BST 2012


Hi All.

When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages 
in the queue,  I can read all of the pending messages from the queue.  I am 
doing this using consume( 'processMsg') which is a blocking call.

The strange thing is, after I process and ACK the pending messages on the 
Queue, any new messages I add to the queue are not "picked up" by the 
consume callback.  In other words, it seems as if the exchange + channel 
conn has gone stale.

If I restart my client, the pending messages are picked up without a 
problem.

It appears the consume call expires (for lack of better term) almost 
exactly @ the 2 minute mark. 
My broker is SwiftMQ.  Unfortunately, I don't have access to config 
settings on the broker at the moment.

Any help would be greatly appreciated.  BTW, I saw the same problem with 
STOMP, which I believe is no longer supported anyway.


Here is my main loop:

            // run forever
            while(1) {

                try {

                    if ( $this->conn == null ){
                        self::init();
                    }
                    
                    if ( ! $this->channel->isConnected() ) {
                        self::trace(__METHOD__ . " " . " Channel is not 
connected, reconnecting...");
                        self::init( );
                    }

                    // keep our connection alive
                    if ( isset( $this->conn )){
                        if ( $this->conn->isConnected() == false ){
                            // reinit
                            self::init();
                        }
                    }
                    
                    $result = $this->readMsg( );

                }catch( Exception $e ){
                    self::trace(__METHOD__ . " Error: " . 
$e->getTraceAsString()  );
                }
            }

Here is the readMsg( ) method:

        /**
         * read a message from the Broker queue
         * @return  $result (true or false)
         */
public function readMsg(  ) {
            // read the messages from the queue, acknowledge if no errs so
            // the messages are removed from the queue.  If we don't ack, 
            // messages will remain in the queue and will be marked as 
"redelivered".
            
            // CALLBACK
            $result = $this->consumer->consume( 'processMsg' );
                    
            return $result;
          
}

And here is the callback:

/**
 * CALLBACK - Note we are called via queue->consume which uses Synchronous 
I/O (Blocking)
 * 
* process the envelope body
* @param $amqpEnvelope
* @return false when done - see php AMQP docs for consume and returning to 
the calling thread.
* @throws Exception
*/
function processMsg( $amqpEnvelope, $consumer ) {

    try {
        
        if ( $amqpEnvelope == null || empty($amqpEnvelope) ){
            return false; // still blocking
        }
        
        $payload = $amqpEnvelope->getBody( );
        
        if ($payload == null ){
            return false; // still blocking
        }

        $deliveryTag = $amqpEnvelope->getDeliveryTag();

        trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag );
        
        $response = sendRemoteWebserviceRequest( $payload );
        
        processRemoteWebserviceResponse( $response );
        
        $consumer->ack( $deliveryTag );
        
        // return false indicating we are done (weird, but that's what the 
docs show)
        return false; 


    }catch( AMQPChannelException $e ) {
        throw $e;
    }catch( AMQPConnectionException $e ){
        throw $e;
    }catch( Exception $e ){
        //
        // When known application errs occur, we need to ack the message 
and remove
        // it from the queue.
        if ( strpos($amqpEnvelope->getBody( ), '<error>Title already 
exists' ) !== false){
            $consumer->ack( $amqpEnvelope->getDeliveryTag() );
        }
        throw $e;
    }
}


Note that if I re-establish the connection / exchange / queue that this is 
a non-issue.  Although I don't want to have to do this.
FWIW, I have one subscriber and one queue.

Best,

John

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120913/056df8d6/attachment.htm>


More information about the rabbitmq-discuss mailing list