| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- # python3.6
- import json
- import os
- import struct
- import getopt
- import random
- import sys
- from datetime import datetime
- import time
- from paho.mqtt import client as mqtt_client
- from binascii import *
- # import chardet
- # broker = 'vip.frp.wlphp.com'
- # port = 5801
- # topic = "/yw/#"
- # pyinstaller -F -n mqtt_sub_v0004 mqtt_sub.py
- # pyinstaller -F -n mqtt_sub_v0006 mqtt_sub.py
- # pyinstaller -F -n mqtt_c0_v2 mqtt_c0.py
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python36\Scripts\pyinstaller.exe -F -n code_sub code_sub.py
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python36-32\python code_sub.py
- test = {"mqtt_broker": "test-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "admin", "mqtt_pwd": "houjianwei"}
- prod = {"mqtt_broker": "mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "1SvTlvm1VCawSzS"}
- zz = {"mqtt_broker": "zz-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "oZ8hl3tYpMNEqHx"}
- mqtt_env = "test"
- # mqtt_env = "prod"
- mqtt_broker = 'test-mqtt.cpyypt.cn'
- mqtt_port = 9000
- mqtt_user = 'admin'
- mqtt_pwd = 'houjianwei'
- # mqtt_subtopic = 'cpyypt/logup/#'
- # mqtt_subtopic = [("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)]
- mqtt_subtopic = [("cpyypt/up/9002/#", 0), ("cpyypt/down/9002/broadcast", 0)]
- mqtt_hex = 1
- version = "0.0.0.2"
- version_w_ascii = 1
- version_w_hex = 1
- error_msg = ""
- info_msg = ""
- # generate client ID with pub prefix randomly
- client_id = f'LogTT_Svr_{random.randint(0, 100)}'
- # LogTT_SN = "0"
- def time_n2s(time_n): # 传入参数
- data_sj = time.localtime(time_n)
- time_str = time.strftime("%Y-%m-%d %H:%M:%S", data_sj) # 时间戳转换正常时间
- return time_str # 返回日期,格式为str
- def get_f2s(f, n):
- sf = f"%0.2f" % f
- if len(sf) < n:
- sf = " " * (n - len(sf)) + sf
- return sf
- def get_log_file_name_ascii(sn):
- # global
- log_file_name = f"./log/LogTT_{sn}_ascii_" + datetime.now().strftime('%Y-%m-%d') + ".log"
- return log_file_name
- def get_log_file_name_hex(sn):
- # global
- log_file_name = f"./log/LogTT_{sn}_hex_" + datetime.now().strftime('%Y-%m-%d') + ".log"
- return log_file_name
- def my_unpack(s, data, i):
- global info_msg
- global error_msg
- temp_msg = ""
- if type(s) == str:
- np = struct.calcsize(s)
- try:
- val = struct.unpack(s, data[i:(i + np)])
- return i + np, val[0]
- except:
- temp_msg = "fun:" + sys._getframe().f_code.co_name + \
- ",line:" + f"{sys._getframe().f_lineno}." + \
- f"\n出错了!!!,s={s},i={i},data={data}"
- error_msg = error_msg + temp_msg
- info_msg = info_msg + temp_msg
- return i + np, 0
- elif type(s) == int:
- val = data[i:(i + s)]
- return i + s, val
- def parse_data(data, topic):
- global info_msg
- global error_msg
- def subscribe(client: mqtt_client):
- global info_msg
- global error_msg
- def on_message(client, userdata, msg):
- LogTT_SN = str.split(msg.topic, "/")[-1]
- head = b""
- body = b""
- line_feed = ""
- start_ttl = "TTL_U="
- start_rs232 = "RS232_U="
- start_rs485 = "RS485_U="
- start_fixed = "FIXED_INFO="
- start_heart = "HEART_MSG="
- start_pt = "PT_CMD="
- flag_ascii = False
- flag_hex = True
- if msg.payload[:len(start_ttl)] == start_ttl.encode("utf-8"):
- flag_ascii = True
- flag_hex = True
- head = msg.payload[:len(start_ttl)]
- body = msg.payload[len(start_ttl):]
- elif msg.payload[:len(start_rs232)] == start_rs232.encode("utf-8"):
- flag_ascii = False
- flag_hex = True
- head = msg.payload[:len(start_rs232)]
- body = msg.payload[len(start_rs232):]
- elif msg.payload[:len(start_rs485)] == start_rs485.encode("utf-8"):
- flag_ascii = False
- flag_hex = True
- head = msg.payload[:len(start_rs485)]
- body = msg.payload[len(start_rs485):]
- elif msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
- flag_ascii = True
- flag_hex = False
- head = msg.payload[:len(start_fixed)]
- body = msg.payload[len(start_fixed):]
- elif msg.payload[:len(start_heart)] == start_heart.encode("utf-8"):
- flag_ascii = True
- flag_hex = False
- head = msg.payload[:len(start_heart)]
- body = msg.payload[len(start_heart):]
- elif msg.payload[:len(start_pt)] == start_pt.encode("utf-8"):
- flag_ascii = True
- flag_hex = False
- head = msg.payload[:len(start_pt)]
- body = msg.payload[len(start_pt):]
- else:
- flag_ascii = False
- flag_hex = True
- body = msg.payload
- if flag_ascii:
- try:
- body_str = body.decode('gb2312')
- except UnicodeDecodeError:
- try:
- body_str = body.decode('utf-8')
- except UnicodeDecodeError:
- body_str = body.hex().upper()
- head_str = head.decode('utf-8')
- msg_ascii = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
- msg_ascii = msg_ascii + f"topic:{msg.topic} msg:{head_str}\r\n{body_str}"
- msg_ascii = msg_ascii.rstrip() + "\r\n"
- # 输出信息
- try:
- with open(get_log_file_name_ascii(LogTT_SN), 'a') as f:
- global version_w_ascii
- if version_w_ascii > 0:
- f.write("version:" + version + "\n")
- version_w_ascii = 0
- f.write(msg_ascii)
- f.close()
- except Exception as e:
- print(str(e))
- print(msg_ascii, end="")
- # FIXED_INFO={"base_info":{"name":"张三","uplinkB":1010,"rsrp":-99,"tel":"13888888888","reset_times":49,"project":"LogTT","csq":23,"downlinkB":573,"rsrq":-13,"dev_type":"9002","simid":0,"muid":"20230818221817A863212A0300364949","snr":-1,"version":"9002.4.002","iccid":"89860622330053052122","imei":"869861069998891","rssi":-66,"dev_pd":"20240329","dev_sn":1},"pos_info":{"lng":0,"lat":0}}
- if msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
- body_dict = json.loads(body_str)
- print(body_dict)
- if flag_hex:
- msg_hex = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
- msg_hex = msg_hex + f"topic:{msg.topic} msg:{head.decode('utf-8')}{body.hex().upper()}"
- # 输出信息
- try:
- with open(get_log_file_name_hex(LogTT_SN), 'a') as f:
- global version_w_hex
- if version_w_hex > 0:
- f.write("version:" + version + "\n")
- version_w_hex = 0
- f.write(msg_hex + "\r\n")
- f.close()
- except Exception as e:
- print(str(e))
- if not flag_ascii:
- print(msg_hex)
- client.subscribe(mqtt_subtopic)
- client.on_message = on_message
- def connect_mqtt() -> mqtt_client:
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- print("Connected to MQTT Broker!")
- subscribe(client)
- else:
- print("Failed to connect, return code %d\n", rc)
- client = mqtt_client.Client(client_id)
- client.username_pw_set(mqtt_user, mqtt_pwd)
- client.on_connect = on_connect
- client.connect(mqtt_broker, mqtt_port)
- return client
- def on_disconnect(client, userdata, rc):
- print("mqtt disconnect")
- def run():
- client = connect_mqtt()
- client.on_disconnect = on_disconnect
- # subscribe(client)
- client.loop_forever()
- def print_usage():
- print('')
- print('帮助信息:')
- print('--env=<prod生产环境/zz郑州政务云/test测试环境> ')
- print('')
- print('举例:')
- print('\r\ndbg_mqtt.exe --env=test\r\n')
- print('\r\ndbg_mqtt.exe --env=prod\r\n')
- print('\r\ndbg_mqtt.exe --env=zz\r\n')
- print('\r\ndbg_mqtt.exe --env=test --subtopic=cpyypt/up/0902/0000000003,cpyypt/down/0902/0000000003\r\n')
- print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0102/0000020142,cpyypt/down/0102/0000020142\r\n')
- print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0101/0000001048,cpyypt/down/0101/0000001048\r\n')
- print('')
- def main(argv):
- global mqtt_env
- global mqtt_broker
- global mqtt_port
- global mqtt_user
- global mqtt_pwd
- global mqtt_subtopic
- global mqtt_hex
- if not os.path.exists("log"):
- os.makedirs("log")
- # python sub2.py broker=mqtt.cpyypt.cn port=9000 user=admin pwd=houjianwei subtopic="/yw/#"
- print("dbg_mqtt version:" + version)
- print('输入的参数 argv=:', argv)
- try:
- opts, args = getopt.getopt(argv, "h", ["env=", "broker=", "port=", "user=", "pwd=", "subtopic=", "hex"])
- except getopt.GetoptError:
- print_usage()
- sys.exit(2)
- for opt, arg in opts:
- print('解析', opt, arg)
- if opt == '-h':
- print_usage()
- sys.exit()
- elif opt == "--env":
- mqtt_env = str(arg)
- elif opt == "--broker":
- mqtt_broker = str(arg)
- elif opt == "--port":
- mqtt_port = int(arg)
- elif opt == "--user":
- mqtt_user = str(arg)
- elif opt == "--pwd":
- mqtt_pwd = str(arg)
- elif opt == "--subtopic":
- st = str(arg).split(",")
- mqtt_subtopic = [(s, 0) for s in st]
- # mqtt_subtopic = str(arg).split(";") #[("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)] ("cpyypt/#", 0)
- print("mqtt_subtopic=", mqtt_subtopic)
- elif opt == "--hex":
- mqtt_hex = 1
- if mqtt_broker == '':
- print('【服务器地址】为空,请重新输入!')
- print_usage()
- sys.exit()
- if mqtt_port == 0:
- print('【服务器端口】为空,请重新输入!')
- print_usage()
- sys.exit()
- if mqtt_subtopic == '':
- print('【主题】为空,请重新输入!')
- print_usage()
- sys.exit()
- if mqtt_env == "prod":
- mqtt_broker = prod["mqtt_broker"]
- mqtt_port = prod["mqtt_port"]
- mqtt_user = prod["mqtt_user"]
- mqtt_pwd = prod["mqtt_pwd"]
- elif mqtt_env == "zz":
- mqtt_broker = zz["mqtt_broker"]
- mqtt_port = zz["mqtt_port"]
- mqtt_user = zz["mqtt_user"]
- mqtt_pwd = zz["mqtt_pwd"]
- else:
- mqtt_broker = test["mqtt_broker"]
- mqtt_port = test["mqtt_port"]
- mqtt_user = test["mqtt_user"]
- mqtt_pwd = test["mqtt_pwd"]
- run()
- if __name__ == '__main__':
- # run()
- main(sys.argv[1:])
|