node-red - 工作单元

物联网 MQTT 发布订阅者 节点红色
2021-05-29 05:32:08

我有一个 MQTT 输入节点,可以启动 PostgreSQL 数据库的数据库操作(SELECT 和 INSERT)。数据库操作是通过node-contrib-postgres-multi 完成的。由于这些操作由功能节点分隔,因此我将保存部分消息flow.setflow.get稍后检索例如,

  • 功能节点保存flow.set并生成SELECT查询。
  • 连接到 PostgreSQL 节点。
  • 解析输出,使用flow.get和生成INSERT语句。
  • 连接到 PostgreSQL 节点。

我不禁想象flow.setflow.get不同步。

目前,我正在模拟大约 20 台设备每秒发布数据,每次发布的时间戳增加 1 秒。绝对没有理由为什么生成的消息应该被复制。但是,数据库插入节点会因为 Node-RED 日志 ( .pm2/logs/red-out-0.log) 中看到的唯一索引冲突而失败

如果功能节点和数据库处理需要 2 秒,并且每秒接收 MQTT 消息 (QoS=0),MQTT 或 Node-RED 会缓冲它们吗?因此,每条收到的消息都被视为一个工作单元,直到它出错或“离开”流到数据库、HTTP 请求、MQTT 发布等。

1个回答

值得记住的是,NodeJS 世界中的一切都是异步的,这意味着没有任何东西会阻塞事件循环。

在这种情况下,听起来您收到了第一条传入消息,您在流上下文中使用固定键存储该消息,然后继续执行 Select 查询。此时,SQL 节点将最终执行一些网络 IO,该 IO 将在等待来自数据库的响应时阻塞,因此在等待时它将放弃执行上下文。

如果在等待新 MQTT 消息到达时,它将立即处理并传递给第一个函数节点,该节点将覆盖存储在流上下文中的内容,因为键是相同的。

当第一个 Select 语句返回时,第一条消息将移动到第二个函数节点,当它从流上下文中检索值时,它将是第二条消息而不是第一条消息。

解决这个问题的方法是不使用上下文来保持状态,而是将您想要保留的信息移动到对象msg.payload上的不同键msg行为良好的 Node-RED 节点应始终传递原始msg对象,并且默认情况下仅真正更改msg.payload(有例外,但它们倾向于记录更改的内容以及原因)。

msg对工作单元使用to hold 状态可确保它只能随着消息在流中传播而同步更改。