如何使用 AWS IoT 为作业队列设置主要和故障转移 MQTT 订阅者?

物联网 MQTT aws-iot aws
2021-06-09 00:04:41

我有一个系统,其中客户端(我们称之为 ClientA)可以向特定的 MQTT 主题发布请求。如果重要的话,经纪人是亚马逊网络服务。然后我有另一个客户端(我们称之为 MainSubscriber),它总是订阅同一个主题,这样它就可以从 ClientA 获取请求并做一些工作,最后变成数据库操作。如果重要的话,数据库是 DynamoDB。

由于主订阅者可能并不总是可访问/在线,因此希望有一个故障转移订阅者作为主订阅者的故障转移备份。这个想法是,如果主订阅者没有及时处理请求,那么故障转移订阅者将启动并执行等效的工作/数据库操作。挑战在于“工作”和由此产生的“数据库操作”不能被主订阅者和故障转移订阅者复制。

这是该系统的逻辑系统架构图。

                   -----> MainSubscriber ----
                  /                          \
ClientA --> Broker                            ---> Database
                  \                          /
                   ---> FailoverSubscriber --

显然,这样的系统存在一些挑战:

  1. 主订阅者如何向故障转移订阅者表明它正在处理请求?
  2. 故障转移订阅者如何检测到主订阅者没有收到请求并需要开始处理它?
  3. 故障转移订阅者如何阻止主订阅者突然恢复在线并接收请求?
  4. 如何处理主订阅者和故障转移订阅者之间的同步问题?

如果这样的方案已经存在现有的解决方案,我宁愿不必重新发明轮子。所以,我的第一个问题是是否已经有一些东西了?

如果没有,那么我正在考虑使用具有强一致性读取的 DynamoDB 作为主订阅者和故障转移订阅者之间的中介。那么,我的第二个问题是是否有任何完善的计划来做到这一点?

2个回答

根据AWS SQS 文档(正如您所说的代理是 AWS),这应该是原生的:

收到消息后,它立即保留在队列中。为了防止其他消费者再次处理消息,Amazon SQS 设置了可见性超时,在此期间 Amazon SQS 阻止其他消费组件接收和处理消息。

问题是根据您的最大处理时间找到正确的可见性超时。

您仍然有很小的机会订阅者处理相同的消息,在这种情况下,您的订阅者代码应该尝试为数据库创建幂等输出(至少是相同的主键),并且在尝试插入相同记录时应该优雅地处理失败。

您可能想了解AWS SQS 的死信队列的概念来自 AWS 文档:

死信队列是其他(源)队列可以针对无法成功处理(使用)的消息的队列。您可以在死信队列中留出并隔离这些消息,以确定它们处理失败的原因。

所以,如果你让主订阅者从普通队列中监听,而让次订阅者从死信队列中监听,故障转移问题就应该解决了。

此外,有了这个,您的第 1、2 和 3 个问题都得到了解决。在这种情况下,主要和次要订阅者不需要相互交谈。

此外,在 Tensibai 的回答基础上,请确保您的订阅者代码编写为一次接收一条消息,如果由于visibility timeout


缺点是它会导致处理延迟,消息仅在一段时间后进入死信队列。

所以,如果你不想那样,那么你可以继续 Tensibai 的回答。如果您可以容忍这一点,而不是使用额外的 Dynamo 表进行状态检查,那么您可以使用它。