[rabbitmq-discuss] erlang-client issue, discarding content?

Suhail Doshi digitalwarfare at gmail.com
Thu Jul 30 22:17:00 BST 2009


The only thing I can think of is....definitely not acking properly but
consumers are definitely re-processing items so it's like it acks but
doesn't get removed out of memory?
What's the proper way to ack something in python? This is what I do:

self.channel.basic_ack(data.delivery_tag)

where data comes into the recv handler and channel is
amqp.Connection(host=host, **info).channel()

On Thu, Jul 30, 2009 at 1:15 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:

> I attached some images that might provide better insight into the problem.
> You can basically see when everything went down and horribly wrong: memory
> usage, swap in/out, and load average.
> Suhail
>
>
> On Thu, Jul 30, 2009 at 12:54 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:
>
>> Here's the erlang module I use to send items to rabbit, it's largely
>> adapted from code open sourced online:
>> Dpaste of it: http://dpaste.com/73447/
>>
>> -export([amqp_lifecycle/0, send_message/5, log/2]).
>>
>> -include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit.hrl").
>>
>> -include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit_framing.hrl").
>> -include("rabbitmq-erlang-client/include/amqp_client.hrl").
>>
>> -record(rabbit_info, {channel, ticket, exchange, routing_key}).
>>
>> amqp_lifecycle() ->
>>     User = "mixpanel",
>>     Password = "mixpanel0816",
>>     Realm = <<"mixpanel">>, %% virtual_host
>>     Connection = amqp_connection:start(User, Password, "127.0.0.1",
>> Realm),
>>     Channel = amqp_connection:open_channel(Connection),
>>     Access = #'access.request'{
>>         realm = Realm,
>>         exclusive = false,
>>         passive = true,
>>          active = true,
>>         write = true,
>>         read = true
>>     },
>>     #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel,
>> Access),
>>
>>      Q = <<"storage">>,
>>     X = <<"records">>,
>>     BindKey = <<"event">>,
>>
>>     QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
>>                                     passive = false, durable = true,
>>                                     exclusive = false, auto_delete =
>> false,
>>                                     nowait = false, arguments = []},
>>
>>     #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel,
>> QueueDeclare),
>>
>>     ExchangeDeclare = #'exchange.declare'{ticket = Ticket,
>>                                           exchange = X, type =
>> <<"direct">>,
>>                                           passive = false, durable = true,
>>                                           auto_delete = false, internal =
>> false,
>>                                           nowait = false, arguments = []},
>>
>>     #'exchange.declare_ok'{} = amqp_channel:call(Channel,
>> ExchangeDeclare),
>>     QueueBind = #'queue.bind'{ticket = Ticket,
>>                               queue = Q,
>>                               exchange = X,
>>                               routing_key = BindKey,
>>                               nowait = false, arguments = []},
>>     #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
>>
>>     RabbitInfo = #'rabbit_info'{
>>         channel = Channel,
>>         ticket = Ticket,
>>         exchange = X,
>>         routing_key = BindKey
>>     },
>>     RabbitInfo.
>>
>> send_message(Channel, Ticket, X, RoutingKey, Payload) ->
>>     BasicPublish = #'basic.publish'{ticket = Ticket,
>>                                     exchange = X,
>>                                     routing_key = RoutingKey,
>>                                     mandatory = false,
>>                                     immediate = false},
>>     BasicProperties = amqp_util:basic_properties(),
>>     Properties = BasicProperties#'P_basic'{delivery_mode=2}, %%
>> Persistence plz
>>     Content = #content{class_id = 60,
>>          properties = Properties,
>>          properties_bin = none,
>>          payload_fragments_rev = [Payload]
>>         },
>>     amqp_channel:cast(Channel, BasicPublish, Content).
>>
>> log(Key,Value) ->
>>     io:format("~p: ~p~n",[Key,Value]).
>>
>>
>> I do the following to send items to the queue:
>>
>> event_queue_data(QueueInfo, Data) ->
>>     send_message(
>>         QueueInfo#'rabbit_info'.channel,
>>         QueueInfo#'rabbit_info'.ticket,
>>         QueueInfo#'rabbit_info'.exchange,
>>         QueueInfo#'rabbit_info'.routing_key,
>>         list_to_binary(Data)
>>     ),
>>     1.
>>
>> On Thu, Jul 30, 2009 at 12:52 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:
>>
>>> mplivelog1 ~: sudo /usr/sbin/rabbitmqctl list_queues -p myqueue name
>>> consumers messages messages_ready
>>>
>>> Listing queues ...
>>> storage 5 1 0
>>> ...done.
>>>
>>> I currently have it running again after a restart and that's usually the
>>> response I get, usually only 0-5 items in the queue, since it's being
>>> processed.
>>>
>>> When the error was occurring and I checked the queue, there were
>>> effectively *zero* items in the queue according to that command I ran above
>>> in the queue. In python I definitely do acknowledge items via:
>>>
>>> self.channel.basic_ack(data.delivery_tag)
>>>
>>> where self.channel is amqp.Connection(host=host, **info).channel()
>>>
>>> Suhail
>>>
>>> On Thu, Jul 30, 2009 at 12:46 PM, Matthias Radestock <
>>> matthias at lshift.net> wrote:
>>>
>>>> Suhail Doshi wrote:
>>>>
>>>>> Looking in my logs shows a large gap in time, in bold is where it is:
>>>>>
>>>>> =INFO REPORT==== 30-Jul-2009::17:26:56 ===
>>>>>    alarm_handler: {set,{system_memory_high_watermark,[]}}
>>>>>
>>>>
>>>> This indicates that rabbit got close to running out of memory and told
>>>> all connected clients to stop sending any more messages. Hence the
>>>> "Discarding ..." message you were seeing in the erlang client. Your
>>>> consumers should continue to receive messages, albeit slowly if rabbit is
>>>> swapping.
>>>>
>>>> Perhaps your consumers are not acknowledging received messages, causing
>>>> them to keep piling up at the server?
>>>>
>>>> I recommend checking the queue lengths/sizes with 'rabbitmqctl
>>>> list_queues'.
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Matthias.
>>>>
>>>
>>>
>>>
>>> --
>>> http://mixpanel.com
>>> Blog: http://blog.mixpanel.com
>>>
>>
>>
>>
>> --
>> http://mixpanel.com
>> Blog: http://blog.mixpanel.com
>>
>
>
>
> --
> http://mixpanel.com
> Blog: http://blog.mixpanel.com
>



-- 
http://mixpanel.com
Blog: http://blog.mixpanel.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090730/7ecfd676/attachment.htm 


More information about the rabbitmq-discuss mailing list