[rabbitmq-discuss] erlang-client issue, discarding content?

Suhail Doshi digitalwarfare at gmail.com
Thu Jul 30 20:54:40 BST 2009


Here's the erlang module I use to send items to rabbit, it's largely adapted
from code open sourced online:
Dpaste of it: http://dpaste.com/73447/

-export([amqp_lifecycle/0, send_message/5, log/2]).

-include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit.hrl").
-include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit_framing.hrl").
-include("rabbitmq-erlang-client/include/amqp_client.hrl").

-record(rabbit_info, {channel, ticket, exchange, routing_key}).

amqp_lifecycle() ->
    User = "mixpanel",
    Password = "mixpanel0816",
    Realm = <<"mixpanel">>, %% virtual_host
    Connection = amqp_connection:start(User, Password, "127.0.0.1", Realm),
    Channel = amqp_connection:open_channel(Connection),
    Access = #'access.request'{
        realm = Realm,
        exclusive = false,
        passive = true,
        active = true,
        write = true,
        read = true
    },
    #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel,
Access),

    Q = <<"storage">>,
    X = <<"records">>,
    BindKey = <<"event">>,

    QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
                                    passive = false, durable = true,
                                    exclusive = false, auto_delete = false,
                                    nowait = false, arguments = []},

    #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel,
QueueDeclare),

    ExchangeDeclare = #'exchange.declare'{ticket = Ticket,
                                          exchange = X, type = <<"direct">>,
                                          passive = false, durable = true,
                                          auto_delete = false, internal =
false,
                                          nowait = false, arguments = []},

    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),
    QueueBind = #'queue.bind'{ticket = Ticket,
                              queue = Q,
                              exchange = X,
                              routing_key = BindKey,
                              nowait = false, arguments = []},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),

    RabbitInfo = #'rabbit_info'{
        channel = Channel,
        ticket = Ticket,
        exchange = X,
        routing_key = BindKey
    },
    RabbitInfo.

send_message(Channel, Ticket, X, RoutingKey, Payload) ->
    BasicPublish = #'basic.publish'{ticket = Ticket,
                                    exchange = X,
                                    routing_key = RoutingKey,
                                    mandatory = false,
                                    immediate = false},
    BasicProperties = amqp_util:basic_properties(),
    Properties = BasicProperties#'P_basic'{delivery_mode=2}, %% Persistence
plz
    Content = #content{class_id = 60,
         properties = Properties,
         properties_bin = none,
         payload_fragments_rev = [Payload]
        },
    amqp_channel:cast(Channel, BasicPublish, Content).

log(Key,Value) ->
    io:format("~p: ~p~n",[Key,Value]).


I do the following to send items to the queue:

event_queue_data(QueueInfo, Data) ->
    send_message(
        QueueInfo#'rabbit_info'.channel,
        QueueInfo#'rabbit_info'.ticket,
        QueueInfo#'rabbit_info'.exchange,
        QueueInfo#'rabbit_info'.routing_key,
        list_to_binary(Data)
    ),
    1.

On Thu, Jul 30, 2009 at 12:52 PM, Suhail Doshi <digitalwarfare at gmail.com>wrote:

> mplivelog1 ~: sudo /usr/sbin/rabbitmqctl list_queues -p myqueue name
> consumers messages messages_ready
>
> Listing queues ...
> storage 5 1 0
> ...done.
>
> I currently have it running again after a restart and that's usually the
> response I get, usually only 0-5 items in the queue, since it's being
> processed.
>
> When the error was occurring and I checked the queue, there were
> effectively *zero* items in the queue according to that command I ran above
> in the queue. In python I definitely do acknowledge items via:
>
> self.channel.basic_ack(data.delivery_tag)
>
> where self.channel is amqp.Connection(host=host, **info).channel()
>
> Suhail
>
> On Thu, Jul 30, 2009 at 12:46 PM, Matthias Radestock <matthias at lshift.net>wrote:
>
>> Suhail Doshi wrote:
>>
>>> Looking in my logs shows a large gap in time, in bold is where it is:
>>>
>>> =INFO REPORT==== 30-Jul-2009::17:26:56 ===
>>>    alarm_handler: {set,{system_memory_high_watermark,[]}}
>>>
>>
>> This indicates that rabbit got close to running out of memory and told all
>> connected clients to stop sending any more messages. Hence the "Discarding
>> ..." message you were seeing in the erlang client. Your consumers should
>> continue to receive messages, albeit slowly if rabbit is swapping.
>>
>> Perhaps your consumers are not acknowledging received messages, causing
>> them to keep piling up at the server?
>>
>> I recommend checking the queue lengths/sizes with 'rabbitmqctl
>> list_queues'.
>>
>>
>> Regards,
>>
>> Matthias.
>>
>
>
>
> --
> http://mixpanel.com
> Blog: http://blog.mixpanel.com
>



-- 
http://mixpanel.com
Blog: http://blog.mixpanel.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20090730/017609b0/attachment.htm 


More information about the rabbitmq-discuss mailing list