[rabbitmq-discuss] Searching for a client library/example: Problems getting a reliable connection in Python and Erlang

Laing, Michael P. Michael.Laing at nytimes.com
Sun Nov 4 23:32:58 GMT 2012


You should upgrade to pika 0.9.6 using pip. Also rabbitmq to 2.8.7 if you can.

'heartbeat' should be 'heartbeat_interval', at least in 0.9.6.

You need to import sys.

This works. I stopped it after a few seconds and 40,000 messages into my local cluster.

You will need to create the exchange and tap it with a queue.

Michael

import sys # add this
import pika, json

amqpUser = 'guest'
amqpPassword = 'guest'
amqpHost = 'localhost'
amqpPort = 5672
amqpVHost = '/'
amqpHeartbeat = 10 # interval in seconds
msgDeliveryMode = 1
exchangeName = 'my_log_exchange'
routingKey = 'rk'

parameterKeys = ['first line', 'second line']

"""
test_data in a file:

test log
another line
"""

credentials = pika.PlainCredentials(amqpUser, amqpPassword)

connParameters = pika.ConnectionParameters(
    amqpHost,
    port = amqpPort,
    virtual_host = amqpVHost,
    credentials = credentials,
    heartbeat_interval = amqpHeartbeat # change this
)

msgProperties = pika.BasicProperties(content_type = 'application/json',
delivery_mode = msgDeliveryMode)

while 1:
    conn = pika.BlockingConnection(connParameters)
    channel = conn.channel()

    while 1:
        logrecord = {}
        for parameterKey in parameterKeys:
            line = sys.stdin.readline().rstrip('\n')
            logrecord[parameterKey] = line

        msg = json.dumps(logrecord)

        channel.basic_publish(
            body = msg,
            exchange = exchangeName,
            properties = msgProperties,
            routing_key = routingKey
        )


On 11/4/12 5:04 PM, "Daniel Neugebauer" <mailinglists at energiequant.de<mailto:mailinglists at energiequant.de>> wrote:

Preparing my code for this post, I think I now figured out what was
wrong with my Python scripts. I found some confused code in my attempts
to use amqplib which now appears to work fine for my purpose (I messed
up error handling; I really should not implement at 11pm after a busy
work day...). Sorry. :(

Kombu now works as well and indeed appears to reconnect automatically -
just what I wanted. I must have had some version conflict running it
from Apache as 1.5.1 was installed globally as well (unfortunately,
another application required it).

I would still appreciate if you could take a few moments to scan over my
assumptions about what I've done wrong with Pika and why my Erlang code
doesn't work.


First, my setup: I'm running Python 2.7, Erlang R15B and RabbitMQ 2.8.4
on Linux, connections are only machine-local (localhost) so far (I'm
planning to centralize logs via shovels later on). The script should be
run via a piped CustomLog in Apache 2.2. Pika is at version 0.9.5,
amqplib at 1.0.2 - these versions may be a bit older now but were recent
(or appeared to be so) when I started trying to get this working in August.

The messages I got from Pika were randomly spamming Apache's error_log:
(with increasing byte and frame counts)

/usr/lib64/python2.7/site-packages/pika/connection.py:642: UserWarning:
Pika: Write buffer exceeded warning threshold at 2020 bytes and an
estimated 11 frames behind
  warn(message % (self.outbound_buffer.size, est_frames_behind))

They can be reproduced by running the following script (set/substitute
variables to connection data and parameterKeys with a list of strings)
and then stopping the RabbitMQ server. In August, I was getting them
randomly after some uptime as well but I couldn't reproduce that now. As
I'm digging deeper into it now, the client does not notice that the
connection was lost and starts queueing messages. I don't get any
exceptions thrown, so I'm unable to detect that as I thought hook
functions for connection handling were optional. When enabling
heartbeats, the connection closes as soon as the client is sleeping for
2-3 seconds which causes the same error messages. As I'm thinking about
it now, maybe my use case strictly requires spawning a thread for async
handling as I'm preventing heartbeats (and what not) from being
processed when blocking on readline()? It wasn't clear to me when
reading the documentation that this would be required (it would be
reasonable for Pika to spawn a thread for heartbeats itself) but it
would make sense. Anyway, closing connections after only a few seconds
seems a bit radical considering messages could be easily delayed on
public networks, although I'm sure that's configurable somewhere.

I guess I should write an async client instead. I thought I could avoid
that since I'm fine with a blocking connection (Apache queues log
messages internally and replays messages after a crash) but I must have
misunderstood the documentation and Pika's blocking connections are only
suitable for very simple, "one shot" messaging without (too large)
delays in between.

