Node-red msg queue form Controller to Gateway over MQTT



  • Short intro. I have a small Arduino device network based on MySensors with RFM69 radios, a rPi MySensors<->MQTT gateway, and a Home Assistant running on a PC server as controller.
    Often my HA sends multiple commands to turn on/off relays on remote devices and although the messages arrive OK and relays actuate, the gateway does not receive the acknowledge because it starts sending a new message. And so the controller does not receive confirmation of state change on all commands sent.
    The setup is based on development branch and no custom modifications.

    So after going from simple Atmega328 serial gateway via mqtt, esp8266 mqtt gateway, then a Atsamd21 serial gateway via mqtt, and now the rpi gateway, problem still there.

    My current solution is a queue implemented in Node-Red. Messages received form controller over mqtt are queued and sent one at a time. If ack is requested but not received in a fixed amount of time (500ms) the message is put back in queue for a total for 5 times. After 5 retries the message is dropped.

    This flow work fine so far, all messages are received and acknowledged in time.

    [{"id":"c00f16f5.dde598","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"f574f085.8c965","type":"simple-queue","z":"c00f16f5.dde598","name":"","firstMessageBypass":true,"bypassInterval":"0","x":410,"y":160,"wires":[["8f09ef2d.2f651"]]},{"id":"47303c08.4463d4","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-in-queue/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":200,"y":100,"wires":[["71271e36.f1b4c"]]},{"id":"8f09ef2d.2f651","type":"function","z":"c00f16f5.dde598","name":"process mys","func":"msg.queueLoops++;\nvar m = msg.topic;\nm = m.split('/');\nmsg.retain = false;\nflow.set('node_id',m[1]);\nflow.set('child_id',m[2]);\nflow.set('command',m[3]);\nflow.set('ack',m[4]);\nflow.set('type',m[5]);\nflow.set('payload',msg.payload);\nflow.set('receivedAt',msg.receivedAt);\nif (m[4] == \"0\" ){\n    //message does not requrire acknowladge\n    flow.set('ackRequested',0);\n    //msg.message=\"no ackRequested\";\n    node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - noAck\");\n} \nelse\n{\n    //message acknowladge requested\n    flow.set('ackRequested',1);\n    msg.message=\"ackRequested!\";\n    node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - ackRequested\");\n}\nmsg.topic = msg.topic.replace('mysensors-in-queue', 'mysensors-in');\nreturn msg;","outputs":1,"noerr":0,"x":570,"y":160,"wires":[["8b425bd3.630458","9ede4d44.22f1f"]]},{"id":"8b425bd3.630458","type":"function","z":"c00f16f5.dde598","name":"check ack","func":"if (flow.get('ackRequested') === 0){\n    //no ack requested, trigger new message from queue. send only trigger command\n    msg = {trigger : 1};\n    return [msg,null];\n}\nelse{\n    //ack requested, start timer and wait for reply\n    msg.mqttPayload = msg.payload\n    return [null,msg];\n}","outputs":2,"noerr":0,"x":760,"y":160,"wires":[["e2913b85.f1b368"],["d843d4a.9199228"]]},{"id":"fd5a5b49.3ec838","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-out/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":180,"y":400,"wires":[["c8e82d0.58cabd"]]},{"id":"9ede4d44.22f1f","type":"mqtt out","z":"c00f16f5.dde598","name":"mysensors-in","topic":"","qos":"","retain":"","broker":"65b2b296.362e3c","x":770,"y":100,"wires":[]},{"id":"e2913b85.f1b368","type":"link out","z":"c00f16f5.dde598","name":"Trigger","links":["c2fcc29e.2f944"],"x":1035,"y":160,"wires":[]},{"id":"c2fcc29e.2f944","type":"link in","z":"c00f16f5.dde598","name":"Trigger","links":["e2913b85.f1b368"],"x":255,"y":160,"wires":[["f574f085.8c965"]]},{"id":"d843d4a.9199228","type":"trigger","z":"c00f16f5.dde598","op1":"","op2":"true","op1type":"nul","op2type":"bool","duration":"500","extend":false,"units":"ms","reset":"","bytopic":"all","name":"timeout","x":660,"y":260,"wires":[["956936f9.17acd8"]]},{"id":"956936f9.17acd8","type":"function","z":"c00f16f5.dde598","name":"check status","func":"var maxLoops = 5; \nif (msg.payload === true){\n    //we got a timeout, no ack received\n    if (msg.queueLoops < maxLoops) {\n        //put message back in queue\n        //check queueLopps to avoid a loop and trigger new message from queue\n        msg.payload = msg.mqttPayload;\n        flow.set('ackRequested',0);\n        node.log(msg.topic + \" \" + msg.payload + \" - ack NOK\");\n        return msg;\n    }\n    else{\n        node.log(msg.topic + \" \" + msg.payload + \" - dropped\");\n        msg = {trigger : 1};\n        return msg; \n    }\n}\nif (msg.reset == 1){\n    //ack received\n    //timeout timer canceled\n    //trigger new message from queue\n    msg = {trigger : 1};\n    return msg; \n}\nreturn null;","outputs":1,"noerr":0,"x":850,"y":260,"wires":[["38d0af8e.532d2","e2913b85.f1b368"]]},{"id":"38d0af8e.532d2","type":"function","z":"c00f16f5.dde598","name":"","func":"//simple queue discards a message if trigger propery is set\n//so we send a delayed trigger if \nif (typeof msg.topic  != \"undefined\") {\n    msg = {trigger : 1};\n    return msg;\n}","outputs":1,"noerr":0,"x":790,"y":360,"wires":[["c9ee7809.97a808"]]},{"id":"c9ee7809.97a808","type":"delay","z":"c00f16f5.dde598","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":950,"y":360,"wires":[["e2913b85.f1b368"]]},{"id":"71271e36.f1b4c","type":"function","z":"c00f16f5.dde598","name":"Add info","func":"//we need a way to track how many times a message has been throw the queue to avoid an infinite loop\n//set received time of message\nmsg.receivedAt = Date.now();\n//set number of queue count to 0 for new message\nmsg.queueLoops = 0;\nreturn msg;","outputs":1,"noerr":0,"x":420,"y":100,"wires":[["f574f085.8c965"]]},{"id":"c8e82d0.58cabd","type":"function","z":"c00f16f5.dde598","name":"compare","func":"var m = msg.topic;\n\tm = m.split('/');\nif (flow.get('ackRequested') === 1){\n    if ((flow.get('node_id') === m[1]) &&\n        (flow.get('child_id') === m[2]) &&\n        (flow.get('command') === m[3]) &&\n        (flow.get('ack') === m[4]) &&\n        (flow.get('type') === m[5]) &&\n        (flow.get('payload') === msg.payload)){\n            //acknoladge received\n            flow.set('ackRequested',0);\n            msg.reset = 1;\n            node.log(  Date.now() +\":\"+\n                        flow.get('receivedAt',msg.receivedAt) +\"/\"+\n                        msg.topic + \" \" + msg.payload + \" - ack OK\");\n            msg.delay = Date.now() - flow.get('receivedAt',msg.receivedAt) ;\n            msg.node_id =  m[1];\n            return msg;\n        }\n}\nelse\n    msg.error = 1;\nreturn null;\n","outputs":1,"noerr":0,"x":480,"y":400,"wires":[["d843d4a.9199228","956936f9.17acd8"]]},{"id":"65b2b296.362e3c","type":"mqtt-broker","z":"","name":"","broker":"192.168.1.27","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
    

    requires node-red-contrib-simple-message-queue



Suggested Topics

50
Online

11.5k
Users

11.1k
Topics

112.7k
Posts