Paho-mqtt 最大客户端

物联网 MQTT 帕霍
2021-06-12 15:08:44

我有一个关于 mqtt-paho 以及使用基于多处理的进程创建 10K 客户端的可能性的问题。这是我现在的代码(我使用基于多处理的进程来创建线程):

import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
import sys
logging.basicConfig(level=logging.INFO)

init_time = time.time()
disconnected = 0

def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            time.sleep(1)
            connflag = True

        except:
            pass
            #client.badconnection_flag = True
            #logging.info("connection failed " + str(badcount))
            #time.sleep(5)
            #badcount += 1
            #if badcount >= 3 and not run_forever:
            #    return -1
            #    raise SystemExit  # give up

    return 0


def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True


def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = True  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False


def on_log(client, userdata, level, buf):
    print(buf)


def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
          #print("connected OK")
          pass
    else:
        print("Bad connection Returned code=", rc)
        file1 = open("bad_connections.txt","a")#append mode 
        file1.write("Bad connection Returned code=%s \n" % rc) 
        file1.close() 
        client.loop_stop()


def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    print("client disconnected ok")


def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)

def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)

def Create_connections(n_clients, threads):
    for i in range(len(n_clients)):
        cname = "client-" + n_clients[i]["name"]
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        n_clients[i]["client"] = client
        n_clients[i]["client_id"] = client_id
        n_clients[i]["cname"] = cname
        broker_p = n_clients[i]["broker"]
        port = n_clients[i]["port"]
        token = n_clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
        threads.append(t)
        t.start()

def main_loop(clients_loop):

    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections(clients_loop, threads)

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while True:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients_loop:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected" , c["name"])
                    file2 = open("disconnects.txt","a")#append mode 
                    file2.write("broker %s is disconnected %s \n" % (c["broker"], c["name"])) 
                    file2.close()
                    time.sleep(1)
                    #sys.exit("A connection was dropped")

    except KeyboardInterrupt:
        print("ending")
        for c in clients_loop:
            c["client"].run_flag = False
        
    time.sleep(10)

if __name__ == '__main__':

    # In case the user is using a demo version or local version of thingsboard
    things_location = input("What type of thingsboard installation are you working with (demo/local)? ")

    if things_location == "demo":
        type_install = "demo.thingsboard.io"
        header = Things.get_credentials(things_location)
    elif things_location == "local":
        computer = input("Which computer? ")
        type_install = "cseetprj%s.essex.ac.uk:8080" % computer
        broker = "cseetprj%s.essex.ac.uk" % computer
        header = Things.get_credentials("local", type_install)
    else:
        print("Error: Installation not supported")

    my_devices = Things.get_devices_id(header, type_install)
    
    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)
        
    print(len(clients))
    time.sleep(5)
    if len(clients) >= 200:
        print("Splitting devices to multiprocess")
        split_by = math.ceil(len(clients) / 250)
        split_clients = np.array_split(clients, split_by)

    jobs = []
    for idx, client_portion in enumerate(split_clients):
        print("Starting process for portion %s" % (idx + 1))
        p = multiprocessing.Process(target=main_loop, args = (client_portion,))
        jobs.append(p)
        p.start()
        
    for job in jobs:
      print("Ending process")
      job.join()

当仅使用 1K 设备进行测试时,由于某种原因,我得到了一些 rc=3(这个数字有所不同,我不知道为什么),而另一个客户端最终断开连接(也不知道为什么)。代码有问题吗?我计划尝试为多达 10K 个设备发送数据,但我什至无法建立 1K 的恒定连接。

我拥有物联网平台(接收连接)的机器的规格如下:

  • 处理器:Intel Core i5-3570 CPU @3.40GHz x 4
  • 内存:8GB
  • 磁盘大小 500GB
0个回答
没有发现任何回复~