Hey ,&nbsp;<div>So i am running rabbitmq 3.02 on Ubuntu 12.04. Using PECL AMQP client.&nbsp;</div><div>Using 1 queue, 35 consumers. The consumer fetches an xml object and saves it to the &nbsp;db.</div><div><br></div><div>And 1 reader that publishes messages using a for loop.</div><div><br></div><div>After publishing around 120,000 messages , the reader gives out the following error on publish.</div><div>&nbsp;</div><div>Socket error: Broken pipe.<br></div><div><br></div><div>The reader process then needs to be killed manually and the broker needs to be restarted.&nbsp;</div><div>Any idea how to handle this ?</div><div><br></div><div>Message Queue class</div><div><br></div><div><div>class AppMessageQueue {</div><div>&nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; public function __construct() {<br></div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;_createNewConnectionAndChannel();</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; public function __destruct() {</div><div>&nbsp; &nbsp; <span class="Apple-tab-span" style="white-space:pre">        </span>if ($this-&gt;conn) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; <span class="Apple-tab-span" style="white-space:pre">        </span>$this-&gt;conn-&gt;disconnect();</div><div>&nbsp; &nbsp; <span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp;&nbsp;</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>public function purge($queue) {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;ch-&gt;queue_purge($queue);</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><span class="Apple-tab-span" style="white-space:pre">        </span></div><div>&nbsp; &nbsp; public function startConsuming(Parsable $parser) {</div><div>&nbsp; &nbsp; &nbsp; &nbsp; $this-&gt;messageParser = $parser;</div><div>&nbsp; &nbsp; &nbsp; &nbsp; //$ch-&gt;basic_consume(queue, consumer_tag, no_local, no_ack, exclusive, nowait, callback);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;queue = new AMQPQueue($this-&gt;ch);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;queue-&gt;setName(MQ_QUEUE);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;queue-&gt;declare();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;queue-&gt;bind(MQ_EXCHANGE, MQ_ROUTING_KEY);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;queue-&gt;consume(array($this , 'processMessage') , AMQP_AUTOACK);</div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; public function processMessage($msg) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>return $this-&gt;messageParser-&gt;parse($msg-&gt;getBody());</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span></div><div>&nbsp; &nbsp; }</div><div>&nbsp; &nbsp;&nbsp;</div><div>&nbsp; &nbsp; public function publishMessage($message) {</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>try {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>$this-&gt;exchange-&gt;publish($message , MQ_ROUTING_KEY);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>} catch (Exception $e) {</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>echo "Caught exception : " . $e-&gt;getMessage();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>}</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">        </span>protected function _createNewConnectionAndChannel() {</div><div><br></div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;conn = new AMQPConnection(array(</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>'host' =&gt; 'localhost' ,</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>'port' =&gt; 5672 ,</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>'login' =&gt; &nbsp;MQ_USER,</div><div><span class="Apple-tab-span" style="white-space:pre">                        </span>'password' =&gt; MQ_PASS</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>));</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;conn-&gt;connect();</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;ch = new AMQPChannel($this-&gt;conn);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span></div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;exchange = new AMQPExchange($this-&gt;ch);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;exchange-&gt;setName(MQ_EXCHANGE);</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;exchange-&gt;setType('direct');</div><div><span class="Apple-tab-span" style="white-space:pre">                </span>$this-&gt;exchange-&gt;declare();</div><div><span class="Apple-tab-span" style="white-space:pre">        </span>}</div><div>&nbsp;</div><div>}</div></div>