<p>I developed an android app where it subscribes to a queue and also
publishes to other queues. At a time it publishes the same message to
two different queues, one of them is a queue named "Queue" and now from a
appfog instance i need to subscribe to the "Queue" and consume messages
and insert them in a mysql db.</p>
<p>I created a php standalone app for the above purpose with
codeigniter. By some reason the worker app loses its connection to
rabbitmq. i would like to know the best way to do this. How can a worker
app on appfog can sustain the application restarts.</p>
<p>what of kind of thing i need to use to solve the above problem.</p>
<p>I figured that the problem is not with rabbitmq connection. it is
with the code related to mysql inserts. i checked the crash logs of my
app and the error is "Mysql gone away". an example of php rabbitmq
consumer has call backs for receive message and register_shutdown. in
receive call back i can not use $this of code igniter because its out of
scope and i was using get_instance(). i am not sure how to call a
method from rabbitmq client receive call back function</p>
<p>The controller is <br></p><p><br></p><pre style="" class="default prettyprint prettyprinted"><code><span class="pun"><?</span><span class="pln">php
</span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(!</span><span class="kwd">defined</span><span class="pun">(</span><span class="str">'BASEPATH'</span><span class="pun">))</span><span class="pln">
</span><span class="kwd">exit</span><span class="pun">(</span><span class="str">'No direct script access allowed'</span><span class="pun">);</span><span class="pln">
include</span><span class="pun">(</span><span class="pln">__DIR__ </span><span class="pun">.</span><span class="pln"> </span><span class="str">'/php-amqplib/config.php'</span><span class="pun">);</span><span class="pln">
</span><span class="kwd">use</span><span class="pln"> </span><span class="typ">PhpAmqpLib</span><span class="pln">\Connection\A</span><span class="typ">MQPConnection</span><span class="pun">;</span><span class="pln">
</span><span class="kwd">class</span><span class="pln"> </span><span class="typ">Welcome</span><span class="pln"> </span><span class="kwd">extends</span><span class="pln"> CI_Controller </span><span class="pun">{</span><span class="pln">
</span><span class="kwd">public</span><span class="pln"> </span><span class="kwd">function</span><span class="pln"> __construct</span><span class="pun">()</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
parent</span><span class="pun">::</span><span class="pln">__construct</span><span class="pun">();</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
</span><span class="kwd">public</span><span class="pln"> </span><span class="kwd">function</span><span class="pln"> index</span><span class="pun">()</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
</span><span class="com">//connect to rabbitmq and consume messages</span><span class="pln">
</span><span class="com">//insert messages to mysql</span><span class="pln">
</span><span class="com">//$this->messages = array();</span><span class="pln">
$exchange </span><span class="pun">=</span><span class="pln"> </span><span class="str">"router"</span><span class="pun">;</span><span class="pln">
$queue </span><span class="pun">=</span><span class="pln"> </span><span class="str">"abbiya"</span><span class="pun">;</span><span class="pln">
$conn </span><span class="pun">=</span><span class="pln"> </span><span class="kwd">new</span><span class="pln"> </span><span class="typ">AMQPConnection</span><span class="pun">(</span><span class="pln">HOST</span><span class="pun">,</span><span class="pln"> PORT</span><span class="pun">,</span><span class="pln"> USER</span><span class="pun">,</span><span class="pln"> PASS</span><span class="pun">,</span><span class="pln"> VHOST</span><span class="pun">);</span><span class="pln">
$ch </span><span class="pun">=</span><span class="pln"> $conn</span><span class="pun">-></span><span class="pln">channel</span><span class="pun">();</span><span class="pln">
</span><span class="com">/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/</span><span class="pln">
$ch</span><span class="pun">-></span><span class="pln">queue_declare</span><span class="pun">(</span><span class="pln">$queue</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">true</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">);</span><span class="pln">
$ch</span><span class="pun">-></span><span class="pln">queue_bind</span><span class="pun">(</span><span class="pln">$queue</span><span class="pun">,</span><span class="pln"> $exchange</span><span class="pun">,</span><span class="pln"> $queue</span><span class="pun">);</span><span class="pln">
</span><span class="com">/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/</span><span class="pln">
$consumer_tag </span><span class="pun">=</span><span class="pln"> </span><span class="str">"abbiya"</span><span class="pun">;</span><span class="pln">
$ch</span><span class="pun">-></span><span class="pln">basic_recover</span><span class="pun">(</span><span class="kwd">true</span><span class="pun">);</span><span class="pln">
$ch</span><span class="pun">-></span><span class="pln">basic_consume</span><span class="pun">(</span><span class="pln">$queue</span><span class="pun">,</span><span class="pln"> $consumer_tag</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">false</span><span class="pun">,</span><span class="pln"> </span><span class="kwd">function</span><span class="pun">(</span><span class="pln">$msg</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
$message_body </span><span class="pun">=</span><span class="pln"> json_decode</span><span class="pun">(</span><span class="pln">$msg</span><span class="pun">-></span><span class="pln">body</span><span class="pun">);</span><span class="pln">
$msg</span><span class="pun">-></span><span class="pln">delivery_info</span><span class="pun">[</span><span class="str">'channel'</span><span class="pun">]-></span><span class="pln">
basic_ack</span><span class="pun">(</span><span class="pln">$msg</span><span class="pun">-></span><span class="pln">delivery_info</span><span class="pun">[</span><span class="str">'delivery_tag'</span><span class="pun">]);</span><span class="pln">
</span><span class="com">// Send a message with the string "quit" to cancel the consumer.</span><span class="pln">
</span><span class="kwd">if</span><span class="pln"> </span><span class="pun">(</span><span class="pln">$msg</span><span class="pun">-></span><span class="pln">body </span><span class="pun">===</span><span class="pln"> </span><span class="str">'quit'</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
$msg</span><span class="pun">-></span><span class="pln">delivery_info</span><span class="pun">[</span><span class="str">'channel'</span><span class="pun">]-></span><span class="pln">
basic_cancel</span><span class="pun">(</span><span class="pln">$msg</span><span class="pun">-></span><span class="pln">delivery_info</span><span class="pun">[</span><span class="str">'consumer_tag'</span><span class="pun">]);</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
$data </span><span class="pun">=</span><span class="pln"> array</span><span class="pun">(</span><span class="pln">
</span><span class="str">'sender_id'</span><span class="pln"> </span><span class="pun">=></span><span class="pln"> $message_body</span><span class="pun">-></span><span class="pln">r</span><span class="pun">,</span><span class="pln">
</span><span class="str">'receiver_id'</span><span class="pln"> </span><span class="pun">=></span><span class="pln"> $message_body</span><span class="pun">-></span><span class="pln">s</span><span class="pun">,</span><span class="pln">
</span><span class="str">'message_content'</span><span class="pln"> </span><span class="pun">=></span><span class="pln"> $message_body</span><span class="pun">-></span><span class="pln">m</span><span class="pun">,</span><span class="pln">
</span><span class="com">// 'sent_time' => $message_body->t,</span><span class="pln">
</span><span class="str">'status'</span><span class="pln"> </span><span class="pun">=></span><span class="pln"> </span><span class="lit">0</span><span class="pln">
</span><span class="pun">);</span><span class="pln">
$ci </span><span class="pun">=&</span><span class="pln"> get_instance</span><span class="pun">();</span><span class="pln">
$ci</span><span class="pun">-></span><span class="typ">Message_model</span><span class="pun">-></span><span class="pln">newMessage</span><span class="pun">(</span><span class="pln">$data</span><span class="pun">);</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
</span><span class="pun">);</span><span class="pln">
</span><span class="com">// Loop as long as the channel has callbacks registered</span><span class="pln">
</span><span class="kwd">while</span><span class="pln"> </span><span class="pun">(</span><span class="pln">count</span><span class="pun">(</span><span class="pln">$ch</span><span class="pun">-></span><span class="pln">callbacks</span><span class="pun">))</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
$ch</span><span class="pun">-></span><span class="pln">wait</span><span class="pun">();</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
register_shutdown_function</span><span class="pun">(</span><span class="kwd">function</span><span class="pun">()</span><span class="pln"> </span><span class="kwd">use</span><span class="pln"> </span><span class="pun">(</span><span class="pln">$ch</span><span class="pun">,</span><span class="pln"> $conn</span><span class="pun">)</span><span class="pln"> </span><span class="pun">{</span><span class="pln">
$ch</span><span class="pun">-></span><span class="pln">close</span><span class="pun">();</span><span class="pln">
$conn</span><span class="pun">-></span><span class="pln">close</span><span class="pun">();</span><span class="pln">
$this</span><span class="pun">-></span><span class="pln">index</span><span class="pun">();</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
</span><span class="pun">);</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
</span><span class="pun">}</span><span class="pln">
</span><span class="com">/* End of file welcome.php */</span><span class="pln">
</span><span class="com">/* Location: ./application/controllers/welcome.php */</span></code></pre>