<html><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"><meta name="Generator" content="Microsoft Word 14 (filtered medium)"><style><!--
/* Font Definitions */
@font-face
{font-family:Calibri;
panose-1:2 15 5 2 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
{margin:0in;
margin-bottom:.0001pt;
font-size:11.0pt;
font-family:"Calibri","sans-serif";}
a:link, span.MsoHyperlink
{mso-style-priority:99;
color:blue;
text-decoration:underline;}
a:visited, span.MsoHyperlinkFollowed
{mso-style-priority:99;
color:purple;
text-decoration:underline;}
span.EmailStyle17
{mso-style-type:personal-compose;
font-family:"Courier New","serif";
color:windowtext;}
.MsoChpDefault
{mso-style-type:export-only;
font-family:"Calibri","sans-serif";}
@page WordSection1
{size:8.5in 11.0in;
margin:1.0in 1.0in 1.0in 1.0in;}
div.WordSection1
{page:WordSection1;}
--></style></head><body lang="EN-US" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Hi, </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">I am testing a Mosquitto client publishing 100s of messages to RabbitMQ exchange via the MQTT plugin with different QoS levels and I am seeing issues with messages being received out of order and also what appears to be the TCP socket going down (seeing TCP FINs in tshark). This is all done on localhost inside an Ubuntu VM</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">This happens when the inter-message delay is reduced to < 0.07s with QoS = 0 and < 0.5s with QoS = 1. (</span>RabbitMQ 3.2.1, <acronym><span style="font-family:"Calibri","sans-serif"">Erlang R14B04)</span></acronym><span style="font-family:"Courier New","serif""> (Pika 0.9.13) (mosquito 1.2.2)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">The client consumer simply expects to see msgs received in order and prints an error if the msg id # is not +1 from the previous. If I send in a similar manner via AMQP using pika client, there is no issue with messages being received out of order or seeing any socket disconnects (FINs)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">The problem gets worse with QoS=1 seeing the connection failing and the publisher stops receiving confirmations.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">In the example below 200 message are published with 0s delay between them at QoS=0. All the messages are received up to 51, then 52 is missed, then every 10 messages are received and one skipped. At the end of the log, it shows receiving the skipped messages being output in reversed order… received 112, 102, 92, 82…all the way to 52…</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">I’ve set the client.max_inflight_messages_set(0) in the mosquito client and the mqtt rabbitmq plugin config is set with prefetch=0 for infinite messaging which I think only applies for QoS=1. Also in the mosquito client, I’ve trigger the mosquito client thread to run to process the publish cfm from the broker plugin. </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Any insights would be appreciated. Seems like an issue with mqtt plugin as when I look at tcpdumps of the traces the mqtt messages are aggregated together into tcp messages with the low delays and with higher delays they are separated into individual tcp messages.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">I’ve included the example invocation, output and code below.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Adrian</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><b><u><span style="font-family:"Courier New","serif"">Mosquitto MQTT Publisher:</span></u></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Example…send 200 messages with 0 delay at QoS=0</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><i><span style="font-family:"Courier New","serif"">% python mqtt_publisher_json.py 200 <b>0</b> <b>0</b></span></i></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Sending JSON msg 1 via MQTT</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">…</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Sending JSON msg 198 via MQTT</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">Sending JSON msg 199 via MQTT</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Sending JSON msg 200 via MQTT</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<b><u><span style="font-family:"Courier New","serif"">AMQP Consumer using PIKA</span></u></b></p><p class="MsoNormal"><b><u><span style="font-family:"Courier New","serif""><span style="text-decoration:none"> </span></span></u></b></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Receive messages published on the mqtt topic exchange</span></b></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> </span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">% python amqp_receive_mqtt_json.py</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 1}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 2}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 3}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">…all messages received here</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 50}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 51}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 53} <- Got 53 instead of 52</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 52</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 54} <- now receive 54…61 in order</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 55}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 56}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 57}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 58}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 59}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 60}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 61}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 63} <- Got 63 instead of 62</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 62</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 64}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 65}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 66}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 67}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 68}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 69}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 70}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 71}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 73} <- etc…</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 72</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 74}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 75}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 76}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 77}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 78}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 79}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 80}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 81}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 83}</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 82</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 84}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 85}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 86}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 87}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 88}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 89}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 90}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 91}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 93}</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 92</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 94}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 95}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 96}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 97}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 98}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 99}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 100}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 101} </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 103} </span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 102</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 104}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 105}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 106}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 107}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 108}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 109}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 110}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 111}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 113}</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 112</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 114}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 115}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">…all messages received in order here…</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 199}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 200} <b><- last message sent</b></span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 112} <- Now here come all the skipped messages in reverse order.</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 201</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 102}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 113</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 92}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 103</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 82}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 93</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 72}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 83</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 62}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 73</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 52}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 63</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<b><u><span style="font-family:"Courier New","serif"">mqtt_publisher_json.py:</span></u></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Example of mosquitto json publisher</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">import mosquitto</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">import sys</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">import time</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">import random</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">import json</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">def on_connect(mosq, obj, rc):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> if rc == 0:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print("on_connect ok...")</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> else:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print("on_connect failed rc=",rc)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">def on_publish(mosq,obj,mid):</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print("Message published: ", mid)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">def on_disconnect(mosq, obj, rc):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> print("Disconnected...")</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">def on_log(mosq,userdata,level,buf):</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print("on_log:",userdata,level,buf)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">if len(sys.argv) != 4:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print "Usage: cnt sleep_s qos"</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> exit(-1)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">cnt = int(sys.argv[1])</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">sleep_s = float(sys.argv[2])</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">qos = int(sys.argv[3])</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">client = mosquitto.Mosquitto("test-client%d" % random.randint(1,100))</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">print client._client_id</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.on_connect = on_connect</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.on_disconnect = on_disconnect</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">#client.on_log = on_log</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#client.on_publish = on_publish</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.max_inflight_messages_set(0)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.connect("localhost",port=1884)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Start mosquitto thread to deal with inflight acks</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.loop_start()</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Transmit a bunch of messages space sleep_s apart</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">i = 1</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">while i <= cnt:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print "Sending JSON msg %d via MQTT" % i</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> msg = {}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> #msg["msg"] = "JSON msg#%d via MQTT" % i</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> msg["id"] = i</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> try :</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> rc = client.publish("mqtt_json_routing_key",json.dumps(msg),qos=qos)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> except :</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print "Unexpected publish error: ",sys.exc_info()[0]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> i = i + 1</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""> time.sleep(sleep_s)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Some delay for last message to go out</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">wait_s = 2</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">print "Waiting %d seconds for last message out..." % wait_s</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">time.sleep(wait_s)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Stop the threaded Mosquitto client</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.loop_stop()</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Now disconnect</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">try:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> client.disconnect()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">except:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print "Unexpected disconnect error: ",sys.exc_info()[0]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><b><u><span style="font-family:"Courier New","serif"">amqp_receive_mqtt_json.py</span></u></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Usage: python amqp_receive_mqtt_json [debug]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># [0/1 - debug enable/disable - default enable]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Consumer to receive AMQP messages from mqtt-json-queue from mqtt.topic exchange</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""># code expectes a json 'id' field in the message body and will report if the 'id'</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># is not +1 from previous</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">prev_id = 0</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">debug = 1</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">def callback(ch, method, properties, body):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> global prev_id, debug</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> ch.basic_ack(delivery_tag=method.delivery_tag)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> msgBody = json.loads(body)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> if debug != 0:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print " [x] %r:%r" % (method.routing_key,msgBody,)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> if prev_id + 1 != int(msgBody['id']):</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> print "Missing %d" % (prev_id + 1)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> prev_id = msgBody['id']</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> return</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">if len(sys.argv) > 1:</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> debug = int(sys.argv[1])</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">while True:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> try:</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> connection = pika.BlockingConnection(pika.ConnectionParameters(</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> host='localhost'))</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> channel = connection.channel()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> channel.exchange_declare(exchange='mqtt.topic',</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> type='topic')</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> result = channel.queue_declare("mqtt-json-queue")</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> queue_name = result.method.queue</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> # topic here is what the mqtt plugin posts to</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> channel.queue_bind(exchange='mqtt.topic',</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> queue=queue_name,</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> routing_key="mqtt_json_routing_key")</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> print ' [*] Waiting for logs. To exit press CTRL+C'</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> channel.basic_consume(callback,</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> queue=queue_name</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> )</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> channel.start_consuming()</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> except Exception, e:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> traceback.print_exc()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""> </span></p><p class="MsoNormal"> </p></div></body></html>