[rabbitmq-discuss] MQTT Plugin - missed and received in wrong order with QoS=0

Adrian Yip ayip at recondynamics.com
Tue Dec 3 22:38:43 GMT 2013


Hi,



I am testing a Mosquitto client publishing 100s of messages to RabbitMQ
exchange via the MQTT plugin with different QoS levels and I am seeing
issues with messages being received out of order and also what appears to
be the TCP socket going down (seeing TCP FINs in tshark).  This is all done
on localhost inside an Ubuntu VM



This happens when the inter-message delay is reduced to < 0.07s with QoS =
0 and < 0.5s with QoS = 1.  (RabbitMQ 3.2.1, Erlang R14B04) (Pika 0.9.13)
(mosquito 1.2.2)



The client consumer simply expects to see msgs received in order and prints
an error if the msg id # is not +1 from the previous.  If I send in a
similar manner via AMQP using pika client, there is no issue with messages
being received out of order or seeing any socket disconnects (FINs)



The problem gets worse with QoS=1 seeing the connection failing and the
publisher stops receiving confirmations.



In the example below 200 message are published with 0s delay between them
at QoS=0.  All the messages are received up to 51, then 52 is missed, then
every 10 messages are received and one skipped.  At the end of the log, it
shows receiving the skipped messages being output in reversed order…
received 112, 102, 92, 82…all the way to 52…



I’ve set the client.max_inflight_messages_set(0) in the mosquito client and
the mqtt rabbitmq plugin config is set with prefetch=0 for infinite
messaging which I think only applies for QoS=1.  Also in the mosquito
client, I’ve trigger the mosquito client thread to run to process the
publish cfm from the broker plugin.



Any insights would be appreciated.  Seems like an issue with mqtt plugin as
when I look at tcpdumps of the traces the mqtt messages are aggregated
together into tcp messages with the low delays and with higher delays they
are separated into individual tcp messages.



I’ve included the example invocation, output and code below.



Adrian





*Mosquitto MQTT Publisher:*



*Example…send 200 messages with 0 delay at QoS=0*



*% python mqtt_publisher_json.py 200 0 0*



Sending JSON msg 1 via MQTT



…



Sending JSON msg 198 via MQTT

Sending JSON msg 199 via MQTT

Sending JSON msg 200 via MQTT





*AMQP Consumer using PIKA*



*Receive messages published on the mqtt topic exchange*



% python amqp_receive_mqtt_json.py

[x] 'mqtt_json_routing_key':{u'id': 1}

[x] 'mqtt_json_routing_key':{u'id': 2}

[x] 'mqtt_json_routing_key':{u'id': 3}

…all messages received here

[x] 'mqtt_json_routing_key':{u'id': 50}

[x] 'mqtt_json_routing_key':{u'id': 51}

[x] 'mqtt_json_routing_key':{u'id': 53}  <- Got 53 instead of 52

*Missing 52*

[x] 'mqtt_json_routing_key':{u'id': 54}  <- now receive 54…61 in order

[x] 'mqtt_json_routing_key':{u'id': 55}

[x] 'mqtt_json_routing_key':{u'id': 56}

[x] 'mqtt_json_routing_key':{u'id': 57}

[x] 'mqtt_json_routing_key':{u'id': 58}

[x] 'mqtt_json_routing_key':{u'id': 59}

[x] 'mqtt_json_routing_key':{u'id': 60}

[x] 'mqtt_json_routing_key':{u'id': 61}

[x] 'mqtt_json_routing_key':{u'id': 63}  <- Got 63 instead of 62

*Missing 62*

[x] 'mqtt_json_routing_key':{u'id': 64}

[x] 'mqtt_json_routing_key':{u'id': 65}

[x] 'mqtt_json_routing_key':{u'id': 66}

[x] 'mqtt_json_routing_key':{u'id': 67}

[x] 'mqtt_json_routing_key':{u'id': 68}

[x] 'mqtt_json_routing_key':{u'id': 69}

[x] 'mqtt_json_routing_key':{u'id': 70}

[x] 'mqtt_json_routing_key':{u'id': 71}

[x] 'mqtt_json_routing_key':{u'id': 73}  <- etc…

*Missing 72*

[x] 'mqtt_json_routing_key':{u'id': 74}

[x] 'mqtt_json_routing_key':{u'id': 75}

[x] 'mqtt_json_routing_key':{u'id': 76}

[x] 'mqtt_json_routing_key':{u'id': 77}

[x] 'mqtt_json_routing_key':{u'id': 78}

[x] 'mqtt_json_routing_key':{u'id': 79}

[x] 'mqtt_json_routing_key':{u'id': 80}

[x] 'mqtt_json_routing_key':{u'id': 81}

[x] 'mqtt_json_routing_key':{u'id': 83}

*Missing 82*

[x] 'mqtt_json_routing_key':{u'id': 84}

[x] 'mqtt_json_routing_key':{u'id': 85}

[x] 'mqtt_json_routing_key':{u'id': 86}

[x] 'mqtt_json_routing_key':{u'id': 87}

[x] 'mqtt_json_routing_key':{u'id': 88}

[x] 'mqtt_json_routing_key':{u'id': 89}

[x] 'mqtt_json_routing_key':{u'id': 90}

[x] 'mqtt_json_routing_key':{u'id': 91}

[x] 'mqtt_json_routing_key':{u'id': 93}

*Missing 92*

[x] 'mqtt_json_routing_key':{u'id': 94}

[x] 'mqtt_json_routing_key':{u'id': 95}

[x] 'mqtt_json_routing_key':{u'id': 96}

