I attached some images that might provide better insight into the problem. You can basically see when everything went down and horribly wrong: memory usage, swap in/out, and load average.<div><br></div><div>Suhail<br><div>
<br><div class="gmail_quote">On Thu, Jul 30, 2009 at 12:54 PM, Suhail Doshi <span dir="ltr"><<a href="mailto:digitalwarfare@gmail.com">digitalwarfare@gmail.com</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex;">
Here's the erlang module I use to send items to rabbit, it's largely adapted from code open sourced online:<div><br></div><div>Dpaste of it: <a href="http://dpaste.com/73447/" target="_blank">http://dpaste.com/73447/</a></div>
<div>
<br></div><div><div>-export([amqp_lifecycle/0, send_message/5, log/2]).</div><div><br></div><div>-include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit.hrl").</div><div>-include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit_framing.hrl").</div>
<div>-include("rabbitmq-erlang-client/include/amqp_client.hrl").</div><div><br></div><div>-record(rabbit_info, {channel, ticket, exchange, routing_key}).</div><div><br></div><div>amqp_lifecycle() -></div><div>
User = "mixpanel",</div><div> Password = "mixpanel0816",</div><div> Realm = <<"mixpanel">>, %% virtual_host</div><div> Connection = amqp_connection:start(User, Password, "127.0.0.1", Realm),</div>
<div> Channel = amqp_connection:open_channel(Connection),</div><div> Access = #'access.request'{</div><div> realm = Realm,</div><div> exclusive = false,</div><div> passive = true,</div>
<div>
active = true,</div><div> write = true,</div><div> read = true</div><div> },</div><div> #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel, Access),</div><div> </div>
<div>
Q = <<"storage">>,</div><div> X = <<"records">>,</div><div> BindKey = <<"event">>,</div><div> </div><div> QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,</div>
<div> passive = false, durable = true,</div><div> exclusive = false, auto_delete = false,</div><div> nowait = false, arguments = []},</div>
<div> </div><div> #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, QueueDeclare),</div><div> </div><div> ExchangeDeclare = #'exchange.declare'{ticket = Ticket,</div>
<div> exchange = X, type = <<"direct">>,</div><div> passive = false, durable = true,</div><div> auto_delete = false, internal = false,</div>
<div> nowait = false, arguments = []},</div><div> </div><div> #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),</div>
<div> QueueBind = #'queue.bind'{ticket = Ticket,</div><div> queue = Q,</div><div> exchange = X,</div><div> routing_key = BindKey,</div>
<div> nowait = false, arguments = []},</div><div> #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),</div><div> </div><div> RabbitInfo = #'rabbit_info'{</div>
<div> channel = Channel,</div><div> ticket = Ticket,</div><div> exchange = X,</div><div> routing_key = BindKey</div><div> },</div><div> RabbitInfo.</div><div> </div><div>send_message(Channel, Ticket, X, RoutingKey, Payload) -></div>
<div> BasicPublish = #'basic.publish'{ticket = Ticket,</div><div> exchange = X,</div><div> routing_key = RoutingKey,</div><div> mandatory = false,</div>
<div> immediate = false},</div><div> BasicProperties = amqp_util:basic_properties(),</div><div> Properties = BasicProperties#'P_basic'{delivery_mode=2}, %% Persistence plz</div>
<div> Content = #content{class_id = 60,</div><div> properties = Properties,</div><div> properties_bin = none,</div><div> payload_fragments_rev = [Payload]</div><div> },</div><div> amqp_channel:cast(Channel, BasicPublish, Content).</div>
<div><br></div><div>log(Key,Value) -></div><div> io:format("~p: ~p~n",[Key,Value]).</div><div><br></div><div><br></div><div>I do the following to send items to the queue:</div><div><br></div><div><div>event_queue_data(QueueInfo, Data) -></div>
<div> send_message(</div><div> QueueInfo#'rabbit_info'.channel,</div><div> QueueInfo#'rabbit_info'.ticket,</div><div> QueueInfo#'rabbit_info'.exchange,</div><div> QueueInfo#'rabbit_info'.routing_key,</div>
<div> list_to_binary(Data)</div><div> ),</div><div> 1.</div></div><div><div></div><div class="h5"><br><div class="gmail_quote">On Thu, Jul 30, 2009 at 12:52 PM, Suhail Doshi <span dir="ltr"><<a href="mailto:digitalwarfare@gmail.com" target="_blank">digitalwarfare@gmail.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div>mplivelog1 ~: sudo /usr/sbin/rabbitmqctl list_queues -p myqueue name consumers messages messages_ready</div><div>
<br></div><div>Listing queues ...</div><div>storage<span style="white-space:pre">        </span>5<span style="white-space:pre">        </span>1<span style="white-space:pre">        </span>0</div>
<div>...done.</div><div><br></div><div>I currently have it running again after a restart and that's usually the response I get, usually only 0-5 items in the queue, since it's being processed.</div><div><br></div>
<div>When the error was occurring and I checked the queue, there were effectively *zero* items in the queue according to that command I ran above in the queue. In python I definitely do acknowledge items via:</div><div><br>
</div><div>self.channel.basic_ack(data.delivery_tag)</div><div><br></div><div>where self.channel is amqp.Connection(host=host, **info).channel()</div><div><br></div><font color="#888888"><div>Suhail</div></font><div><div>
</div><div><br><div class="gmail_quote">On Thu, Jul 30, 2009 at 12:46 PM, Matthias Radestock <span dir="ltr"><<a href="mailto:matthias@lshift.net" target="_blank">matthias@lshift.net</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div>Suhail Doshi wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
Looking in my logs shows a large gap in time, in bold is where it is:<br>
<br>
=INFO REPORT==== 30-Jul-2009::17:26:56 ===<br>
alarm_handler: {set,{system_memory_high_watermark,[]}}<br>
</blockquote>
<br></div>
This indicates that rabbit got close to running out of memory and told all connected clients to stop sending any more messages. Hence the "Discarding ..." message you were seeing in the erlang client. Your consumers should continue to receive messages, albeit slowly if rabbit is swapping.<br>
<br>
Perhaps your consumers are not acknowledging received messages, causing them to keep piling up at the server?<br>
<br>
I recommend checking the queue lengths/sizes with 'rabbitmqctl list_queues'.<br>
<br>
<br>
Regards,<br><font color="#888888">
<br>
Matthias.<br>
</font></blockquote></div><br><br clear="all"><br></div></div><div><div></div><div>-- <br><a href="http://mixpanel.com" target="_blank">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com" target="_blank">http://blog.mixpanel.com</a><br>
</div></div></blockquote></div><br><br clear="all"><br>-- <br><a href="http://mixpanel.com" target="_blank">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com" target="_blank">http://blog.mixpanel.com</a><br>
</div></div></div>
</blockquote></div><br><br clear="all"><br>-- <br><a href="http://mixpanel.com">http://mixpanel.com</a><br>Blog: <a href="http://blog.mixpanel.com">http://blog.mixpanel.com</a><br>
</div></div>