<html><head></head><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; color: rgb(0, 0, 0); font-size: 14px; font-family: Calibri, sans-serif; "><div>You should upgrade to pika 0.9.6 using pip. Also rabbitmq to 2.8.7 if you can.</div><div><br></div><div>'heartbeat' should be 'heartbeat_interval', at least in 0.9.6.</div><div><br></div><div>You need to import sys.</div><div><br></div><div>This works. I stopped it after a few seconds and 40,000 messages into my local cluster.</div><div><br></div><div>You will need to create the exchange and tap it with a queue.</div><div><br></div><div>Michael</div><div><br></div><div><div><span style="font-family: Courier; ">import sys # add this</span></div><div><span style="font-family: Courier; ">import pika, json</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">amqpUser = 'guest'</span></div><div><span style="font-family: Courier; ">amqpPassword = 'guest'</span></div><div><span style="font-family: Courier; ">amqpHost = 'localhost'</span></div><div><span style="font-family: Courier; ">amqpPort = 5672</span></div><div><span style="font-family: Courier; ">amqpVHost = '/'</span></div><div><span style="font-family: Courier; ">amqpHeartbeat = 10 # interval in seconds</span></div><div><span style="font-family: Courier; ">msgDeliveryMode = 1</span></div><div><span style="font-family: Courier; ">exchangeName = 'my_log_exchange'</span></div><div><span style="font-family: Courier; ">routingKey = 'rk'</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">parameterKeys = ['first line', 'second line']</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">"""</span></div><div><span style="font-family: Courier; ">test_data in a file:</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">test log</span></div><div><span style="font-family: Courier; ">another line</span></div><div><span style="font-family: Courier; ">"""</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">credentials = pika.PlainCredentials(amqpUser, amqpPassword)</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">connParameters = pika.ConnectionParameters(</span></div><div><span style="font-family: Courier; "> amqpHost, </span></div><div><span style="font-family: Courier; "> port = amqpPort,</span></div><div><span style="font-family: Courier; "> virtual_host = amqpVHost, </span></div><div><span style="font-family: Courier; "> credentials = credentials,</span></div><div><span style="font-family: Courier; "> heartbeat_interval = amqpHeartbeat # change this</span></div><div><span style="font-family: Courier; ">)</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">msgProperties = pika.BasicProperties(content_type = 'application/json',</span></div><div><span style="font-family: Courier; ">delivery_mode = msgDeliveryMode)</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; ">while 1:</span></div><div><span style="font-family: Courier; "> conn = pika.BlockingConnection(connParameters)</span></div><div><span style="font-family: Courier; "> channel = conn.channel()</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; "> while 1:</span></div><div><span style="font-family: Courier; "> logrecord = {}</span></div><div><span style="font-family: Courier; "> for parameterKey in parameterKeys:</span></div><div><span style="font-family: Courier; "> line = sys.stdin.readline().rstrip('\n')</span></div><div><span style="font-family: Courier; "> logrecord[parameterKey] = line</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; "> msg = json.dumps(logrecord)</span></div><div><span style="font-family: Courier; "><br></span></div><div><span style="font-family: Courier; "> channel.basic_publish(</span></div><div><span style="font-family: Courier; "> body = msg, </span></div><div><span style="font-family: Courier; "> exchange = exchangeName,</span></div><div><span style="font-family: Courier; "> properties = msgProperties, </span></div><div><span style="font-family: Courier; "> routing_key = routingKey</span></div><div><span style="font-family: Courier; "> )</span></div></div><div><br></div><div><br></div><div>On 11/4/12 5:04 PM, "Daniel Neugebauer" <<a href="mailto:mailinglists@energiequant.de">mailinglists@energiequant.de</a>> wrote:</div><div><br></div><blockquote id="MAC_OUTLOOK_ATTRIBUTION_BLOCKQUOTE" style="BORDER-LEFT: #b5c4df 5 solid; PADDING:0 0 0 5; MARGIN:0 0 0 5;"><div>Preparing my code for this post, I think I now figured out what was</div><div>wrong with my Python scripts. I found some confused code in my attempts</div><div>to use amqplib which now appears to work fine for my purpose (I messed</div><div>up error handling; I really should not implement at 11pm after a busy</div><div>work day...). Sorry. :(</div><div><br></div><div>Kombu now works as well and indeed appears to reconnect automatically -</div><div>just what I wanted. I must have had some version conflict running it</div><div>from Apache as 1.5.1 was installed globally as well (unfortunately,</div><div>another application required it).</div><div><br></div><div>I would still appreciate if you could take a few moments to scan over my</div><div>assumptions about what I've done wrong with Pika and why my Erlang code</div><div>doesn't work.</div><div><br></div><div><br></div><div>First, my setup: I'm running Python 2.7, Erlang R15B and RabbitMQ 2.8.4</div><div>on Linux, connections are only machine-local (localhost) so far (I'm</div><div>planning to centralize logs via shovels later on). The script should be</div><div>run via a piped CustomLog in Apache 2.2. Pika is at version 0.9.5,</div><div>amqplib at 1.0.2 - these versions may be a bit older now but were recent</div><div>(or appeared to be so) when I started trying to get this working in August.</div><div><br></div><div>The messages I got from Pika were randomly spamming Apache's error_log:</div><div>(with increasing byte and frame counts)</div><div><br></div><div>/usr/lib64/python2.7/site-packages/pika/connection.py:642: UserWarning:</div><div>Pika: Write buffer exceeded warning threshold at 2020 bytes and an</div><div>estimated 11 frames behind</div><div> warn(message % (self.outbound_buffer.size, est_frames_behind))</div><div><br></div><div>They can be reproduced by running the following script (set/substitute</div><div>variables to connection data and parameterKeys with a list of strings)</div><div>and then stopping the RabbitMQ server. In August, I was getting them</div><div>randomly after some uptime as well but I couldn't reproduce that now. As</div><div>I'm digging deeper into it now, the client does not notice that the</div><div>connection was lost and starts queueing messages. I don't get any</div><div>exceptions thrown, so I'm unable to detect that as I thought hook</div><div>functions for connection handling were optional. When enabling</div><div>heartbeats, the connection closes as soon as the client is sleeping for</div><div>2-3 seconds which causes the same error messages. As I'm thinking about</div><div>it now, maybe my use case strictly requires spawning a thread for async</div><div>handling as I'm preventing heartbeats (and what not) from being</div><div>processed when blocking on readline()? It wasn't clear to me when</div><div>reading the documentation that this would be required (it would be</div><div>reasonable for Pika to spawn a thread for heartbeats itself) but it</div><div>would make sense. Anyway, closing connections after only a few seconds</div><div>seems a bit radical considering messages could be easily delayed on</div><div>public networks, although I'm sure that's configurable somewhere.</div><div><br></div><div>I guess I should write an async client instead. I thought I could avoid</div><div>that since I'm fine with a blocking connection (Apache queues log</div><div>messages internally and replays messages after a crash) but I must have</div><div>misunderstood the documentation and Pika's blocking connections are only</div><div>suitable for very simple, "one shot" messaging without (too large)</div><div>delays in between.</div><div><br></div><div>---></div><div>import pika, json</div><div><br></div><div>credentials = pika.PlainCredentials(amqpUser, amqpPassword)</div><div>connParameters = pika.ConnectionParameters(amqpHost, port = amqpPort,</div><div>virtual_host = amqpVHost, credentials = credentials, heartbeat =</div><div>amqpHeartbeat)</div><div><br></div><div>msgProperties = pika.BasicProperties(content_type = 'application/json',</div><div>delivery_mode = msgDeliveryMode)</div><div><br></div><div>while 1:</div><div> conn = pika.BlockingConnection(connParameters)</div><div> channel = conn.channel()</div><div><br></div><div> while 1:</div><div> logrecord = {}</div><div> for parameterKey in parameterKeys:</div><div> line = sys.stdin.readline().rstrip('\n')</div><div> logrecord[parameterKey] = line</div><div><br></div><div> msg = json.dumps(logrecord)</div><div><br></div><div> channel.basic_publish(body = msg, exchange = exchangeName,</div><div>properties = msgProperties, routing_key = routingKey)</div><div><---</div><div><br></div><div>With heartbeat set to True, RabbitMQ says:</div><div><br></div><div>---></div><div>=INFO REPORT==== 4-Nov-2012::21:08:48 ===</div><div>accepting AMQP connection <0.1809.0> (127.0.0.1:58931 -> 127.0.0.1:5672)</div><div><br></div><div>=ERROR REPORT==== 4-Nov-2012::21:09:02 ===</div><div>closing AMQP connection <0.1809.0> (127.0.0.1:58931 -> 127.0.0.1:5672):</div><div>{timeout,running}</div><div><---</div><div><br></div><div><br></div><div>My attempt to connect via Erlang looks as follows: (removed config loading)</div><div><br></div><div>---></div><div>-module(stdin2amqp).</div><div><br></div><div>-include("amqp_client.hrl").</div><div><br></div><div>-export([main/1]).</div><div><br></div><div>-record(config, { amqpHost = "localhost",</div><div> amqpPort = 5670,</div><div> amqpVHost = "/",</div><div> amqpUser = "guest",</div><div> amqpPassword = "guest",</div><div> amqpHeartbeat = true,</div><div> exchangeName = "logs",</div><div> routingKeyFormat = "apache.access.%{vhost}",</div><div> msgDeliveryMode = 2,</div><div> parameterKeys = []</div><div> }).</div><div><br></div><div>...</div><div><br></div><div>main(ConfigPath) -></div><div> {ok, Config} = omitted_config_loading(ConfigPath),</div><div> {ok, Connection} =</div><div>amqp_connection:start(build_amqp_params_network(Config)),</div><div> {ok, Channel} = amqp_connection:open_channel(Connection),</div><div><br></div><div> Payload = "Test",</div><div> Publish = #'basic.publish'{exchange = <<"apache-logs">>,</div><div>routing_key = <<"test">>},</div><div> amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),</div><div> ok.</div><div><br></div><div>build_amqp_params_network(Config) -></div><div> #amqp_params_network{ username =</div><div>list_to_bitstring(Config#config.amqpUser),</div><div> password =</div><div>list_to_bitstring(Config#config.amqpPassword),</div><div> virtual_host =</div><div>list_to_bitstring(Config#config.amqpVHost),</div><div> host =</div><div>list_to_bitstring(Config#config.amqpHost),</div><div> port = Config#config.amqpPort</div><div> }.</div><div><---</div><div><br></div><div>If I comment the assignment of #amqp_params_network.host out, I can get</div><div>a connection that instantly fails with "Connection (<0.35.0>) closing:</div><div>internal error in channel (<0.45.0>): shutdown".</div><div><br></div><div>If I try to set any hostname, I only get: {"init terminating in</div><div>do_boot",{{badmatch,{error,unknown_host}},[{stdin2amqp,main,1,[{file,"src/stdin2amqp.erl"},{line,24}]},{init,start_it,1,[]},{init,start_em,1,[]}]}}</div><div><br></div><div>It doesn't make a difference if I pass a string through</div><div>list_to_bitstring or enter it directly as <<"localhost">>. The local</div><div>machine's hostname (short or full) or FQDNs (terminated by a dot) don't</div><div>work either. Neither do IPs (I even tried <<127, 0, 0, 1>> and <<1, 0,</div><div>0, 127>> hoping that maybe I would need the IP address as 4 bytes in</div><div>either endianess).</div><div><br></div><div>I suppose it's something very basic that I'm doing wrong here but I</div><div>don't see it.</div><div><br></div><div>Thanks,</div><div>Daniel</div><div>_______________________________________________</div><div>rabbitmq-discuss mailing list</div><div><a href="mailto:rabbitmq-discuss@lists.rabbitmq.com">rabbitmq-discuss@lists.rabbitmq.com</a></div><div><a href="https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss">https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss</a></div><div><br></div></blockquote></body></html>