The only thing I can think of is....definitely not acking properly but consumers are definitely re-processing items so it&#39;s like it acks but doesn&#39;t get removed out of memory?<div><br></div><div>What&#39;s the proper way to ack something in python? This is what I do:</div>

<div><br></div><div>self.channel.basic_ack(data.delivery_tag)<br></div><div><br></div><div>where data comes into the recv handler and channel is amqp.Connection(host=host, **info).channel()</div><div><div><br><div class="gmail_quote">

On Thu, Jul 30, 2009 at 1:15 PM, Suhail Doshi <span dir="ltr">&lt;<a href="mailto:digitalwarfare@gmail.com">digitalwarfare@gmail.com</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">

I attached some images that might provide better insight into the problem. You can basically see when everything went down and horribly wrong: memory usage, swap in/out, and load average.<div><br></div><div><font color="#888888">Suhail</font><div>

<div></div><div class="h5"><br><div>
<br><div class="gmail_quote">On Thu, Jul 30, 2009 at 12:54 PM, Suhail Doshi <span dir="ltr">&lt;<a href="mailto:digitalwarfare@gmail.com" target="_blank">digitalwarfare@gmail.com</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">


Here&#39;s the erlang module I use to send items to rabbit, it&#39;s largely adapted from code open sourced online:<div><br></div><div>Dpaste of it: <a href="http://dpaste.com/73447/" target="_blank">http://dpaste.com/73447/</a></div>


<div>
<br></div><div><div>-export([amqp_lifecycle/0, send_message/5, log/2]).</div><div><br></div><div>-include_lib(&quot;rabbitmq-erlang-client/rabbitmq_server/include/rabbit.hrl&quot;).</div><div>-include_lib(&quot;rabbitmq-erlang-client/rabbitmq_server/include/rabbit_framing.hrl&quot;).</div>



<div>-include(&quot;rabbitmq-erlang-client/include/amqp_client.hrl&quot;).</div><div><br></div><div>-record(rabbit_info, {channel, ticket, exchange, routing_key}).</div><div><br></div><div>amqp_lifecycle() -&gt;</div><div>



    User = &quot;mixpanel&quot;,</div><div>    Password = &quot;mixpanel0816&quot;,</div><div>    Realm = &lt;&lt;&quot;mixpanel&quot;&gt;&gt;, %% virtual_host</div><div>    Connection = amqp_connection:start(User, Password, &quot;127.0.0.1&quot;, Realm),</div>



<div>    Channel = amqp_connection:open_channel(Connection),</div><div>    Access = #&#39;access.request&#39;{</div><div>        realm = Realm,</div><div>        exclusive = false,</div><div>        passive = true,</div>


<div>
        active = true,</div><div>        write = true,</div><div>        read = true</div><div>    },</div><div>    #&#39;access.request_ok&#39;{ticket = Ticket} = amqp_channel:call(Channel, Access),</div><div>    </div>


<div>
    Q = &lt;&lt;&quot;storage&quot;&gt;&gt;,</div><div>    X = &lt;&lt;&quot;records&quot;&gt;&gt;,</div><div>    BindKey = &lt;&lt;&quot;event&quot;&gt;&gt;,</div><div> </div><div>    QueueDeclare = #&#39;queue.declare&#39;{ticket = Ticket, queue = Q,</div>



<div>                                    passive = false, durable = true,</div><div>                                    exclusive = false, auto_delete = false,</div><div>                                    nowait = false, arguments = []},</div>



<div>                                    </div><div>    #&#39;queue.declare_ok&#39;{queue = Q} = amqp_channel:call(Channel, QueueDeclare),</div><div> </div><div>    ExchangeDeclare = #&#39;exchange.declare&#39;{ticket = Ticket,</div>



<div>                                          exchange = X, type = &lt;&lt;&quot;direct&quot;&gt;&gt;,</div><div>                                          passive = false, durable = true,</div><div>                                          auto_delete = false, internal = false,</div>



<div>                                          nowait = false, arguments = []},</div><div>                                          </div><div>    #&#39;exchange.declare_ok&#39;{} = amqp_channel:call(Channel, ExchangeDeclare),</div>



<div>    QueueBind = #&#39;queue.bind&#39;{ticket = Ticket,</div><div>                              queue = Q,</div><div>                              exchange = X,</div><div>                              routing_key = BindKey,</div>



<div>                              nowait = false, arguments = []},</div><div>    #&#39;queue.bind_ok&#39;{} = amqp_channel:call(Channel, QueueBind),</div><div>    </div><div>    RabbitInfo = #&#39;rabbit_info&#39;{</div>



