[rabbitmq-discuss] RabbitMQ AMQP :- Socket error: Broken pipe on high load

dharmanshu at ophio.co.in dharmanshu at ophio.co.in
Sun Apr 7 15:20:54 BST 2013


Hey , 
So i am running rabbitmq 3.02 on Ubuntu 12.04. Using PECL AMQP client. 
Using 1 queue, 35 consumers. The consumer fetches an xml object and saves 
it to the  db.

And 1 reader that publishes messages using a for loop.

After publishing around 120,000 messages , the reader gives out the 
following error on publish.
 
Socket error: Broken pipe.

The reader process then needs to be killed manually and the broker needs to 
be restarted. 
Any idea how to handle this ?

Message Queue class

class AppMessageQueue {
    
    public function __construct() {
$this->_createNewConnectionAndChannel();
    }
    
    public function __destruct() {
    if ($this->conn) {
        $this->conn->disconnect();
    }
    }
    
public function purge($queue) {
$this->ch->queue_purge($queue);
}
    public function startConsuming(Parsable $parser) {
        $this->messageParser = $parser;
        //$ch->basic_consume(queue, consumer_tag, no_local, no_ack, 
exclusive, nowait, callback);
$this->queue = new AMQPQueue($this->ch);
$this->queue->setName(MQ_QUEUE);
$this->queue->declare();
$this->queue->bind(MQ_EXCHANGE, MQ_ROUTING_KEY);
$this->queue->consume(array($this , 'processMessage') , AMQP_AUTOACK);
    }
    
    public function processMessage($msg) {
return $this->messageParser->parse($msg->getBody());
    }
    
    public function publishMessage($message) {
try {
$this->exchange->publish($message , MQ_ROUTING_KEY);
} catch (Exception $e) {
echo "Caught exception : " . $e->getMessage();
}
}

protected function _createNewConnectionAndChannel() {

$this->conn = new AMQPConnection(array(
'host' => 'localhost' ,
'port' => 5672 ,
'login' =>  MQ_USER,
'password' => MQ_PASS
));
$this->conn->connect();
$this->ch = new AMQPChannel($this->conn);
 $this->exchange = new AMQPExchange($this->ch);
$this->exchange->setName(MQ_EXCHANGE);
$this->exchange->setType('direct');
$this->exchange->declare();
}
 
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20130407/ad3d7989/attachment.htm>


More information about the rabbitmq-discuss mailing list