我有一个关于 mqtt-paho 以及使用基于多处理的进程创建 10K 客户端的可能性的问题。这是我现在的代码(我使用基于多处理的进程来创建线程):
import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
import sys
logging.basicConfig(level=logging.INFO)
init_time = time.time()
disconnected = 0
def Connect(client, broker, port, token, keepalive, run_forever=False):
connflag = False
delay = 5
print("connecting ",client)
badcount = 0 # counter for bad connection attempts
while not connflag:
print(logging.info("connecting to broker " + str(broker)))
# print("connecting to broker "+str(broker)+":"+str(port))
print("Attempts ", str(badcount))
time.sleep(2)
try:
client.username_pw_set(token)
client.connect(broker, port, keepalive)
time.sleep(1)
connflag = True
except:
pass
#client.badconnection_flag = True
#logging.info("connection failed " + str(badcount))
#time.sleep(5)
#badcount += 1
#if badcount >= 3 and not run_forever:
# return -1
# raise SystemExit # give up
return 0
def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
"""Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
# running loop is true when using loop_start or loop_forever
client.running_loop = running_loop #
wcount = 0
while True:
logging.info("waiting" + msgType)
if msgType == "CONNACK":
if client.on_connect:
if client.connected_flag:
return True
if client.bad_connection_flag: #
return False
if msgType == "SUBACK":
if client.on_subscribe:
if client.suback_flag:
return True
if msgType == "MESSAGE":
if client.on_message:
if client.message_received_flag:
return True
if msgType == "PUBACK":
if client.on_publish:
if client.puback_flag:
return True
if not client.running_loop:
client.loop(.01) # check for messages manually
time.sleep(period)
wcount += 1
if wcount > wait_time:
print("return from wait loop taken too long")
return False
return True
def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
loop_delay=10, run_forever=False):
"""runs a loop that will auto reconnect and subscribe to topics
pass topics as a list of tuples. You can pass a function to be
called at set intervals determined by the loop_delay
"""
client.run_flag = True
client.broker = broker
print("running loop ")
client.reconnect_delay_set(min_delay=1, max_delay=12)
while client.run_flag: # loop forever
if client.bad_connection_flag:
break
if not client.connected_flag:
print("Connecting to " + broker)
if Connect(client, broker, port, token, keepalive, run_forever) != -1:
if not wait_for(client, "CONNACK"):
client.run_flag = True # break no connack
else: # connect fails
client.run_flag = False # break
print("quitting loop for broker ", broker)
client.loop(0.01)
if client.connected_flag and loop_function: # function to call
loop_function(client, loop_delay) # call function
time.sleep(1)
print("disconnecting from", broker)
if client.connected_flag:
client.disconnect()
client.connected_flag = False
def on_log(client, userdata, level, buf):
print(buf)
def on_connect(client, userdata, flags, rc):
if rc == 0:
client.connected_flag = True # set flag
for c in clients:
#print("connected OK")
pass
else:
print("Bad connection Returned code=", rc)
file1 = open("bad_connections.txt","a")#append mode
file1.write("Bad connection Returned code=%s \n" % rc)
file1.close()
client.loop_stop()
def on_disconnect(client, userdata, rc):
client.connected_flag = False # set flag
print("client disconnected ok")
def on_publish(client, userdata, mid):
print("In on_pub callback mid= ", mid)
def pub(client, loop_delay):
rmd_current = round(random.uniform(0.6, 50.0), 2)
rmd_pressure = round(random.uniform(0.6, 50.0), 2)
global init_time
if time.time() - init_time >= 3600:
rmd_mnc = round(random.uniform(5.0, 30.0), 2)
rmd_sdc = round(random.random(), 2)
rmd_mnp = round(random.uniform(5.0, 30.0), 2)
rmd_sdp = round(random.random(), 2)
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
client.publish('v1/devices/me/telemetry',
'{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
'"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))
init_time = time.time()
else:
client.publish('v1/devices/me/telemetry',
'{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
'"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
print(datetime.datetime.now())
time.sleep(loop_delay)
def Create_connections(n_clients, threads):
for i in range(len(n_clients)):
cname = "client-" + n_clients[i]["name"]
t = int(time.time())
client_id = cname + str(t) # create unique client_id
client = mqtt.Client(client_id) # create new instance
n_clients[i]["client"] = client
n_clients[i]["client_id"] = client_id
n_clients[i]["cname"] = cname
broker_p = n_clients[i]["broker"]
port = n_clients[i]["port"]
token = n_clients[i]["token"]
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
#client.on_message = on_message
t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
threads.append(t)
t.start()
def main_loop(clients_loop):
mqtt.Client.connected_flag = False # create flag in class
mqtt.Client.bad_connection_flag = False # create flag in class
threads = []
print("Creating Connections ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("Publishing ")
Create_connections(clients_loop, threads)
print("All clients connected ")
no_threads = threading.active_count()
print("current threads =", no_threads)
print("starting main loop")
try:
while True:
time.sleep(10)
no_threads = threading.active_count()
print("current threads =", no_threads)
for c in clients_loop:
if not c["client"].connected_flag:
print("broker ", c["broker"], " is disconnected" , c["name"])
file2 = open("disconnects.txt","a")#append mode
file2.write("broker %s is disconnected %s \n" % (c["broker"], c["name"]))
file2.close()
time.sleep(1)
#sys.exit("A connection was dropped")
except KeyboardInterrupt:
print("ending")
for c in clients_loop:
c["client"].run_flag = False
time.sleep(10)
if __name__ == '__main__':
# In case the user is using a demo version or local version of thingsboard
things_location = input("What type of thingsboard installation are you working with (demo/local)? ")
if things_location == "demo":
type_install = "demo.thingsboard.io"
header = Things.get_credentials(things_location)
elif things_location == "local":
computer = input("Which computer? ")
type_install = "cseetprj%s.essex.ac.uk:8080" % computer
broker = "cseetprj%s.essex.ac.uk" % computer
header = Things.get_credentials("local", type_install)
else:
print("Error: Installation not supported")
my_devices = Things.get_devices_id(header, type_install)
clients = []
for device in my_devices:
device_info = {"broker": broker, "port": 1883, "name": device["name"],
"token": Things.get_device_token(device["id"]["id"], header, type_install)}
clients.append(device_info)
print(len(clients))
time.sleep(5)
if len(clients) >= 200:
print("Splitting devices to multiprocess")
split_by = math.ceil(len(clients) / 250)
split_clients = np.array_split(clients, split_by)
jobs = []
for idx, client_portion in enumerate(split_clients):
print("Starting process for portion %s" % (idx + 1))
p = multiprocessing.Process(target=main_loop, args = (client_portion,))
jobs.append(p)
p.start()
for job in jobs:
print("Ending process")
job.join()
当仅使用 1K 设备进行测试时,由于某种原因,我得到了一些 rc=3(这个数字有所不同,我不知道为什么),而另一个客户端最终断开连接(也不知道为什么)。代码有问题吗?我计划尝试为多达 10K 个设备发送数据,但我什至无法建立 1K 的恒定连接。
我拥有物联网平台(接收连接)的机器的规格如下:
- 处理器:Intel Core i5-3570 CPU @3.40GHz x 4
- 内存:8GB
- 磁盘大小 500GB