[rabbitmq-discuss] amqp 1.0.4 blocking consume seems to block forever even though new messages are on the queue
servobit
johnc at codecobblers.com
Thu Sep 13 19:07:51 BST 2012
Hi All.
When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages
in the queue, I can read all of the pending messages from the queue. I am
doing this using consume( 'processMsg') which is a blocking call.
The strange thing is, after I process and ACK the pending messages on the
Queue, any new messages I add to the queue are not "picked up" by the
consume callback. In other words, it seems as if the exchange + channel
conn has gone stale.
If I restart my client, the pending messages are picked up without a
problem.
It appears the consume call expires (for lack of better term) almost
exactly @ the 2 minute mark.
My broker is SwiftMQ. Unfortunately, I don't have access to config
settings on the broker at the moment.
Any help would be greatly appreciated. BTW, I saw the same problem with
STOMP, which I believe is no longer supported anyway.
Here is my main loop:
// run forever
while(1) {
try {
if ( $this->conn == null ){
self::init();
}
if ( ! $this->channel->isConnected() ) {
self::trace(__METHOD__ . " " . " Channel is not
connected, reconnecting...");
self::init( );
}
// keep our connection alive
if ( isset( $this->conn )){
if ( $this->conn->isConnected() == false ){
// reinit
self::init();
}
}
$result = $this->readMsg( );
}catch( Exception $e ){
self::trace(__METHOD__ . " Error: " .
$e->getTraceAsString() );
}
}
Here is the readMsg( ) method:
/**
* read a message from the Broker queue
* @return $result (true or false)
*/
public function readMsg( ) {
// read the messages from the queue, acknowledge if no errs so
// the messages are removed from the queue. If we don't ack,
// messages will remain in the queue and will be marked as
"redelivered".
// CALLBACK
$result = $this->consumer->consume( 'processMsg' );
return $result;
}
And here is the callback:
/**
* CALLBACK - Note we are called via queue->consume which uses Synchronous
I/O (Blocking)
*
* process the envelope body
* @param $amqpEnvelope
* @return false when done - see php AMQP docs for consume and returning to
the calling thread.
* @throws Exception
*/
function processMsg( $amqpEnvelope, $consumer ) {
try {
if ( $amqpEnvelope == null || empty($amqpEnvelope) ){
return false; // still blocking
}
$payload = $amqpEnvelope->getBody( );
if ($payload == null ){
return false; // still blocking
}
$deliveryTag = $amqpEnvelope->getDeliveryTag();
trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag );
$response = sendRemoteWebserviceRequest( $payload );
processRemoteWebserviceResponse( $response );
$consumer->ack( $deliveryTag );
// return false indicating we are done (weird, but that's what the
docs show)
return false;
}catch( AMQPChannelException $e ) {
throw $e;
}catch( AMQPConnectionException $e ){
throw $e;
}catch( Exception $e ){
//
// When known application errs occur, we need to ack the message
and remove
// it from the queue.
if ( strpos($amqpEnvelope->getBody( ), '<error>Title already
exists' ) !== false){
$consumer->ack( $amqpEnvelope->getDeliveryTag() );
}
throw $e;
}
}
Note that if I re-establish the connection / exchange / queue that this is
a non-issue. Although I don't want to have to do this.
FWIW, I have one subscriber and one queue.
Best,
John
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120913/056df8d6/attachment.htm>
More information about the rabbitmq-discuss
mailing list