# python3.6 import json import os import struct import getopt import random import sys from datetime import datetime import time from paho.mqtt import client as mqtt_client from binascii import * # import chardet # broker = 'vip.frp.wlphp.com' # port = 5801 # topic = "/yw/#" # pyinstaller -F -n mqtt_sub_v0004 mqtt_sub.py # pyinstaller -F -n mqtt_sub_v0006 mqtt_sub.py # pyinstaller -F -n mqtt_c0_v2 mqtt_c0.py # C:\Users\chx_s\AppData\Local\Programs\Python\Python36\Scripts\pyinstaller.exe -F -n code_sub code_sub.py # C:\Users\chx_s\AppData\Local\Programs\Python\Python36-32\python code_sub.py test = {"mqtt_broker": "test-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "admin", "mqtt_pwd": "houjianwei"} prod = {"mqtt_broker": "mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "1SvTlvm1VCawSzS"} zz = {"mqtt_broker": "zz-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "oZ8hl3tYpMNEqHx"} mqtt_env = "test" # mqtt_env = "prod" mqtt_broker = 'test-mqtt.cpyypt.cn' mqtt_port = 9000 mqtt_user = 'admin' mqtt_pwd = 'houjianwei' # mqtt_subtopic = 'cpyypt/logup/#' # mqtt_subtopic = [("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)] mqtt_subtopic = [("cpyypt/up/9002/#", 0), ("cpyypt/down/9002/broadcast", 0)] mqtt_hex = 1 version = "0.0.0.2" version_w_ascii = 1 version_w_hex = 1 version_w_tt = 1 error_msg = "" info_msg = "" # generate client ID with pub prefix randomly client_id = f'LogTT_Svr_{random.randint(0, 100)}' # LogTT_SN = "0" def time_n2s(time_n): # 传入参数 data_sj = time.localtime(time_n) time_str = time.strftime("%Y-%m-%d %H:%M:%S", data_sj) # 时间戳转换正常时间 return time_str # 返回日期,格式为str def get_f2s(f, n): sf = f"%0.2f" % f if len(sf) < n: sf = " " * (n - len(sf)) + sf return sf def get_log_file_name_ascii(sn): # global log_file_name = f"./log/LogTT_{sn}_ascii_" + datetime.now().strftime('%Y-%m-%d') + ".log" return log_file_name def get_log_file_name_hex(sn): # global log_file_name = f"./log/LogTT_{sn}_hex_" + datetime.now().strftime('%Y-%m-%d') + ".log" return log_file_name def get_log_file_name_tt(sn): # global log_file_name = f"./log/LogTT_{sn}_tt_" + datetime.now().strftime('%Y-%m-%d') + ".log" return log_file_name def my_unpack(s, data, i): global info_msg global error_msg temp_msg = "" if type(s) == str: np = struct.calcsize(s) try: val = struct.unpack(s, data[i:(i + np)]) return i + np, val[0] except: temp_msg = "fun:" + sys._getframe().f_code.co_name + \ ",line:" + f"{sys._getframe().f_lineno}." + \ f"\n出错了!!!,s={s},i={i},data={data}" error_msg = error_msg + temp_msg info_msg = info_msg + temp_msg return i + np, 0 elif type(s) == int: val = data[i:(i + s)] return i + s, val def parse_data(data, topic): global info_msg global error_msg def subscribe(client: mqtt_client): global info_msg global error_msg def on_message(client, userdata, msg): LogTT_SN = str.split(msg.topic, "/")[-1] head = b"" body = b"" line_feed = "" start_ttl = "TTL_U=" start_rs232 = "RS232_U=" start_rs485 = "RS485_U=" start_fixed = "FIXED_INFO=" start_heart = "HEART_MSG=" start_pt = "PT_CMD=" flag_ascii = False flag_hex = True if msg.topic.find("ttup") > 0: msg_tt = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:" msg_tt = msg_tt + f"topic:{msg.topic} msg:{msg.payload.hex().upper()}" # 输出信息 try: with open(get_log_file_name_tt(LogTT_SN), 'a') as f: global version_w_tt if version_w_tt > 0: f.write("version:" + version + "\n") version_w_tt = 0 f.write(msg_tt + "\r\n") f.close() except Exception as e: print(str(e)) print(msg_tt) return if msg.payload[:len(start_ttl)] == start_ttl.encode("utf-8"): flag_ascii = True flag_hex = True head = msg.payload[:len(start_ttl)] body = msg.payload[len(start_ttl):] elif msg.payload[:len(start_rs232)] == start_rs232.encode("utf-8"): flag_ascii = False flag_hex = True head = msg.payload[:len(start_rs232)] body = msg.payload[len(start_rs232):] elif msg.payload[:len(start_rs485)] == start_rs485.encode("utf-8"): flag_ascii = False flag_hex = True head = msg.payload[:len(start_rs485)] body = msg.payload[len(start_rs485):] elif msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"): flag_ascii = True flag_hex = False head = msg.payload[:len(start_fixed)] body = msg.payload[len(start_fixed):] elif msg.payload[:len(start_heart)] == start_heart.encode("utf-8"): flag_ascii = True flag_hex = False head = msg.payload[:len(start_heart)] body = msg.payload[len(start_heart):] elif msg.payload[:len(start_pt)] == start_pt.encode("utf-8"): flag_ascii = True flag_hex = False head = msg.payload[:len(start_pt)] body = msg.payload[len(start_pt):] else: flag_ascii = False flag_hex = True body = msg.payload if flag_ascii: try: body_str = body.decode('gb2312') except UnicodeDecodeError: try: body_str = body.decode('utf-8') except UnicodeDecodeError: body_str = body.hex().upper() head_str = head.decode('utf-8') msg_ascii = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:" msg_ascii = msg_ascii + f"topic:{msg.topic} msg:{head_str}\r\n{body_str}" msg_ascii = msg_ascii.rstrip() + "\r\n" # 输出信息 try: with open(get_log_file_name_ascii(LogTT_SN), 'a') as f: global version_w_ascii if version_w_ascii > 0: f.write("version:" + version + "\n") version_w_ascii = 0 f.write(msg_ascii) f.close() except Exception as e: print(str(e)) print(msg_ascii, end="") # FIXED_INFO={"base_info":{"name":"张三","uplinkB":1010,"rsrp":-99,"tel":"13888888888","reset_times":49,"project":"LogTT","csq":23,"downlinkB":573,"rsrq":-13,"dev_type":"9002","simid":0,"muid":"20230818221817A863212A0300364949","snr":-1,"version":"9002.4.002","iccid":"89860622330053052122","imei":"869861069998891","rssi":-66,"dev_pd":"20240329","dev_sn":1},"pos_info":{"lng":0,"lat":0}} if msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"): body_dict = json.loads(body_str) print(body_dict) if flag_hex: msg_hex = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:" msg_hex = msg_hex + f"topic:{msg.topic} msg:{head.decode('utf-8')}{body.hex().upper()}" # 输出信息 try: with open(get_log_file_name_hex(LogTT_SN), 'a') as f: global version_w_hex if version_w_hex > 0: f.write("version:" + version + "\n") version_w_hex = 0 f.write(msg_hex + "\r\n") f.close() except Exception as e: print(str(e)) if not flag_ascii: print(msg_hex) client.subscribe(mqtt_subtopic) client.on_message = on_message def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") subscribe(client) else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.username_pw_set(mqtt_user, mqtt_pwd) client.on_connect = on_connect client.connect(mqtt_broker, mqtt_port) return client def on_disconnect(client, userdata, rc): print("mqtt disconnect") def run(): client = connect_mqtt() client.on_disconnect = on_disconnect # subscribe(client) client.loop_forever() def print_usage(): print('') print('帮助信息:') print('--env= ') print('') print('举例:') print('\r\ndbg_mqtt.exe --env=test\r\n') print('\r\ndbg_mqtt.exe --env=prod\r\n') print('\r\ndbg_mqtt.exe --env=zz\r\n') print('\r\ndbg_mqtt.exe --env=test --subtopic=cpyypt/up/0902/0000000003,cpyypt/down/0902/0000000003\r\n') print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0102/0000020142,cpyypt/down/0102/0000020142\r\n') print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0101/0000001048,cpyypt/down/0101/0000001048\r\n') print('') def main(argv): global mqtt_env global mqtt_broker global mqtt_port global mqtt_user global mqtt_pwd global mqtt_subtopic global mqtt_hex if not os.path.exists("log"): os.makedirs("log") # python sub2.py broker=mqtt.cpyypt.cn port=9000 user=admin pwd=houjianwei subtopic="/yw/#" print("dbg_mqtt version:" + version) print('输入的参数 argv=:', argv) try: opts, args = getopt.getopt(argv, "h", ["env=", "broker=", "port=", "user=", "pwd=", "subtopic=", "hex"]) except getopt.GetoptError: print_usage() sys.exit(2) for opt, arg in opts: print('解析', opt, arg) if opt == '-h': print_usage() sys.exit() elif opt == "--env": mqtt_env = str(arg) elif opt == "--broker": mqtt_broker = str(arg) elif opt == "--port": mqtt_port = int(arg) elif opt == "--user": mqtt_user = str(arg) elif opt == "--pwd": mqtt_pwd = str(arg) elif opt == "--subtopic": st = str(arg).split(",") mqtt_subtopic = [(s, 0) for s in st] # mqtt_subtopic = str(arg).split(";") #[("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)] ("cpyypt/#", 0) print("mqtt_subtopic=", mqtt_subtopic) elif opt == "--hex": mqtt_hex = 1 if mqtt_broker == '': print('【服务器地址】为空,请重新输入!') print_usage() sys.exit() if mqtt_port == 0: print('【服务器端口】为空,请重新输入!') print_usage() sys.exit() if mqtt_subtopic == '': print('【主题】为空,请重新输入!') print_usage() sys.exit() if mqtt_env == "prod": mqtt_broker = prod["mqtt_broker"] mqtt_port = prod["mqtt_port"] mqtt_user = prod["mqtt_user"] mqtt_pwd = prod["mqtt_pwd"] elif mqtt_env == "zz": mqtt_broker = zz["mqtt_broker"] mqtt_port = zz["mqtt_port"] mqtt_user = zz["mqtt_user"] mqtt_pwd = zz["mqtt_pwd"] else: mqtt_broker = test["mqtt_broker"] mqtt_port = test["mqtt_port"] mqtt_user = test["mqtt_user"] mqtt_pwd = test["mqtt_pwd"] run() if __name__ == '__main__': # run() main(sys.argv[1:])