[rabbitmq-discuss] MQTT Plugin - missed and received in wrong order with QoS=0
Adrian Yip
ayip at recondynamics.com
Tue Dec 3 22:38:43 GMT 2013
Hi,
I am testing a Mosquitto client publishing 100s of messages to RabbitMQ
exchange via the MQTT plugin with different QoS levels and I am seeing
issues with messages being received out of order and also what appears to
be the TCP socket going down (seeing TCP FINs in tshark). This is all done
on localhost inside an Ubuntu VM
This happens when the inter-message delay is reduced to < 0.07s with QoS =
0 and < 0.5s with QoS = 1. (RabbitMQ 3.2.1, Erlang R14B04) (Pika 0.9.13)
(mosquito 1.2.2)
The client consumer simply expects to see msgs received in order and prints
an error if the msg id # is not +1 from the previous. If I send in a
similar manner via AMQP using pika client, there is no issue with messages
being received out of order or seeing any socket disconnects (FINs)
The problem gets worse with QoS=1 seeing the connection failing and the
publisher stops receiving confirmations.
In the example below 200 message are published with 0s delay between them
at QoS=0. All the messages are received up to 51, then 52 is missed, then
every 10 messages are received and one skipped. At the end of the log, it
shows receiving the skipped messages being output in reversed order…
received 112, 102, 92, 82…all the way to 52…
I’ve set the client.max_inflight_messages_set(0) in the mosquito client and
the mqtt rabbitmq plugin config is set with prefetch=0 for infinite
messaging which I think only applies for QoS=1. Also in the mosquito
client, I’ve trigger the mosquito client thread to run to process the
publish cfm from the broker plugin.
Any insights would be appreciated. Seems like an issue with mqtt plugin as
when I look at tcpdumps of the traces the mqtt messages are aggregated
together into tcp messages with the low delays and with higher delays they
are separated into individual tcp messages.
I’ve included the example invocation, output and code below.
Adrian
*Mosquitto MQTT Publisher:*
*Example…send 200 messages with 0 delay at QoS=0*
*% python mqtt_publisher_json.py 200 0 0*
Sending JSON msg 1 via MQTT
…
Sending JSON msg 198 via MQTT
Sending JSON msg 199 via MQTT
Sending JSON msg 200 via MQTT
*AMQP Consumer using PIKA*
*Receive messages published on the mqtt topic exchange*
% python amqp_receive_mqtt_json.py
[x] 'mqtt_json_routing_key':{u'id': 1}
[x] 'mqtt_json_routing_key':{u'id': 2}
[x] 'mqtt_json_routing_key':{u'id': 3}
…all messages received here
[x] 'mqtt_json_routing_key':{u'id': 50}
[x] 'mqtt_json_routing_key':{u'id': 51}
[x] 'mqtt_json_routing_key':{u'id': 53} <- Got 53 instead of 52
*Missing 52*
[x] 'mqtt_json_routing_key':{u'id': 54} <- now receive 54…61 in order
[x] 'mqtt_json_routing_key':{u'id': 55}
[x] 'mqtt_json_routing_key':{u'id': 56}
[x] 'mqtt_json_routing_key':{u'id': 57}
[x] 'mqtt_json_routing_key':{u'id': 58}
[x] 'mqtt_json_routing_key':{u'id': 59}
[x] 'mqtt_json_routing_key':{u'id': 60}
[x] 'mqtt_json_routing_key':{u'id': 61}
[x] 'mqtt_json_routing_key':{u'id': 63} <- Got 63 instead of 62
*Missing 62*
[x] 'mqtt_json_routing_key':{u'id': 64}
[x] 'mqtt_json_routing_key':{u'id': 65}
[x] 'mqtt_json_routing_key':{u'id': 66}
[x] 'mqtt_json_routing_key':{u'id': 67}
[x] 'mqtt_json_routing_key':{u'id': 68}
[x] 'mqtt_json_routing_key':{u'id': 69}
[x] 'mqtt_json_routing_key':{u'id': 70}
[x] 'mqtt_json_routing_key':{u'id': 71}
[x] 'mqtt_json_routing_key':{u'id': 73} <- etc…
*Missing 72*
[x] 'mqtt_json_routing_key':{u'id': 74}
[x] 'mqtt_json_routing_key':{u'id': 75}
[x] 'mqtt_json_routing_key':{u'id': 76}
[x] 'mqtt_json_routing_key':{u'id': 77}
[x] 'mqtt_json_routing_key':{u'id': 78}
[x] 'mqtt_json_routing_key':{u'id': 79}
[x] 'mqtt_json_routing_key':{u'id': 80}
[x] 'mqtt_json_routing_key':{u'id': 81}
[x] 'mqtt_json_routing_key':{u'id': 83}
*Missing 82*
[x] 'mqtt_json_routing_key':{u'id': 84}
[x] 'mqtt_json_routing_key':{u'id': 85}
[x] 'mqtt_json_routing_key':{u'id': 86}
[x] 'mqtt_json_routing_key':{u'id': 87}
[x] 'mqtt_json_routing_key':{u'id': 88}
[x] 'mqtt_json_routing_key':{u'id': 89}
[x] 'mqtt_json_routing_key':{u'id': 90}
[x] 'mqtt_json_routing_key':{u'id': 91}
[x] 'mqtt_json_routing_key':{u'id': 93}
*Missing 92*
[x] 'mqtt_json_routing_key':{u'id': 94}
[x] 'mqtt_json_routing_key':{u'id': 95}
[x] 'mqtt_json_routing_key':{u'id': 96}
[x] 'mqtt_json_routing_key':{u'id': 97}
[x] 'mqtt_json_routing_key':{u'id': 98}
[x] 'mqtt_json_routing_key':{u'id': 99}
[x] 'mqtt_json_routing_key':{u'id': 100}
[x] 'mqtt_json_routing_key':{u'id': 101}
[x] 'mqtt_json_routing_key':{u'id': 103}
*Missing 102*
[x] 'mqtt_json_routing_key':{u'id': 104}
[x] 'mqtt_json_routing_key':{u'id': 105}
[x] 'mqtt_json_routing_key':{u'id': 106}
[x] 'mqtt_json_routing_key':{u'id': 107}
[x] 'mqtt_json_routing_key':{u'id': 108}
[x] 'mqtt_json_routing_key':{u'id': 109}
[x] 'mqtt_json_routing_key':{u'id': 110}
[x] 'mqtt_json_routing_key':{u'id': 111}
[x] 'mqtt_json_routing_key':{u'id': 113}
*Missing 112*
[x] 'mqtt_json_routing_key':{u'id': 114}
[x] 'mqtt_json_routing_key':{u'id': 115}
…all messages received in order here…
[x] 'mqtt_json_routing_key':{u'id': 199}
[x] 'mqtt_json_routing_key':{u'id': 200} *<- last message sent*
* [x] 'mqtt_json_routing_key':{u'id': 112} <- Now here come all the
skipped messages in reverse order.*
Missing 201
* [x] 'mqtt_json_routing_key':{u'id': 102}*
Missing 113
* [x] 'mqtt_json_routing_key':{u'id': 92}*
Missing 103
* [x] 'mqtt_json_routing_key':{u'id': 82}*
Missing 93
* [x] 'mqtt_json_routing_key':{u'id': 72}*
Missing 83
* [x] 'mqtt_json_routing_key':{u'id': 62}*
Missing 73
* [x] 'mqtt_json_routing_key':{u'id': 52}*
Missing 63
*mqtt_publisher_json.py:*
# Example of mosquitto json publisher
import mosquitto
import sys
import time
import random
import json
def on_connect(mosq, obj, rc):
if rc == 0:
print("on_connect ok...")
else:
print("on_connect failed rc=",rc)
def on_publish(mosq,obj,mid):
print("Message published: ", mid)
def on_disconnect(mosq, obj, rc):
print("Disconnected...")
def on_log(mosq,userdata,level,buf):
print("on_log:",userdata,level,buf)
if len(sys.argv) != 4:
print "Usage: cnt sleep_s qos"
exit(-1)
cnt = int(sys.argv[1])
sleep_s = float(sys.argv[2])
qos = int(sys.argv[3])
client = mosquitto.Mosquitto("test-client%d" % random.randint(1,100))
print client._client_id
client.on_connect = on_connect
client.on_disconnect = on_disconnect
#client.on_log = on_log
#client.on_publish = on_publish
client.max_inflight_messages_set(0)
client.connect("localhost",port=1884)
# Start mosquitto thread to deal with inflight acks
client.loop_start()
# Transmit a bunch of messages space sleep_s apart
i = 1
while i <= cnt:
print "Sending JSON msg %d via MQTT" % i
msg = {}
#msg["msg"] = "JSON msg#%d via MQTT" % i
msg["id"] = i
try :
rc = client.publish("mqtt_json_routing_key",json.dumps(msg),qos=qos)
except :
print "Unexpected publish error: ",sys.exc_info()[0]
i = i + 1
time.sleep(sleep_s)
# Some delay for last message to go out
wait_s = 2
print "Waiting %d seconds for last message out..." % wait_s
time.sleep(wait_s)
# Stop the threaded Mosquitto client
client.loop_stop()
# Now disconnect
try:
client.disconnect()
except:
print "Unexpected disconnect error: ",sys.exc_info()[0]
*amqp_receive_mqtt_json.py*
# Usage: python amqp_receive_mqtt_json [debug]
#
# [0/1 - debug enable/disable - default enable]
#
# Consumer to receive AMQP messages from mqtt-json-queue from mqtt.topic
exchange
# code expectes a json 'id' field in the message body and will report if
the 'id'
# is not +1 from previous
#
prev_id = 0
debug = 1
def callback(ch, method, properties, body):
global prev_id, debug
ch.basic_ack(delivery_tag=method.delivery_tag)
msgBody = json.loads(body)
if debug != 0:
print " [x] %r:%r" % (method.routing_key,msgBody,)
if prev_id + 1 != int(msgBody['id']):
print "Missing %d" % (prev_id + 1)
prev_id = msgBody['id']
return
if len(sys.argv) > 1:
debug = int(sys.argv[1])
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='mqtt.topic',
type='topic')
result = channel.queue_declare("mqtt-json-queue")
queue_name = result.method.queue
# topic here is what the mqtt plugin posts to
channel.queue_bind(exchange='mqtt.topic',
queue=queue_name,
routing_key="mqtt_json_routing_key")
print ' [*] Waiting for logs. To exit press CTRL+C'
channel.basic_consume(callback,
queue=queue_name
)
channel.start_consuming()
except Exception, e:
traceback.print_exc()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20131203/8bae20b3/attachment.html>
More information about the rabbitmq-discuss
mailing list