Node-red msg queue form Controller to Gateway over MQTT
-
Short intro. I have a small Arduino device network based on MySensors with RFM69 radios, a rPi MySensors<->MQTT gateway, and a Home Assistant running on a PC server as controller.
Often my HA sends multiple commands to turn on/off relays on remote devices and although the messages arrive OK and relays actuate, the gateway does not receive the acknowledge because it starts sending a new message. And so the controller does not receive confirmation of state change on all commands sent.
The setup is based on development branch and no custom modifications.So after going from simple Atmega328 serial gateway via mqtt, esp8266 mqtt gateway, then a Atsamd21 serial gateway via mqtt, and now the rpi gateway, problem still there.
My current solution is a queue implemented in Node-Red. Messages received form controller over mqtt are queued and sent one at a time. If ack is requested but not received in a fixed amount of time (500ms) the message is put back in queue for a total for 5 times. After 5 retries the message is dropped.
This flow work fine so far, all messages are received and acknowledged in time.
[{"id":"c00f16f5.dde598","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"f574f085.8c965","type":"simple-queue","z":"c00f16f5.dde598","name":"","firstMessageBypass":true,"bypassInterval":"0","x":410,"y":160,"wires":[["8f09ef2d.2f651"]]},{"id":"47303c08.4463d4","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-in-queue/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":200,"y":100,"wires":[["71271e36.f1b4c"]]},{"id":"8f09ef2d.2f651","type":"function","z":"c00f16f5.dde598","name":"process mys","func":"msg.queueLoops++;\nvar m = msg.topic;\nm = m.split('/');\nmsg.retain = false;\nflow.set('node_id',m[1]);\nflow.set('child_id',m[2]);\nflow.set('command',m[3]);\nflow.set('ack',m[4]);\nflow.set('type',m[5]);\nflow.set('payload',msg.payload);\nflow.set('receivedAt',msg.receivedAt);\nif (m[4] == \"0\" ){\n //message does not requrire acknowladge\n flow.set('ackRequested',0);\n //msg.message=\"no ackRequested\";\n node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - noAck\");\n} \nelse\n{\n //message acknowladge requested\n flow.set('ackRequested',1);\n msg.message=\"ackRequested!\";\n node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - ackRequested\");\n}\nmsg.topic = msg.topic.replace('mysensors-in-queue', 'mysensors-in');\nreturn msg;","outputs":1,"noerr":0,"x":570,"y":160,"wires":[["8b425bd3.630458","9ede4d44.22f1f"]]},{"id":"8b425bd3.630458","type":"function","z":"c00f16f5.dde598","name":"check ack","func":"if (flow.get('ackRequested') === 0){\n //no ack requested, trigger new message from queue. send only trigger command\n msg = {trigger : 1};\n return [msg,null];\n}\nelse{\n //ack requested, start timer and wait for reply\n msg.mqttPayload = msg.payload\n return [null,msg];\n}","outputs":2,"noerr":0,"x":760,"y":160,"wires":[["e2913b85.f1b368"],["d843d4a.9199228"]]},{"id":"fd5a5b49.3ec838","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-out/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":180,"y":400,"wires":[["c8e82d0.58cabd"]]},{"id":"9ede4d44.22f1f","type":"mqtt out","z":"c00f16f5.dde598","name":"mysensors-in","topic":"","qos":"","retain":"","broker":"65b2b296.362e3c","x":770,"y":100,"wires":[]},{"id":"e2913b85.f1b368","type":"link out","z":"c00f16f5.dde598","name":"Trigger","links":["c2fcc29e.2f944"],"x":1035,"y":160,"wires":[]},{"id":"c2fcc29e.2f944","type":"link in","z":"c00f16f5.dde598","name":"Trigger","links":["e2913b85.f1b368"],"x":255,"y":160,"wires":[["f574f085.8c965"]]},{"id":"d843d4a.9199228","type":"trigger","z":"c00f16f5.dde598","op1":"","op2":"true","op1type":"nul","op2type":"bool","duration":"500","extend":false,"units":"ms","reset":"","bytopic":"all","name":"timeout","x":660,"y":260,"wires":[["956936f9.17acd8"]]},{"id":"956936f9.17acd8","type":"function","z":"c00f16f5.dde598","name":"check status","func":"var maxLoops = 5; \nif (msg.payload === true){\n //we got a timeout, no ack received\n if (msg.queueLoops < maxLoops) {\n //put message back in queue\n //check queueLopps to avoid a loop and trigger new message from queue\n msg.payload = msg.mqttPayload;\n flow.set('ackRequested',0);\n node.log(msg.topic + \" \" + msg.payload + \" - ack NOK\");\n return msg;\n }\n else{\n node.log(msg.topic + \" \" + msg.payload + \" - dropped\");\n msg = {trigger : 1};\n return msg; \n }\n}\nif (msg.reset == 1){\n //ack received\n //timeout timer canceled\n //trigger new message from queue\n msg = {trigger : 1};\n return msg; \n}\nreturn null;","outputs":1,"noerr":0,"x":850,"y":260,"wires":[["38d0af8e.532d2","e2913b85.f1b368"]]},{"id":"38d0af8e.532d2","type":"function","z":"c00f16f5.dde598","name":"","func":"//simple queue discards a message if trigger propery is set\n//so we send a delayed trigger if \nif (typeof msg.topic != \"undefined\") {\n msg = {trigger : 1};\n return msg;\n}","outputs":1,"noerr":0,"x":790,"y":360,"wires":[["c9ee7809.97a808"]]},{"id":"c9ee7809.97a808","type":"delay","z":"c00f16f5.dde598","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":950,"y":360,"wires":[["e2913b85.f1b368"]]},{"id":"71271e36.f1b4c","type":"function","z":"c00f16f5.dde598","name":"Add info","func":"//we need a way to track how many times a message has been throw the queue to avoid an infinite loop\n//set received time of message\nmsg.receivedAt = Date.now();\n//set number of queue count to 0 for new message\nmsg.queueLoops = 0;\nreturn msg;","outputs":1,"noerr":0,"x":420,"y":100,"wires":[["f574f085.8c965"]]},{"id":"c8e82d0.58cabd","type":"function","z":"c00f16f5.dde598","name":"compare","func":"var m = msg.topic;\n\tm = m.split('/');\nif (flow.get('ackRequested') === 1){\n if ((flow.get('node_id') === m[1]) &&\n (flow.get('child_id') === m[2]) &&\n (flow.get('command') === m[3]) &&\n (flow.get('ack') === m[4]) &&\n (flow.get('type') === m[5]) &&\n (flow.get('payload') === msg.payload)){\n //acknoladge received\n flow.set('ackRequested',0);\n msg.reset = 1;\n node.log( Date.now() +\":\"+\n flow.get('receivedAt',msg.receivedAt) +\"/\"+\n msg.topic + \" \" + msg.payload + \" - ack OK\");\n msg.delay = Date.now() - flow.get('receivedAt',msg.receivedAt) ;\n msg.node_id = m[1];\n return msg;\n }\n}\nelse\n msg.error = 1;\nreturn null;\n","outputs":1,"noerr":0,"x":480,"y":400,"wires":[["d843d4a.9199228","956936f9.17acd8"]]},{"id":"65b2b296.362e3c","type":"mqtt-broker","z":"","name":"","broker":"192.168.1.27","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
requires node-red-contrib-simple-message-queue
Suggested Topics
-
Raspberry Pi and MQTT-OpenHAB
General Discussion • 27 Nov 2014, 17:33 • C.r.a.z.y. 30 Nov 2014, 17:59 -
WizNet5100 ethernet module chip temperature
Troubleshooting • 21 Jan 2015, 17:08 • bomber 25 Jan 2015, 22:13 -
Battery level to openHAB
OpenHAB • 20 Jan 2015, 19:25 • bomber 20 Jan 2015, 21:06 -
CC3000 and ethernet/MQTT gateway
Hardware • 29 Jan 2015, 14:09 • bomber 6 Jan 2016, 01:20 -
Yet another MQTT gateway for MySensors :)
Development • 11 Aug 2014, 20:52 • scurb 16 Aug 2014, 20:59 -
MQTT Client Gateway / Node Controller / OTA
Hardware • 19 Jul 2016, 15:01 • CrankyCoder 26 Jul 2017, 13:32 -
💬 Bed occupancy detector
OpenHardware.io • 4 Oct 2018, 19:18 • openhardware.io 4 Oct 2018, 19:18 -
Gateway MQTT first launch
General Discussion • 12 Jul 2018, 16:35 • miclane 12 Jul 2018, 17:21