我正在开发一个帮助应用程序来清理我的 Mosquitto MQTT 服务上保留的消息。我遇到的问题是如何使用Paho MQTT处理一次队列。
我知道怎么做
- 阻塞线程 (
loop_forever()
) - 异步查询队列 (
loop_start()
/loop_stop()
)
我想要做的是处理队列一次并退出(并处理以这种方式收集的每个主题,这些主题(最有可能)是保留的消息)
loop()
是最有希望的:
定期调用以处理网络事件。此调用在 select() 中等待,直到网络套接字可用于读取或写入(如果合适),然后处理传入/传出数据。
不幸的是,我在使用它时没有看到任何主题。
现在我的代码启动一个线程,等待 2 秒并停止它。它可以完成这项工作,但我想了解如何通过一次性处理干净利落地做到这一点:
import paho.mqtt.client as mqtt
import time
class MQTT:
def __init__(self):
print("initializing app")
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect("mqtt.example.com", 1883, 60)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
def on_connect(self, client, userdata, flags, rc):
print("connected to MQTT with result code " + str(rc))
self.client.subscribe("#")
def on_message(self, client, userdata, msg):
# EDIT: added a check for actually retained messages
if msg.retain:
print(f"removing retained {msg.topic}")
self.client.publish(msg.topic, retain=True)
MQTT()