<html><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"><meta name="Generator" content="Microsoft Word 14 (filtered medium)"><style><!--
/* Font Definitions */
@font-face
        {font-family:Calibri;
        panose-1:2 15 5 2 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
        {margin:0in;
        margin-bottom:.0001pt;
        font-size:11.0pt;
        font-family:"Calibri","sans-serif";}
a:link, span.MsoHyperlink
        {mso-style-priority:99;
        color:blue;
        text-decoration:underline;}
a:visited, span.MsoHyperlinkFollowed
        {mso-style-priority:99;
        color:purple;
        text-decoration:underline;}
span.EmailStyle17
        {mso-style-type:personal-compose;
        font-family:"Courier New";
        color:windowtext;}
.MsoChpDefault
        {mso-style-type:export-only;
        font-family:"Calibri","sans-serif";}
@page WordSection1
        {size:8.5in 11.0in;
        margin:1.0in 1.0in 1.0in 1.0in;}
div.WordSection1
        {page:WordSection1;}
--></style></head><body lang="EN-US" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Hi,</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">I have a receiver that is waiting on an topic exchange and I can publish MQTT messages with Mosquitto with QOS=0 just fine from multiple clients, 1000s of messages.</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">When I publish with QOS=1 however, the client appears successful in publishing, but the receiver always stops at 20 messages.� I can kick off another client without stopping the receiver and it will also do the same thing, the receiver will show 20 messages and nothing more.</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Ideas?</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Adrian</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Receive_json.py</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">------------------</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">#!/usr/bin/env python</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import pika, traceback</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import json</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">prev_id = 0</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">def callback(ch, method, properties, body):</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� global prev_id</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� ch.basic_ack(delivery_tag=method.delivery_tag)</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� msgBody = json.loads(body)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� print &quot; [x] %r:%r&quot; % (method.routing_key,msgBody,)</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� if prev_id + 1 != int(msgBody[&#39;id&#39;]):</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� print &quot;Missing %d&quot; % (prev_id + 1) </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����prev_id = msgBody[&#39;id&#39;]</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� return</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">while True:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� try:</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� connection = pika.BlockingConnection(pika.ConnectionParameters(</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��������������� host=&#39;localhost&#39;))</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� channel = connection.channel()</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� channel.exchange_declare(exchange=&#39;mqtt.topic&#39;,</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�������������������������������� type=&#39;topic&#39;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">������� result = channel.queue_declare(&quot;mqtt-json-queue&quot;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� queue_name = result.method.queue</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� # topic here is what the mqtt plugin posts to</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">������� channel.queue_bind(exchange=&#39;mqtt.topic&#39;,</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�������������������������� queue=queue_name,</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�������������������������� routing_key=&quot;json_routing_key&quot;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� print &#39; [*] Waiting for logs. To exit press CTRL+C&#39;</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� channel.basic_consume(callback,</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����������������������������� queue=queue_name</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����������������������������� )</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����������� </span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">��������channel.start_consuming()</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����except Exception, e:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� traceback.print_exc()</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Client</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">------</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import mosquitto</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import sys</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import time</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import random</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import json</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">def on_connect(self, mosq, obj, rc):</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� if rc == 0:</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">������� print(&quot;Connected successfully.&quot;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">if len(sys.argv) != 4:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� print &quot;Usage: cnt sleep_s qos&quot;</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� exit(-1)</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">cnt = int(sys.argv[1])</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">sleep_s = float(sys.argv[2])</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">qos = int(sys.argv[3])</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">client = mosquitto.Mosquitto(&quot;test-client%d&quot; % random.randint(1,100))</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">print client._client_id</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">client.connect(&quot;localhost&quot;,port=1884)</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">i = 1</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">while i &lt;=� cnt:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� print &quot;Sending JSON msg %d via MQTT&quot; % i</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� msg = {}</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� msg[&quot;id&quot;] = i</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����try :</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� client.publish(&quot;json_routing_key&quot;,json.dumps(msg),qos=qos)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� except :</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� print &quot;Unexpected publish error: &quot;,sys.exc_info()[0]</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� i = i + 1</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">������� </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">����time.sleep(sleep_s)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">�</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">time.sleep(2)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">try:</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� client.disconnect()</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">except:</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">��� print &quot;Unexpected disconnect error: &quot;,sys.exc_info()[0]</span></p>
<p class="MsoNormal">�</p></div></body></html>