<div>        channel = Channel,</div><div>        ticket = Ticket,</div><div>        exchange = X,</div><div>        routing_key = BindKey</div><div>    },</div><div>    RabbitInfo.</div><div>    </div><div>send_message(Channel, Ticket, X, RoutingKey, Payload) -&gt;</div>



<div>    BasicPublish = #&#39;basic.publish&#39;{ticket = Ticket,</div><div>                                    exchange = X,</div><div>                                    routing_key = RoutingKey,</div><div>                                    mandatory = false,</div>



<div>                                    immediate = false},</div><div>    BasicProperties = amqp_util:basic_properties(),</div><div>    Properties = BasicProperties#&#39;P_basic&#39;{delivery_mode=2}, %% Persistence plz</div>



<div>    Content = #content{class_id = 60,</div><div>         properties = Properties,</div><div>         properties_bin = none,</div><div>         payload_fragments_rev = [Payload]</div><div>        },</div><div>    amqp_channel:cast(Channel, BasicPublish, Content).</div>



<div><br></div><div>log(Key,Value) -&gt;</div><div>    io:format(&quot;~p: ~p~n&quot;,[Key,Value]).</div><div><br></div><div><br></div><div>I do the following to send items to the queue:</div><div><br></div><div><div>event_queue_data(QueueInfo, Data) -&gt;</div>



<div>    send_message(</div><div>        QueueInfo#&#39;rabbit_info&#39;.channel,</div><div>        QueueInfo#&#39;rabbit_info&#39;.ticket,</div><div>        QueueInfo#&#39;rabbit_info&#39;.exchange,</div><div>        QueueInfo#&#39;rabbit_info&#39;.routing_key,</div>



<div>        list_to_binary(Data)</div><div>    ),</div><div>    1.</div></div><div><div></div><div><br><div class="gmail_quote">On Thu, Jul 30, 2009 at 12:52 PM, Suhail Doshi <span dir="ltr">&lt;<a href="mailto:digitalwarfare@gmail.com" target="_blank">digitalwarfare@gmail.com</a>&gt;</span> wrote:<br>



<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div>mplivelog1 ~: sudo /usr/sbin/rabbitmqctl list_queues -p myqueue name consumers messages messages_ready</div><div>



<br></div><div>Listing queues ...</div><div>storage<span style="white-space:pre">        </span>5<span style="white-space:pre">        </span>1<span style="white-space:pre">        </span>0</div>
<div>...done.</div><div><br></div><div>I currently have it running again after a restart and that&#39;s usually the response I get, usually only 0-5 items in the queue, since it&#39;s being processed.</div><div><br></div>




<div>When the error was occurring and I checked the queue, there were effectively *zero* items in the queue according to that command I ran above in the queue. In python I definitely do acknowledge items via:</div><div><br>




</div><div>self.channel.basic_ack(data.delivery_tag)</div><div><br></div><div>where self.channel is amqp.Connection(host=host, **info).channel()</div><div><br></div><font color="#888888"><div>Suhail</div></font><div><div>



</div><div><br><div class="gmail_quote">On Thu, Jul 30, 2009 at 12:46 PM, Matthias Radestock <span dir="ltr">&lt;<a href="mailto:matthias@lshift.net" target="_blank">matthias@lshift.net</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div>Suhail Doshi wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
Looking in my logs shows a large gap in time, in bold is where it is:<br>
<br>
=INFO REPORT==== 30-Jul-2009::17:26:56 ===<br>
    alarm_handler: {set,{system_memory_high_watermark,[]}}<br>
</blockquote>
<br></div>
This indicates that rabbit got close to running out of memory and told all connected clients to stop sending any more messages. Hence the &quot;Discarding ...&quot; message you were seeing in the erlang client. Your consumers should continue to receive messages, albeit slowly if rabbit is swapping.<br>





<br>
Perhaps your consumers are not acknowledging received messages, causing them to keep piling up at the server?<br>
<br>
I recommend checking the queue lengths/sizes with &#39;rabbitmqctl list_queues&#39;.<br>
<br>
<br>
Regards,<br><font color="#888888">
<br>
Matthias.<br>
</font></blockquote></div><br><br clear="all"><br></div></div><div><div></div><div>-- <br><a href="http://mixpanel.com" target="_blank">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com" target="_blank">http://blog.mixpanel.com</a><br>




</div></div></blockquote></div><br><br clear="all"><br>-- <br><a href="http://mixpanel.com" target="_blank">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com" target="_blank">http://blog.mixpanel.com</a><br>



</div></div></div>
</blockquote></div><br><br clear="all"><br>-- <br><a href="http://mixpanel.com" target="_blank">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com" target="_blank">http://blog.mixpanel.com</a><br>
</div></div></div></div>
</blockquote></div><br><br clear="all"><br>-- <br><a href="http://mixpanel.com">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com">http://blog.mixpanel.com</a><br>
</div></div>