[rabbitmq-discuss] amqp 1.0.4 blocking consume seems to block forever even though new messages are on the queue

servobit johnc at codecobblers.com
Thu Sep 13 19:07:51 BST 2012

Hi All.

When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages 
in the queue,  I can read all of the pending messages from the queue.  I am 
doing this using consume( 'processMsg') which is a blocking call.

The strange thing is, after I process and ACK the pending messages on the 
Queue, any new messages I add to the queue are not "picked up" by the 
consume callback.  In other words, it seems as if the exchange + channel 
conn has gone stale.

If I restart my client, the pending messages are picked up without a 

It appears the consume call expires (for lack of better term) almost 
exactly @ the 2 minute mark. 
My broker is SwiftMQ.  Unfortunately, I don't have access to config 
settings on the broker at the moment.

Any help would be greatly appreciated.  BTW, I saw the same problem with 
STOMP, which I believe is no longer supported anyway.

Here is my main loop:

            // run forever
            while(1) {

                try {

                    if ( $this->conn == null ){
                    if ( ! $this->channel->isConnected() ) {
                        self::trace(__METHOD__ . " " . " Channel is not 
connected, reconnecting...");
                        self::init( );

                    // keep our connection alive
                    if ( isset( $this->conn )){
                        if ( $this->conn->isConnected() == false ){
                            // reinit
                    $result = $this->readMsg( );

                }catch( Exception $e ){
                    self::trace(__METHOD__ . " Error: " . 
$e->getTraceAsString()  );

Here is the readMsg( ) method:

         * read a message from the Broker queue
         * @return  $result (true or false)
public function readMsg(  ) {
            // read the messages from the queue, acknowledge if no errs so
            // the messages are removed from the queue.  If we don't ack, 
            // messages will remain in the queue and will be marked as 
            // CALLBACK
            $result = $this->consumer->consume( 'processMsg' );
            return $result;

And here is the callback:

 * CALLBACK - Note we are called via queue->consume which uses Synchronous 
I/O (Blocking)
* process the envelope body
* @param $amqpEnvelope
* @return false when done - see php AMQP docs for consume and returning to 
the calling thread.
* @throws Exception
function processMsg( $amqpEnvelope, $consumer ) {

    try {
        if ( $amqpEnvelope == null || empty($amqpEnvelope) ){
            return false; // still blocking
        $payload = $amqpEnvelope->getBody( );
        if ($payload == null ){
            return false; // still blocking

        $deliveryTag = $amqpEnvelope->getDeliveryTag();

        trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag );
        $response = sendRemoteWebserviceRequest( $payload );
        processRemoteWebserviceResponse( $response );
        $consumer->ack( $deliveryTag );
        // return false indicating we are done (weird, but that's what the 
docs show)
        return false; 

    }catch( AMQPChannelException $e ) {
        throw $e;
    }catch( AMQPConnectionException $e ){
        throw $e;
    }catch( Exception $e ){
        // When known application errs occur, we need to ack the message 
and remove
        // it from the queue.
        if ( strpos($amqpEnvelope->getBody( ), '<error>Title already 
exists' ) !== false){
            $consumer->ack( $amqpEnvelope->getDeliveryTag() );
        throw $e;

Note that if I re-establish the connection / exchange / queue that this is 
a non-issue.  Although I don't want to have to do this.
FWIW, I have one subscriber and one queue.



-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120913/056df8d6/attachment.htm>

More information about the rabbitmq-discuss mailing list