| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546 |
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\Scripts\pip.exe
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\python.exe
- import sys
- import struct
- from binascii import *
- # 临时注释掉crcmod导入,用于测试弹窗功能
- from crcmod import *
- import random
- import re
- # bytes解包
- def my_unpack(s, data, i):
- global info_msg
- global error_msg
- temp_msg = ""
- if type(s) == str:
- np = struct.calcsize(s)
- try:
- val = struct.unpack(s, data[i:(i + np)])
- return i + np, val[0]
- except:
- temp_msg = "fun:" + sys._getframe().f_code.co_name + \
- ",line:" + f"{sys._getframe().f_lineno}." + \
- f"\n出错了!!!,s={s},i={i},data={data}"
- error_msg = error_msg + temp_msg
- info_msg = info_msg + temp_msg
- return i + np, 0
- elif type(s) == int:
- val = data[i:(i + s)]
- return i + s, val
- def convert_to_24byte_bytes(s):
- """将字符串转换为24字节的bytes,不足补0x00"""
- bytes_data = s.encode('utf-8')[:24] # 取前24字节
- return bytes_data.ljust(24, b'\x00') # 不足24字节用0x00填充
- def get_msg_restart(device, data):
- print(f"this is get_msg_restart:")
- # body
- pub_body = bytearray(struct.pack("H", 0x2006)) # 注意 2006 是指十六进制 0x2006
- pub_body = pub_body + bytearray(bytes(struct.pack("I", int(device['dev_sn'])))) # 注意 103 是指十进制 103
- # add something
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x1021)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body) + 2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- print(f"pub_frame=%s\n" % pub_frame.hex().upper())
- return pub_frame
- def get_msg_upgrade_1001(dev_type,dev_sn):
- print(f"this is get_msg_upgrade_1001:")
- # body
- pub_body = bytearray(struct.pack("H", (int(dev_type,16)))) # 注意 2006 是指十六进制 0x2006
- pub_body = pub_body + bytearray(bytes(struct.pack("I", int(dev_sn)))) # 注意 103 是指十进制 103
- pub_body = pub_body + bytearray(struct.pack("H", (int(dev_type,16)))) # 注意 2006 是指十六进制 0x2006
- pub_body = pub_body + bytearray(bytes(struct.pack("I", int(dev_sn)))) # 注意 103 是指十进制 103
- pub_body = pub_body + bytearray(struct.pack("H", (int(dev_type,16)))) # 注意 2006 是指十六进制 0x2006
- pub_body = pub_body + bytearray(bytes(struct.pack("I", int(dev_sn)))) # 注意 103 是指十进制 103
- pub_body = pub_body + bytearray(struct.pack("H", 0x1234))
- pub_body = pub_body + bytearray(struct.pack("B", 0x03))
- url = "http://123.bin"
- pub_body = pub_body + bytearray(struct.pack("B", len(url)))
- pub_body = pub_body + bytearray(url.encode())
- # add something
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x04)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x1001)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body) + 2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- return pub_frame
- def get_msg_upgrade_1030(dev_type, dev_sn, fileSum, maxPkgId, curPkgId, curPkgsize, data):
- print(f"this is get_msg_upgrade_1030:")
- # body
- pub_body = bytearray(struct.pack("H", int(dev_type,16))) # 注意 2006 是指十六进制 0x2006
- pub_body = pub_body + bytearray(bytes(struct.pack("I", int(dev_sn)))) # 注意 103 是指十进制 103
- pub_body = pub_body + bytearray(struct.pack("B", 0x03))
- pub_body = pub_body + bytearray(struct.pack("H", 0x1234))
- pub_body = pub_body + bytearray(struct.pack("I", fileSum))
- pub_body = pub_body + bytearray(struct.pack("H", maxPkgId))
- pub_body = pub_body + bytearray(struct.pack("H", curPkgId))
- pub_body = pub_body + bytearray(struct.pack("I", 0))
- pub_body = pub_body + bytearray(struct.pack("H", curPkgsize))
- # add something
- pub_body = pub_body + bytearray(data)
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x04)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x1030)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body) + 2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- return pub_frame
- def get_msg_config_wifi(device, data):
- print(f"this is get_msg_config_wifi:")
- pub_body = bytearray(struct.pack("B", 0x02)) # WIFI配置文件编号索引
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 仅配置WIFI
- pub_body = pub_body + bytearray(struct.pack("H", 1)) # 2.4G信道
- pub_body = pub_body + bytearray(struct.pack("H", 36)) # 5G信道
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 使能5G
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 功率等级
- pub_body = pub_body + bytearray([0x43, 0x4E, 0x00]) # CN国家代码
- pub_body = pub_body + convert_to_24byte_bytes(device.get("wifi_ssid","未知")) # WIFI名称
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # WIFI加密类型
- pub_body = pub_body + convert_to_24byte_bytes(device.get("wifi_password","未知")) # WIFI密码
- if device.get("network_type","未知") == "WiFi":
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 网络类型 0=未知,1=WIFI,2=有线网
- elif device.get("network_type","未知") == "有线网":
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 网络类型 0=未知,1=WIFI,2=有线网
- else:
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 网络类型 0=未知,1=WIFI,2=有线网
- if device.get("cloud_platform","未知") == "阿里云生产":
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- elif device.get("cloud_platform","未知") == "测试环境":
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- else:
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- pub_body = pub_body + bytearray(struct.pack("H", 0x00)) # 预留
- # body
- # pub_body = bytearray(struct.pack("H", 0x2006)) # 注意 2006 是指十六进制 0x2006
- # pub_body = pub_body + bytearray(bytes(struct.pack("I", int(device['dev_sn'])))) # 注意 103 是指十进制 103
- # add something
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x1006)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body) + 2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- print(f"pub_frame=%s\n" % pub_frame.hex().upper())
- return pub_frame
- def get_msg_config_platform(device, data):
- print(f"this is get_msg_config_platform:")
- # body
- pub_body = bytearray(struct.pack("B", 0x02)) # WIFI配置文件编号索引
- pub_body = pub_body + bytearray(struct.pack("B", 0x03)) # 仅配置平台
- pub_body = pub_body + bytearray(struct.pack("H", 1)) # 2.4G信道
- pub_body = pub_body + bytearray(struct.pack("H", 36)) # 5G信道
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 使能5G
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 功率等级
- pub_body = pub_body + bytearray([0x43, 0x4E, 0x00]) # CN国家代码
- pub_body = pub_body + convert_to_24byte_bytes(device.get("wifi_ssid","未知")) # WIFI名称
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # WIFI加密类型
- pub_body = pub_body + convert_to_24byte_bytes(device.get("wifi_password","未知")) # WIFI密码
- if device.get("network_type","未知") == "WiFi":
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 网络类型 0=未知,1=WIFI,2=有线网
- elif device.get("network_type","未知") == "有线网":
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 网络类型 0=未知,1=WIFI,2=有线网
- else:
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 网络类型 0=未知,1=WIFI,2=有线网
- if data == "阿里云生产":
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- elif data == "测试环境":
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- else:
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- pub_body = pub_body + bytearray(struct.pack("H", 0x00)) # 预留
- # pub_body = pub_body + bytearray(struct.pack("H", 0x2006))
- # pub_body = pub_body + bytearray(bytes(struct.pack("I", int(device['dev_sn'])))) # 注意 103 是指十进制 103
- # add something
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x1006)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body) + 2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- print(f"pub_frame=%s\n" % pub_frame.hex().upper())
- return pub_frame
- def get_msg_config_network(device, data):
- print(f"this is get_msg_config_network:")
- pub_body = bytearray(struct.pack("B", 0x02)) # WIFI配置文件编号索引
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 仅配置设备
- pub_body = pub_body + bytearray(struct.pack("H", 1)) # 2.4G信道
- pub_body = pub_body + bytearray(struct.pack("H", 36)) # 5G信道
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 使能5G
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 功率等级
- pub_body = pub_body + bytearray([0x43, 0x4E, 0x00]) # CN国家代码
- pub_body = pub_body + convert_to_24byte_bytes(device.get("wifi_ssid","未知")) # WIFI名称
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # WIFI加密类型
- pub_body = pub_body + convert_to_24byte_bytes(device.get("wifi_password","未知")) # WIFI密码
- if device.get("network_type","未知") == "WiFi":
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 网络类型 0=未知,1=WIFI,2=有线网
- elif device.get("network_type","未知") == "有线网":
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 网络类型 0=未知,1=WIFI,2=有线网
- else:
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 网络类型 0=未知,1=WIFI,2=有线网
- if device.get("cloud_platform","未知") == "阿里云生产":
- pub_body = pub_body + bytearray(struct.pack("B", 0x01)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- elif device.get("cloud_platform","未知") == "测试环境":
- pub_body = pub_body + bytearray(struct.pack("B", 0x02)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- else:
- pub_body = pub_body + bytearray(struct.pack("B", 0x00)) # 云平台 0=未知,1=阿里云生产,2=测试环境
- pub_body = pub_body + bytearray(struct.pack("H", 0x00)) # 预留
- # body
- # pub_body = bytearray(struct.pack("H", 0x2006)) # 注意 2006 是指十六进制 0x2006
- # pub_body = pub_body + bytearray(bytes(struct.pack("I", int(device['dev_sn'])))) # 注意 103 是指十进制 103
- # add something
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x1006)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body) + 2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- print(f"pub_frame=%s\n" % pub_frame.hex().upper())
- return pub_frame
- def get_msg_config_all(device,data):
- print(f"this is get_msg_config_all:")
- # body
- pub_body = bytearray(struct.pack("H", 0x2006)) # 注意 2006 是指十六进制 0x2006
- pub_body = pub_body + bytearray(bytes(struct.pack("I", int(device['dev_sn'])))) # 注意 103 是指十进制 103
- # add something
- # pub_header = bytearray.fromhex("FEFE01390100000363206A00")
-
- # header
- pub_header = bytearray(struct.pack("H", 0xFEFE)) # 协议头
- pub_header = pub_header + bytearray(struct.pack("B", 0x01)) # 协议版本
- pub_header = pub_header + bytearray(struct.pack("I", random.randint(0, 1000000000))) # 消息ID
- pub_header = pub_header + bytearray(struct.pack("B", 0x03)) # 一级类型
- pub_header = pub_header + bytearray(struct.pack("H", 0x2001)) # 二级类型
- pub_header = pub_header + bytearray(struct.pack("H", (len(pub_body)+2))) # 消息长度
- # 重新结算长度
- # pub_header[10:12] = bytearray(struct.pack("H", (len(pub_body))))
-
- # 添加crc16
- pub_frame = pub_header + pub_body
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(pub_frame)
- # 完整消息
- pub_frame = pub_frame + bytearray(struct.pack("<H", crc16))
- print(f"\nlen(pub_frame)=%d," % len(pub_frame))
- print(f"pub_frame=%s\n" % pub_frame.hex().upper())
-
- return pub_frame
- info_msg = ""
- error_msg = ""
- def parse_data(client, data, topic):
- global info_msg
- global error_msg
-
- header = {}
- body = {}
-
- flag = True
- while flag:
- info_msg = ""
- error_msg = ""
- temp_msg = ""
- crc16mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0000)
- crc16 = crc16mod(data[:-2])
-
- crc16_f = struct.unpack("H",data[-2:])[0]
- if crc16 != crc16_f :
- temp_msg = sys._getframe().f_code.co_name + \
- f"\nerror:crc16 != data[-2:]\n"
- error_msg = error_msg + temp_msg
- info_msg = info_msg + temp_msg
- flag = False
- break
- if len(data) < 12:
- temp_msg = sys._getframe().f_code.co_name + \
- f"\nerror:len(data) < 12\n"
- error_msg = error_msg + temp_msg
- info_msg = info_msg + temp_msg
- flag = False
- break
- temp = 0
- info_msg = "解析结果:"
- cmd = {}
- np = 0
- np, header['start'] = my_unpack("<H", data, np)
- np, header['pro_ver'] = my_unpack("B", data, np)
- np, header['msg_id'] = my_unpack("<I", data, np)
- np, header['msg_type1'] = my_unpack("B", data, np)
- np, header['msg_type2'] = my_unpack("<H", data, np)
- np, header['msg_Len'] = my_unpack("<H", data, np)
- info_msg = info_msg + f"协议版本:0x%02x," % (header['pro_ver'])
- # if header['msg_type1'] == 0x03 and header['msg_type2'] == 0x2051:
- # outMsg = outMsg + "\r\n\r\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\r\n\r\n"
- # for k, v in header.items():
- # outMsg = outMsg + str(k)+':' + ' '*(16-len(k)) + (f"0x%02X" % v) + ' '*(16 - len(f"0x%02X" % v)) + (f"%u" % v)
- # outMsg = outMsg + str(k) + ':' + (f"0x%X," % v) + (f"%u" % v)
- if header['start'] != 0xFEFE:
- temp_msg = "fun:" + sys._getframe().f_code.co_name + \
- ",line:" + f"{sys._getframe().f_lineno}." + \
- f"\nerror: frame.start"
- error_msg = error_msg + temp_msg
- info_msg = info_msg + temp_msg
- flag = False
- break
-
- if header['msg_Len'] + 12 != len(data):
- temp_msg = "fun:" + sys._getframe().f_code.co_name + \
- ",line:" + f"{sys._getframe().f_lineno}." + \
- f"\nerror: frame.len"
- error_msg = error_msg + temp_msg
- info_msg = info_msg + temp_msg
- flag = False
- break
- if header['pro_ver'] == 0x01:
- if header['msg_type1'] == 0x02:
- info_msg = info_msg + f",MT1=0x%02X," % (header['msg_type1']) + "状态类"
- if header['msg_type2'] == 0x2003:
- info_msg = info_msg + f",MT2=0x%04X," % (header['msg_type2']) + "解码板心跳上行"
- np, body['master_type'] = my_unpack("<H", data, np);
- body['master_type'] = f"%04X" % (body['master_type'])
- np, body['master_sn'] = my_unpack("<I", data, np);
- body['master_sn'] = f"%u" % (body['master_sn'])
- info_msg = info_msg + f",主设备:%s-%s" % (body['master_type'].zfill(4), body['master_sn'].zfill(10))
- np, body['slave_type'] = my_unpack("<H", data, np);
- body['slave_type'] = f"%04X" % (body['slave_type'])
- np, body['slave_sn'] = my_unpack("<I", data, np);
- body['slave_sn'] = f"%u" % (body['slave_sn'])
- info_msg = info_msg + f",从设备:%s-%s" % (body['slave_type'].zfill(4), body['slave_sn'].zfill(10))
- np, body['app_version'] = my_unpack("<I", data, np);
- body['app_version'] = f"%08X" % (body['app_version'])
- info_msg = info_msg + f",APP版本:%s" % (body['app_version'])
- np, body['reset_times'] = my_unpack("<I", data, np);
- info_msg = info_msg + f",复位次数:%d" % (body['reset_times'])
- np, body['last_reset_type'] = my_unpack("B", data, np);
- body['last_reset_type'] = f"%02X" % (body['last_reset_type'])
- info_msg = info_msg + f",最后复位类型:0x%s" % (body['last_reset_type'])
- np, body['uuid'] = my_unpack(12, data, np);
- body['uuid'] = body['uuid'].hex().upper()
- info_msg = info_msg + f",uuid:%s" % (body['uuid'])
- np, body['run_time'] = my_unpack("<I", data, np);
- info_msg = info_msg + f",运行时长:%d秒" % (body['run_time'])
- np, body['receive_sum'] = my_unpack("<I", data, np);
- info_msg = info_msg + f",接收数据量:%d" % (body['receive_sum'])
- np, body['send_sum'] = my_unpack("<I", data, np);
- info_msg = info_msg + f",发送数据量:%d" % (body['send_sum'])
- np, body['network_type'] = my_unpack("B", data, np);
- info_msg = info_msg + f",网络类型:0x%02X" % (body['network_type'])
- np, body['cloud_platform'] = my_unpack("B", data, np);
- info_msg = info_msg + f",云平台:0x%02X" % (body['cloud_platform'])
- np, body['sim_status'] = my_unpack("B", data, np);
- info_msg = info_msg + f",SIM卡状态:0x%02X" % (body['sim_status'])
- np, body['free_fifo'] = my_unpack("B", data, np);
- info_msg = info_msg + f",剩余资源:0x%02X" % (body['free_fifo'])
- np, body_ip = my_unpack("16s", data, np);
- find_ip = body_ip.rstrip(b'\0').decode('utf-8')
- body['ip'] = find_ip
- info_msg = info_msg + f",IP地址:%s" % (str(body['ip']))
- np, body['reserve'] = my_unpack("<I", data, np);
- info_msg = info_msg + f",保留字段:%d" % (body['reserve'])
- elif header['msg_type1'] == 0x04:
- info_msg = info_msg + f",MT1=0x%02X," % (header['msg_type1']) + "状态类"
- if header['msg_type2'] == 0x2001:
- info_msg = info_msg + f",MT2=0x%04X," % (header['msg_type2']) + "升级状态上行"
- np, body['FirstLevelType'] = my_unpack("<H", data, np);
- body['FirstLevelType'] = f"%04X" % (body['FirstLevelType'])
- np, body['FirstLevelSn'] = my_unpack("<I", data, np);
- body['FirstLevelSn'] = f"%u" % (body['FirstLevelSn'])
- info_msg = info_msg + f",一级设备:%s-%s" % (body['FirstLevelType'].zfill(4), body['FirstLevelSn'].zfill(10))
-
- np, body['escalationDevType'] = my_unpack("<H", data, np);
- body['escalationDevType'] = f"%04X" % (body['escalationDevType'])
- np, body['escalationDevSn'] = my_unpack("<I", data, np);
- body['escalationDevSn'] = f"%u" % (body['escalationDevSn'])
- info_msg = info_msg + f",目标设备:%s-%s" % (body['escalationDevType'].zfill(4), body['escalationDevSn'].zfill(10))
- np, body['firmType'] = my_unpack("B", data, np);
- info_msg = info_msg + f",固件类型:0x%02X" % (body['firmType'])
-
- np, body['taskID'] = my_unpack("<H", data, np);
- info_msg = info_msg + f",任务ID:%u" % (body['taskID'])
- np, body['upgrade_status'] = my_unpack("B", data, np);
- info_msg = info_msg + f",升级状态:0x%02X" % (body['upgrade_status'])
-
- np, body['app_Version'] = my_unpack("<I", data, np);
- info_msg = info_msg + f",APP版本:0x%08X" % (body['app_Version'])
- flag = False
- # print(info_msg)
-
- return header, body, info_msg
|