<html><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"><meta name="Generator" content="Microsoft Word 14 (filtered medium)"><style><!--
/* Font Definitions */
@font-face
        {font-family:Calibri;
        panose-1:2 15 5 2 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
        {margin:0in;
        margin-bottom:.0001pt;
        font-size:11.0pt;
        font-family:"Calibri","sans-serif";}
a:link, span.MsoHyperlink
        {mso-style-priority:99;
        color:blue;
        text-decoration:underline;}
a:visited, span.MsoHyperlinkFollowed
        {mso-style-priority:99;
        color:purple;
        text-decoration:underline;}
span.EmailStyle17
        {mso-style-type:personal-compose;
        font-family:"Courier New","serif";
        color:windowtext;}
.MsoChpDefault
        {mso-style-type:export-only;
        font-family:"Calibri","sans-serif";}
@page WordSection1
        {size:8.5in 11.0in;
        margin:1.0in 1.0in 1.0in 1.0in;}
div.WordSection1
        {page:WordSection1;}
--></style></head><body lang="EN-US" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Hi, </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">I am testing a Mosquitto client publishing 100s of messages to RabbitMQ exchange via the MQTT plugin with different QoS levels and I am seeing issues with messages being received out of order and also what appears to be the TCP socket going down (seeing TCP FINs in tshark).� This is all done on localhost inside an Ubuntu VM</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">This happens when the inter-message delay is reduced to < 0.07s with QoS = 0 and < 0.5s with QoS = 1.� (</span>RabbitMQ 3.2.1, <acronym><span style="font-family:"Calibri","sans-serif"">Erlang R14B04)</span></acronym><span style="font-family:"Courier New","serif""> (Pika 0.9.13) (mosquito 1.2.2)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">The client consumer simply expects to see msgs received in order and prints an error if the msg id # is not +1 from the previous.� If I send in a similar manner via AMQP using pika client, there is no issue with messages being received out of order or seeing any socket disconnects (FINs)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">The problem gets worse with QoS=1 seeing the connection failing and the publisher stops receiving confirmations.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">In the example below 200 message are published with 0s delay between them at QoS=0.� All the messages are received up to 51, then 52 is missed, then every 10 messages are received and one skipped.� At the end of the log, it shows receiving the skipped messages being output in reversed order�� received 112, 102, 92, 82�all the way to 52�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">I�ve set the client.max_inflight_messages_set(0) in the mosquito client and the mqtt rabbitmq plugin config is set with prefetch=0 for infinite messaging which I think only applies for QoS=1.� Also in the mosquito client, I�ve trigger the mosquito client thread to run to process the publish cfm from the broker plugin. </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Any insights would be appreciated.� Seems like an issue with mqtt plugin as when I look at tcpdumps of the traces the mqtt messages are aggregated together into tcp messages with the low delays and with higher delays they are separated into individual tcp messages.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">I�ve included the example invocation, output and code below.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Adrian</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><b><u><span style="font-family:"Courier New","serif"">Mosquitto MQTT Publisher:</span></u></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Example�send 200 messages with 0 delay at QoS=0</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><i><span style="font-family:"Courier New","serif"">% python mqtt_publisher_json.py 200 <b>0</b> <b>0</b></span></i></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Sending JSON msg 1 via MQTT</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Sending JSON msg 198 via MQTT</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">Sending JSON msg 199 via MQTT</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">Sending JSON msg 200 via MQTT</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<b><u><span style="font-family:"Courier New","serif"">AMQP Consumer using PIKA</span></u></b></p><p class="MsoNormal"><b><u><span style="font-family:"Courier New","serif""><span style="text-decoration:none">�</span></span></u></b></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Receive messages published on the mqtt topic exchange</span></b></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">�</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">% python amqp_receive_mqtt_json.py</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 1}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 2}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 3}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�all messages received here</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 50}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 51}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 53}� <- Got 53 instead of 52</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 52</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 54}� <- now receive 54�61 in order</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 55}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 56}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 57}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 58}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 59}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 60}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 61}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 63}� <- Got 63 instead of 62</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 62</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 64}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 65}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 66}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 67}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 68}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 69}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 70}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 71}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 73}� <- etc�</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 72</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 74}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 75}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 76}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 77}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 78}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 79}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 80}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 81}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 83}</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 82</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 84}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 85}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 86}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 87}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 88}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 89}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 90}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 91}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 93}</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 92</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 94}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 95}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 96}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 97}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 98}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 99}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 100}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 101}���� </span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�[x] 'mqtt_json_routing_key':{u'id': 103}� </span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 102</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 104}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 105}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 106}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 107}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 108}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 109}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 110}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 111}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 113}</span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif"">Missing 112</span></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 114}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 115}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�all messages received in order here�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 199}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 200}�� <b><- last message sent</b></span></p>
<p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 112}�� <- Now here come all the skipped messages in reverse order.</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 201</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 102}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 113</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 92}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 103</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 82}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 93</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 72}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 83</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 62}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 73</span></p><p class="MsoNormal"><b><span style="font-family:"Courier New","serif""> [x] 'mqtt_json_routing_key':{u'id': 52}</span></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">Missing 63</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<b><u><span style="font-family:"Courier New","serif"">mqtt_publisher_json.py:</span></u></b></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Example of mosquitto json publisher</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">import mosquitto</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">import sys</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">import time</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">import random</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">import json</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">def on_connect(mosq, obj, rc):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� if rc == 0:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� print("on_connect ok...")</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� else:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� print("on_connect failed rc=",rc)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">def on_publish(mosq,obj,mid):</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� print("Message published: ", mid)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">def on_disconnect(mosq, obj, rc):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� print("Disconnected...")</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">def on_log(mosq,userdata,level,buf):</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� print("on_log:",userdata,level,buf)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">if len(sys.argv) != 4:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� print "Usage: cnt sleep_s qos"</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� exit(-1)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">cnt = int(sys.argv[1])</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">sleep_s = float(sys.argv[2])</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">qos = int(sys.argv[3])</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">client = mosquitto.Mosquitto("test-client%d" % random.randint(1,100))</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">print client._client_id</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.on_connect = on_connect</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.on_disconnect = on_disconnect</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">#client.on_log = on_log</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#client.on_publish = on_publish</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.max_inflight_messages_set(0)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.connect("localhost",port=1884)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Start mosquitto thread to deal with inflight acks</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.loop_start()</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Transmit a bunch of messages space sleep_s apart</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">i = 1</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">while i <=� cnt:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� print "Sending JSON msg %d via MQTT" % i</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� msg = {}</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� #msg["msg"] = "JSON msg#%d via MQTT" % i</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� msg["id"] = i</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� try :</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� rc = client.publish("mqtt_json_routing_key",json.dumps(msg),qos=qos)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� except :</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� print "Unexpected publish error: ",sys.exc_info()[0]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� i = i + 1</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">��� time.sleep(sleep_s)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Some delay for last message to go out</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">wait_s = 2</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">print "Waiting %d seconds for last message out..." % wait_s</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">time.sleep(wait_s)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Stop the threaded Mosquitto client</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">client.loop_stop()</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif""># Now disconnect</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">try:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� client.disconnect()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">except:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� print "Unexpected disconnect error: ",sys.exc_info()[0]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><b><u><span style="font-family:"Courier New","serif"">amqp_receive_mqtt_json.py</span></u></b></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif""># Usage: python amqp_receive_mqtt_json [debug]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">#� [0/1 - debug enable/disable - default enable]</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">#� Consumer to receive AMQP messages from mqtt-json-queue from mqtt.topic exchange</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#� code expectes a json 'id' field in the message body and will report if the 'id'</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">#� is not +1 from previous</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">#</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">prev_id = 0</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">debug = 1</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">def callback(ch, method, properties, body):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� global prev_id, debug</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� ch.basic_ack(delivery_tag=method.delivery_tag)</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� msgBody = json.loads(body)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� if debug != 0:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� print " [x] %r:%r" % (method.routing_key,msgBody,)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� if prev_id + 1 != int(msgBody['id']):</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� print "Missing %d" % (prev_id + 1)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� prev_id = msgBody['id']</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� return</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">if len(sys.argv) > 1:</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� debug = int(sys.argv[1])</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">while True:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� try:</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� connection = pika.BlockingConnection(pika.ConnectionParameters(</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��������������� host='localhost'))</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� channel = connection.channel()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� channel.exchange_declare(exchange='mqtt.topic',</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�������������������������������� type='topic')</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� result = channel.queue_declare("mqtt-json-queue")</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� queue_name = result.method.queue</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� # topic here is what the mqtt plugin posts to</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� channel.queue_bind(exchange='mqtt.topic',</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�������������������������� queue=queue_name,</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�������������������������� routing_key="mqtt_json_routing_key")</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� print ' [*] Waiting for logs. To exit press CTRL+C'</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� channel.basic_consume(callback,</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">����������������������������� queue=queue_name</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">����������������������������� )</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� channel.start_consuming()</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">��� except Exception, e:</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">������� traceback.print_exc()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal"><span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">
<span style="font-family:"Courier New","serif"">�</span></p><p class="MsoNormal">�</p></div></body></html>