[rabbitmq-discuss] Searching for a client library/example: Problems getting a reliable connection in Python and Erlang

Daniel Neugebauer mailinglists at energiequant.de
Sun Nov 4 22:04:59 GMT 2012


Preparing my code for this post, I think I now figured out what was
wrong with my Python scripts. I found some confused code in my attempts
to use amqplib which now appears to work fine for my purpose (I messed
up error handling; I really should not implement at 11pm after a busy
work day...). Sorry. :(

Kombu now works as well and indeed appears to reconnect automatically -
just what I wanted. I must have had some version conflict running it
from Apache as 1.5.1 was installed globally as well (unfortunately,
another application required it).

I would still appreciate if you could take a few moments to scan over my
assumptions about what I've done wrong with Pika and why my Erlang code
doesn't work.


First, my setup: I'm running Python 2.7, Erlang R15B and RabbitMQ 2.8.4
on Linux, connections are only machine-local (localhost) so far (I'm
planning to centralize logs via shovels later on). The script should be
run via a piped CustomLog in Apache 2.2. Pika is at version 0.9.5,
amqplib at 1.0.2 - these versions may be a bit older now but were recent
(or appeared to be so) when I started trying to get this working in August.

The messages I got from Pika were randomly spamming Apache's error_log:
(with increasing byte and frame counts)

/usr/lib64/python2.7/site-packages/pika/connection.py:642: UserWarning:
Pika: Write buffer exceeded warning threshold at 2020 bytes and an
estimated 11 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))

They can be reproduced by running the following script (set/substitute
variables to connection data and parameterKeys with a list of strings)
and then stopping the RabbitMQ server. In August, I was getting them
randomly after some uptime as well but I couldn't reproduce that now. As
I'm digging deeper into it now, the client does not notice that the
connection was lost and starts queueing messages. I don't get any
exceptions thrown, so I'm unable to detect that as I thought hook
functions for connection handling were optional. When enabling
heartbeats, the connection closes as soon as the client is sleeping for
2-3 seconds which causes the same error messages. As I'm thinking about
it now, maybe my use case strictly requires spawning a thread for async
handling as I'm preventing heartbeats (and what not) from being
processed when blocking on readline()? It wasn't clear to me when
reading the documentation that this would be required (it would be
reasonable for Pika to spawn a thread for heartbeats itself) but it
would make sense. Anyway, closing connections after only a few seconds
seems a bit radical considering messages could be easily delayed on
public networks, although I'm sure that's configurable somewhere.

I guess I should write an async client instead. I thought I could avoid
that since I'm fine with a blocking connection (Apache queues log
messages internally and replays messages after a crash) but I must have
misunderstood the documentation and Pika's blocking connections are only
suitable for very simple, "one shot" messaging without (too large)
delays in between.

--->
import pika, json

credentials = pika.PlainCredentials(amqpUser, amqpPassword)
connParameters = pika.ConnectionParameters(amqpHost, port = amqpPort,
virtual_host = amqpVHost, credentials = credentials, heartbeat =
amqpHeartbeat)

msgProperties = pika.BasicProperties(content_type = 'application/json',
delivery_mode = msgDeliveryMode)

while 1:
    conn = pika.BlockingConnection(connParameters)
    channel = conn.channel()

    while 1:
        logrecord = {}
        for parameterKey in parameterKeys:
            line = sys.stdin.readline().rstrip('\n')
            logrecord[parameterKey] = line

        msg = json.dumps(logrecord)

        channel.basic_publish(body = msg, exchange = exchangeName,
properties = msgProperties, routing_key = routingKey)
<---

With heartbeat set to True, RabbitMQ says:

--->
=INFO REPORT==== 4-Nov-2012::21:08:48 ===
accepting AMQP connection <0.1809.0> (127.0.0.1:58931 -> 127.0.0.1:5672)

=ERROR REPORT==== 4-Nov-2012::21:09:02 ===
closing AMQP connection <0.1809.0> (127.0.0.1:58931 -> 127.0.0.1:5672):
{timeout,running}
<---


My attempt to connect via Erlang looks as follows: (removed config loading)

--->
-module(stdin2amqp).

-include("amqp_client.hrl").

-export([main/1]).

-record(config, { amqpHost = "localhost",
                  amqpPort = 5670,
                  amqpVHost = "/",
                  amqpUser = "guest",
                  amqpPassword = "guest",
                  amqpHeartbeat = true,
                  exchangeName = "logs",
                  routingKeyFormat = "apache.access.%{vhost}",
                  msgDeliveryMode = 2,
                  parameterKeys = []
                }).

...

main(ConfigPath) ->
        {ok, Config} = omitted_config_loading(ConfigPath),
        {ok, Connection} =
amqp_connection:start(build_amqp_params_network(Config)),
        {ok, Channel} = amqp_connection:open_channel(Connection),

        Payload = "Test",
        Publish = #'basic.publish'{exchange = <<"apache-logs">>,
routing_key = <<"test">>},
        amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
        ok.

build_amqp_params_network(Config) ->
        #amqp_params_network{ username =
list_to_bitstring(Config#config.amqpUser),
                              password =
list_to_bitstring(Config#config.amqpPassword),
                              virtual_host =
list_to_bitstring(Config#config.amqpVHost),
                              host =
list_to_bitstring(Config#config.amqpHost),
                              port = Config#config.amqpPort
                            }.
<---

If I comment the assignment of #amqp_params_network.host out, I can get
a connection that instantly fails with "Connection (<0.35.0>) closing:
internal error in channel (<0.45.0>): shutdown".

If I try to set any hostname, I only get: {"init terminating in
do_boot",{{badmatch,{error,unknown_host}},[{stdin2amqp,main,1,[{file,"src/stdin2amqp.erl"},{line,24}]},{init,start_it,1,[]},{init,start_em,1,[]}]}}

It doesn't make a difference if I pass a string through
list_to_bitstring or enter it directly as <<"localhost">>. The local
machine's hostname (short or full) or FQDNs (terminated by a dot) don't
work either. Neither do IPs (I even tried <<127, 0, 0, 1>> and <<1, 0,
0, 127>> hoping that maybe I would need the IP address as 4 bytes in
either endianess).

I suppose it's something very basic that I'm doing wrong here but I
don't see it.

Thanks,
Daniel


More information about the rabbitmq-discuss mailing list