[x] 'mqtt_json_routing_key':{u'id': 97}

[x] 'mqtt_json_routing_key':{u'id': 98}

[x] 'mqtt_json_routing_key':{u'id': 99}

[x] 'mqtt_json_routing_key':{u'id': 100}

[x] 'mqtt_json_routing_key':{u'id': 101}

 [x] 'mqtt_json_routing_key':{u'id': 103}

*Missing 102*

[x] 'mqtt_json_routing_key':{u'id': 104}

[x] 'mqtt_json_routing_key':{u'id': 105}

[x] 'mqtt_json_routing_key':{u'id': 106}

[x] 'mqtt_json_routing_key':{u'id': 107}

[x] 'mqtt_json_routing_key':{u'id': 108}

[x] 'mqtt_json_routing_key':{u'id': 109}

[x] 'mqtt_json_routing_key':{u'id': 110}

[x] 'mqtt_json_routing_key':{u'id': 111}

[x] 'mqtt_json_routing_key':{u'id': 113}

*Missing 112*

[x] 'mqtt_json_routing_key':{u'id': 114}

[x] 'mqtt_json_routing_key':{u'id': 115}



…all messages received in order here…



[x] 'mqtt_json_routing_key':{u'id': 199}

[x] 'mqtt_json_routing_key':{u'id': 200}   *<- last message sent*

* [x] 'mqtt_json_routing_key':{u'id': 112}   <- Now here come all the
skipped messages in reverse order.*

Missing 201

* [x] 'mqtt_json_routing_key':{u'id': 102}*

Missing 113

* [x] 'mqtt_json_routing_key':{u'id': 92}*

Missing 103

* [x] 'mqtt_json_routing_key':{u'id': 82}*

Missing 93

* [x] 'mqtt_json_routing_key':{u'id': 72}*

Missing 83

* [x] 'mqtt_json_routing_key':{u'id': 62}*

Missing 73

* [x] 'mqtt_json_routing_key':{u'id': 52}*

Missing 63



*mqtt_publisher_json.py:*



# Example of mosquitto json publisher

import mosquitto

import sys

import time

import random

import json



def on_connect(mosq, obj, rc):

    if rc == 0:

        print("on_connect ok...")

    else:

        print("on_connect failed rc=",rc)





def on_publish(mosq,obj,mid):

    print("Message published: ", mid)



def on_disconnect(mosq, obj, rc):

    print("Disconnected...")



def on_log(mosq,userdata,level,buf):

    print("on_log:",userdata,level,buf)





if len(sys.argv) != 4:

    print "Usage: cnt sleep_s qos"

    exit(-1)

cnt = int(sys.argv[1])

sleep_s = float(sys.argv[2])

qos = int(sys.argv[3])



client = mosquitto.Mosquitto("test-client%d" % random.randint(1,100))

print client._client_id

client.on_connect = on_connect

client.on_disconnect = on_disconnect

#client.on_log = on_log

#client.on_publish = on_publish



client.max_inflight_messages_set(0)

client.connect("localhost",port=1884)



# Start mosquitto thread to deal with inflight acks

client.loop_start()



# Transmit a bunch of messages space sleep_s apart

i = 1

while i <=  cnt:

    print "Sending JSON msg %d via MQTT" % i

    msg = {}

    #msg["msg"] = "JSON msg#%d via MQTT" % i

    msg["id"] = i



    try :

        rc = client.publish("mqtt_json_routing_key",json.dumps(msg),qos=qos)

    except :

        print "Unexpected publish error: ",sys.exc_info()[0]

    i = i + 1



    time.sleep(sleep_s)



# Some delay for last message to go out

wait_s = 2

print "Waiting %d seconds for last message out..." % wait_s

time.sleep(wait_s)



# Stop the threaded Mosquitto client

client.loop_stop()



# Now disconnect

try:

    client.disconnect()

except:

    print "Unexpected disconnect error: ",sys.exc_info()[0]



*amqp_receive_mqtt_json.py*



# Usage: python amqp_receive_mqtt_json [debug]

#

#  [0/1 - debug enable/disable - default enable]

#

#  Consumer to receive AMQP messages from mqtt-json-queue from mqtt.topic
exchange

#  code expectes a json 'id' field in the message body and will report if
the 'id'

#  is not +1 from previous

#

prev_id = 0

debug = 1



def callback(ch, method, properties, body):

    global prev_id, debug



    ch.basic_ack(delivery_tag=method.delivery_tag)

    msgBody = json.loads(body)

    if debug != 0:

        print " [x] %r:%r" % (method.routing_key,msgBody,)

    if prev_id + 1 != int(msgBody['id']):

        print "Missing %d" % (prev_id + 1)

    prev_id = msgBody['id']

    return



if len(sys.argv) > 1:

    debug = int(sys.argv[1])



while True:

    try:



        connection = pika.BlockingConnection(pika.ConnectionParameters(

                host='localhost'))

        channel = connection.channel()



        channel.exchange_declare(exchange='mqtt.topic',

                                 type='topic')



        result = channel.queue_declare("mqtt-json-queue")

        queue_name = result.method.queue



        # topic here is what the mqtt plugin posts to

        channel.queue_bind(exchange='mqtt.topic',

                           queue=queue_name,

                           routing_key="mqtt_json_routing_key")



        print ' [*] Waiting for logs. To exit press CTRL+C'



        channel.basic_consume(callback,

                              queue=queue_name

                              )



        channel.start_consuming()



    except Exception, e:

        traceback.print_exc()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20131203/8bae20b3/attachment.html>


More information about the rabbitmq-discuss mailing list