Python Paho MQTT 2.5 KB 消息几天不发送,而 0.1 KB 消息发送正常

物联网 MQTT aws-iot linux 帕霍 Python
2021-06-22 10:48:35

我在现场拥有强大的双核 IoT 网关,带有高速蜂窝调制解调器和良好的互联网连接,但它们无法将 2.5 KB MQTT 消息发送到我的 AWS IoT 消息代理。我的程序发送各种大小的消息,0.1 KB 或 0.2 KB 的消息成功率超过 99%。1.5 KB 的消息大约是 50/50,而 2.5 KB 的消息成功率不到 10%……如果我不看它们(它变得更奇怪)。

我的网关将在几天内无法发送 2.5 KB 消息(一直成功发送较小的 0.1 KB 和 1.5 KB 消息),但是一旦我使用 OpenVPN VPN 进入网关进行调查,它立即发送2.5 KB 的消息。这就像让我的孩子在我不在的时候做点什么;我一回来,马上就完成了……太奇怪了,太令人沮丧了!!

因此,我猜这与我的网关的互联网连接有关。我可以在它们上流式传输 Netflix 电影,但它们无法发送 2.5 KB MQTT 消息……当我在它们上安装软件时,它们可以在几秒钟内下载数兆字节的数据。我还猜测它不是 AWS IoT,因为当我从我的开发计算机重现问题时,2.5 KB 消息总是成功发布到 AWS IoT 消息代理。

580 美元的网关规格:

  • Axiomtek ICO120 双核 x86
  • Ubuntu Linux 18.04 LTS
  • SIMCom SIM7600AH 调制解调器

使用 Paho MQTT 库连接到 AWS IoT 消息代理的 Python 代码:


