Node-red msg queue form Controller to Gateway over MQTT
-
Short intro. I have a small Arduino device network based on MySensors with RFM69 radios, a rPi MySensors<->MQTT gateway, and a Home Assistant running on a PC server as controller.
Often my HA sends multiple commands to turn on/off relays on remote devices and although the messages arrive OK and relays actuate, the gateway does not receive the acknowledge because it starts sending a new message. And so the controller does not receive confirmation of state change on all commands sent.
The setup is based on development branch and no custom modifications.So after going from simple Atmega328 serial gateway via mqtt, esp8266 mqtt gateway, then a Atsamd21 serial gateway via mqtt, and now the rpi gateway, problem still there.
My current solution is a queue implemented in Node-Red. Messages received form controller over mqtt are queued and sent one at a time. If ack is requested but not received in a fixed amount of time (500ms) the message is put back in queue for a total for 5 times. After 5 retries the message is dropped.
This flow work fine so far, all messages are received and acknowledged in time.
[{"id":"c00f16f5.dde598","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"f574f085.8c965","type":"simple-queue","z":"c00f16f5.dde598","name":"","firstMessageBypass":true,"bypassInterval":"0","x":410,"y":160,"wires":[["8f09ef2d.2f651"]]},{"id":"47303c08.4463d4","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-in-queue/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":200,"y":100,"wires":[["71271e36.f1b4c"]]},{"id":"8f09ef2d.2f651","type":"function","z":"c00f16f5.dde598","name":"process mys","func":"msg.queueLoops++;\nvar m = msg.topic;\nm = m.split('/');\nmsg.retain = false;\nflow.set('node_id',m[1]);\nflow.set('child_id',m[2]);\nflow.set('command',m[3]);\nflow.set('ack',m[4]);\nflow.set('type',m[5]);\nflow.set('payload',msg.payload);\nflow.set('receivedAt',msg.receivedAt);\nif (m[4] == \"0\" ){\n //message does not requrire acknowladge\n flow.set('ackRequested',0);\n //msg.message=\"no ackRequested\";\n node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - noAck\");\n} \nelse\n{\n //message acknowladge requested\n flow.set('ackRequested',1);\n msg.message=\"ackRequested!\";\n node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - ackRequested\");\n}\nmsg.topic = msg.topic.replace('mysensors-in-queue', 'mysensors-in');\nreturn msg;","outputs":1,"noerr":0,"x":570,"y":160,"wires":[["8b425bd3.630458","9ede4d44.22f1f"]]},{"id":"8b425bd3.630458","type":"function","z":"c00f16f5.dde598","name":"check ack","func":"if (flow.get('ackRequested') === 0){\n //no ack requested, trigger new message from queue. send only trigger command\n msg = {trigger : 1};\n return [msg,null];\n}\nelse{\n //ack requested, start timer and wait for reply\n msg.mqttPayload = msg.payload\n return [null,msg];\n}","outputs":2,"noerr":0,"x":760,"y":160,"wires":[["e2913b85.f1b368"],["d843d4a.9199228"]]},{"id":"fd5a5b49.3ec838","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-out/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":180,"y":400,"wires":[["c8e82d0.58cabd"]]},{"id":"9ede4d44.22f1f","type":"mqtt out","z":"c00f16f5.dde598","name":"mysensors-in","topic":"","qos":"","retain":"","broker":"65b2b296.362e3c","x":770,"y":100,"wires":[]},{"id":"e2913b85.f1b368","type":"link out","z":"c00f16f5.dde598","name":"Trigger","links":["c2fcc29e.2f944"],"x":1035,"y":160,"wires":[]},{"id":"c2fcc29e.2f944","type":"link in","z":"c00f16f5.dde598","name":"Trigger","links":["e2913b85.f1b368"],"x":255,"y":160,"wires":[["f574f085.8c965"]]},{"id":"d843d4a.9199228","type":"trigger","z":"c00f16f5.dde598","op1":"","op2":"true","op1type":"nul","op2type":"bool","duration":"500","extend":false,"units":"ms","reset":"","bytopic":"all","name":"timeout","x":660,"y":260,"wires":[["956936f9.17acd8"]]},{"id":"956936f9.17acd8","type":"function","z":"c00f16f5.dde598","name":"check status","func":"var maxLoops = 5; \nif (msg.payload === true){\n //we got a timeout, no ack received\n if (msg.queueLoops < maxLoops) {\n //put message back in queue\n //check queueLopps to avoid a loop and trigger new message from queue\n msg.payload = msg.mqttPayload;\n flow.set('ackRequested',0);\n node.log(msg.topic + \" \" + msg.payload + \" - ack NOK\");\n return msg;\n }\n else{\n node.log(msg.topic + \" \" + msg.payload + \" - dropped\");\n msg = {trigger : 1};\n return msg; \n }\n}\nif (msg.reset == 1){\n //ack received\n //timeout timer canceled\n //trigger new message from queue\n msg = {trigger : 1};\n return msg; \n}\nreturn null;","outputs":1,"noerr":0,"x":850,"y":260,"wires":[["38d0af8e.532d2","e2913b85.f1b368"]]},{"id":"38d0af8e.532d2","type":"function","z":"c00f16f5.dde598","name":"","func":"//simple queue discards a message if trigger propery is set\n//so we send a delayed trigger if \nif (typeof msg.topic != \"undefined\") {\n msg = {trigger : 1};\n return msg;\n}","outputs":1,"noerr":0,"x":790,"y":360,"wires":[["c9ee7809.97a808"]]},{"id":"c9ee7809.97a808","type":"delay","z":"c00f16f5.dde598","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":950,"y":360,"wires":[["e2913b85.f1b368"]]},{"id":"71271e36.f1b4c","type":"function","z":"c00f16f5.dde598","name":"Add info","func":"//we need a way to track how many times a message has been throw the queue to avoid an infinite loop\n//set received time of message\nmsg.receivedAt = Date.now();\n//set number of queue count to 0 for new message\nmsg.queueLoops = 0;\nreturn msg;","outputs":1,"noerr":0,"x":420,"y":100,"wires":[["f574f085.8c965"]]},{"id":"c8e82d0.58cabd","type":"function","z":"c00f16f5.dde598","name":"compare","func":"var m = msg.topic;\n\tm = m.split('/');\nif (flow.get('ackRequested') === 1){\n if ((flow.get('node_id') === m[1]) &&\n (flow.get('child_id') === m[2]) &&\n (flow.get('command') === m[3]) &&\n (flow.get('ack') === m[4]) &&\n (flow.get('type') === m[5]) &&\n (flow.get('payload') === msg.payload)){\n //acknoladge received\n flow.set('ackRequested',0);\n msg.reset = 1;\n node.log( Date.now() +\":\"+\n flow.get('receivedAt',msg.receivedAt) +\"/\"+\n msg.topic + \" \" + msg.payload + \" - ack OK\");\n msg.delay = Date.now() - flow.get('receivedAt',msg.receivedAt) ;\n msg.node_id = m[1];\n return msg;\n }\n}\nelse\n msg.error = 1;\nreturn null;\n","outputs":1,"noerr":0,"x":480,"y":400,"wires":[["d843d4a.9199228","956936f9.17acd8"]]},{"id":"65b2b296.362e3c","type":"mqtt-broker","z":"","name":"","broker":"192.168.1.27","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
requires node-red-contrib-simple-message-queue
Suggested Topics
-
sendBatteryLevel: wrong nodeId?
Bug Reports • 5 Jan 2015, 08:50 • Xander 5 Jan 2015, 11:08 -
Need some minor help with MQTT strings
Troubleshooting • 22 Dec 2015, 23:09 • drock1985 23 Dec 2015, 16:01 -
💬 Sonoff relay using MySensors ESP8266 wifi or mqtt gateway
OpenHardware.io • 20 Jan 2017, 16:20 • openhardware.io 29 Oct 2018, 08:03 -
Gateway or mqtt?
General Discussion • 21 Nov 2016, 03:03 • Michael_K 22 Nov 2016, 09:05 -
Battery level to openHAB
OpenHAB • 20 Jan 2015, 19:25 • bomber 20 Jan 2015, 21:06 -
Strange value being sent from controller using mixed temp/relay node
Bug Reports • 2 Feb 2015, 13:56 • Gambituk 27 Mar 2015, 22:44 -
MQTT getting data from database - New Domotic Project
Development • 31 May 2017, 18:05 • achumpitazc 31 May 2017, 19:21 -
ESP8266 gateway crashes while the node is in FOTA progress.
Troubleshooting • 25 Nov 2017, 00:22 • Abdu Sahin 13 Dec 2017, 20:50