# -*- coding: gb2312 -*- import time from os import path, makedirs import sys from datetime import datetime from PyQt5.QtWidgets import QApplication, QMainWindow from PyQt5.QtCore import pyqtSignal from PyQt5.QtCore import QUrl from PyQt5.QtCore import QTimer from PyQt5.QtWebEngineWidgets import QWebEngineView from PyQt5.QtGui import QTextCursor from mainwindow import * import logging from platform import node from paho.mqtt import client as mqtt_client from json import loads as json_loads, dumps as json_dumps, JSONDecodeError from threading import Thread from requests import get as requests_get # pyinstaller.exe -D -n LogTT_tools_V0.0.0.2 main.py # pyinstaller.exe -w -F -n LogTT_tools_V0.0.0.3 main.py client_id = "LogTT_tool_" + node() g_client = mqtt_client.Client(client_id) mqtt_broker = 'test-mqtt.cpyypt.cn' mqtt_port = 9000 mqtt_user = 'admin' mqtt_pwd = 'houjianwei' mqtt_keepalive = 5 mqtt_timer_time_out = 3*1000 # mqtt_subtopic = [("cpyypt/up/9002/#", 0), ("cpyypt/down/9002/broadcast", 0)] mqtt_subtopic = [("cpyypt/up/9002/#", 0)] mqtt_pubtopic = "cpyypt/down/9002/0000000001" version = "0.0.0.3" version_w_ascii = 1 version_w_hex = 1 error_msg = "" info_msg = "" start_ttl = "TTL_U=" start_rs232 = "RS232_U=" start_rs485 = "RS485_U=" start_fixed = "FIXED_INFO=" start_heart = "HEART_MSG=" start_pt = "PT_CMD=" data_buf = {} LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" logging.basicConfig(filename='formater.log', level=logging.DEBUG, format=LOG_FORMAT) logging.debug("\n\n................") def get_log_file_name_ascii(sn): # global log_file_name = f"./log/LogTT_{sn}_ascii_" + datetime.now().strftime('%Y-%m-%d') + ".log" return log_file_name def get_log_file_name_hex(sn): # global log_file_name = f"./log/LogTT_{sn}_hex_" + datetime.now().strftime('%Y-%m-%d') + ".log" return log_file_name def is_valid_json(json_str): try: json_loads(json_str) return True except JSONDecodeError: return False class MyWindow(QMainWindow, Ui_MainWindow): map_refresh_signal = pyqtSignal(int, float, float) rx_data_signal = pyqtSignal(str, str, str) mqtt_connect_state_signal = pyqtSignal(str) log_display_update_signal = pyqtSignal(str) def __init__(self, parent=None): print("__init__") logging.info("__init__") super(MyWindow, self).__init__(parent) self.setupUi(self) self.initUI() self.set_svr_env = "test" self.sn_current = 0 self.mqtt_connect_state = "未连接" browser = QWebEngineView() self.browser = browser self.horizontalLayout.layout().addWidget(self.browser) self.horizontalLayoutWidget.setGeometry(QtCore.QRect(0, 0, 1024, 600)) # self.pushButton_set_subscribe.clicked.connect(lambda: {self.mqtt_subscribe(g_client), Thread(target=g_client.loop_forever, daemon=True).start()}) self.pushButton_set_subscribe.clicked.connect(lambda: {self.mqtt_subscribe(g_client)}) self.mqtt_connect(g_client) self.mqtt_subscribe(g_client) self.mqtt_disconnect(g_client) self.mqtt_thread = Thread(target=g_client.loop_forever, daemon=True) self.mqtt_thread.start() if not path.exists("log"): makedirs("log") self.textEdit_log_0rx_display.setText("") self.textEdit_log_0rx_display.moveCursor(QTextCursor.End) self.textEdit_log_0rx_display.document().setMaximumBlockCount(10000) self.display_clear() def initUI(self): print("initUI") logging.info("initUI") self.setWindowTitle("LogTT_tools:test-" + version + ",未连接") # self.tabWidget.setCurrentIndex(self.tabWidget.indexOf(self.tab_lsb)) # self.comboBox_log_sn.currentIndexChanged.connect(self.comboBox_log_sn_change) self.comboBox_log_sn.activated.connect(self.display_update) self.comboBox_log_sn.activated.connect(self.sn_update_something) self.pushButton_log_rx_clr.clicked.connect(self.log_rx_clr) self.map_refresh_signal.connect(self.map_refresh) self.rx_data_signal.connect(self.parse_data) self.mqtt_connect_state_signal.connect(self.mainWindow_title_update) self.log_display_update_signal.connect(self.log_display_update) self.pushButton_set_create.clicked.connect(self.set_create) self.pushButton_set_send.clicked.connect(self.set_send) self.pushButton_set_create_send.clicked.connect(self.set_create_send) # self.pushButton_set_send.clicked.connect(lambda: {self.publish(g_client)}) self.pushButton_set_connect.clicked.connect(lambda: {self.connect(g_client)}) self.comboBox_set_svr.activated.connect(self.set_svr) self.checkBox_set_io_up_enable.clicked.connect(self.set_io_up_able) self.checkBox_set_io_up_disable.clicked.connect(self.set_io_up_disable) self.checkBox_set_io_down_enable.clicked.connect(self.set_io_down_able) self.checkBox_set_io_down_disable.clicked.connect(self.set_io_down_disable) self.checkBox_set_io_parameter.clicked.connect(self.set_io_parameter) self.checkBox_set_io_cfg.clicked.connect(self.set_io_cfg) self.checkBox_set_default_tx_sn.clicked.connect(self.set_default_tx_sn) self.lineEdit_tx_sn.textChanged.connect(self.set_tx_sn_change) self.pushButton_lsb_refresh.clicked.connect(self.lsb_get_fixed_info) self.pushButton_log_send.clicked.connect(self.log_send) self.pushButton_log_refresh.clicked.connect(self.log_refresh) # self.lineEdit_log_rx_buf_max.textChanged.connect(self.log_rx_buf_max) self.pushButton_log_2rs485_0on.clicked.connect(self.log_rs485_up_enable) self.pushButton_log_2rs485_1off.clicked.connect(self.log_rs485_up_disable) def timer_test_f(self, str): print("mqtt timer_test_f," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) self.timer_test_t.start(3*1000) # 10秒后重连 def mqtt_timer_f(self): print("mqtt mqtt_timer_f," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) # self.mqtt_timer.start(3*1000) # 10秒后重连 def log_rx_clr(self): self.textEdit_log_0rx_display.setText("") def mainWindow_title_update(self, state): self.setWindowTitle("LogTT_tools:" + "test-" + version + "," + state) if state == "已连接": self.mqtt_subscribe(g_client) def mqtt_connect(self, client): print("mqtt mqtt_connect," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) try: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!" + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) self.mqtt_connect_state_signal.emit("已连接") else: print("Failed to connect, return code %d\n", rc) self.mqtt_connect_state_signal.emit("连接失败") # self.mqtt_timer.start(mqtt_timer_time_out) # 10秒后重连 client.username_pw_set(mqtt_user, mqtt_pwd) client.on_connect = on_connect client.connect(mqtt_broker, mqtt_port, mqtt_keepalive) print("[√]连接成功," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) except Exception as e: print("[!]Connect error") print("[×]连接失败," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) pass def mqtt_disconnect(self, client): print("mqtt mqtt_disconnect," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) def on_disconnect(client, userdata, rc): print("self.sender()", self.sender()) if rc != 0: print("Unexpected disconnection." + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) else: print("Disconnected from MQTT." + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) # 更新 UI 线程 self.mqtt_connect_state_signal.emit("已断开") client.on_disconnect = on_disconnect def mqtt_subscribe(self, client): print("mqtt mqtt_subscribe," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) global info_msg global error_msg def on_message(client, userdata, msg): LogTT_SN = str.split(msg.topic, "/")[-1] head = b"" body = b"" cmd_type = "" flag_ascii = False flag_hex = True if msg.payload[:len(start_ttl)] == start_ttl.encode("utf-8"): cmd_type = start_ttl flag_ascii = True flag_hex = True head = msg.payload[:len(start_ttl)] body = msg.payload[len(start_ttl):] elif msg.payload[:len(start_rs232)] == start_rs232.encode("utf-8"): cmd_type = start_rs232 flag_ascii = False flag_hex = True head = msg.payload[:len(start_rs232)] body = msg.payload[len(start_rs232):] elif msg.payload[:len(start_rs485)] == start_rs485.encode("utf-8"): cmd_type = start_rs485 flag_ascii = False flag_hex = True head = msg.payload[:len(start_rs485)] body = msg.payload[len(start_rs485):] elif msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"): cmd_type = start_fixed flag_ascii = True flag_hex = False head = msg.payload[:len(start_fixed)] body = msg.payload[len(start_fixed):] elif msg.payload[:len(start_heart)] == start_heart.encode("utf-8"): cmd_type = start_heart flag_ascii = True flag_hex = False head = msg.payload[:len(start_heart)] body = msg.payload[len(start_heart):] elif msg.payload[:len(start_pt)] == start_pt.encode("utf-8"): cmd_type = start_pt flag_ascii = True flag_hex = False head = msg.payload[:len(start_pt)] body = msg.payload[len(start_pt):] else: cmd_type = "" flag_ascii = False flag_hex = True body = msg.payload if flag_ascii: try: body_str = body.decode('gb2312') except UnicodeDecodeError: try: body_str = body.decode('utf-8') except UnicodeDecodeError: body_str = body.hex().upper() head_str = head.decode('utf-8') msg_ascii = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:" msg_ascii = msg_ascii + f"topic:{msg.topic} msg:{head_str}\r\n{body_str}" msg_ascii = msg_ascii.rstrip() + "\r\n" # 输出信息 try: with open(get_log_file_name_ascii(LogTT_SN), 'a') as f: global version_w_ascii if version_w_ascii > 0: f.write("version:" + version + "\n") version_w_ascii = 0 f.write(msg_ascii) f.close() except Exception as e: print(str(e)) print(msg_ascii, end="") if self.comboBox_log_sn.currentIndex() == -1: # self.textEdit_log_0rx_display.append(msg_ascii) self.log_display_update_signal.emit(msg_ascii) elif self.comboBox_log_sn.currentText() == LogTT_SN: # self.textEdit_log_0rx_display.append(msg_ascii) self.log_display_update_signal.emit(msg_ascii) self.rx_data_signal.emit(LogTT_SN, head_str, body_str) # self.parse_data(LogTT_SN, head_str, body_str) if flag_hex: msg_hex = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:" msg_hex = msg_hex + f"topic:{msg.topic} msg:{head.decode('utf-8')}{body.hex().upper()}" # 输出信息 try: with open(get_log_file_name_hex(LogTT_SN), 'a') as f: global version_w_hex if version_w_hex > 0: f.write("version:" + version + "\n") version_w_hex = 0 f.write(msg_hex + "\r\n") f.close() except Exception as e: print(str(e)) if not flag_ascii: print(msg_hex) # self.textEdit_log_0rx_display.append(msg_ascii) self.log_display_update_signal.emit(msg_hex) client.subscribe(mqtt_subtopic) client.on_message = on_message print("[√]订阅成功," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) def mqtt_publish_test(self, client): print("mqtt mqtt_publish_test," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) try: msg = "qqqqqqqqqqqqqqq" result = client.publish(mqtt_pubtopic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"[√]发送成功, Send `{msg}` to topic `{mqtt_pubtopic}`") else: print(f"[×]发送失败, Failed to send message to topic {mqtt_pubtopic}") except: print("[!]请先连接服务器!") def parse_data(self, sn, head, body): if data_buf.get(sn) is None: data_buf[sn] = {} if data_buf[sn].get("base_info") is None: data_buf[sn]["base_info"] = {} if data_buf[sn].get("pos_info") is None: data_buf[sn]["pos_info"] = {} data_buf[sn]["heart_time_stamp"] = datetime.now().strftime('%H:%M:%S.%f')[:-3] # start_fixed = "FIXED_INFO=" # start_heart = "HEART_MSG=" # FIXED_INFO={"base_info":{"name":"张三","uplinkB":1010,"rsrp":-99,"tel":"13888888888","reset_times":49,"project":"LogTT","csq":23,"downlinkB":573,"rsrq":-13,"dev_type":"9002","simid":0,"muid":"20230818221817A863212A0300364949","snr":-1,"version":"9002.4.002","iccid":"89860622330053052122","imei":"869861069998891","rssi":-66,"dev_pd":"20240329","dev_sn":1},"pos_info":{"lng":0,"lat":0}} # if head == start_fixed or head == start_heart: if is_valid_json(body): body_dict = json_loads(body) if body_dict["base_info"]: base_info = body_dict["base_info"] if base_info.get("project"): data_buf[sn]["base_info"]["project"] = base_info.get("project") if base_info.get("version"): data_buf[sn]["base_info"]["version"] = base_info.get("version") if base_info.get("name"): data_buf[sn]["base_info"]["name"] = base_info.get("name") if base_info.get("tel"): data_buf[sn]["base_info"]["tel"] = base_info.get("tel") if base_info.get("dev_type"): data_buf[sn]["base_info"]["dev_type"] = base_info.get("dev_type") if base_info.get("dev_sn"): data_buf[sn]["base_info"]["dev_sn"] = str(sn) if base_info.get("dev_pd"): data_buf[sn]["base_info"]["dev_pd"] = base_info.get("dev_pd") if base_info.get("reset_times"): data_buf[sn]["base_info"]["reset_times"] = base_info.get("reset_times") if base_info.get("imei"): data_buf[sn]["base_info"]["imei"] = base_info.get("imei") if base_info.get("muid"): data_buf[sn]["base_info"]["muid"] = base_info.get("muid") if base_info.get("iccid"): data_buf[sn]["base_info"]["iccid"] = base_info.get("iccid") if base_info.get("simid"): data_buf[sn]["base_info"]["simid"] = base_info.get("simid") if base_info.get("csq"): data_buf[sn]["base_info"]["csq"] = base_info.get("csq") if base_info.get("rssi"): data_buf[sn]["base_info"]["rssi"] = base_info.get("rssi") if base_info.get("snr"): data_buf[sn]["base_info"]["snr"] = base_info.get("snr") if base_info.get("rsrq"): data_buf[sn]["base_info"]["rsrq"] = base_info.get("rsrq") if base_info.get("rsrp"): data_buf[sn]["base_info"]["rsrp"] = base_info.get("rsrp") if base_info.get("uplinkB"): data_buf[sn]["base_info"]["uplinkB"] = base_info.get("uplinkB") if base_info.get("downlinkB"): data_buf[sn]["base_info"]["downlinkB"] = base_info.get("downlinkB") if body_dict["pos_info"]: pos_info = body_dict["pos_info"] if pos_info.get("lng"): data_buf[sn]["pos_info"]["lng"] = pos_info.get("lng") if pos_info.get("lat"): data_buf[sn]["pos_info"]["lat"] = pos_info.get("lat") # 更新数据 self.data_update(sn) def data_update(self, sn): self.comboBox_log_sn_add(sn) self.display_update(sn) self.dev_list_refresh(sn) if self.comboBox_log_sn.currentIndex() != -1: sn_cur = self.comboBox_log_sn.currentText() if sn == sn_cur: if data_buf.get(sn_cur): if data_buf[sn_cur].get("pos_info"): lng = data_buf[sn_cur]["pos_info"].get("lng", 116.1666213) lat = data_buf[sn_cur]["pos_info"].get("lat", 39.9007607) # self.map_refresh_signal.emit(sn_cur, float(lng), float(lat)) def comboBox_log_sn_change(self): # self.sn_current = sn self.display_update() def comboBox_log_sn_add(self, sn): if sn == "0000000116": return if self.comboBox_log_sn.findText(sn) == -1: self.comboBox_log_sn.addItem(sn) num_items = self.comboBox_log_sn.count() current_text = self.comboBox_log_sn.currentText() # 插入,且排序 text_list = [] for index in range(num_items): text_list.append(self.comboBox_log_sn.itemText(index)) text_list = sorted(text_list) for index in range(num_items): self.comboBox_log_sn.setItemText(index, text_list[index]) self.comboBox_log_sn.setCurrentText(current_text) def display_base_info_clear(self): self.label_log_01dev_type.setText("设备类型:") self.label_log_02sn.setText("设备SN:") self.label_log_03reset_times.setText("复位次数:") self.label_log_04app_ver.setText("固件版本:") self.label_log_05name.setText("姓名:") self.label_log_06telephone.setText("电话:") self.label_log_09imei.setText("IMEI:") self.label_log_10iccid.setText("ICCID:") self.label_log_11rssi.setText("信号强度:") self.label_log_12snr.setText("信噪比:") def display_pos_info_clear(self): self.label_log_07longitude.setText("经度:") self.label_log_08latitude.setText("维度:") def display_clear(self): self.display_base_info_clear() self.display_pos_info_clear() self.groupBox_log_1base.setTitle("日志设备基础信息") def display_update(self, sn=0): sn_cur = self.comboBox_log_sn.currentText() if sn_cur in data_buf: if data_buf[sn_cur]["base_info"]: base_info = data_buf[sn_cur]["base_info"] self.label_log_01dev_type.setText("设备类型:" + base_info.get("dev_type", "")) self.label_log_02sn.setText("设备SN:" + str(sn_cur)) self.label_log_03reset_times.setText("复位次数:" + str(base_info.get("reset_times", 0))) self.label_log_04app_ver.setText("固件版本:" + base_info.get("version", "")) self.label_log_05name.setText("姓名:" + base_info.get("name", "")) self.label_log_06telephone.setText("电话:" + base_info.get("tel", "")) self.label_log_09imei.setText("IMEI:" + base_info.get("imei", "")) self.label_log_10iccid.setText("ICCID:" + base_info.get("iccid", "")) self.label_log_11rssi.setText("信号强度:" + str(base_info.get("rssi", 0))) self.label_log_12snr.setText("信噪比:" + str(base_info.get("snr", 0))) else: self.display_base_info_clear() if data_buf[sn_cur]["pos_info"]: pos_info = data_buf[sn_cur]["pos_info"] lng = pos_info.get("lng", 0) if pos_info.get("lng", 0) != 0 else 116.1666213 self.label_log_07longitude.setText("经度:" + str(lng)) lat = pos_info.get("lat", 0) if pos_info.get("lat", 0) != 0 else 39.9007607 self.label_log_08latitude.setText("维度:" + str(lat)) else: self.display_pos_info_clear() self.groupBox_log_1base.setTitle("日志设备基础信息:" + str(data_buf[sn_cur]["heart_time_stamp"])) else: self.display_clear() self.label_lsb_01dev_type.setText(self.label_log_01dev_type.text()) self.label_lsb_02sn.setText(self.label_log_02sn.text()) self.label_lsb_03reset_times.setText(self.label_log_03reset_times.text()) self.label_lsb_04app_ver.setText(self.label_log_04app_ver.text()) self.label_lsb_05name.setText(self.label_log_05name.text()) self.label_lsb_06telephone.setText(self.label_log_06telephone.text()) self.label_lsb_09imei.setText(self.label_log_09imei.text()) self.label_lsb_10iccid.setText(self.label_log_10iccid.text()) self.label_lsb_11rssi.setText(self.label_log_11rssi.text()) self.label_lsb_12snr.setText(self.label_log_12snr.text()) self.label_lsb_07longitude.setText(self.label_log_07longitude.text()) self.label_lsb_08latitude.setText(self.label_log_08latitude.text()) self.groupBox_lsb_1base.setTitle(self.groupBox_log_1base.title()) def sn_update_something(self): if self.checkBox_set_default_tx_sn.isChecked(): sn_text = self.comboBox_log_sn.currentText() sn_num = int(sn_text) sn_text = f"%u" % sn_num self.lineEdit_tx_sn.setText(sn_text) # self.lineEdit_5pub_topic.setText("cpyypt/down/9002/" + self.lineEdit_tx_sn.text().zfill(10)) def dev_list_refresh(self, sn): dev_str = self.plainTextEdit_log_dev_list.toPlainText() dev_list = str.split(dev_str) # dev_list = [dev for dev in data_buf.keys()] if sn not in dev_list: # print(dev_list) dev_list.append(sn) dev_list = sorted(dev_list) dev_text = "" for dev in dev_list: dev_text = dev_text + str(dev) + "\n" self.plainTextEdit_log_dev_list.setPlainText(dev_text) def mqtt_send_data(self, client, pub_topic, userdata): try: result = client.publish(pub_topic, userdata) # result: [0, 1] status = result[0] if status == 0: msg = f"[√]发送成功, Send `{userdata}` to topic `{pub_topic}`" else: msg = f"[×]发送失败, Failed to send message to topic {pub_topic}" except: msg = "[!]请先连接服务器!" # self.textEdit_log_0rx_display.append(msg + "\n") self.log_display_update_signal.emit(msg + "\n") def log_display_update(self, text): # 当前光标是否在最后 cursor_end = False position = self.textEdit_log_0rx_display.textCursor().position() length = len(self.textEdit_log_0rx_display.toPlainText()) if position == length: cursor_end = True self.textEdit_log_0rx_display.append(text) # 是否将光标移动到最后 if cursor_end: self.textEdit_log_0rx_display.moveCursor(QTextCursor.End) def log_rx_buf_max(self): max_text = self.lineEdit_log_rx_buf_max.text() max_num = 10000 try: max_num = int(max_text) except ValueError: max_num = 10000 if max_num <= 100: max_num = 100 self.lineEdit_log_rx_buf_max.setText(max_num) if max_num >= 100000: max_num = 100000 self.lineEdit_log_rx_buf_max.setText(f'{max_num}') self.textEdit_log_0rx_display.document().setMaximumBlockCount(max_num) def log_refresh(self): if self.comboBox_log_sn.currentIndex() != -1: sn_cur = self.comboBox_log_sn.currentText() send_topic = "cpyypt/down/9002/" + sn_cur send_data = 'PT_CMD={"get_fixed_info": "true"}' self.mqtt_send_data(g_client, send_topic, send_data) def log_send(self): if self.comboBox_log_sn.currentIndex() != -1: sn_cur = self.comboBox_log_sn.currentText() cmd_header = "TTD_TTL" cmd_tail = "" io_select = self.comboBox_tx_io_select.currentIndex() if io_select == 0: cmd_header = 'TTD_TTL' elif io_select == 1: cmd_header = 'TTD_RS232' elif io_select == 2: cmd_header = 'TTD_RS485' if self.radioButton_log_1tx_ascii.isChecked(): cmd_header = cmd_header + "_A=" if self.checkBox_r_n.isChecked(): cmd_tail = "\r\n" else: cmd_header = cmd_header + "_H=" cmd_body = self.textEdit_log_1tx.toPlainText() send_data = cmd_header + cmd_body + cmd_tail send_topic = "cpyypt/down/9002/" + sn_cur self.mqtt_send_data(g_client, send_topic, send_data) def log_rs485_up_enable(self): if self.comboBox_log_sn.currentIndex() != -1: sn_cur = self.comboBox_log_sn.currentText() send_data = 'PT_CMD={"rs485_cfg": {"up_enable": 1}}' send_topic = "cpyypt/down/9002/" + sn_cur self.mqtt_send_data(g_client, send_topic, send_data) def log_rs485_up_disable(self): if self.comboBox_log_sn.currentIndex() != -1: sn_cur = self.comboBox_log_sn.currentText() send_data = 'PT_CMD={"rs485_cfg": {"up_enable": 0}}' send_topic = "cpyypt/down/9002/" + sn_cur self.mqtt_send_data(g_client, send_topic, send_data) def map_refresh(self, sn=0, longitude=116.1798005, latitude=39.9030239): print("map_refresh") self.map_get_regeocode(sn, longitude, latitude) self.map_load_static_map(sn, longitude, latitude) def map_get_regeocode(self, sn=0, longitude=116.1798005, latitude=39.9030239, key='ebe21d6ee4409542d070b8be762ab0a5'): # print(longitude, latitude, key) # 构建请求URL url = f"https://restapi.amap.com/v3/geocode/regeo?key={key}&location={longitude},{latitude}" # 发送请求 response = requests_get(url) # 解析JSON响应 data = response.json() text = "信号基站位置信息:" if data['status'] == '1': # 打印地理编码结果 geo_result = data['regeocode'] # print(f"地址: {geo_result['formatted_address']}") # print(f"省: {geo_result['addressComponent']['province']}") # print(f"市: {geo_result['addressComponent']['city']}") # print(f"区: {geo_result['addressComponent']['district']}") text = text + f"\n省: {geo_result['addressComponent']['province']}" text = text + f"\n市: {geo_result['addressComponent']['city']}" text = text + f"\n区: {geo_result['addressComponent']['district']}" text = text + f"\n地址: {geo_result['formatted_address']}" else: text = "逆向地理编码失败" print(text) self.textEdit_lsb_address.setText(text) def map_load_static_map(self, sn=0, longitude=116.1798005, latitude=39.9030239, key='ebe21d6ee4409542d070b8be762ab0a5'): # 高德地图URL,location为初始定位,可以是城市名或经纬度 location_str = f'{longitude},{latitude}' #print(self, sn, longitude, latitude, key) #location_str = "116.1798005,39.9030239" url = f"https://restapi.amap.com/v3/geocode/regeo?key=ebe21d6ee4409542d070b8be762ab0a5&location=116.1798005,39.9030239" # url = f"https://restapi.amap.com/v3/geocode/regeo?output=xml&location=116.1798005,39.9030239&key=ebe21d6ee4409542d070b8be762ab0a5&radius=1000&extensions=all" url = f"https://restapi.amap.com/v3/staticmap?location={location_str}&zoom=13&size=1024*600&markers=mid,,A:{location_str}&key={key}" print(url) self.browser.load(QUrl(url)) # self.setCentralWidget(self.browser) # self.browser.show() def lsb_get_fixed_info(self): sn_cur = "" send_cmd_flag = False if self.comboBox_log_sn.currentIndex() != -1: sn_cur = self.comboBox_log_sn.currentText() else: return if data_buf.get(sn_cur): if data_buf[sn_cur].get("pos_info"): lng = data_buf[sn_cur]["pos_info"].get("lng", 116.1666213) lat = data_buf[sn_cur]["pos_info"].get("lat", 39.9007607) self.map_refresh(sn_cur, lng, lat) else: send_cmd_flag = True else: send_cmd_flag = True if send_cmd_flag: send_topic = "cpyypt/down/9002/" + sn_cur send_data = 'PT_CMD={"get_fixed_info": "true"}' self.mqtt_send_data(g_client, send_topic, send_data) def set_svr(self): index_cur = self.comboBox_set_svr.currentIndex() if index_cur == 0: self.lineEdit_set_0ip.setText("test-******") self.lineEdit_set_1port.setText("******") self.lineEdit_set_2usr.setText("******") self.lineEdit_set_3password.setText("******") self.set_svr_env = "test" elif index_cur == 1: self.lineEdit_set_0ip.setText("prod-******") self.lineEdit_set_1port.setText("******") self.lineEdit_set_2usr.setText("******") self.lineEdit_set_3password.setText("******") self.set_svr_env = "prod" else: self.set_svr_env = "usr_def" def set_create(self): # PT_CMD={"imei_check":"869861069999337","reboot":"true","dev_cfg":{"dev_type":"9002","dev_sn":5,"dev_pd":"20240329"}} # PT_CMD={"reboot":"true","dev_cfg":{"dev_type":"9002","dev_sn":"0000000001","dev_pd":"20240322"},"reg_cfg":{"name":"李四","tel":"13999999999"}} set_dict = {} # 下发指令,是否绑定IMEI if self.checkBox_set_bind_imei.isChecked(): set_dict["imei_check"] = self.lineEdit_0imei.text() else: set_dict.pop("imei_check", None) # 获取基础信息 if self.checkBox_set_get_fixed_info.isChecked(): set_dict["get_fixed_info"] = "true" else: set_dict.pop("get_fixed_info", None) # 恢复默认参数 if self.checkBox_set_default_cfg.isChecked(): set_dict["default_cfg"] = "true" else: set_dict.pop("default_cfg", None) # 重启 if self.checkBox_set_reboot.isChecked(): set_dict["reboot"] = "true" else: set_dict.pop("reboot", None) # 设备信息 if self.checkBox_set_dev_cfg.isChecked(): set_dict["dev_cfg"] = {} set_dict["dev_cfg"]["dev_sn"] = self.lineEdit_set_dev_sn.text() set_dict["dev_cfg"]["dev_pd"] = self.lineEdit_set_dev_pd.text() else: set_dict.pop("dev_cfg", None) # 注册信息 if self.checkBox_set_reg_cfg.isChecked(): set_dict["reg_cfg"] = {} set_dict["reg_cfg"]["name"] = self.lineEdit_set_reg_name.text() set_dict["reg_cfg"]["tel"] = self.lineEdit_set_reg_tel.text() else: set_dict.pop("reg_cfg", None) # 服务器信息 if self.checkBox_set_svr_cfg.isChecked(): set_dict["svr_cfg"] = {} set_dict["svr_cfg"]["svr_ip"] = self.lineEdit_set_svr_cfg.text() set_dict["svr_cfg"]["svr_port"] = self.lineEdit_set_svr_port.text() set_dict["svr_cfg"]["svr_usr_name"] = self.lineEdit_set_svr_usr_name.text() set_dict["svr_cfg"]["svr_usr_pwd"] = self.lineEdit_set_svr_usr_pwd.text() else: set_dict.pop("svr_cfg", None) # 接口参数 io_name = self.comboBox_set_io_select.currentText() + "_cfg" if self.checkBox_set_io_cfg.isChecked(): io_name = io_name.lower() set_dict[io_name] = {} if self.checkBox_set_io_up_enable.isChecked(): set_dict[io_name]["up_enable"] = 1 if self.checkBox_set_io_up_disable.isChecked(): set_dict[io_name]["up_enable"] = 0 if (not self.checkBox_set_io_up_enable.isChecked()) and (not self.checkBox_set_io_up_disable.isChecked()): set_dict[io_name].pop("up_enable", None) if self.checkBox_set_io_down_enable.isChecked(): set_dict[io_name]["down_enable"] = 1 if self.checkBox_set_io_down_disable.isChecked(): set_dict[io_name]["down_enable"] = 0 if (not self.checkBox_set_io_down_enable.isChecked()) and (not self.checkBox_set_io_down_disable.isChecked()): set_dict[io_name].pop("down_enable", None) if self.checkBox_set_io_parameter.isChecked(): if self.lineEdit_set_baud_rate.text() != "": set_dict[io_name]["baud_rate"] = int(self.lineEdit_set_baud_rate.text()) if self.lineEdit_set_data_bits.text() != "": set_dict[io_name]["data_bits"] = int(self.lineEdit_set_data_bits.text()) if self.lineEdit_set_stop_bits.text() != "": set_dict[io_name]["stop_bits"] = int(self.lineEdit_set_stop_bits.text()) if self.lineEdit_set_partiy.text() != "": set_dict[io_name]["partiy"] = int(self.lineEdit_set_partiy.text()) ''' if self.lineEdit_set_bit_order.text() != "": set_dict[io_name]["bit_order"] = int(self.lineEdit_set_bit_order.text()) if self.lineEdit_set_buff_size.text() != "": set_dict[io_name]["buff_size"] = int(self.lineEdit_set_buff_size.text()) if self.lineEdit_set_rs485_gpio.text() != "": set_dict[io_name]["rs485_gpio"] = int(self.lineEdit_set_rs485_gpio.text()) if self.lineEdit_set_rs485_level.text() != "": set_dict[io_name]["rs485_level"] = int(self.lineEdit_set_rs485_level.text()) if self.lineEdit_set_rs485_delay.text() != "": set_dict[io_name]["rs485_delay"] = int(self.lineEdit_set_rs485_delay.text()) ''' else: set_dict.pop(io_name, None) set_str = json_dumps(set_dict, ensure_ascii=False) print(set_str) if len(set_str) > 4: self.textEdit_set_tx.setText("PT_CMD=" + set_str) def set_send(self): print("mqtt set_send," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) send_topic = self.lineEdit_5pub_topic.text() send_data = self.textEdit_set_tx.toPlainText() self.mqtt_send_data(g_client, send_topic, send_data) def set_create_send(self): self.set_create() self.set_send() def set_io_up_able(self): if self.checkBox_set_io_up_enable.isChecked(): self.checkBox_set_io_up_disable.setChecked(False) self.checkBox_set_io_cfg.setChecked(True) if (not self.checkBox_set_io_parameter.isChecked()) and \ (not self.checkBox_set_io_up_enable.isChecked()) and \ (not self.checkBox_set_io_up_disable.isChecked()) and \ (not self.checkBox_set_io_down_enable.isChecked()) and \ (not self.checkBox_set_io_down_disable.isChecked()): self.checkBox_set_io_cfg.setChecked(False) def set_io_up_disable(self): if self.checkBox_set_io_up_disable.isChecked(): self.checkBox_set_io_up_enable.setChecked(False) self.checkBox_set_io_cfg.setChecked(True) if (not self.checkBox_set_io_parameter.isChecked()) and \ (not self.checkBox_set_io_up_enable.isChecked()) and \ (not self.checkBox_set_io_up_disable.isChecked()) and \ (not self.checkBox_set_io_down_enable.isChecked()) and \ (not self.checkBox_set_io_down_disable.isChecked()): self.checkBox_set_io_cfg.setChecked(False) def set_io_down_able(self): if self.checkBox_set_io_down_enable.isChecked(): self.checkBox_set_io_down_disable.setChecked(False) self.checkBox_set_io_cfg.setChecked(True) if (not self.checkBox_set_io_parameter.isChecked()) and \ (not self.checkBox_set_io_up_enable.isChecked()) and \ (not self.checkBox_set_io_up_disable.isChecked()) and \ (not self.checkBox_set_io_down_enable.isChecked()) and \ (not self.checkBox_set_io_down_disable.isChecked()): self.checkBox_set_io_cfg.setChecked(False) def set_io_down_disable(self): if self.checkBox_set_io_down_disable.isChecked(): self.checkBox_set_io_down_enable.setChecked(False) self.checkBox_set_io_cfg.setChecked(True) if (not self.checkBox_set_io_parameter.isChecked()) and \ (not self.checkBox_set_io_up_enable.isChecked()) and \ (not self.checkBox_set_io_up_disable.isChecked()) and \ (not self.checkBox_set_io_down_enable.isChecked()) and \ (not self.checkBox_set_io_down_disable.isChecked()): self.checkBox_set_io_cfg.setChecked(False) def set_io_parameter(self): if self.checkBox_set_io_parameter.isChecked(): self.checkBox_set_io_cfg.setChecked(True) if (not self.checkBox_set_io_parameter.isChecked()) and \ (not self.checkBox_set_io_up_enable.isChecked()) and \ (not self.checkBox_set_io_up_disable.isChecked()) and \ (not self.checkBox_set_io_down_enable.isChecked()) and \ (not self.checkBox_set_io_down_disable.isChecked()): self.checkBox_set_io_cfg.setChecked(False) def set_io_cfg(self): if not self.checkBox_set_io_parameter.isChecked(): self.checkBox_set_io_parameter.setChecked(False) self.checkBox_set_io_up_enable.setChecked(False) self.checkBox_set_io_up_disable.setChecked(False) self.checkBox_set_io_down_enable.setChecked(False) self.checkBox_set_io_down_disable.setChecked(False) def set_default_tx_sn(self): if self.checkBox_set_default_tx_sn.isChecked(): self.lineEdit_tx_sn.setEnabled(False) self.lineEdit_5pub_topic.setEnabled(False) else: self.lineEdit_tx_sn.setEnabled(True) self.lineEdit_5pub_topic.setEnabled(True) def set_tx_sn_change(self): sn = self.lineEdit_tx_sn.text() sn_str = sn.zfill(10) self.lineEdit_5pub_topic.setText("cpyypt/down/9002/" + sn_str) def main(): app = QApplication(sys.argv) # 创建一个QApplication,也就是你要开发的软件app myWin = MyWindow() # 创建一个QMainWindow,用来装载你需要的各种组件、控件,myWin()类的实例化对象 myWin.show() # 执行QMainWindow的show()方法,显示这个QMainWindow # 捕获并处理异常 try: sys.exit(app.exec_()) # sys.exit(app.exec_()) # 使用exit()或者点击关闭按钮退出QApplication except SystemExit as e: # 异常处理代码 print(f"程序异常退出: {e}") # 这里可以添加额外的处理代码,例如保存状态、清理资源等 if __name__ == '__main__': main()