| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902 |
- # -*- coding: gb2312 -*-
- import time
- from os import path, makedirs
- import sys
- from datetime import datetime
- from PyQt5.QtWidgets import QApplication, QMainWindow
- from PyQt5.QtCore import pyqtSignal
- from PyQt5.QtCore import QUrl
- from PyQt5.QtCore import QTimer
- from PyQt5.QtWebEngineWidgets import QWebEngineView
- from PyQt5.QtGui import QTextCursor
- from mainwindow import *
- import logging
- from platform import node
- from paho.mqtt import client as mqtt_client
- from json import loads as json_loads, dumps as json_dumps, JSONDecodeError
- from threading import Thread
- from requests import get as requests_get
- # pyinstaller.exe -D -n LogTT_tools_V0.0.0.2 main.py
- # pyinstaller.exe -w -F -n LogTT_tools_V0.0.0.3 main.py
- # pyinstaller.exe -w -F -n LogTT_tools_V0.0.0.3 main.py
- client_id = "LogTT_tool_" + node()
- g_client = mqtt_client.Client(client_id)
- mqtt_broker = 'test-mqtt.cpyypt.cn'
- mqtt_port = 9000
- mqtt_user = 'admin'
- mqtt_pwd = 'houjianwei'
- mqtt_keepalive = 5
- mqtt_timer_time_out = 3*1000
- # mqtt_subtopic = [("cpyypt/up/9002/#", 0), ("cpyypt/down/9002/broadcast", 0)]
- mqtt_subtopic = [("cpyypt/up/9002/#", 0)]
- mqtt_pubtopic = "cpyypt/down/9002/0000000001"
- version = "0.0.0.3"
- version_w_ascii = 1
- version_w_hex = 1
- error_msg = ""
- info_msg = ""
- start_ttl = "TTL_U="
- start_rs232 = "RS232_U="
- start_rs485 = "RS485_U="
- start_fixed = "FIXED_INFO="
- start_heart = "HEART_MSG="
- start_pt = "PT_CMD="
- data_buf = {}
- LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
- logging.basicConfig(filename='formater.log', level=logging.DEBUG, format=LOG_FORMAT)
- logging.debug("\n\n................")
- def get_log_file_name_ascii(sn):
- # global
- log_file_name = f"./log/LogTT_{sn}_ascii_" + datetime.now().strftime('%Y-%m-%d') + ".log"
- return log_file_name
- def get_log_file_name_hex(sn):
- # global
- log_file_name = f"./log/LogTT_{sn}_hex_" + datetime.now().strftime('%Y-%m-%d') + ".log"
- return log_file_name
- def is_valid_json(json_str):
- try:
- json_loads(json_str)
- return True
- except JSONDecodeError:
- return False
- class MyWindow(QMainWindow, Ui_MainWindow):
- map_refresh_signal = pyqtSignal(int, float, float)
- rx_data_signal = pyqtSignal(str, str, str)
- mqtt_connect_state_signal = pyqtSignal(str)
- log_display_update_signal = pyqtSignal(str)
- def __init__(self, parent=None):
- print("__init__")
- logging.info("__init__")
- super(MyWindow, self).__init__(parent)
- self.setupUi(self)
- self.initUI()
- self.set_svr_env = "test"
- self.sn_current = 0
- self.mqtt_connect_state = "未连接"
- browser = QWebEngineView()
- self.browser = browser
- self.horizontalLayout.layout().addWidget(self.browser)
- self.horizontalLayoutWidget.setGeometry(QtCore.QRect(0, 0, 1024, 600))
- # self.pushButton_set_subscribe.clicked.connect(lambda: {self.mqtt_subscribe(g_client), Thread(target=g_client.loop_forever, daemon=True).start()})
- self.pushButton_set_subscribe.clicked.connect(lambda: {self.mqtt_subscribe(g_client)})
- self.mqtt_connect(g_client)
- self.mqtt_subscribe(g_client)
- self.mqtt_disconnect(g_client)
- self.mqtt_thread = Thread(target=g_client.loop_forever, daemon=True)
- self.mqtt_thread.start()
- if not path.exists("log"):
- makedirs("log")
- self.textEdit_log_0rx_display.setText("")
- self.textEdit_log_0rx_display.moveCursor(QTextCursor.End)
- self.textEdit_log_0rx_display.document().setMaximumBlockCount(10000)
- self.display_clear()
- def initUI(self):
- print("initUI")
- logging.info("initUI")
- self.setWindowTitle("LogTT_tools:test-" + version + ",未连接")
- # self.tabWidget.setCurrentIndex(self.tabWidget.indexOf(self.tab_lsb))
- # self.comboBox_log_sn.currentIndexChanged.connect(self.comboBox_log_sn_change)
- self.comboBox_log_sn.activated.connect(self.display_update)
- self.comboBox_log_sn.activated.connect(self.sn_update_something)
- self.pushButton_log_rx_clr.clicked.connect(self.log_rx_clr)
- self.map_refresh_signal.connect(self.map_refresh)
- self.rx_data_signal.connect(self.parse_data)
- self.mqtt_connect_state_signal.connect(self.mainWindow_title_update)
- self.log_display_update_signal.connect(self.log_display_update)
- self.pushButton_set_create.clicked.connect(self.set_create)
- self.pushButton_set_send.clicked.connect(self.set_send)
- self.pushButton_set_create_send.clicked.connect(self.set_create_send)
- # self.pushButton_set_send.clicked.connect(lambda: {self.publish(g_client)})
- self.pushButton_set_connect.clicked.connect(lambda: {self.connect(g_client)})
- self.comboBox_set_svr.activated.connect(self.set_svr)
- self.checkBox_set_io_up_enable.clicked.connect(self.set_io_up_able)
- self.checkBox_set_io_up_disable.clicked.connect(self.set_io_up_disable)
- self.checkBox_set_io_down_enable.clicked.connect(self.set_io_down_able)
- self.checkBox_set_io_down_disable.clicked.connect(self.set_io_down_disable)
- self.checkBox_set_io_parameter.clicked.connect(self.set_io_parameter)
- self.checkBox_set_io_cfg.clicked.connect(self.set_io_cfg)
- self.checkBox_set_default_tx_sn.clicked.connect(self.set_default_tx_sn)
- self.lineEdit_tx_sn.textChanged.connect(self.set_tx_sn_change)
- self.pushButton_lsb_refresh.clicked.connect(self.lsb_get_fixed_info)
- self.pushButton_log_send.clicked.connect(self.log_send)
- self.pushButton_log_refresh.clicked.connect(self.log_refresh)
- # self.lineEdit_log_rx_buf_max.textChanged.connect(self.log_rx_buf_max)
- self.pushButton_log_2rs485_0on.clicked.connect(self.log_rs485_up_enable)
- self.pushButton_log_2rs485_1off.clicked.connect(self.log_rs485_up_disable)
- def timer_test_f(self, str):
- print("mqtt timer_test_f," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- self.timer_test_t.start(3*1000) # 10秒后重连
- def mqtt_timer_f(self):
- print("mqtt mqtt_timer_f," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- # self.mqtt_timer.start(3*1000) # 10秒后重连
- def log_rx_clr(self):
- self.textEdit_log_0rx_display.setText("")
- def mainWindow_title_update(self, state):
- self.setWindowTitle("LogTT_tools:" + "test-" + version + "," + state)
- if state == "已连接":
- self.mqtt_subscribe(g_client)
- def mqtt_connect(self, client):
- print("mqtt mqtt_connect," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- try:
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- print("Connected to MQTT Broker!" + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- self.mqtt_connect_state_signal.emit("已连接")
- else:
- print("Failed to connect, return code %d\n", rc)
- self.mqtt_connect_state_signal.emit("连接失败")
- # self.mqtt_timer.start(mqtt_timer_time_out) # 10秒后重连
- client.username_pw_set(mqtt_user, mqtt_pwd)
- client.on_connect = on_connect
- client.connect(mqtt_broker, mqtt_port, mqtt_keepalive)
- print("[√]连接成功," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- except Exception as e:
- print("[!]Connect error")
- print("[×]连接失败," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- pass
- def mqtt_disconnect(self, client):
- print("mqtt mqtt_disconnect," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- def on_disconnect(client, userdata, rc):
- print("self.sender()", self.sender())
- if rc != 0:
- print("Unexpected disconnection." + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- else:
- print("Disconnected from MQTT." + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- # 更新 UI 线程
- self.mqtt_connect_state_signal.emit("已断开")
- client.on_disconnect = on_disconnect
- def mqtt_subscribe(self, client):
- print("mqtt mqtt_subscribe," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- global info_msg
- global error_msg
- def on_message(client, userdata, msg):
- LogTT_SN = str.split(msg.topic, "/")[-1]
- head = b""
- body = b""
- cmd_type = ""
- flag_ascii = False
- flag_hex = True
- if msg.payload[:len(start_ttl)] == start_ttl.encode("utf-8"):
- cmd_type = start_ttl
- flag_ascii = True
- flag_hex = True
- head = msg.payload[:len(start_ttl)]
- body = msg.payload[len(start_ttl):]
- elif msg.payload[:len(start_rs232)] == start_rs232.encode("utf-8"):
- cmd_type = start_rs232
- flag_ascii = False
- flag_hex = True
- head = msg.payload[:len(start_rs232)]
- body = msg.payload[len(start_rs232):]
- elif msg.payload[:len(start_rs485)] == start_rs485.encode("utf-8"):
- cmd_type = start_rs485
- flag_ascii = False
- flag_hex = True
- head = msg.payload[:len(start_rs485)]
- body = msg.payload[len(start_rs485):]
- elif msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
- cmd_type = start_fixed
- flag_ascii = True
- flag_hex = False
- head = msg.payload[:len(start_fixed)]
- body = msg.payload[len(start_fixed):]
- elif msg.payload[:len(start_heart)] == start_heart.encode("utf-8"):
- cmd_type = start_heart
- flag_ascii = True
- flag_hex = False
- head = msg.payload[:len(start_heart)]
- body = msg.payload[len(start_heart):]
- elif msg.payload[:len(start_pt)] == start_pt.encode("utf-8"):
- cmd_type = start_pt
- flag_ascii = True
- flag_hex = False
- head = msg.payload[:len(start_pt)]
- body = msg.payload[len(start_pt):]
- else:
- cmd_type = ""
- flag_ascii = False
- flag_hex = True
- body = msg.payload
- if flag_ascii:
- try:
- body_str = body.decode('gb2312')
- except UnicodeDecodeError:
- try:
- body_str = body.decode('utf-8')
- except UnicodeDecodeError:
- body_str = body.hex().upper()
- head_str = head.decode('utf-8')
- msg_ascii = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
- msg_ascii = msg_ascii + f"topic:{msg.topic} msg:{head_str}\r\n{body_str}"
- msg_ascii = msg_ascii.rstrip() + "\r\n"
- # 输出信息
- try:
- with open(get_log_file_name_ascii(LogTT_SN), 'a') as f:
- global version_w_ascii
- if version_w_ascii > 0:
- f.write("version:" + version + "\n")
- version_w_ascii = 0
- f.write(msg_ascii)
- f.close()
- except Exception as e:
- print(str(e))
- print(msg_ascii, end="")
- if self.comboBox_log_sn.currentIndex() == -1:
- # self.textEdit_log_0rx_display.append(msg_ascii)
- self.log_display_update_signal.emit(msg_ascii)
- elif self.comboBox_log_sn.currentText() == LogTT_SN:
- # self.textEdit_log_0rx_display.append(msg_ascii)
- self.log_display_update_signal.emit(msg_ascii)
- self.rx_data_signal.emit(LogTT_SN, head_str, body_str)
- # self.parse_data(LogTT_SN, head_str, body_str)
- if flag_hex:
- msg_hex = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
- msg_hex = msg_hex + f"topic:{msg.topic} msg:{head.decode('utf-8')}{body.hex().upper()}"
- # 输出信息
- try:
- with open(get_log_file_name_hex(LogTT_SN), 'a') as f:
- global version_w_hex
- if version_w_hex > 0:
- f.write("version:" + version + "\n")
- version_w_hex = 0
- f.write(msg_hex + "\r\n")
- f.close()
- except Exception as e:
- print(str(e))
- if not flag_ascii:
- print(msg_hex)
- # self.textEdit_log_0rx_display.append(msg_ascii)
- self.log_display_update_signal.emit(msg_hex)
- client.subscribe(mqtt_subtopic)
- client.on_message = on_message
- print("[√]订阅成功," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- def mqtt_publish_test(self, client):
- print("mqtt mqtt_publish_test," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- try:
- msg = "qqqqqqqqqqqqqqq"
- result = client.publish(mqtt_pubtopic, msg)
- # result: [0, 1]
- status = result[0]
- if status == 0:
- print(f"[√]发送成功, Send `{msg}` to topic `{mqtt_pubtopic}`")
- else:
- print(f"[×]发送失败, Failed to send message to topic {mqtt_pubtopic}")
- except:
- print("[!]请先连接服务器!")
- def parse_data(self, sn, head, body):
- if data_buf.get(sn) is None:
- data_buf[sn] = {}
- if data_buf[sn].get("base_info") is None:
- data_buf[sn]["base_info"] = {}
- if data_buf[sn].get("pos_info") is None:
- data_buf[sn]["pos_info"] = {}
- data_buf[sn]["heart_time_stamp"] = datetime.now().strftime('%H:%M:%S.%f')[:-3]
- # start_fixed = "FIXED_INFO="
- # start_heart = "HEART_MSG="
- # FIXED_INFO={"base_info":{"name":"张三","uplinkB":1010,"rsrp":-99,"tel":"13888888888","reset_times":49,"project":"LogTT","csq":23,"downlinkB":573,"rsrq":-13,"dev_type":"9002","simid":0,"muid":"20230818221817A863212A0300364949","snr":-1,"version":"9002.4.002","iccid":"89860622330053052122","imei":"869861069998891","rssi":-66,"dev_pd":"20240329","dev_sn":1},"pos_info":{"lng":0,"lat":0}}
- # if head == start_fixed or head == start_heart:
- if is_valid_json(body):
- body_dict = json_loads(body)
- if body_dict["base_info"]:
- base_info = body_dict["base_info"]
- if base_info.get("project"): data_buf[sn]["base_info"]["project"] = base_info.get("project")
- if base_info.get("version"): data_buf[sn]["base_info"]["version"] = base_info.get("version")
- if base_info.get("name"): data_buf[sn]["base_info"]["name"] = base_info.get("name")
- if base_info.get("tel"): data_buf[sn]["base_info"]["tel"] = base_info.get("tel")
- if base_info.get("dev_type"): data_buf[sn]["base_info"]["dev_type"] = base_info.get("dev_type")
- if base_info.get("dev_sn"): data_buf[sn]["base_info"]["dev_sn"] = str(sn)
- if base_info.get("dev_pd"): data_buf[sn]["base_info"]["dev_pd"] = base_info.get("dev_pd")
- if base_info.get("reset_times"): data_buf[sn]["base_info"]["reset_times"] = base_info.get("reset_times")
- if base_info.get("imei"): data_buf[sn]["base_info"]["imei"] = base_info.get("imei")
- if base_info.get("muid"): data_buf[sn]["base_info"]["muid"] = base_info.get("muid")
- if base_info.get("iccid"): data_buf[sn]["base_info"]["iccid"] = base_info.get("iccid")
- if base_info.get("simid"): data_buf[sn]["base_info"]["simid"] = base_info.get("simid")
- if base_info.get("csq"): data_buf[sn]["base_info"]["csq"] = base_info.get("csq")
- if base_info.get("rssi"): data_buf[sn]["base_info"]["rssi"] = base_info.get("rssi")
- if base_info.get("snr"): data_buf[sn]["base_info"]["snr"] = base_info.get("snr")
- if base_info.get("rsrq"): data_buf[sn]["base_info"]["rsrq"] = base_info.get("rsrq")
- if base_info.get("rsrp"): data_buf[sn]["base_info"]["rsrp"] = base_info.get("rsrp")
- if base_info.get("uplinkB"): data_buf[sn]["base_info"]["uplinkB"] = base_info.get("uplinkB")
- if base_info.get("downlinkB"): data_buf[sn]["base_info"]["downlinkB"] = base_info.get("downlinkB")
- if body_dict["pos_info"]:
- pos_info = body_dict["pos_info"]
- if pos_info.get("lng"): data_buf[sn]["pos_info"]["lng"] = pos_info.get("lng")
- if pos_info.get("lat"): data_buf[sn]["pos_info"]["lat"] = pos_info.get("lat")
- # 更新数据
- self.data_update(sn)
- def data_update(self, sn):
- self.comboBox_log_sn_add(sn)
- self.display_update(sn)
- self.dev_list_refresh(sn)
- if self.comboBox_log_sn.currentIndex() != -1:
- sn_cur = self.comboBox_log_sn.currentText()
- if sn == sn_cur:
- if data_buf.get(sn_cur):
- if data_buf[sn_cur].get("pos_info"):
- lng = data_buf[sn_cur]["pos_info"].get("lng", 116.1666213)
- lat = data_buf[sn_cur]["pos_info"].get("lat", 39.9007607)
- # self.map_refresh_signal.emit(sn_cur, float(lng), float(lat))
- def comboBox_log_sn_change(self):
- # self.sn_current = sn
- self.display_update()
- def comboBox_log_sn_add(self, sn):
- if sn == "0000000116":
- return
- if self.comboBox_log_sn.findText(sn) == -1:
- self.comboBox_log_sn.addItem(sn)
- num_items = self.comboBox_log_sn.count()
- current_text = self.comboBox_log_sn.currentText()
- # 插入,且排序
- text_list = []
- for index in range(num_items):
- text_list.append(self.comboBox_log_sn.itemText(index))
- text_list = sorted(text_list)
- for index in range(num_items):
- self.comboBox_log_sn.setItemText(index, text_list[index])
- self.comboBox_log_sn.setCurrentText(current_text)
- def display_base_info_clear(self):
- self.label_log_01dev_type.setText("设备类型:")
- self.label_log_02sn.setText("设备SN:")
- self.label_log_03reset_times.setText("复位次数:")
- self.label_log_04app_ver.setText("固件版本:")
- self.label_log_05name.setText("姓名:")
- self.label_log_06telephone.setText("电话:")
- self.label_log_09imei.setText("IMEI:")
- self.label_log_10iccid.setText("ICCID:")
- self.label_log_11rssi.setText("信号强度:")
- self.label_log_12snr.setText("信噪比:")
- def display_pos_info_clear(self):
- self.label_log_07longitude.setText("经度:")
- self.label_log_08latitude.setText("维度:")
- def display_clear(self):
- self.display_base_info_clear()
- self.display_pos_info_clear()
- self.groupBox_log_1base.setTitle("日志设备基础信息")
- def display_update(self, sn=0):
- sn_cur = self.comboBox_log_sn.currentText()
- if sn_cur in data_buf:
- if data_buf[sn_cur]["base_info"]:
- base_info = data_buf[sn_cur]["base_info"]
- self.label_log_01dev_type.setText("设备类型:" + base_info.get("dev_type", ""))
- self.label_log_02sn.setText("设备SN:" + str(sn_cur))
- self.label_log_03reset_times.setText("复位次数:" + str(base_info.get("reset_times", 0)))
- self.label_log_04app_ver.setText("固件版本:" + base_info.get("version", ""))
- self.label_log_05name.setText("姓名:" + base_info.get("name", ""))
- self.label_log_06telephone.setText("电话:" + base_info.get("tel", ""))
- self.label_log_09imei.setText("IMEI:" + base_info.get("imei", ""))
- self.label_log_10iccid.setText("ICCID:" + base_info.get("iccid", ""))
- self.label_log_11rssi.setText("信号强度:" + str(base_info.get("rssi", 0)))
- self.label_log_12snr.setText("信噪比:" + str(base_info.get("snr", 0)))
- else:
- self.display_base_info_clear()
- if data_buf[sn_cur]["pos_info"]:
- pos_info = data_buf[sn_cur]["pos_info"]
- lng = pos_info.get("lng", 0) if pos_info.get("lng", 0) != 0 else 116.1666213
- self.label_log_07longitude.setText("经度:" + str(lng))
- lat = pos_info.get("lat", 0) if pos_info.get("lat", 0) != 0 else 39.9007607
- self.label_log_08latitude.setText("维度:" + str(lat))
- else:
- self.display_pos_info_clear()
- self.groupBox_log_1base.setTitle("日志设备基础信息:" + str(data_buf[sn_cur]["heart_time_stamp"]))
- else:
- self.display_clear()
- self.label_lsb_01dev_type.setText(self.label_log_01dev_type.text())
- self.label_lsb_02sn.setText(self.label_log_02sn.text())
- self.label_lsb_03reset_times.setText(self.label_log_03reset_times.text())
- self.label_lsb_04app_ver.setText(self.label_log_04app_ver.text())
- self.label_lsb_05name.setText(self.label_log_05name.text())
- self.label_lsb_06telephone.setText(self.label_log_06telephone.text())
- self.label_lsb_09imei.setText(self.label_log_09imei.text())
- self.label_lsb_10iccid.setText(self.label_log_10iccid.text())
- self.label_lsb_11rssi.setText(self.label_log_11rssi.text())
- self.label_lsb_12snr.setText(self.label_log_12snr.text())
- self.label_lsb_07longitude.setText(self.label_log_07longitude.text())
- self.label_lsb_08latitude.setText(self.label_log_08latitude.text())
- self.groupBox_lsb_1base.setTitle(self.groupBox_log_1base.title())
- def sn_update_something(self):
- if self.checkBox_set_default_tx_sn.isChecked():
- sn_text = self.comboBox_log_sn.currentText()
- sn_num = int(sn_text)
- sn_text = f"%u" % sn_num
- self.lineEdit_tx_sn.setText(sn_text)
- # self.lineEdit_5pub_topic.setText("cpyypt/down/9002/" + self.lineEdit_tx_sn.text().zfill(10))
- def dev_list_refresh(self, sn):
- dev_str = self.plainTextEdit_log_dev_list.toPlainText()
- dev_list = str.split(dev_str)
- # dev_list = [dev for dev in data_buf.keys()]
- if sn not in dev_list:
- # print(dev_list)
- dev_list.append(sn)
- dev_list = sorted(dev_list)
- dev_text = ""
- for dev in dev_list:
- dev_text = dev_text + str(dev) + "\n"
- self.plainTextEdit_log_dev_list.setPlainText(dev_text)
- def mqtt_send_data(self, client, pub_topic, userdata):
- try:
- result = client.publish(pub_topic, userdata)
- # result: [0, 1]
- status = result[0]
- if status == 0:
- msg = f"[√]发送成功, Send `{userdata}` to topic `{pub_topic}`"
- else:
- msg = f"[×]发送失败, Failed to send message to topic {pub_topic}"
- except:
- msg = "[!]请先连接服务器!"
- # self.textEdit_log_0rx_display.append(msg + "\n")
- self.log_display_update_signal.emit(msg + "\n")
- def log_display_update(self, text):
- # 当前光标是否在最后
- cursor_end = False
- position = self.textEdit_log_0rx_display.textCursor().position()
- length = len(self.textEdit_log_0rx_display.toPlainText())
- if position == length:
- cursor_end = True
- self.textEdit_log_0rx_display.append(text)
- # 是否将光标移动到最后
- if cursor_end:
- self.textEdit_log_0rx_display.moveCursor(QTextCursor.End)
- def log_rx_buf_max(self):
- max_text = self.lineEdit_log_rx_buf_max.text()
- max_num = 10000
- try:
- max_num = int(max_text)
- except ValueError:
- max_num = 10000
- if max_num <= 100:
- max_num = 100
- self.lineEdit_log_rx_buf_max.setText(max_num)
- if max_num >= 100000:
- max_num = 100000
- self.lineEdit_log_rx_buf_max.setText(f'{max_num}')
- self.textEdit_log_0rx_display.document().setMaximumBlockCount(max_num)
- def log_refresh(self):
- if self.comboBox_log_sn.currentIndex() != -1:
- sn_cur = self.comboBox_log_sn.currentText()
- send_topic = "cpyypt/down/9002/" + sn_cur
- send_data = 'PT_CMD={"get_fixed_info": "true"}'
- self.mqtt_send_data(g_client, send_topic, send_data)
- def log_send(self):
- if self.comboBox_log_sn.currentIndex() != -1:
- sn_cur = self.comboBox_log_sn.currentText()
- cmd_header = "TTD_TTL"
- cmd_tail = ""
- io_select = self.comboBox_tx_io_select.currentIndex()
- if io_select == 0:
- cmd_header = 'TTD_TTL'
- elif io_select == 1:
- cmd_header = 'TTD_RS232'
- elif io_select == 2:
- cmd_header = 'TTD_RS485'
- if self.radioButton_log_1tx_ascii.isChecked():
- cmd_header = cmd_header + "_A="
- if self.checkBox_r_n.isChecked():
- cmd_tail = "\r\n"
- else:
- cmd_header = cmd_header + "_H="
- cmd_body = self.textEdit_log_1tx.toPlainText()
- send_data = cmd_header + cmd_body + cmd_tail
- send_topic = "cpyypt/down/9002/" + sn_cur
- self.mqtt_send_data(g_client, send_topic, send_data)
- def log_rs485_up_enable(self):
- if self.comboBox_log_sn.currentIndex() != -1:
- sn_cur = self.comboBox_log_sn.currentText()
- send_data = 'PT_CMD={"rs485_cfg": {"up_enable": 1}}'
- send_topic = "cpyypt/down/9002/" + sn_cur
- self.mqtt_send_data(g_client, send_topic, send_data)
- def log_rs485_up_disable(self):
- if self.comboBox_log_sn.currentIndex() != -1:
- sn_cur = self.comboBox_log_sn.currentText()
- send_data = 'PT_CMD={"rs485_cfg": {"up_enable": 0}}'
- send_topic = "cpyypt/down/9002/" + sn_cur
- self.mqtt_send_data(g_client, send_topic, send_data)
- def map_refresh(self, sn=0, longitude=116.1798005, latitude=39.9030239):
- print("map_refresh")
- self.map_get_regeocode(sn, longitude, latitude)
- self.map_load_static_map(sn, longitude, latitude)
- def map_get_regeocode(self, sn=0, longitude=116.1798005, latitude=39.9030239, key='ebe21d6ee4409542d070b8be762ab0a5'):
- # print(longitude, latitude, key)
- # 构建请求URL
- url = f"https://restapi.amap.com/v3/geocode/regeo?key={key}&location={longitude},{latitude}"
- # 发送请求
- response = requests_get(url)
- # 解析JSON响应
- data = response.json()
- text = "信号基站位置信息:"
- if data['status'] == '1':
- # 打印地理编码结果
- geo_result = data['regeocode']
- # print(f"地址: {geo_result['formatted_address']}")
- # print(f"省: {geo_result['addressComponent']['province']}")
- # print(f"市: {geo_result['addressComponent']['city']}")
- # print(f"区: {geo_result['addressComponent']['district']}")
- text = text + f"\n省: {geo_result['addressComponent']['province']}"
- text = text + f"\n市: {geo_result['addressComponent']['city']}"
- text = text + f"\n区: {geo_result['addressComponent']['district']}"
- text = text + f"\n地址: {geo_result['formatted_address']}"
- else:
- text = "逆向地理编码失败"
- print(text)
- self.textEdit_lsb_address.setText(text)
- def map_load_static_map(self, sn=0, longitude=116.1798005, latitude=39.9030239, key='ebe21d6ee4409542d070b8be762ab0a5'):
- # 高德地图URL,location为初始定位,可以是城市名或经纬度
- location_str = f'{longitude},{latitude}'
- #print(self, sn, longitude, latitude, key)
- #location_str = "116.1798005,39.9030239"
- url = f"https://restapi.amap.com/v3/geocode/regeo?key=ebe21d6ee4409542d070b8be762ab0a5&location=116.1798005,39.9030239"
- # url = f"https://restapi.amap.com/v3/geocode/regeo?output=xml&location=116.1798005,39.9030239&key=ebe21d6ee4409542d070b8be762ab0a5&radius=1000&extensions=all"
- url = f"https://restapi.amap.com/v3/staticmap?location={location_str}&zoom=13&size=1024*600&markers=mid,,A:{location_str}&key={key}"
- print(url)
- self.browser.load(QUrl(url))
- # self.setCentralWidget(self.browser)
- # self.browser.show()
- def lsb_get_fixed_info(self):
- sn_cur = ""
- send_cmd_flag = False
- if self.comboBox_log_sn.currentIndex() != -1:
- sn_cur = self.comboBox_log_sn.currentText()
- else:
- return
- if data_buf.get(sn_cur):
- if data_buf[sn_cur].get("pos_info"):
- lng = data_buf[sn_cur]["pos_info"].get("lng", 116.1666213)
- lat = data_buf[sn_cur]["pos_info"].get("lat", 39.9007607)
- self.map_refresh(sn_cur, lng, lat)
- else:
- send_cmd_flag = True
- else:
- send_cmd_flag = True
- if send_cmd_flag:
- send_topic = "cpyypt/down/9002/" + sn_cur
- send_data = 'PT_CMD={"get_fixed_info": "true"}'
- self.mqtt_send_data(g_client, send_topic, send_data)
- def set_svr(self):
- index_cur = self.comboBox_set_svr.currentIndex()
- if index_cur == 0:
- self.lineEdit_set_0ip.setText("test-******")
- self.lineEdit_set_1port.setText("******")
- self.lineEdit_set_2usr.setText("******")
- self.lineEdit_set_3password.setText("******")
- self.set_svr_env = "test"
- elif index_cur == 1:
- self.lineEdit_set_0ip.setText("prod-******")
- self.lineEdit_set_1port.setText("******")
- self.lineEdit_set_2usr.setText("******")
- self.lineEdit_set_3password.setText("******")
- self.set_svr_env = "prod"
- else:
- self.set_svr_env = "usr_def"
- def set_create(self):
- # PT_CMD={"imei_check":"869861069999337","reboot":"true","dev_cfg":{"dev_type":"9002","dev_sn":5,"dev_pd":"20240329"}}
- # PT_CMD={"reboot":"true","dev_cfg":{"dev_type":"9002","dev_sn":"0000000001","dev_pd":"20240322"},"reg_cfg":{"name":"李四","tel":"13999999999"}}
- set_dict = {}
- # 下发指令,是否绑定IMEI
- if self.checkBox_set_bind_imei.isChecked():
- set_dict["imei_check"] = self.lineEdit_0imei.text()
- else:
- set_dict.pop("imei_check", None)
- # 获取基础信息
- if self.checkBox_set_get_fixed_info.isChecked():
- set_dict["get_fixed_info"] = "true"
- else:
- set_dict.pop("get_fixed_info", None)
- # 恢复默认参数
- if self.checkBox_set_default_cfg.isChecked():
- set_dict["default_cfg"] = "true"
- else:
- set_dict.pop("default_cfg", None)
- # 重启
- if self.checkBox_set_reboot.isChecked():
- set_dict["reboot"] = "true"
- else:
- set_dict.pop("reboot", None)
- # 设备信息
- if self.checkBox_set_dev_cfg.isChecked():
- set_dict["dev_cfg"] = {}
- set_dict["dev_cfg"]["dev_sn"] = self.lineEdit_set_dev_sn.text()
- set_dict["dev_cfg"]["dev_pd"] = self.lineEdit_set_dev_pd.text()
- else:
- set_dict.pop("dev_cfg", None)
- # 注册信息
- if self.checkBox_set_reg_cfg.isChecked():
- set_dict["reg_cfg"] = {}
- set_dict["reg_cfg"]["name"] = self.lineEdit_set_reg_name.text()
- set_dict["reg_cfg"]["tel"] = self.lineEdit_set_reg_tel.text()
- else:
- set_dict.pop("reg_cfg", None)
- # 服务器信息
- if self.checkBox_set_svr_cfg.isChecked():
- set_dict["svr_cfg"] = {}
- set_dict["svr_cfg"]["svr_ip"] = self.lineEdit_set_svr_cfg.text()
- set_dict["svr_cfg"]["svr_port"] = self.lineEdit_set_svr_port.text()
- set_dict["svr_cfg"]["svr_usr_name"] = self.lineEdit_set_svr_usr_name.text()
- set_dict["svr_cfg"]["svr_usr_pwd"] = self.lineEdit_set_svr_usr_pwd.text()
- else:
- set_dict.pop("svr_cfg", None)
- # 接口参数
- io_name = self.comboBox_set_io_select.currentText() + "_cfg"
- if self.checkBox_set_io_cfg.isChecked():
- io_name = io_name.lower()
- set_dict[io_name] = {}
- if self.checkBox_set_io_up_enable.isChecked():
- set_dict[io_name]["up_enable"] = 1
- if self.checkBox_set_io_up_disable.isChecked():
- set_dict[io_name]["up_enable"] = 0
- if (not self.checkBox_set_io_up_enable.isChecked()) and (not self.checkBox_set_io_up_disable.isChecked()):
- set_dict[io_name].pop("up_enable", None)
- if self.checkBox_set_io_down_enable.isChecked():
- set_dict[io_name]["down_enable"] = 1
- if self.checkBox_set_io_down_disable.isChecked():
- set_dict[io_name]["down_enable"] = 0
- if (not self.checkBox_set_io_down_enable.isChecked()) and (not self.checkBox_set_io_down_disable.isChecked()):
- set_dict[io_name].pop("down_enable", None)
- if self.checkBox_set_io_parameter.isChecked():
- if self.lineEdit_set_baud_rate.text() != "":
- set_dict[io_name]["baud_rate"] = int(self.lineEdit_set_baud_rate.text())
- if self.lineEdit_set_data_bits.text() != "":
- set_dict[io_name]["data_bits"] = int(self.lineEdit_set_data_bits.text())
- if self.lineEdit_set_stop_bits.text() != "":
- set_dict[io_name]["stop_bits"] = int(self.lineEdit_set_stop_bits.text())
- if self.lineEdit_set_partiy.text() != "":
- set_dict[io_name]["partiy"] = int(self.lineEdit_set_partiy.text())
- '''
- if self.lineEdit_set_bit_order.text() != "":
- set_dict[io_name]["bit_order"] = int(self.lineEdit_set_bit_order.text())
- if self.lineEdit_set_buff_size.text() != "":
- set_dict[io_name]["buff_size"] = int(self.lineEdit_set_buff_size.text())
- if self.lineEdit_set_rs485_gpio.text() != "":
- set_dict[io_name]["rs485_gpio"] = int(self.lineEdit_set_rs485_gpio.text())
- if self.lineEdit_set_rs485_level.text() != "":
- set_dict[io_name]["rs485_level"] = int(self.lineEdit_set_rs485_level.text())
- if self.lineEdit_set_rs485_delay.text() != "":
- set_dict[io_name]["rs485_delay"] = int(self.lineEdit_set_rs485_delay.text())
- '''
- else:
- set_dict.pop(io_name, None)
- set_str = json_dumps(set_dict, ensure_ascii=False)
- print(set_str)
- if len(set_str) > 4:
- self.textEdit_set_tx.setText("PT_CMD=" + set_str)
- def set_send(self):
- print("mqtt set_send," + datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
- send_topic = self.lineEdit_5pub_topic.text()
- send_data = self.textEdit_set_tx.toPlainText()
- self.mqtt_send_data(g_client, send_topic, send_data)
- def set_create_send(self):
- self.set_create()
- self.set_send()
- def set_io_up_able(self):
- if self.checkBox_set_io_up_enable.isChecked():
- self.checkBox_set_io_up_disable.setChecked(False)
- self.checkBox_set_io_cfg.setChecked(True)
- if (not self.checkBox_set_io_parameter.isChecked()) and \
- (not self.checkBox_set_io_up_enable.isChecked()) and \
- (not self.checkBox_set_io_up_disable.isChecked()) and \
- (not self.checkBox_set_io_down_enable.isChecked()) and \
- (not self.checkBox_set_io_down_disable.isChecked()):
- self.checkBox_set_io_cfg.setChecked(False)
- def set_io_up_disable(self):
- if self.checkBox_set_io_up_disable.isChecked():
- self.checkBox_set_io_up_enable.setChecked(False)
- self.checkBox_set_io_cfg.setChecked(True)
- if (not self.checkBox_set_io_parameter.isChecked()) and \
- (not self.checkBox_set_io_up_enable.isChecked()) and \
- (not self.checkBox_set_io_up_disable.isChecked()) and \
- (not self.checkBox_set_io_down_enable.isChecked()) and \
- (not self.checkBox_set_io_down_disable.isChecked()):
- self.checkBox_set_io_cfg.setChecked(False)
- def set_io_down_able(self):
- if self.checkBox_set_io_down_enable.isChecked():
- self.checkBox_set_io_down_disable.setChecked(False)
- self.checkBox_set_io_cfg.setChecked(True)
- if (not self.checkBox_set_io_parameter.isChecked()) and \
- (not self.checkBox_set_io_up_enable.isChecked()) and \
- (not self.checkBox_set_io_up_disable.isChecked()) and \
- (not self.checkBox_set_io_down_enable.isChecked()) and \
- (not self.checkBox_set_io_down_disable.isChecked()):
- self.checkBox_set_io_cfg.setChecked(False)
- def set_io_down_disable(self):
- if self.checkBox_set_io_down_disable.isChecked():
- self.checkBox_set_io_down_enable.setChecked(False)
- self.checkBox_set_io_cfg.setChecked(True)
- if (not self.checkBox_set_io_parameter.isChecked()) and \
- (not self.checkBox_set_io_up_enable.isChecked()) and \
- (not self.checkBox_set_io_up_disable.isChecked()) and \
- (not self.checkBox_set_io_down_enable.isChecked()) and \
- (not self.checkBox_set_io_down_disable.isChecked()):
- self.checkBox_set_io_cfg.setChecked(False)
- def set_io_parameter(self):
- if self.checkBox_set_io_parameter.isChecked():
- self.checkBox_set_io_cfg.setChecked(True)
- if (not self.checkBox_set_io_parameter.isChecked()) and \
- (not self.checkBox_set_io_up_enable.isChecked()) and \
- (not self.checkBox_set_io_up_disable.isChecked()) and \
- (not self.checkBox_set_io_down_enable.isChecked()) and \
- (not self.checkBox_set_io_down_disable.isChecked()):
- self.checkBox_set_io_cfg.setChecked(False)
- def set_io_cfg(self):
- if not self.checkBox_set_io_parameter.isChecked():
- self.checkBox_set_io_parameter.setChecked(False)
- self.checkBox_set_io_up_enable.setChecked(False)
- self.checkBox_set_io_up_disable.setChecked(False)
- self.checkBox_set_io_down_enable.setChecked(False)
- self.checkBox_set_io_down_disable.setChecked(False)
- def set_default_tx_sn(self):
- if self.checkBox_set_default_tx_sn.isChecked():
- self.lineEdit_tx_sn.setEnabled(False)
- self.lineEdit_5pub_topic.setEnabled(False)
- else:
- self.lineEdit_tx_sn.setEnabled(True)
- self.lineEdit_5pub_topic.setEnabled(True)
- def set_tx_sn_change(self):
- sn = self.lineEdit_tx_sn.text()
- sn_str = sn.zfill(10)
- self.lineEdit_5pub_topic.setText("cpyypt/down/9002/" + sn_str)
- def main():
- app = QApplication(sys.argv) # 创建一个QApplication,也就是你要开发的软件app
- myWin = MyWindow() # 创建一个QMainWindow,用来装载你需要的各种组件、控件,myWin()类的实例化对象
- myWin.show() # 执行QMainWindow的show()方法,显示这个QMainWindow
- # 捕获并处理异常
- try:
- sys.exit(app.exec_())
- # sys.exit(app.exec_()) # 使用exit()或者点击关闭按钮退出QApplication
- except SystemExit as e:
- # 异常处理代码
- print(f"程序异常退出: {e}")
- # 这里可以添加额外的处理代码,例如保存状态、清理资源等
- if __name__ == '__main__':
- main()
|