class PahoContainer:
    def __init__(
        self,
        c,
        mqtt_broker,
        cert_dir="/home/user/certs",
        set_on_message=True,
        set_on_publish=True,
        aws_thing=None,
        set_will=True,
    ):
        """Connects a client to MQTT broker"""

        self.c = c
        self.mqtt_broker = mqtt_broker
        self.cert_dir = cert_dir
        self.set_on_message = set_on_message
        self.set_on_publish = set_on_publish
        self.aws_thing = aws_thing

        self.connect(set_will=set_will)

    def on_connect(self, client, userdata, flags, rc):
        """The callback for when the client receives a CONNACK response from the server."""

        self.c.logger.info(f"Paho connected with result code: {rc}")

        # If the result code == 0 == True, set the connected_flag = True
        if rc == 0:
            self.c.logger.info(f"Setting Paho client.connected_flag = True")
            client.connected_flag = True

    def on_disconnect(self, client, userdata, rc):
        """
        The callback for a disconnection. You will need to reconnect as soon as possible.

        Since we run a network loop using loop_start() or loop_forever(), the re-connections are automatically handled.

        A new connection attempt is made automatically in the background every 3 to 6 seconds.
        """
        self.c.logger.info(f"on_disconnect callback. Disconnection reason rc: '{rc}'")

        client.connected_flag = False
        client.disconnect_flag = True

    def on_message(self, client, userdata, msg):
        """The callback for when a PUBLISH message is received from the server"""
        self.c.logger.info(f"Paho msg.topic: {msg.topic}; str(msg.payload): {str(msg.payload)}")

    def on_publish(self, client, userdata, mid):
        """The callback for when a PUBLISH message is sent to the server"""
        self.c.logger.info(f"Paho on_publish callback for Message ID (mid): {mid}")

    def on_log(self, client, userdata, level, buf):
        """Callback to record log messages"""
        self.c.logger.info(f"on_log callback. Level: '{level}'; msg buf: '{buf}'")

    def try_connecting(self, broker, port, keepalive, try_x_times=20):
        """Try connecting up to 20 times before raising an error"""
        counter = 0
        while True:
            counter += 1
            try:
                self.client.connect(broker, port=port, keepalive=keepalive)
            except Exception:
                x_more_times = try_x_times - counter
                if x_more_times == 0:
                    raise
                self.c.logger.exception(f"Problem connecting. Will try again {x_more_times}...")
                time.sleep(0.1)
            else:
                break

    def connect(self, set_will=True):
        """Connect to the message broker server"""

        client_id = mqtt.base62(uuid.uuid4().int, padding=22)
        self.client = mqtt.Client(client_id=client_id, clean_session=True)

        # Set a will to be sent by the broker in case the client disconnects unexpectedly.
        # This must be called before connect() to have any effect.
        # topic: The topic that the will message should be published on.
        # payload: The message to send as a will. If not given, or set to None a
        # zero length message will be used as the will.
        if set_will:
            if self.aws_thing is None:
                self.c.logger.warning("set_will is True but there is no self.aws_thing, so it can't happen")
            else:
                topic_lwt = f'last_will/{self.aws_thing.upper()}'
                payload_lwt = json.dumps({"connected": 0})
                self.client.will_set(topic_lwt, payload=payload_lwt, qos=1, retain=False)

        # We MUST use this on_connect callback to set the client.connected_flag = True.
        # Otherwise we'll be in an infinite loop
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect

        # Enable logging using the standard python logging package.
        # This may be used at the same time as the on_log callback method
        # If logger is specified (default logger=None), then that logging.Logger object will be used;
        # otherwise one will be created automatically
        self.client.enable_logger(logger=self.c.logger)
        # self.client.enable_logger(logger=None)
        # Set the log level, if logger=None in enable_logger()
        self.client._logger.setLevel(logging.DEBUG)
        self.client.on_log = self.on_log
        # The client will automatically retry connection.
        # Between each attempt it will wait a number of seconds between min_delay and max_delay
        # When the connection is lost, initially the reconnection attempt is delayed of min_delay seconds.
        # It's doubled between subsequent attempt up to max_delay.
        # The delay is reset to min_delay when the connection complete (e.g. the CONNACK is received,
        # not just the TCP connection is established).
        self.client.reconnect_delay_set(min_delay=1, max_delay=5)
        # Set the maximum number of messages with QoS>0 that can be part way through their network flow at once.
        # Defaults to 20. Increasing this value will consume more memory but can increase throughput
        self.client.max_inflight_messages_set(10)
        # Set the maximum number of outgoing messages with QoS>0 that can be pending in the outgoing message queue.
        # Defaults to 0. 0 means unlimited. When the queue is full, any further outgoing messages would be dropped.
        self.client.max_queued_messages_set(0)
        # Set the time in seconds before a message with QoS>0 is retried, if the broker does not respond.
        # This is set to 5 seconds by default and should not normally need changing. 
        self.client.message_retry_set(2)

        if self.set_on_message:
            self.client.on_message = self.on_message
        if self.set_on_publish:
            self.client.on_publish = self.on_publish

        # Initialize client.connected_flag = False
        self.client.connected_flag = False
        self.c.logger.info(f"Connecting to broker: {self.mqtt_broker}")
        self.root_ca, self.device_cert, self.private_key = get_certs(self.c, self.cert_dir)
        self.client.tls_set(
            ca_certs=self.root_ca,
            certfile=self.device_cert,
            keyfile=self.private_key,
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLSv1_2,
            ciphers=None,
        )
        self.try_connecting(
            self.mqtt_broker,
            port=8883,
            keepalive=60,
            try_x_times=20
        )

        # We must start the loop before the while not client.connected_flag loop
        self.client.loop_start()

        # If we are not connected yet, wait a bit, then try again before returning the client
        while not self.client.connected_flag:
            seconds_to_sleep = 0.05
            self.c.logger.info(
                f"Waiting {seconds_to_sleep} seconds, then checking client.connected_flag again"
            )
            time.sleep(seconds_to_sleep)

向 AWS IoT 发送消息的简单代码:

# The metrics_dict is a Python dictionary with 2.5 KB of key/value pairs
payload = json.dumps({"metrics": metrics_dict})

info = paho_container.client.publish(
    topic,
    payload,
    qos=1,
)

用于创建 GSM 蜂窝网络连接的 Linux 网络管理器 (nmcli) 命令:

sudo nmcli radio wwan on
sudo nmcli c add type gsm ifname '*' con-name 'my_conn' apn 'pda.bell.ca' connection.autoconnect yes ipv4.dns '8.8.8.8 8.8.4.4'
sudo nmcli c up

2021 年 1 月 26 日编辑:

ifconfig tun0OpenVPN 连接命令的输出(我已经更改了 IP 地址):

