Skip to content
  • MySensors
  • OpenHardware.io
  • Categories
  • Recent
  • Tags
  • Popular
Skins
  • Light
  • Brite
  • Cerulean
  • Cosmo
  • Flatly
  • Journal
  • Litera
  • Lumen
  • Lux
  • Materia
  • Minty
  • Morph
  • Pulse
  • Sandstone
  • Simplex
  • Sketchy
  • Spacelab
  • United
  • Yeti
  • Zephyr
  • Dark
  • Cyborg
  • Darkly
  • Quartz
  • Slate
  • Solar
  • Superhero
  • Vapor

  • Default (No Skin)
  • No Skin
Collapse
Brand Logo
  1. Home
  2. Development
  3. Node-red msg queue form Controller to Gateway over MQTT

Node-red msg queue form Controller to Gateway over MQTT

Scheduled Pinned Locked Moved Development
mqttgateway mqttqueue
1 Posts 1 Posters 25 Views 2 Watching
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • alexeliteA Offline
    alexeliteA Offline
    alexelite
    wrote on last edited by alexelite
    #1

    Short intro. I have a small Arduino device network based on MySensors with RFM69 radios, a rPi MySensors<->MQTT gateway, and a Home Assistant running on a PC server as controller.
    Often my HA sends multiple commands to turn on/off relays on remote devices and although the messages arrive OK and relays actuate, the gateway does not receive the acknowledge because it starts sending a new message. And so the controller does not receive confirmation of state change on all commands sent.
    The setup is based on development branch and no custom modifications.

    So after going from simple Atmega328 serial gateway via mqtt, esp8266 mqtt gateway, then a Atsamd21 serial gateway via mqtt, and now the rpi gateway, problem still there.

    My current solution is a queue implemented in Node-Red. Messages received form controller over mqtt are queued and sent one at a time. If ack is requested but not received in a fixed amount of time (500ms) the message is put back in queue for a total for 5 times. After 5 retries the message is dropped.

    This flow work fine so far, all messages are received and acknowledged in time.

    [{"id":"c00f16f5.dde598","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"f574f085.8c965","type":"simple-queue","z":"c00f16f5.dde598","name":"","firstMessageBypass":true,"bypassInterval":"0","x":410,"y":160,"wires":[["8f09ef2d.2f651"]]},{"id":"47303c08.4463d4","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-in-queue/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":200,"y":100,"wires":[["71271e36.f1b4c"]]},{"id":"8f09ef2d.2f651","type":"function","z":"c00f16f5.dde598","name":"process mys","func":"msg.queueLoops++;\nvar m = msg.topic;\nm = m.split('/');\nmsg.retain = false;\nflow.set('node_id',m[1]);\nflow.set('child_id',m[2]);\nflow.set('command',m[3]);\nflow.set('ack',m[4]);\nflow.set('type',m[5]);\nflow.set('payload',msg.payload);\nflow.set('receivedAt',msg.receivedAt);\nif (m[4] == \"0\" ){\n    //message does not requrire acknowladge\n    flow.set('ackRequested',0);\n    //msg.message=\"no ackRequested\";\n    node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - noAck\");\n} \nelse\n{\n    //message acknowladge requested\n    flow.set('ackRequested',1);\n    msg.message=\"ackRequested!\";\n    node.log(Date.now() + \":\" + msg.receivedAt +\"/\" + msg.topic + \" \" + msg.payload + \" - ackRequested\");\n}\nmsg.topic = msg.topic.replace('mysensors-in-queue', 'mysensors-in');\nreturn msg;","outputs":1,"noerr":0,"x":570,"y":160,"wires":[["8b425bd3.630458","9ede4d44.22f1f"]]},{"id":"8b425bd3.630458","type":"function","z":"c00f16f5.dde598","name":"check ack","func":"if (flow.get('ackRequested') === 0){\n    //no ack requested, trigger new message from queue. send only trigger command\n    msg = {trigger : 1};\n    return [msg,null];\n}\nelse{\n    //ack requested, start timer and wait for reply\n    msg.mqttPayload = msg.payload\n    return [null,msg];\n}","outputs":2,"noerr":0,"x":760,"y":160,"wires":[["e2913b85.f1b368"],["d843d4a.9199228"]]},{"id":"fd5a5b49.3ec838","type":"mqtt in","z":"c00f16f5.dde598","name":"","topic":"mysensors-out/#","qos":"2","datatype":"auto","broker":"65b2b296.362e3c","x":180,"y":400,"wires":[["c8e82d0.58cabd"]]},{"id":"9ede4d44.22f1f","type":"mqtt out","z":"c00f16f5.dde598","name":"mysensors-in","topic":"","qos":"","retain":"","broker":"65b2b296.362e3c","x":770,"y":100,"wires":[]},{"id":"e2913b85.f1b368","type":"link out","z":"c00f16f5.dde598","name":"Trigger","links":["c2fcc29e.2f944"],"x":1035,"y":160,"wires":[]},{"id":"c2fcc29e.2f944","type":"link in","z":"c00f16f5.dde598","name":"Trigger","links":["e2913b85.f1b368"],"x":255,"y":160,"wires":[["f574f085.8c965"]]},{"id":"d843d4a.9199228","type":"trigger","z":"c00f16f5.dde598","op1":"","op2":"true","op1type":"nul","op2type":"bool","duration":"500","extend":false,"units":"ms","reset":"","bytopic":"all","name":"timeout","x":660,"y":260,"wires":[["956936f9.17acd8"]]},{"id":"956936f9.17acd8","type":"function","z":"c00f16f5.dde598","name":"check status","func":"var maxLoops = 5; \nif (msg.payload === true){\n    //we got a timeout, no ack received\n    if (msg.queueLoops < maxLoops) {\n        //put message back in queue\n        //check queueLopps to avoid a loop and trigger new message from queue\n        msg.payload = msg.mqttPayload;\n        flow.set('ackRequested',0);\n        node.log(msg.topic + \" \" + msg.payload + \" - ack NOK\");\n        return msg;\n    }\n    else{\n        node.log(msg.topic + \" \" + msg.payload + \" - dropped\");\n        msg = {trigger : 1};\n        return msg; \n    }\n}\nif (msg.reset == 1){\n    //ack received\n    //timeout timer canceled\n    //trigger new message from queue\n    msg = {trigger : 1};\n    return msg; \n}\nreturn null;","outputs":1,"noerr":0,"x":850,"y":260,"wires":[["38d0af8e.532d2","e2913b85.f1b368"]]},{"id":"38d0af8e.532d2","type":"function","z":"c00f16f5.dde598","name":"","func":"//simple queue discards a message if trigger propery is set\n//so we send a delayed trigger if \nif (typeof msg.topic  != \"undefined\") {\n    msg = {trigger : 1};\n    return msg;\n}","outputs":1,"noerr":0,"x":790,"y":360,"wires":[["c9ee7809.97a808"]]},{"id":"c9ee7809.97a808","type":"delay","z":"c00f16f5.dde598","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":950,"y":360,"wires":[["e2913b85.f1b368"]]},{"id":"71271e36.f1b4c","type":"function","z":"c00f16f5.dde598","name":"Add info","func":"//we need a way to track how many times a message has been throw the queue to avoid an infinite loop\n//set received time of message\nmsg.receivedAt = Date.now();\n//set number of queue count to 0 for new message\nmsg.queueLoops = 0;\nreturn msg;","outputs":1,"noerr":0,"x":420,"y":100,"wires":[["f574f085.8c965"]]},{"id":"c8e82d0.58cabd","type":"function","z":"c00f16f5.dde598","name":"compare","func":"var m = msg.topic;\n\tm = m.split('/');\nif (flow.get('ackRequested') === 1){\n    if ((flow.get('node_id') === m[1]) &&\n        (flow.get('child_id') === m[2]) &&\n        (flow.get('command') === m[3]) &&\n        (flow.get('ack') === m[4]) &&\n        (flow.get('type') === m[5]) &&\n        (flow.get('payload') === msg.payload)){\n            //acknoladge received\n            flow.set('ackRequested',0);\n            msg.reset = 1;\n            node.log(  Date.now() +\":\"+\n                        flow.get('receivedAt',msg.receivedAt) +\"/\"+\n                        msg.topic + \" \" + msg.payload + \" - ack OK\");\n            msg.delay = Date.now() - flow.get('receivedAt',msg.receivedAt) ;\n            msg.node_id =  m[1];\n            return msg;\n        }\n}\nelse\n    msg.error = 1;\nreturn null;\n","outputs":1,"noerr":0,"x":480,"y":400,"wires":[["d843d4a.9199228","956936f9.17acd8"]]},{"id":"65b2b296.362e3c","type":"mqtt-broker","z":"","name":"","broker":"192.168.1.27","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
    

    requires node-red-contrib-simple-message-queue

    1 Reply Last reply
    1
    Reply
    • Reply as topic
    Log in to reply
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes


    22

    Online

    11.7k

    Users

    11.2k

    Topics

    113.0k

    Posts


    Copyright 2019 TBD   |   Forum Guidelines   |   Privacy Policy   |   Terms of Service
    • Login

    • Don't have an account? Register

    • Login or register to search.
    • First post
      Last post
    0
    • MySensors
    • OpenHardware.io
    • Categories
    • Recent
    • Tags
    • Popular