[rabbitmq-discuss] erlang-client issue, discarding content?
Suhail Doshi
digitalwarfare at gmail.com
Thu Jul 30 22:17:00 BST 2009
The only thing I can think of is....definitely not acking properly but
consumers are definitely re-processing items so it's like it acks but
doesn't get removed out of memory?
What's the proper way to ack something in python? This is what I do:
self.channel.basic_ack(data.delivery_tag)
where data comes into the recv handler and channel is
amqp.Connection(host=host, **info).channel()
On Thu, Jul 30, 2009 at 1:15 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:
> I attached some images that might provide better insight into the problem.
> You can basically see when everything went down and horribly wrong: memory
> usage, swap in/out, and load average.
> Suhail
>
>
> On Thu, Jul 30, 2009 at 12:54 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:
>
>> Here's the erlang module I use to send items to rabbit, it's largely
>> adapted from code open sourced online:
>> Dpaste of it: http://dpaste.com/73447/
>>
>> -export([amqp_lifecycle/0, send_message/5, log/2]).
>>
>> -include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit.hrl").
>>
>> -include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit_framing.hrl").
>> -include("rabbitmq-erlang-client/include/amqp_client.hrl").
>>
>> -record(rabbit_info, {channel, ticket, exchange, routing_key}).
>>
>> amqp_lifecycle() ->
>> User = "mixpanel",
>> Password = "mixpanel0816",
>> Realm = <<"mixpanel">>, %% virtual_host
>> Connection = amqp_connection:start(User, Password, "127.0.0.1",
>> Realm),
>> Channel = amqp_connection:open_channel(Connection),
>> Access = #'access.request'{
>> realm = Realm,
>> exclusive = false,
>> passive = true,
>> active = true,
>> write = true,
>> read = true
>> },
>> #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel,
>> Access),
>>
>> Q = <<"storage">>,
>> X = <<"records">>,
>> BindKey = <<"event">>,
>>
>> QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
>> passive = false, durable = true,
>> exclusive = false, auto_delete =
>> false,
>> nowait = false, arguments = []},
>>
>> #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel,
>> QueueDeclare),
>>
>> ExchangeDeclare = #'exchange.declare'{ticket = Ticket,
>> exchange = X, type =
>> <<"direct">>,
>> passive = false, durable = true,
>> auto_delete = false, internal =
>> false,
>> nowait = false, arguments = []},
>>
>> #'exchange.declare_ok'{} = amqp_channel:call(Channel,
>> ExchangeDeclare),
>> QueueBind = #'queue.bind'{ticket = Ticket,
>> queue = Q,
>> exchange = X,
>> routing_key = BindKey,
>> nowait = false, arguments = []},
>> #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
>>
>> RabbitInfo = #'rabbit_info'{
>> channel = Channel,
>> ticket = Ticket,
>> exchange = X,
>> routing_key = BindKey
>> },
>> RabbitInfo.
>>
>> send_message(Channel, Ticket, X, RoutingKey, Payload) ->
>> BasicPublish = #'basic.publish'{ticket = Ticket,
>> exchange = X,
>> routing_key = RoutingKey,
>> mandatory = false,
>> immediate = false},
>> BasicProperties = amqp_util:basic_properties(),
>> Properties = BasicProperties#'P_basic'{delivery_mode=2}, %%
>> Persistence plz
>> Content = #content{class_id = 60,
>> properties = Properties,
>> properties_bin = none,
>> payload_fragments_rev = [Payload]
>> },
>> amqp_channel:cast(Channel, BasicPublish, Content).
>>
>> log(Key,Value) ->
>> io:format("~p: ~p~n",[Key,Value]).
>>
>>
>> I do the following to send items to the queue:
>>
>> event_queue_data(QueueInfo, Data) ->
>> send_message(
>> QueueInfo#'rabbit_info'.channel,
>> QueueInfo#'rabbit_info'.ticket,
>> QueueInfo#'rabbit_info'.exchange,
>> QueueInfo#'rabbit_info'.routing_key,
>> list_to_binary(Data)
>> ),
>> 1.
>>
>> On Thu, Jul 30, 2009 at 12:52 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:
>>
>>> mplivelog1 ~: sudo /usr/sbin/rabbitmqctl list_queues -p myqueue name
>>> consumers messages messages_ready
>>>
>>> Listing queues ...
>>> storage 5 1 0
>>> ...done.
>>>
>>> I currently have it running again after a restart and that's usually the
>>> response I get, usually only 0-5 items in the queue, since it's being
>>> processed.
>>>
>>> When the error was occurring and I checked the queue, there were
>>> effectively *zero* items in the queue according to that command I ran above
>>> in the queue. In python I definitely do acknowledge items via:
>>>
>>> self.channel.basic_ack(data.delivery_tag)
>>>
>>> where self.channel is amqp.Connection(host=host, **info).channel()
>>>
>>> Suhail
>>>
>>> On Thu, Jul 30, 2009 at 12:46 PM, Matthias Radestock <
>>> matthias at lshift.net> wrote:
>>>
>>>> Suhail Doshi wrote:
>>>>
>>>>> Looking in my logs shows a large gap in time, in bold is where it is:
>>>>>
>>>>> =INFO REPORT==== 30-Jul-2009::17:26:56 ===
>>>>> alarm_handler: {set,{system_memory_high_watermark,[]}}
>>>>>
>>>>
>>>> This indicates that rabbit got close to running out of memory and told
>>>> all connected clients to stop sending any more messages. Hence the
>>>> "Discarding ..." message you were seeing in the erlang client. Your
>>>> consumers should continue to receive messages, albeit slowly if rabbit is
>>>> swapping.
>>>>
>>>> Perhaps your consumers are not acknowledging received messages, causing
>>>> them to keep piling up at the server?
>>>>
>>>> I recommend checking the queue lengths/sizes with 'rabbitmqctl
>>>> list_queues'.
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Matthias.
>>>>
>>>
>>>
>>>
>>> --
>>> http://mixpanel.com
>>> Blog: http://blog.mixpanel.com
>>>
>>
>>
>>
>> --
>> http://mixpanel.com
>> Blog: http://blog.mixpanel.com
>>
>
>
>
> --
> http://mixpanel.com
> Blog: http://blog.mixpanel.com
>
--
http://mixpanel.com
Blog: http://blog.mixpanel.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090730/7ecfd676/attachment.htm
More information about the rabbitmq-discuss
mailing list