tun0: flags=4305<UP,POINTOPOINT,RUNNING,NOARP,MULTICAST>  mtu 1500
        inet 172.27.abc.def  netmask 255.255.248.0  destination 172.27.abc.def
        inet6 fe80::4597:4b9f:abcd:efgh  prefixlen 64  scopeid 0x20<link>
        unspec 00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00  txqueuelen 100  (UNSPEC)
        RX packets 9235  bytes 2655505 (2.6 MB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 9768  bytes 3329110 (3.3 MB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

ifconfig wwp0s21f0u4i5GSM 蜂窝连接命令的输出,显示 MTU 为 1500 字节(我已更改 IP 地址):

wwp0s21f0u4i5: flags=4305<UP,POINTOPOINT,RUNNING,NOARP,MULTICAST>  mtu 1500
        inet 174.90.ghi.jkl  netmask 255.255.255.248  destination 174.90.ghi.jkl
        unspec 00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00  txqueuelen 1000  (UNSPEC)
        RX packets 11784052  bytes 2475908779 (2.4 GB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 12009517  bytes 2104615202 (2.1 GB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

输出nmcli c s user_apn(我更改了 IP 地址):

$ nmcli c s user_apn
connection.id:                          user_apn
connection.uuid:                        05ebc6d3-4fbb-4ddb-93fd-25fb57314ca2
connection.stable-id:                   --
connection.type:                        gsm
connection.interface-name:              cdc-wdm0
connection.autoconnect:                 yes
connection.autoconnect-priority:        0
connection.autoconnect-retries:         -1 (default)
connection.auth-retries:                -1
connection.timestamp:                   1611695541
connection.read-only:                   no
connection.permissions:                 --
connection.zone:                        --
connection.master:                      --
connection.slave-type:                  --
connection.autoconnect-slaves:          -1 (default)
connection.secondaries:                 --
connection.gateway-ping-timeout:        0
connection.metered:                     unknown
connection.lldp:                        default
connection.mdns:                        -1 (default)
ipv4.method:                            auto
ipv4.dns:                               8.8.8.8,8.8.4.4
ipv4.dns-search:                        --
ipv4.dns-options:                       ""
ipv4.dns-priority:                      0
ipv4.addresses:                         --
ipv4.gateway:                           --
ipv4.routes:                            --
ipv4.route-metric:                      -1
ipv4.route-table:                       0 (unspec)
ipv4.ignore-auto-routes:                no
ipv4.ignore-auto-dns:                   no
ipv4.dhcp-client-id:                    --
ipv4.dhcp-timeout:                      0 (default)
ipv4.dhcp-send-hostname:                yes
ipv4.dhcp-hostname:                     --
ipv4.dhcp-fqdn:                         --
ipv4.never-default:                     no
ipv4.may-fail:                          yes
ipv4.dad-timeout:                       -1 (default)
ipv6.method:                            auto
ipv6.dns:                               --
ipv6.dns-search:                        --
ipv6.dns-options:                       ""
ipv6.dns-priority:                      0
ipv6.addresses:                         --
ipv6.gateway:                           --
ipv6.routes:                            --
ipv6.route-metric:                      -1
ipv6.route-table:                       0 (unspec)
ipv6.ignore-auto-routes:                no
ipv6.ignore-auto-dns:                   no
ipv6.never-default:                     no
ipv6.may-fail:                          yes
ipv6.ip6-privacy:                       -1 (unknown)
ipv6.addr-gen-mode:                     stable-privacy
ipv6.dhcp-send-hostname:                yes
ipv6.dhcp-hostname:                     --
ipv6.token:                             --
gsm.number:                             *99#
gsm.username:                           --
gsm.password:                           <hidden>
gsm.password-flags:                     0 (none)
gsm.apn:                                wrmstatic.bell.ca.ioe
gsm.network-id:                         --
gsm.pin:                                <hidden>
gsm.pin-flags:                          0 (none)
gsm.home-only:                          no
gsm.device-id:                          --
gsm.sim-id:                             --
gsm.sim-operator-id:                    --
gsm.mtu:                                auto
proxy.method:                           none
proxy.browser-only:                     no
proxy.pac-url:                          --
proxy.pac-script:                       --
GENERAL.NAME:                           user_apn
GENERAL.UUID:                           05ebc6d3-4fbb-4ddb-93fd-25fb57314ca2
GENERAL.DEVICES:                        cdc-wdm0
GENERAL.STATE:                          activated
GENERAL.DEFAULT:                        yes
GENERAL.DEFAULT6:                       no
GENERAL.SPEC-OBJECT:                    --
GENERAL.VPN:                            no
GENERAL.DBUS-PATH:                      /org/freedesktop/NetworkManager/ActiveConnection/1
GENERAL.CON-PATH:                       /org/freedesktop/NetworkManager/Settings/1
GENERAL.ZONE:                           --
GENERAL.MASTER-PATH:                    --
IP4.ADDRESS[1]:                         174.90.123.456/29
IP4.GATEWAY:                            174.90.123.457
IP4.ROUTE[1]:                           dst = 174.90.123.452/29, nh = 0.0.0.0, mt = 700
IP4.ROUTE[2]:                           dst = 169.254.0.0/16, nh = 0.0.0.0, mt = 1000
IP4.ROUTE[3]:                           dst = 54.218.161.180/32, nh = 174.90.186.221, mt = 0
IP4.ROUTE[4]:                           dst = 0.0.0.0/0, nh = 174.90.186.221, mt = 700
IP4.DNS[1]:                             70.28.245.227
IP4.DNS[2]:                             184.151.118.254
IP4.DNS[3]:                             8.8.8.8
IP4.DNS[4]:                             8.8.4.4
IP6.GATEWAY:                            --

编辑 2021 年 1 月 27 日上午 9:00 MST:输出 traceroute --mtu <broker>

试图弄清楚这是否是与 1500 的 MTU 相关的数据包碎片问题以及 MQTT 消息在 1.5 KB 大小左右开始失败的事实,并且几乎总是在 2.5 KB 大小时失败。

$ traceroute --mtu abcdefg-ats.iot.us-west-2.amazonaws.com
traceroute to abcdefg-ats.iot.us-west-2.amazonaws.com (52.43.abc.def), 30 hops max, 65000 byte packets
 1  172.27.abc.def (172.27.abc.def)  78.980 ms F=1500  75.051 ms  77.459 ms
 2  ec2-50-112-abc-def.us-west-2.compute.amazonaws.com (50.112.abc.def)  101.733 ms ec2-34-221-abc-def.us-west-2.compute.amazonaws.com (34.221.abc.def)  78.166 ms ec2-50-112-abc-def.us-west-2.compute.amazonaws.com (50.112.abc.def)  93.053 ms
 3  * * *
 4  * * *
 5  * * *
 6  * * *
 7  * * *
 8  * * *
 9  * * *
10  * * *
11  * * *
12  * * *
13  * * *
14  * * *
15  * * *
16  * * *
17  * * *
18  * * *
19  * * *
20  * * *
21  * * *
22  * * *
23  * * *
24  * * *
25  * * *
26  * * *
27  * * *
28  * * *
29  * * *
30  * * *

编辑 2021 年 1 月 27 日上午 9:50 MST 显示ping命令输出

当我用 1300 字节 ping AWS IoT 消息代理时,它每次都通过:

$ ping -c 3 -s 1300 52.43.abc.def
PING 52.43.abc.def (52.43.abc.def) 1300(1328) bytes of data.
1308 bytes from 52.43.abc.def: icmp_seq=1 ttl=253 time=87.7 ms
1308 bytes from 52.43.abc.def: icmp_seq=2 ttl=253 time=99.7 ms
1308 bytes from 52.43.abc.def: icmp_seq=3 ttl=253 time=106 ms

但是,当我用 1400 字节 (1.4 KB) ping 代理时,它超时了!为什么?

$ ping -c 3 -s 1400 52.43.abc.def
PING 52.43.abc.def (52.43.abc.def) 1400(1428) bytes of data.

--- 52.43.163.79 ping statistics ---
3 packets transmitted, 0 received, 100% packet loss, time 2051ms

编辑 1 月 27 日 13:00 MST 显示ip route show输出:

@hardillb 询问“默认路由”是否因 OpenVPN(tun0接口)启动而改变起初我不确定这意味着什么,但现在我认为 OpenVPN确实改变了默认路由。请参阅以下ip route show引用tun0(OpenVPN 网络接口)的输出:

$ ip route show
0.0.0.0/1 via 172.27.abc.def dev tun0
default via 10.74.abc.def dev wwp0s21f0u4i5 proto static metric 700
10.74.abc.def/30 dev wwp0s21f0u4i5 proto kernel scope link src 10.74.abc.def metric 700
54.218.abc.def via 10.74.abc.def dev wwp0s21f0u4i5
128.0.0.0/1 via 172.27.abc.def dev tun0
169.254.0.0/16 dev enp2s0 scope link metric 1000 linkdown
172.27.abc.def/21 dev tun0 proto kernel scope link src 172.27.abc.def
192.168.2.0/24 dev enp2s0 proto kernel scope link src 192.168.2.2 metric 100 linkdown
2个回答

根据我们在评论和编辑中看到的内容,我认为为蜂窝链接设置较低的 MTU 可能是值得的。

尝试运行以下内容:

nmcli con mod "user_apn" gsm.mtu 1300

这应该会导致 MQTT 客户端(与网络堆栈结合)将数据包分解成更小的块,希望这些块能够一路到达代理,而不会在路由中被分割。

可能存在 MTU 为 1280 的路径。尝试 1100 或 1200,看看是否能解决问题。如果没有,请尝试查找 MTU 查找器应用程序,以尝试从源中查找到目的地的 MTU。