--->
import pika, json

credentials = pika.PlainCredentials(amqpUser, amqpPassword)
connParameters = pika.ConnectionParameters(amqpHost, port = amqpPort,
virtual_host = amqpVHost, credentials = credentials, heartbeat =
amqpHeartbeat)

msgProperties = pika.BasicProperties(content_type = 'application/json',
delivery_mode = msgDeliveryMode)

while 1:
    conn = pika.BlockingConnection(connParameters)
    channel = conn.channel()

    while 1:
        logrecord = {}
        for parameterKey in parameterKeys:
            line = sys.stdin.readline().rstrip('\n')
            logrecord[parameterKey] = line

        msg = json.dumps(logrecord)

        channel.basic_publish(body = msg, exchange = exchangeName,
properties = msgProperties, routing_key = routingKey)
<---

With heartbeat set to True, RabbitMQ says:

--->
=INFO REPORT==== 4-Nov-2012::21:08:48 ===
accepting AMQP connection <0.1809.0> (127.0.0.1:58931 -> 127.0.0.1:5672)

=ERROR REPORT==== 4-Nov-2012::21:09:02 ===
closing AMQP connection <0.1809.0> (127.0.0.1:58931 -> 127.0.0.1:5672):
{timeout,running}
<---


My attempt to connect via Erlang looks as follows: (removed config loading)

--->
-module(stdin2amqp).

-include("amqp_client.hrl").

-export([main/1]).

-record(config, { amqpHost = "localhost",
                  amqpPort = 5670,
                  amqpVHost = "/",
                  amqpUser = "guest",
                  amqpPassword = "guest",
                  amqpHeartbeat = true,
                  exchangeName = "logs",
                  routingKeyFormat = "apache.access.%{vhost}",
                  msgDeliveryMode = 2,
                  parameterKeys = []
                }).

...

main(ConfigPath) ->
        {ok, Config} = omitted_config_loading(ConfigPath),
        {ok, Connection} =
amqp_connection:start(build_amqp_params_network(Config)),
        {ok, Channel} = amqp_connection:open_channel(Connection),

        Payload = "Test",
        Publish = #'basic.publish'{exchange = <<"apache-logs">>,
routing_key = <<"test">>},
        amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
        ok.

build_amqp_params_network(Config) ->
        #amqp_params_network{ username =
list_to_bitstring(Config#config.amqpUser),
                              password =
list_to_bitstring(Config#config.amqpPassword),
                              virtual_host =
list_to_bitstring(Config#config.amqpVHost),
                              host =
list_to_bitstring(Config#config.amqpHost),
                              port = Config#config.amqpPort
                            }.
<---

If I comment the assignment of #amqp_params_network.host out, I can get
a connection that instantly fails with "Connection (<0.35.0>) closing:
internal error in channel (<0.45.0>): shutdown".

If I try to set any hostname, I only get: {"init terminating in
do_boot",{{badmatch,{error,unknown_host}},[{stdin2amqp,main,1,[{file,"src/stdin2amqp.erl"},{line,24}]},{init,start_it,1,[]},{init,start_em,1,[]}]}}

It doesn't make a difference if I pass a string through
list_to_bitstring or enter it directly as <<"localhost">>. The local
machine's hostname (short or full) or FQDNs (terminated by a dot) don't
work either. Neither do IPs (I even tried <<127, 0, 0, 1>> and <<1, 0,
0, 127>> hoping that maybe I would need the IP address as 4 bytes in
either endianess).

I suppose it's something very basic that I'm doing wrong here but I
don't see it.

Thanks,
Daniel
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss at lists.rabbitmq.com<mailto:rabbitmq-discuss at lists.rabbitmq.com>
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20121104/c331476b/attachment.htm>


More information about the rabbitmq-discuss mailing list