<html><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"><meta name="Generator" content="Microsoft Word 14 (filtered medium)"><style><!--
/* Font Definitions */
@font-face
        {font-family:Calibri;
        panose-1:2 15 5 2 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
        {margin:0in;
        margin-bottom:.0001pt;
        font-size:11.0pt;
        font-family:"Calibri","sans-serif";}
a:link, span.MsoHyperlink
        {mso-style-priority:99;
        color:blue;
        text-decoration:underline;}
a:visited, span.MsoHyperlinkFollowed
        {mso-style-priority:99;
        color:purple;
        text-decoration:underline;}
span.EmailStyle17
        {mso-style-type:personal-compose;
        font-family:"Courier New";
        color:windowtext;}
.MsoChpDefault
        {mso-style-type:export-only;
        font-family:"Calibri","sans-serif";}
@page WordSection1
        {size:8.5in 11.0in;
        margin:1.0in 1.0in 1.0in 1.0in;}
div.WordSection1
        {page:WordSection1;}
--></style></head><body lang="EN-US" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Hi,</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">I have a receiver that is waiting on an topic exchange and I can publish MQTT messages with Mosquitto with QOS=0 just fine from multiple clients, 1000s of messages.</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">When I publish with QOS=1 however, the client appears successful in publishing, but the receiver always stops at 20 messages.  I can kick off another client without stopping the receiver and it will also do the same thing, the receiver will show 20 messages and nothing more.</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Ideas?</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Adrian</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Receive_json.py</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">------------------</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">#!/usr/bin/env python</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import pika, traceback</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import json</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">prev_id = 0</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">def callback(ch, method, properties, body):</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    global prev_id</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    ch.basic_ack(delivery_tag=method.delivery_tag)</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    msgBody = json.loads(body)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    print &quot; [x] %r:%r&quot; % (method.routing_key,msgBody,)</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    if prev_id + 1 != int(msgBody[&#39;id&#39;]):</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        print &quot;Missing %d&quot; % (prev_id + 1) </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    prev_id = msgBody[&#39;id&#39;]</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    return</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">while True:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    try:</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        connection = pika.BlockingConnection(pika.ConnectionParameters(</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">                host=&#39;localhost&#39;))</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        channel = connection.channel()</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        channel.exchange_declare(exchange=&#39;mqtt.topic&#39;,</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">                                 type=&#39;topic&#39;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">        result = channel.queue_declare(&quot;mqtt-json-queue&quot;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        queue_name = result.method.queue</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        # topic here is what the mqtt plugin posts to</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">        channel.queue_bind(exchange=&#39;mqtt.topic&#39;,</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">                           queue=queue_name,</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">                           routing_key=&quot;json_routing_key&quot;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        print &#39; [*] Waiting for logs. To exit press CTRL+C&#39;</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        channel.basic_consume(callback,</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">                              queue=queue_name</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">                              )</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">            </span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">        channel.start_consuming()</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    except Exception, e:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        traceback.print_exc()</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">Client</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">------</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import mosquitto</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import sys</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import time</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import random</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">import json</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">def on_connect(self, mosq, obj, rc):</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    if rc == 0:</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">        print(&quot;Connected successfully.&quot;)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">if len(sys.argv) != 4:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    print &quot;Usage: cnt sleep_s qos&quot;</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    exit(-1)</span></p><p class="MsoNormal">
<span style="font-family:&quot;Courier New&quot;">cnt = int(sys.argv[1])</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">sleep_s = float(sys.argv[2])</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">qos = int(sys.argv[3])</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">client = mosquitto.Mosquitto(&quot;test-client%d&quot; % random.randint(1,100))</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">print client._client_id</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">client.connect(&quot;localhost&quot;,port=1884)</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">i = 1</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">while i &lt;=  cnt:</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    print &quot;Sending JSON msg %d via MQTT&quot; % i</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    msg = {}</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    msg[&quot;id&quot;] = i</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    try :</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        client.publish(&quot;json_routing_key&quot;,json.dumps(msg),qos=qos)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    except :</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        print &quot;Unexpected publish error: &quot;,sys.exc_info()[0]</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    i = i + 1</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">        </span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    time.sleep(sleep_s)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;"> </span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">time.sleep(2)</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">try:</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    client.disconnect()</span></p>
<p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">except:</span></p><p class="MsoNormal"><span style="font-family:&quot;Courier New&quot;">    print &quot;Unexpected disconnect error: &quot;,sys.exc_info()[0]</span></p>
<p class="MsoNormal"> </p></div></body></html>