如何处理一次MQTT队列?

物联网 MQTT 蚊子 帕霍
2021-05-30 07:32:39

我正在开发一个帮助应用程序来清理我的 Mosquitto MQTT 服务上保留的消息。遇到的问题是如何使用Paho MQTT处理一次队列

我知道怎么做

  • 阻塞线程 ( loop_forever())
  • 异步查询队列 ( loop_start()/ loop_stop())

我想要做的是处理队列一次并退出(并处理以这种方式收集的每个主题,这些主题(最有可能)是保留的消息)

loop() 是最有希望的:

定期调用以处理网络事件。此调用在 select() 中等待,直到网络套接字可用于读取或写入(如果合适),然后处理传入/传出数据。

不幸的是,我在使用它时没有看到任何主题。

现在我的代码启动一个线程,等待 2 秒并停止它。它可以完成这项工作,但我想了解如何通过一次性处理干净利落地做到这一点:

import paho.mqtt.client as mqtt
import time

class MQTT:

    def __init__(self):
        print("initializing app")
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect("mqtt.example.com", 1883, 60)
        self.client.loop_start()
        time.sleep(2)
        self.client.loop_stop()

    def on_connect(self, client, userdata, flags, rc):
        print("connected to MQTT with result code " + str(rc))
        self.client.subscribe("#")

    def on_message(self, client, userdata, msg):
        # EDIT: added a check for actually retained messages
        if msg.retain:
            print(f"removing retained {msg.topic}")
            self.client.publish(msg.topic, retain=True)

MQTT()
1个回答

保留的消息只会传递一次(每个连接)。

并且在任何时候都只能有 1 条关于给定主题的保留消息。

所以只需连接,启动循环并订阅您感兴趣的主题。当消息被传递时,检查消息上的保留标志。如果消息被保留,那么您可以发布一条带有空负载的新消息,并且保留位设置为清除该主题。没有必要对网络循环做任何奇怪的事情。