<html><head><meta http-equiv="Content-Type" content="text/html; charset=us-ascii"><meta name="Generator" content="Microsoft Word 14 (filtered medium)"><style><!--
/* Font Definitions */
@font-face
        {font-family:Calibri;
        panose-1:2 15 5 2 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
        {margin:0in;
        margin-bottom:.0001pt;
        font-size:11.0pt;
        font-family:"Calibri","sans-serif";}
a:link, span.MsoHyperlink
        {mso-style-priority:99;
        color:blue;
        text-decoration:underline;}
a:visited, span.MsoHyperlinkFollowed
        {mso-style-priority:99;
        color:purple;
        text-decoration:underline;}
span.EmailStyle17
        {mso-style-type:personal-compose;
        font-family:"Courier New";
        color:windowtext;}
.MsoChpDefault
        {mso-style-type:export-only;
        font-family:"Calibri","sans-serif";}
@page WordSection1
        {size:8.5in 11.0in;
        margin:1.0in 1.0in 1.0in 1.0in;}
div.WordSection1
        {page:WordSection1;}
--></style></head><body lang="EN-US" link="blue" vlink="purple"><div class="WordSection1"><p class="MsoNormal"><span style="font-family:"Courier New"">Hi,</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">I have a receiver that is waiting on an topic exchange and I can publish MQTT messages with Mosquitto with QOS=0 just fine from multiple clients, 1000s of messages.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">When I publish with QOS=1 however, the client appears successful in publishing, but the receiver always stops at 20 messages. I can kick off another client without stopping the receiver and it will also do the same thing, the receiver will show 20 messages and nothing more.</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">Ideas?</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">Adrian</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">Receive_json.py</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">------------------</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">#!/usr/bin/env python</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">import pika, traceback</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">import json</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">prev_id = 0</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">def callback(ch, method, properties, body):</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> global prev_id</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> ch.basic_ack(delivery_tag=method.delivery_tag)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> msgBody = json.loads(body)</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> print " [x] %r:%r" % (method.routing_key,msgBody,)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> if prev_id + 1 != int(msgBody['id']):</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> print "Missing %d" % (prev_id + 1) </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> prev_id = msgBody['id']</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> return</span></p><p class="MsoNormal">
<span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">while True:</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> try:</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> connection = pika.BlockingConnection(pika.ConnectionParameters(</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> host='localhost'))</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> channel = connection.channel()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> channel.exchange_declare(exchange='mqtt.topic',</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> type='topic')</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New""> result = channel.queue_declare("mqtt-json-queue")</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> queue_name = result.method.queue</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> # topic here is what the mqtt plugin posts to</span></p><p class="MsoNormal">
<span style="font-family:"Courier New""> channel.queue_bind(exchange='mqtt.topic',</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> queue=queue_name,</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> routing_key="json_routing_key")</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> print ' [*] Waiting for logs. To exit press CTRL+C'</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> channel.basic_consume(callback,</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> queue=queue_name</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> )</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal">
<span style="font-family:"Courier New""> channel.start_consuming()</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> except Exception, e:</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> traceback.print_exc()</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">Client</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">------</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">import mosquitto</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">import sys</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">import time</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">import random</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">import json</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">def on_connect(self, mosq, obj, rc):</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> if rc == 0:</span></p><p class="MsoNormal">
<span style="font-family:"Courier New""> print("Connected successfully.")</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">if len(sys.argv) != 4:</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> print "Usage: cnt sleep_s qos"</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> exit(-1)</span></p><p class="MsoNormal">
<span style="font-family:"Courier New"">cnt = int(sys.argv[1])</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">sleep_s = float(sys.argv[2])</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">qos = int(sys.argv[3])</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">client = mosquitto.Mosquitto("test-client%d" % random.randint(1,100))</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">print client._client_id</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">client.connect("localhost",port=1884)</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New"">i = 1</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">while i <= cnt:</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> print "Sending JSON msg %d via MQTT" % i</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> msg = {}</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> msg["id"] = i</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> try :</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> client.publish("json_routing_key",json.dumps(msg),qos=qos)</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> except :</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> print "Unexpected publish error: ",sys.exc_info()[0]</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> i = i + 1</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New""> </span></p><p class="MsoNormal"><span style="font-family:"Courier New""> time.sleep(sleep_s)</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> </span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">time.sleep(2)</span></p><p class="MsoNormal"><span style="font-family:"Courier New"">try:</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> client.disconnect()</span></p>
<p class="MsoNormal"><span style="font-family:"Courier New"">except:</span></p><p class="MsoNormal"><span style="font-family:"Courier New""> print "Unexpected disconnect error: ",sys.exc_info()[0]</span></p>
<p class="MsoNormal"> </p></div></body></html>