Hi All.<div><br></div><div>When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages in the queue, &nbsp;I can read all of the pending messages from the queue. &nbsp;I am doing this using consume( 'processMsg') which is a blocking call.</div><div><br></div><div>The strange thing is, after I process and ACK the pending messages on the Queue, any new messages I add to the queue are not "picked up" by the consume callback. &nbsp;In other words, it seems as if the exchange + channel conn has gone stale.</div><div><br></div><div>If I restart my client, the pending messages are picked up without a problem.</div><div><br></div><div>It appears the consume call expires (for lack of better term) almost exactly @ the 2 minute mark.&nbsp;</div><div>My broker is SwiftMQ. &nbsp;Unfortunately, I don't have access to config settings on the broker at the moment.</div><div><br></div><div>Any help would be greatly appreciated. &nbsp;BTW, I saw the same problem with STOMP, which I believe is no longer supported anyway.</div><div><br></div><div><br></div><div>Here is my main loop:</div><div><br></div><div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // run forever</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while(1) {</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ( $this-&gt;conn == null ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self::init();</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ( ! $this-&gt;channel-&gt;isConnected() ) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self::trace(__METHOD__ . " " . " Channel is not connected, reconnecting...");</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self::init( );</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // keep our connection alive</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ( isset( $this-&gt;conn )){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ( $this-&gt;conn-&gt;isConnected() == false ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // reinit</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self::init();</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $result = $this-&gt;readMsg( );</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }catch( Exception $e ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self::trace(__METHOD__ . " Error: " . $e-&gt;getTraceAsString() &nbsp;);</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }</div></div><div><br></div><div>Here is the readMsg( ) method:</div><div><br></div><div><div>&nbsp; &nbsp; &nbsp; &nbsp; /**</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* read a message from the Broker queue</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* @return &nbsp;$result (true or false)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public function readMsg( &nbsp;) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // read the messages from the queue, acknowledge if no errs so</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the messages are removed from the queue. &nbsp;If we don't ack,&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // messages will remain in the queue and will be marked as "redelivered".</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // CALLBACK</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $result = $this-&gt;consumer-&gt;consume( 'processMsg' );</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return $result;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div></div><div><br></div><div>And here is the callback:</div><div><br></div><div><div>/**</div><div>&nbsp;* CALLBACK - Note we are called via queue-&gt;consume which uses Synchronous I/O (Blocking)</div><div>&nbsp;*&nbsp;</div><div>* process the envelope body</div><div>* @param $amqpEnvelope</div><div>* @return false when done - see php AMQP docs for consume and returning to the calling thread.</div><div>* @throws Exception</div><div>*/</div><div>function processMsg( $amqpEnvelope, $consumer ) {</div><div><br></div><div>&nbsp; &nbsp; try {</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if ( $amqpEnvelope == null || empty($amqpEnvelope) ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return false; // still blocking</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; $payload = $amqpEnvelope-&gt;getBody( );</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if ($payload == null ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return false; // still blocking</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; $deliveryTag = $amqpEnvelope-&gt;getDeliveryTag();</div><div><br></div><div>&nbsp; &nbsp; &nbsp; &nbsp; trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag );</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; $response = sendRemoteWebserviceRequest( $payload );</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; processRemoteWebserviceResponse( $response );</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; $consumer-&gt;ack( $deliveryTag );</div><div>&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; // return false indicating we are done (weird, but that's what the docs show)</div><div>&nbsp; &nbsp; &nbsp; &nbsp; return false;&nbsp;</div><div><br></div><div><br></div><div>&nbsp; &nbsp; }catch( AMQPChannelException $e ) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; throw $e;</div><div>&nbsp; &nbsp; }catch( AMQPConnectionException $e ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; throw $e;</div><div>&nbsp; &nbsp; }catch( Exception $e ){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; //</div><div>&nbsp; &nbsp; &nbsp; &nbsp; // When known application errs occur, we need to ack the message and remove</div><div>&nbsp; &nbsp; &nbsp; &nbsp; // it from the queue.</div><div>&nbsp; &nbsp; &nbsp; &nbsp; if ( strpos($amqpEnvelope-&gt;getBody( ), '&lt;error&gt;Title already exists' ) !== false){</div><div>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $consumer-&gt;ack( $amqpEnvelope-&gt;getDeliveryTag() );</div><div>&nbsp; &nbsp; &nbsp; &nbsp; }</div><div>&nbsp; &nbsp; &nbsp; &nbsp; throw $e;</div><div>&nbsp; &nbsp; }</div><div>}</div></div><div><br></div><div><br></div><div>Note that if I re-establish the connection / exchange / queue that this is a non-issue. &nbsp;Although I don't want to have to do this.</div><div>FWIW, I have one subscriber and one queue.</div><div><br></div><div>Best,</div><div><br></div><div>John</div><div><br></div>