| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451 |
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\Scripts\pip.exe
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\python.exe
- from flask import Flask, render_template, request, redirect, url_for, session, jsonify, send_from_directory
- from datetime import datetime
- import os
- import json
- import threading
- import paho.mqtt.client as mqtt
- import logging
- from functools import wraps
- import random
- import math
- import time
- import subprocess
- import platform as sys_platform
- import shutil
- # 自定义
- from pos_manager import frame
- # 应用版本信息
- APP_VERSION = '1.0.12'
- # 应用日志文件路径
- LOG_FILE = 'app.log'
- # 配置日志
- # 自定义日志格式化器,避免与Flask内部日志重复
- class CustomFormatter(logging.Formatter):
- def format(self, record):
- # 删除record中的asctime属性,避免重复记录时间
- if hasattr(record, 'asctime'):
- delattr(record, 'asctime')
- return super().format(record)
- # 创建处理器
- file_handler = logging.FileHandler(LOG_FILE, encoding='utf-8')
- stream_handler = logging.StreamHandler()
- # 设置格式器,只包含必要信息
- formatter = CustomFormatter('%(levelname)s: %(message)s')
- file_handler.setFormatter(formatter)
- stream_handler.setFormatter(formatter)
- # 配置根日志器
- logging.basicConfig(
- level=logging.INFO,
- handlers=[file_handler, stream_handler]
- )
- # 禁用Flask内部的请求日志
- werkzeug_logger = logging.getLogger('werkzeug')
- werkzeug_logger.setLevel(logging.WARNING)
- logger = logging.getLogger(__name__)
- # 记录应用启动时间
- app_start_time = datetime.now()
- app = Flask(__name__)
- app.secret_key = 'supersecretkey'
- app.config['UPLOAD_FOLDER'] = 'static/uploads'
- app.config['ALLOWED_EXTENSIONS'] = {'bin', 'zip', 'tar', 'rar'}
- app.config['DATA_FOLDER'] = 'data'
- app.config['APP_UPGREADE_FOLDER_TEMP'] = 'app_upgrade_temp'
- # 关键配置:禁用 ASCII 转义,确保中文正常显示
- app.config['JSON_AS_ASCII'] = False
- # 确保目录存在
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- os.makedirs(app.config['DATA_FOLDER'], exist_ok=True)
- # 数据文件路径
- USERS_PATH = os.path.join(app.config['DATA_FOLDER'], 'users.json')
- FIRMWARES_PATH = os.path.join(app.config['DATA_FOLDER'], 'firmwares.json')
- DEVICES_PATH = os.path.join(app.config['DATA_FOLDER'], 'devices.json')
- PLATFORMS_PATH = os.path.join(app.config['DATA_FOLDER'], 'platforms.json')
- # 全局变量
- # MQTT客户端管理
- mqtt_clients = {}
- mqtt_threads = {}
- mqtt_lock = threading.Lock()
- mqtt_sub_topic = [("cpyypt/up/2006/#", 0)]
- mqtt_pub_topic = "cpyypt/down/2006/0000000101"
- @app.route('/favicon.ico')
- def favicon():
- return send_from_directory("static", "favicon.ico", mimetype='image/vnd.microsoft.icon')
- @app.before_request
- def update_last_activity():
- mqtt_last_activity['last_time'] = time.time()
- # logger.info(f"mqtt_last_activity: {mqtt_last_activity}")
- # 获取应用运行时长
- @app.route('/api/run_time')
- def get_run_time():
- global app_start_time
- current_time = datetime.now()
- run_time = current_time - app_start_time
-
- # 计算年、月、日、时、分、秒
- days = run_time.days
- seconds = run_time.seconds
- hours, remainder = divmod(seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
-
- # 简化计算,假设每年365天,每月30天
- years = days // 365
- days_remaining = days % 365
- months = days_remaining // 30
- days = days_remaining % 30
-
- run_time_str = f"{years}年{months}月{days}日 {hours}时{minutes}分{seconds}秒"
- return jsonify({'run_time': run_time_str})
- # 上下文处理器
- @app.context_processor
- def inject_common_data():
- return {
- 'datetime': datetime,
- 'app_version': APP_VERSION
- }
- # 初始化数据
- def init_data():
- # 初始化用户数据
- if not os.path.exists(USERS_PATH):
- with open(USERS_PATH, 'w', encoding='utf-8') as f:
- json.dump({'admin': 'admin'}, f, ensure_ascii=False)
- # 初始化固件数据
- if not os.path.exists(FIRMWARES_PATH):
- init_firmwares = [
- {'id': 1, 'filename': 'pos.bin', 'upload_time': '2025-08-01 13:08:30', 'remark': '初始版本'},
- {'id': 2, 'filename': 'ITSF-POS V2.3.bin', 'upload_time': '2025-08-01 14:10:09', 'remark': '基础功能版'}
- ]
- with open(FIRMWARES_PATH, 'w', encoding='utf-8') as f:
- json.dump(init_firmwares, f, ensure_ascii=False, indent=2)
- # 初始化设备数据
- if not os.path.exists(DEVICES_PATH):
- init_devices = [
- {
- "id": 1,
- "dev_type": "2006",
- "dev_sn": "101",
- "firmware_ver": "pos12.bin",
- "ip": "192.168.1.101",
- "status": "正常",
- "online": "在线",
- "cloud_platform": "阿里云测试",
- "network_type": "WiFi",
- "wifi_ssid": "ssss_5G",
- "wifi_password": "eeeeeeeeee",
- "run_time":1000,
- "receive_sum":12,
- "send_sum":10,
- "abnormal_sum":2,
- "success_rate":83.33,
- "speed":83.33
- },
- {
- "id": 2,
- "dev_type": "2006",
- "dev_sn": "103",
- "firmware_ver": "pos1.bin",
- "ip": "192.168.1.102",
- "status": "正常",
- "online": "离线",
- "cloud_platform": "阿里云测试",
- "network_type": "WiFi",
- "wifi_ssid": "wbjw",
- "wifi_password": "2222",
- "run_time":1000,
- "receive_sum":12,
- "send_sum":10,
- "abnormal_sum":2,
- "success_rate":83.33,
- "speed":83.33
- }
- ]
- with open(DEVICES_PATH, 'w', encoding='utf-8') as f:
- json.dump(init_devices, f, ensure_ascii=False, indent=2)
- # 初始化云平台数据
- if not os.path.exists(PLATFORMS_PATH):
- platform_d = {
- "current_platform": "test_env",
- "platform_l": {
- "aliyun_prod_env": {
- "id": 1,
- "name": "阿里云生产",
- "status": "connected",
- "ip": "mqtt.cpyypt.cn",
- "port": 9000,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "cpyypt",
- "password": "1SvTlvm1VCawSzS"
- },
- "test_env": {
- "id": 2,
- "name": "阿里云测试",
- "status": "disconnected",
- "ip": "test-mqtt.cpyypt.cn",
- "port": 9000,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "admin",
- "password": "houjianwei"
- },
- "localize_a_env": {
- "id": 3,
- "name": "本地化A云平台",
- "status": "disconnected",
- "ip": "mqtt.localize_a_env.com",
- "port": 1883,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "localize_a_env_user",
- "password": "localize_a_env_pass"
- }
- }
- }
- # platform_d["platform_l"]["aliyun_prod_env"]["sub_topic"] = [("cpyypt/up/2006/#", 0)]
- # platform_d["platform_l"]["aliyun_prod_env"]["pub_topic"] = [("cpyypt/down/2006/#", 0)]
- # platform_d["platform_l"]["aliyun_prod_env"]["client_id"] = f"pos_mng_{random.randint(0, 100)}"
- with open(PLATFORMS_PATH, 'w', encoding='utf-8') as f:
- json.dump(platform_d, f, ensure_ascii=False, indent=2)
- # 数据加载函数
- def load_users():
- with open(USERS_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- def load_firmwares():
- with open(FIRMWARES_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- load_devices_first = True # 标记第1次加载
- def load_devices():
- if not os.path.exists(DEVICES_PATH):
- logger.info(f"设备配置文件不存在,初始化数据: {DEVICES_PATH}")
- init_data()
- with open(DEVICES_PATH, 'r', encoding='utf-8') as f:
- devices = json.load(f)
- # 程序启动时,强制将所有所有统计信息为0
- global load_devices_first
- if load_devices_first :
- load_devices_first = False # 第1次加载完成,标记为False
- for device in devices:
- device['online'] = '离线'
- device['realtime'] = 0
- device['run_time'] = 0
- device['receive_sum'] = 0
- device['send_sum'] = 0
- device['abnormal_sum'] = 0
- device['success_rate'] = 0
- device['speed'] = 0
-
- # 保存强制更新后的状态
- save_devices(devices)
-
- return devices
-
- load_platforms_first = True # 标记第1次加载
- def load_platforms():
- try:
- if not os.path.exists(PLATFORMS_PATH):
- logger.info(f"云平台配置文件不存在,初始化数据: {PLATFORMS_PATH}")
- init_data()
-
- # logger.info(f"加载云平台配置文件: {PLATFORMS_PATH}")
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f:
- file_content = f.read().strip()
- if not file_content:
- logger.error("云平台配置文件为空")
- # 如果文件为空,重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f_new:
- platform_d = json.load(f_new)
- else:
- try:
- platform_d = json.loads(file_content)
- except json.JSONDecodeError as e:
- logger.error(f"解析云平台配置文件失败: {e}")
- # 尝试重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f_new:
- platform_d = json.load(f_new)
-
- # 程序启动时,强制将所有云平台状态设置为已断开
- global load_platforms_first
- if load_platforms_first and 'platform_l' in platform_d:
- load_platforms_first = False # 第1次加载完成,标记为False
- for platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- platform_d['platform_l'][platform_id]['client_id'] = "pos_mng_%s_%d" % (os.environ['COMPUTERNAME'], random.randint(0, 100))
-
- # 保存强制更新后的状态
- save_platforms(platform_d)
-
- return platform_d
- except Exception as e:
- logger.error(f"加载云平台配置时发生异常: {e}")
- # 尝试返回一个默认值或重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- return platform_d
- def load_platforms_c():
- platform_d = load_platforms()
- platform_c = {}
- for platform_id in platform_d['platform_l']:
- platform_c[platform_id] = {}
- platform_c[platform_id]['name'] = platform_d['platform_l'][platform_id]['name']
- platform_c[platform_id]['status'] = platform_d['platform_l'][platform_id]['status']
- platform_c[platform_id]['ip'] = platform_d['platform_l'][platform_id]['ip']
- return platform_c
- # 数据保存函数
- def save_users(users):
- with open(USERS_PATH, 'w', encoding='utf-8') as f:
- json.dump(users, f, ensure_ascii=False, indent=2)
- def save_firmwares(firmwares):
- with open(FIRMWARES_PATH, 'w', encoding='utf-8') as f:
- json.dump(firmwares, f, ensure_ascii=False, indent=2)
- def save_devices(devices):
- with open(DEVICES_PATH, 'w', encoding='utf-8') as f:
- json.dump(devices, f, ensure_ascii=False, indent=2)
- def save_platforms(platform_d):
- with open(PLATFORMS_PATH, 'w', encoding='utf-8') as f:
- json.dump(platform_d, f, ensure_ascii=False, indent=2)
- # MQTT回调函数
- def on_connect(client, userdata, flags, rc):
- platform_id = userdata['platform_id']
- logger.info(f"MQTT连接成功 - 云平台: {platform_id}, 结果代码: {rc}")
- pub_topic = userdata['pub_topic']
- client.subscribe(mqtt_sub_topic)
- logger.info(f"已订阅主题: {mqtt_sub_topic}")
- with mqtt_lock:
- platform_d = load_platforms()
- if platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'connected'
- save_platforms(platform_d)
- def on_disconnect(client, userdata, rc):
- platform_id = userdata['platform_id']
- logger.info(f"MQTT断开连接 - 云平台: {platform_id}, 结果代码: {rc}")
- with mqtt_lock:
- platform_d = load_platforms()
- if platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- save_platforms(platform_d)
- def on_message(client, userdata, msg):
- platform_id = userdata['platform_id']
- try:
- payload_str = msg.payload.decode('gb2312')
- except UnicodeDecodeError:
- try:
- payload_str = msg.payload.decode('utf-8')
- except UnicodeDecodeError:
- payload_str = msg.payload.hex().upper()
- # logger.info(f"收到消息 - 云平台: {platform_id}, 主题: {msg.topic}, 内容: {payload_str}")
- # 如果长时间未操作,自动断开MQTT连接
- platform_d = load_platforms()
- # """
- get_platform_status = next((value['status'] for key, value in platform_d['platform_l'].items() if key == platform_id), None)
- if get_platform_status == 'connected':
- mqtt_disconnect_time_out = 5*60
- if time.time() - mqtt_last_activity['last_time'] > mqtt_disconnect_time_out: # 5分钟无操作
- logger.info(f"云平台 {platform_id} 长时间未操作,超过{mqtt_disconnect_time_out}秒,自动断开连接")
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- stop_mqtt_client(platform_id)
- logger.info(f"云平台 {platform_id} 自动断开连接,已完成")
- return
- # """
-
- dev_type = msg.topic.split("/")[-2]
- dev_sn = msg.topic.split("/")[-1].lstrip('0')
- devices = load_devices()
- device = next((item for item in devices if item['dev_sn'] == dev_sn), None)
- if device:
- if(device['online'] != '在线'):
- logger.info(f"云平台:{platform_id}, 设备 {dev_sn} 上线")
- device['online'] = '在线'
- device['realtime'] = int(time.time())
- platform_name = next((value['name'] for key, value in platform_d['platform_l'].items() if key == platform_id), None)
- device['cloud_platform'] = platform_name
- save_devices(devices)
-
- if type(msg.payload)== bytes and len(msg.payload) > 4 and payload_str[0:4] == "FEFE":
- # if type(msg.payload)== bytes and len(msg.payload) > 4 and msg.payload[0:2] == 0xFEFE:
- try:
- header, body = frame.parse_data(client, msg.payload, msg.topic)
- if len(header) > 0 and len(body) > 0:
- pro_ver = header.get('pro_ver', None)
- msg_type1 = header.get('msg_type1', None)
- msg_type2 = header.get('msg_type2', None)
- r_body = {}
- r_body['master_type'] = body.get('master_type', None)
- r_body['master_sn'] = body.get('master_sn', None)
- r_body['app_version'] = body.get('app_version', None)
- r_body['reset_times'] = body.get('reset_times', None)
- r_body['last_reset_type'] = body.get('last_reset_type', None)
- r_body['uuid'] = body.get('uuid', None)
- r_body['run_time'] = body.get('run_time', None)
- r_body['receive_sum'] = body.get('receive_sum', None)
- r_body['send_sum'] = body.get('send_sum', None)
- r_body['network_type'] = body.get('network_type', None)
- r_body['cloud_platform'] = body.get('cloud_platform', None)
- r_body['sim_status'] = body.get('sim_status', None)
- r_body['free_fifo'] = body.get('free_fifo', None)
- r_body['ip'] = body.get('ip', None)
- # 至少有1个不为None,则继续
- if any(r_body.values()):
- # 先整理一下字段
- if r_body['network_type'] is not None:
- if r_body['network_type'] == 0:
- r_body['network_type'] = 'WiFi'
- elif r_body['network_type'] == 1:
- r_body['network_type'] = '有线网'
- else:
- r_body['network_type'] = '未知'
- if r_body['cloud_platform'] is not None:
- if r_body['cloud_platform'] == 0:
- r_body['cloud_platform'] = '阿里云生产'
- elif r_body['cloud_platform'] == 1:
- r_body['cloud_platform'] = '阿里云测试'
- else:
- r_body['cloud_platform'] = '未知'
- # 有变化,才更新
- is_changed = False
- for k, v in r_body.items():
- if v is not None:
- if k not in device or device[k] != v:
- is_changed = True
- device[k] = v
- if is_changed:
- logger.info(f"云平台:{platform_id}, 设备 {dev_sn} 接收到MQTT消息,状态更新")
- save_devices(devices)
- except Exception as e:
- logger.error(f"解析数据失败 - 云平台: {platform_id}, 错误: {str(e)}")
- # MQTT客户端线程
- def mqtt_client_thread(platform_id):
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- logger.error(f"云平台 {platform_id} 不存在")
- return
- platform = platform_d['platform_l'][platform_id]
- client = mqtt.Client(client_id=platform['client_id'])
- client.username_pw_set(platform['username'], platform['password'])
-
- # platform['pub_topic'],
- # platform['sub_topic']
- client.user_data_set({
- 'platform_id': platform_id,
- 'pub_topic': mqtt_pub_topic,
- 'sub_topic': mqtt_sub_topic
- })
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- with mqtt_lock:
- mqtt_clients[platform_id] = client
- try:
- client.connect(platform['ip'], platform['port'], 60)
- client.loop_forever()
- except Exception as e:
- logger.error(f"MQTT线程错误 - 云平台: {platform_id}, 错误: {str(e)}")
- finally:
- # 确保客户端已断开连接
- try:
- client.disconnect()
- except Exception as e:
- logger.error(f"MQTT线程结束时断开连接失败 - 云平台: {platform_id}, 错误: {str(e)}")
-
- with mqtt_lock:
- if platform_id in mqtt_clients:
- del mqtt_clients[platform_id]
- if platform_id in mqtt_threads:
- del mqtt_threads[platform_id]
- try:
- client.disconnect()
- except:
- pass
- # MQTT控制函数
- def start_mqtt_client(platform_id):
- with mqtt_lock:
- if platform_id in mqtt_threads and mqtt_threads[platform_id].is_alive():
- logger.info(f"MQTT客户端已运行 - 云平台: {platform_id}")
- return True
- thread = threading.Thread(target=mqtt_client_thread, args=(platform_id,), daemon=True)
- with mqtt_lock:
- mqtt_threads[platform_id] = thread
- thread.start()
- logger.info(f"启动MQTT客户端线程 - 云平台: {platform_id}")
- return True
- mqtt_last_activity = {'last_time':time.time()}
- # 定期检查所有MQTT连接状态
- def check_all_mqtt_connections():
- global mqtt_status_check_timer
- try:
- with mqtt_lock:
- platform_d = load_platforms()
- if 'platform_l' not in platform_d:
- return
- for platform_id in platform_d['platform_l']:
- # 检查MQTT线程是否存活
- # is_connected = platform_id in mqtt_threads and mqtt_threads[platform_id].is_alive()
- is_connected = platform_id in mqtt_clients and mqtt_clients[platform_id].is_connected()
- current_status = platform_d['platform_l'][platform_id].get('status', '')
- # logger.info(f"检查云平台状态 - 云平台: {platform_id}, 当前状态: {current_status}, 是否连接: {is_connected}")
- # 如果状态不一致,则更新
- if (is_connected and current_status != 'connected') or (not is_connected and current_status != 'disconnected'):
- logger.info(f"参数:platform_id={platform_id}, is_connected={is_connected}, current_status ={current_status}")
- platform_d['platform_l'][platform_id]['status'] = 'connected' if is_connected else 'disconnected'
- logger.info(f"更新云平台状态 - 云平台: {platform_id}, 新状态: {platform_d['platform_l'][platform_id]['status']}")
- save_platforms(platform_d)
- except Exception as e:
- logger.error(f"检查MQTT连接状态出错: {str(e)}")
- finally:
- # 设置下一次检查(2秒后)
- mqtt_status_check_timer = threading.Timer(2, check_all_mqtt_connections)
- mqtt_status_check_timer.daemon = True
- mqtt_status_check_timer.start()
- # 启动定时检查
- mqtt_status_check_timer = None
- def start_mqtt_status_check():
- global mqtt_status_check_timer
- if mqtt_status_check_timer is not None:
- mqtt_status_check_timer.cancel()
- check_all_mqtt_connections()
- def stop_mqtt_client(platform_id):
- logger.info(f"断开MQTT连接 - 云平台: {platform_id}")
- client = None
- # 先获取客户端引用,尽量缩短锁的持有时间
- with mqtt_lock:
- if platform_id in mqtt_clients:
- client = mqtt_clients[platform_id]
- # 从字典中移除,避免其他线程访问
- del mqtt_clients[platform_id]
- # 在锁外执行disconnect,避免死锁
- if client:
- try:
- client.disconnect()
- logger.info(f"断开MQTT连接 - 云平台: {platform_id}")
- except Exception as e:
- logger.error(f"断开MQTT连接失败 - 云平台: {platform_id}, 错误: {str(e)}")
- with mqtt_lock:
- if platform_id in mqtt_threads:
- logger.info(f"等待MQTT线程结束 - 云平台: {platform_id}")
- return True
- def publish_mqtt_message(platform_id, message, pub_topic=mqtt_pub_topic):
- with mqtt_lock:
- if platform_id not in mqtt_clients:
- logger.error(f"MQTT客户端未连接 - 云平台: {platform_id}")
- return False
- client = mqtt_clients[platform_id]
- platform_d = load_platforms()
- # platform = platform_d['platform_l'].get(platform_id, {})
- # pub_topic = mqtt_pub_topic # platform.get('pub_topic', '/device/data')
- try:
- result = client.publish(pub_topic, message, qos=0)
- result.wait_for_publish()
- if(type(message) == str):
- logger.info(f"发布消息成功 - 云平台: {platform_id}, 主题: {pub_topic}, 消息: {message}")
- else:
- logger.info(f"发布消息成功 - 云平台: {platform_id}, 主题: {pub_topic}, 消息: {message.hex().upper()}")
- return True
- except Exception as e:
- logger.error(f"发布消息失败 - 云平台: {platform_id}, 错误: {str(e)}")
- return False
- # 辅助函数
- def allowed_file(filename):
- return '.' in filename and \
- filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
- # 登录验证装饰器
- def login_required(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
- if 'logged_in' not in session:
- return redirect(url_for('login', next=request.url))
- return f(*args, **kwargs)
- return decorated_function
- # 路由
- @app.route('/')
- @login_required
- def index():
- platform_d = load_platforms()
- return render_template(
- 'index.html',
- title='主页',
- # cccc platform_l=platform_d['platform_l'],
- platform_c=load_platforms_c(),
- current_platform=platform_d['current_platform']
- )
- @app.route('/api/device_detail/<dev_sn>')
- @login_required
- def get_device_detail(dev_sn):
- devices = load_devices()
- device = next((d for d in devices if d['dev_sn'] == dev_sn), None)
- if not device:
- return jsonify({'error': '设备不存在'}), 404
- device['abnormal_sum'] = int(device['receive_sum']) - int(device['send_sum'])
- device['success_rate'] = 0 if int(device['receive_sum']) == 0 else int(device['send_sum']) / int(device['receive_sum'])
- device['speed'] = 0 if int(device['run_time']) == 0 else int(device['send_sum']) / int(device['run_time'] * 100)
- return jsonify(device)
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form['username']
- password = request.form['password']
- users = load_users()
- if username in users and users[username] == password:
- session['logged_in'] = True
- session['username'] = username
- next_page = request.args.get('next', url_for('index'))
- return redirect(next_page)
- else:
- error = '用户名或密码错误'
- return render_template('login.html', error=error)
- return render_template('login.html', title='用户登录')
- @app.route('/logout')
- def logout():
- session.pop('logged_in', None)
- session.pop('username', None)
- return redirect(url_for('login'))
- @app.route('/app_manage')
- @login_required
- def app_manage():
- logger.info("进入应用管理页面")
- return render_template('app_manage.html', title='应用管理')
- # 应用管理相关API
- @app.route('/api/restart_app', methods=['POST'])
- @login_required
- def restart_app():
- try:
- logger.info("接收到重启应用请求")
- # 在实际应用中,这里需要实现安全的重启逻辑
- # 这里只是模拟重启
- time.sleep(2) # 模拟重启过程
- s_p = sys_platform.system().upper()
- if s_p == 'WINDOWS':
- os.system('python restart_app.py')
- elif s_p == 'LINUX':
- os.system('./sh_web_app_2006_Svr.sh restart')
- return jsonify({"status": "restarting"}), 200
- # logger.info("应用重启成功")
- # return jsonify({'success': True, 'message': '应用重启成功,请等待服务重新启动...'})
- except Exception as e:
- logger.error(f"应用重启失败: {str(e)}")
- return jsonify({'success': False, 'message': f'重启失败: {str(e)}'})
- @app.route('/api/update_app', methods=['POST'])
- @login_required
- def update_app():
- try:
- logger.info("接收到更新应用请求")
- if 'app_file' not in request.files:
- return jsonify({'success': False, 'message': '没有文件被上传'})
- file = request.files['app_file']
- if file.filename == '':
- return jsonify({'success': False, 'message': '没有选择文件'})
- # 检查文件类型
- if not (file.filename.endswith('.py') or (file.filename.endswith('.html') or file.filename.endswith('.zip'))):
- return jsonify({'success': False, 'message': '只支持Python文件(.py)、HTML文件(.html)或压缩包(.zip)'})
- # 保存文件到临时目录
- temp_dir = app.config['APP_UPGREADE_FOLDER_TEMP']
- os.makedirs(temp_dir, exist_ok=True)
- bk_dir = 'app_upgrade_bk'
- os.makedirs(bk_dir, exist_ok=True)
-
- if file.filename == 'web_app_2006.py':
- shutil.copy2(file.filename, os.path.join(bk_dir, file.filename+f'_{APP_VERSION}_bk_{time.time()}'))
- file.save(file.filename)
- elif file.filename == 'frame.py':
- shutil.copy2(os.path.join('pos_manager', file.filename), os.path.join(bk_dir, file.filename+f'_bk_{time.time()}'))
- file.save(os.path.join('pos_manager', file.filename))
- elif file.filename.endswith('.html') :
- shutil.copy2(os.path.join('templates', file.filename), os.path.join(bk_dir, file.filename+f'_bk_{time.time()}'))
- file.save(os.path.join('templates', file.filename))
- elif file.filename.endswith('.zip') :
- file.save(os.path.join(temp_dir, file.filename))
-
- '''
- # 解压文件
- with zipfile.ZipFile(os.path.join(temp_dir, file.filename), 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
- # 移动解压后的文件到目标目录
- for item in os.listdir(temp_dir):
- s = os.path.join(temp_dir, item)
- d = os.path.join(app.config['APP_UPGREADE_FOLDER'], item)
- if os.path.isdir(s):
- shutil.move(s, d)
- else:
- shutil.copy2(s, d)
- # 删除临时文件
- os.remove(os.path.join(temp_dir, file.filename))
- '''
- # 在实际应用中,这里需要实现安全的更新逻辑
- # 例如验证文件完整性、备份原有文件、应用更新等
- logger.info(f"文件 {file.filename} 上传成功")
- return jsonify({'success': True, 'message': f'文件上传成功,重启后生效. {file.filename}, {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'})
- except Exception as e:
- logger.error(f"应用更新失败: {str(e)}")
- return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
- @app.route('/api/get_logs')
- @login_required
- def get_logs():
- try:
- lines = request.args.get('lines', '50')
- # logger.info(f"请求获取日志,行数: {lines}")
- get_log_file = LOG_FILE
- s_p = sys_platform.system().upper()
- '''
- if s_p == 'WINDOWS':
- get_log_file = LOG_FILE
- elif s_p == 'LINUX':
- get_log_file = 'log.log'
- '''
- # 检查日志文件是否存在
- if not os.path.exists(get_log_file):
- return f"日志文件 {get_log_file} 不存在"
- # 读取日志文件
- with open(get_log_file, 'r', encoding='utf-8') as f:
- if lines == 'all':
- logs = ''.join(f.readlines()[-500:]) # 最大500行
- else:
- try:
- line_count = int(lines)
- logs = ''.join(f.readlines()[-line_count:])
- except ValueError:
- logs = ''.join(f.readlines()[-50:]) # 默认50行
- return logs
- except Exception as e:
- logger.error(f"获取日志失败: {str(e)}")
- return f"获取日志失败: {str(e)}"
- @app.route('/firmware', methods=['GET', 'POST'])
- @login_required
- def firmware():
- firmwares = load_firmwares()
- if request.method == 'POST':
- if 'file' not in request.files:
- error = '未选择文件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- file = request.files['file']
-
- remark = request.form.get('remark', '').strip()
- if file.filename == '':
- error = '未选择文件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if not remark:
- error = '备注信息不能为空'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if file and allowed_file(file.filename):
- filename = file.filename
-
- # 检查文件名是否重复
- if any(f['filename'] == filename for f in firmwares):
- error = f'文件名 "{filename}" 已存在,请更改文件名后再上传'
- return render_template('firmware.html', firmwares=firmwares, error=error)
-
- # 检查文件大小
- file.seek(0, os.SEEK_END) # 移动指针到文件末尾
- file_size = file.tell() # 获取指针位置(即文件大小)
- file.seek(0) # 重置指针到文件开头
- # 打印文件大小
- logger.info(f"文件名: {filename}, 文件大小: {file_size} 字节")
- if file_size > 1 * 1024 * 1024: # 1MB
- error = f'文件大小不能超过1MB, 当前文件大小: {file_size} 字节'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if file_size < 1 * 1024: # 1KB
- error = f'文件大小不能小于1KB, 当前文件大小: {file_size} 字节'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
- new_id = max(f['id'] for f in firmwares) + 1 if firmwares else 1
- firmwares.append({
- 'id': new_id,
- 'filename': filename,
- 'filesize': '%d 字节' % file_size,
- 'upload_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'remark': remark
- })
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理编辑操作
- edit_id = request.args.get('edit')
- if edit_id:
- new_remark = request.args.get('remark', '').strip()
- if new_remark:
- for f in firmwares:
- if f['id'] == int(edit_id):
- f['remark'] = new_remark
- break
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理删除操作
- delete_id = request.args.get('delete')
- if delete_id:
- firmwares = [f for f in firmwares if f['id'] != int(delete_id)]
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理下载操作
- download_id = request.args.get('download')
- if download_id:
- firmware = next((f for f in firmwares if f['id'] == int(download_id)), None)
- if firmware:
- filename = firmware['filename']
- file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
- if os.path.exists(file_path):
- return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
- else:
- error = f'文件 "{filename}" 不存在'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- else:
- error = f'未找到ID为 {download_id} 的固件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- return render_template('firmware.html', firmwares=firmwares, title='固件包管理')
- @app.route('/devices', methods=['GET', 'POST'])
- @login_required
- def devices():
- devices = load_devices()
- platform_l = load_platforms()['platform_l']
- firmwares = load_firmwares()
- if request.method == 'POST':
- # 处理设备配置保存
- if 'save_config' in request.form:
- config_type = request.form['config_type']
- dev_sn = request.form['dev_sn']
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- if config_type == 'all':
- # 处理保存所有配置
- target_device_wifi_ssid = request.form.get('ssid', '')
- target_device_wifi_password = request.form.get('wifi_password', '')
- target_device_cloud_platform = request.form.get('cloud-platform', '')
- target_device_network_type = request.form.get('network-type', '')
- log_msg = f"{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)}:"
- log_msg = log_msg + f"WiFi参数:{target_device_wifi_ssid},{target_device_wifi_password};"
- log_msg = log_msg + f"云平台参数:{target_device_cloud_platform};"
- log_msg = log_msg + f"网络类型参数:{target_device_network_type}"
-
- logger.info(log_msg)
- logger.info("TODO: 发送保存所有配置到设备")
- elif config_type == 'wifi':
- target_device['wifi_ssid'] = request.form.get('ssid', '')
- target_device['wifi_password'] = request.form.get('wifi_password', '')
- log_msg = f"WiFi参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['wifi_ssid']},{target_device['wifi_password']}"
- logger.info(log_msg)
- logger.info("TODO: 发送WiFi配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_wifi(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
-
- elif config_type == 'platform':
- log_msg = f"云平台参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['cloud_platform']}"
- logger.info(log_msg)
- logger.info("TODO: 发送云平台配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_platform(device=target_device,data=request.form.get('cloud-platform', ''))
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
- target_device['cloud_platform'] = request.form.get('cloud-platform', '')
- elif config_type == 'network':
- target_device['network_type'] = request.form.get('network-type', '')
- log_msg = f"网络类型参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['network_type']}"
- logger.info(log_msg)
- logger.info("TODO: 发送网络类型配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_network(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
- save_devices(devices)
- return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=True,
- message="操作成功,配置已下发!"
- )
- # 处理批量操作
- action = request.form.get('action')
- selected_sns = [dev_sn for dev_sn in request.form.getlist('dev_sn')]
- if action == 'restart':
- # 实际应用中这里会发送重启命令到设备
- logger.info(f"批量重启设备: {selected_sns}")
-
- for dev_sn in selected_sns:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_restart(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success:
- logger.error(f"消息发布失败,MQTT客户端未连接 -- {load_platforms_c()[platform_id]['name']},请返回主页 【连接】!")
-
- else:
- logger.info(f"消息发布成功 -- dev_sn:{dev_sn}")
- elif action == 'upgrade':
- firmware_id = request.form.get('firmware_id')
- firmware = next((fm for fm in firmwares if fm['id'] == int(firmware_id)), None)
- logger.info(f"批量升级设备 {selected_sns} 到固件 {firmware['filename']}")
-
- if not firmware:
- return jsonify({'status': 'error', 'message': '固件信息不存在'})
-
-
- # 获取文件大小
- bin_file=os.path.join(app.config['UPLOAD_FOLDER'], firmware['filename'])
- try:
- file_size = os.path.getsize(bin_file)
- logger.info(f"文件大小: {file_size} 字节")
- except FileNotFoundError:
- logger.error(f"错误: 文件 '{bin_file}' 不存在")
- return jsonify({'status': 'error', 'message': f'文件{bin_file}名不存在'})
- # 找到第1个被升级设备
- device = next((device for device in devices if device['dev_sn'] == selected_sns[0]), None)
- dev_platform = device.get('cloud_platform', '')
- platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == dev_platform), None)
- with open(bin_file, 'rb') as f:
- bin_data = f.read()
-
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['current_sn'] = ''
- dev_upgrade_dict['dev_type'] = device.get('dev_type', '2006')
- dev_upgrade_dict['dev_upgrade_sns'] = selected_sns
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['bin_data'] = bin_data
- dev_upgrade_dict['bin_len'] = file_size
- dev_upgrade_dict['pkg_size'] = 512
- dev_upgrade_dict['pkg_total'] = math.ceil(dev_upgrade_dict['bin_len'] / dev_upgrade_dict['pkg_size']) # 向上取整
- dev_upgrade_dict['pkg_cnt'] = 0
-
- dev_upgrade_dict['platform_id'] = platform_id
- start_dev_upgrade_check()
- return jsonify({'status': 'success', 'message': '升级任务已启动'})
- elif action == 'config':
- logger.info(f"批量配置设备: {selected_sns}")
- # 显示设备配置页面
- dev_sn = request.args.get('dev_sn')
- if dev_sn:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- title='设备配置'
- )
- return render_template('devices.html', devices=devices, title='解密机配置')
- @app.route('/device_detail/<dev_sn>')
- @login_required
- def device_detail(dev_sn):
- devices = load_devices()
- device = next((d for d in devices if d['dev_sn'] == dev_sn), None)
- if not device:
- return '设备不存在', 404
- return render_template('device_detail.html', device=device, title='设备详情')
- @app.route('/api/get_firmwares')
- @login_required
- def get_firmwares():
- firmwares = load_firmwares()
- return jsonify(firmwares)
- # 云平台相关API
- @app.route('/api/select_platform', methods=['POST'])
- @login_required
- def select_platform():
- platform_id = request.json.get('platform_id')
- if not platform_id:
- return jsonify({'status': 'error', 'message': '请选择云平台'})
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({'status': 'error', 'message': '云平台不存在'})
- platform_d['current_platform'] = platform_id
- save_platforms(platform_d)
- selected_platform = platform_d['platform_l'][platform_id]
- return jsonify({
- 'status': 'success',
- 'platform': {
- 'id': platform_id,
- 'name': selected_platform['name'],
- 'status': selected_platform['status']
- }
- })
- @app.route('/api/toggle_connection', methods=['POST'])
- @login_required
- def toggle_connection():
- platform_id = request.json.get('platform_id')
- if not platform_id:
- return jsonify({'status': 'error', 'message': '请指定云平台'})
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({'status': 'error', 'message': '云平台不存在'})
- current_platform = platform_d['platform_l'][platform_id]
- if current_platform['status'] == 'connected':
- stop_mqtt_client(platform_id)
- new_status = 'disconnected'
- logger.info(f"云平台 {platform_id} 已断开")
- else:
- start_mqtt_client(platform_id)
- new_status = 'connected'
- logger.info(f"云平台 {platform_id} 已连接")
- current_platform['status'] = new_status
- save_platforms(platform_d)
- return jsonify({
- 'status': 'success',
- 'new_status': new_status,
- 'platform_name': current_platform['name']
- })
- @app.route('/api/get_platform_status', methods=['GET'])
- @login_required
- def get_platform_status():
- platform_id = request.args.get('platform_id')
- if not platform_id:
- return jsonify({"status": "error", "message": "未指定云平台ID"}), 400
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({"status": "error", "message": "云平台不存在"}), 404
- return jsonify({
- "status": "success",
- "platform": platform_d['platform_l'][platform_id]
- })
- @app.route('/api/publish_message', methods=['POST'])
- @login_required
- def publish_message():
- data = request.json
- platform_id = data.get('platform_id')
- message = data.get('message')
- if not platform_id or not message:
- return jsonify({"status": "error", "message": "云平台ID和消息内容都不能为空"}), 400
- success = publish_mqtt_message(platform_id, message)
- return jsonify({
- "status": "success" if success else "error",
- "message": "消息发布成功" if success else "消息发布失败"
- })
- # .......................
- def publish_message_dev(platform_id, data, pub_topic):
-
- if not platform_id or not data or not pub_topic:
- # return jsonify({"status": "error", "message": "云平台ID、消息内容、发布主题 都不能为空"}), 400
- logger.error(f"publish_message_dev: platform_id={platform_id}, data={data}, pub_topic={pub_topic}")
- return False
-
- success = publish_mqtt_message(platform_id, data, pub_topic=pub_topic)
- # if not success:
- # logger.error(f"publish_message_dev: platform_id={platform_id}, data={data}, pub_topic={pub_topic}, success={success}")
- return success
- # 定期检查设备状态
- dev_upgrade_dict = {
- 'status' : 'idel',
- 'current_sn' : '',
- 'dev_type' : '2006',
- 'interval' : 1,
- 'bin_data' : None,
- 'bin_len' : 0,
- 'pkg_size' : 512,
- 'pkg_total' : 0,
- 'pkg_cnt':0,
- 'remain':0,
- 'platform_id' : None,
- 'dev_upgrade_sns' : []
- }
- def dev_upgrade_dict_clr():
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['current_sn'] = ''
- dev_upgrade_dict['dev_type'] = '2006'
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['bin_data'] = None
- dev_upgrade_dict['bin_len'] = 0
- dev_upgrade_dict['pkg_size'] = 512
- dev_upgrade_dict['pkg_total'] = 0
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = 0
- dev_upgrade_dict['dev_upgrade_sns'] = []
- def dev_upgrade_check():
- devices = load_devices()
- if dev_upgrade_dict['current_sn'] == '':
- if len(dev_upgrade_dict['dev_upgrade_sns']) > 0:
- dev_upgrade_dict['current_sn'] = dev_upgrade_dict['dev_upgrade_sns'][0]
- else:
- dev_upgrade_dict['current_sn'] = ''
- else:
- device = next((device for device in devices if device['dev_sn'] == dev_upgrade_dict['current_sn']), None)
- # 此处为状态机,需要完善一下,可用。dev_upgrade_dict['status']
- # 升级 状态
- if dev_upgrade_dict['status'] == 'idel':
- logger.info(f'idel,do somethime,{device["dev_sn"]}')
- dev_upgrade_dict['status'] = 'reqeuest'
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = dev_upgrade_dict['bin_len']
- # 请求 状态
- elif dev_upgrade_dict['status'] == 'reqeuest':
- logger.info(f'request,do somethime,{device["dev_sn"]}')
- dev_upgrade_dict['status'] = 'upgrade'
- pub_data = frame.get_msg_upgrade_1001(device=device)
- pub_topic = f"cpyypt/down/{device['dev_type'].zfill(4)}/{device['dev_sn'].zfill(10)}"
- platform_id = load_platforms()['current_platform']
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
- if not success:
- dev_upgrade_dict_clr()
- logger.error(f"dev update failed: platform_id={device['cloud_platform']}, sn={device['dev_sn']}")
- return
- else:
- # time.sleep(2)
- dev_upgrade_dict['status'] = 'upgrade'
- dev_upgrade_dict['interval'] = 0.1
- # 升级 状态
- elif dev_upgrade_dict['status'] == 'upgrade':
- logger.info(f'upgrade,do somethime,{device["dev_sn"]},{dev_upgrade_dict["pkg_cnt"]}/{dev_upgrade_dict["pkg_total"]}')
- if dev_upgrade_dict['remain'] > 0:
- data = dev_upgrade_dict['bin_data'][
- dev_upgrade_dict['pkg_cnt'] * dev_upgrade_dict['pkg_size']: (dev_upgrade_dict['pkg_cnt'] + 1) *
- dev_upgrade_dict['pkg_size']]
- pub_data = frame.get_msg_upgrade_1030(device=device,
- fileSum=dev_upgrade_dict['bin_len'],
- maxPkgId=dev_upgrade_dict['pkg_total'] - 1,
- curPkgId=dev_upgrade_dict['pkg_cnt'],
- curPkgsize=len(data),
- data=data)
- pub_topic = f"cpyypt/down/{device['dev_type'].zfill(4)}/{device['dev_sn'].zfill(10)}"
- platform_id = load_platforms()['current_platform']
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
- if not success:
- dev_upgrade_dict_clr()
- logger.error(f"dev update failed: platform_id={device['cloud_platform']}, sn={device['dev_sn']}")
- return
- # time.sleep(0.1)
- dev_upgrade_dict['status'] = 'upgrade'
- else:
- dev_upgrade_dict['status'] = 'finish'
- dev_upgrade_dict['pkg_cnt'] += 1
- dev_upgrade_dict['remain'] = dev_upgrade_dict['bin_len'] - dev_upgrade_dict['pkg_cnt'] * dev_upgrade_dict[
- 'pkg_size']
- # 完成 状态
- elif dev_upgrade_dict['status'] == 'finish':
- logger.info(f'finish,do somethime,{device["dev_sn"]}')
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = 0
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['dev_upgrade_sns'].remove(dev_upgrade_dict['current_sn'])
- if len(dev_upgrade_dict['dev_upgrade_sns']) > 0:
- dev_upgrade_dict['current_sn'] = dev_upgrade_dict['dev_upgrade_sns'][0]
- else:
- dev_upgrade_dict['current_sn'] = ''
- save_devices(devices)
- if dev_upgrade_dict['current_sn'] == '' and len(dev_upgrade_dict['dev_upgrade_sns']) == 0:
- logger.info('dev_upgrade_check exit')
- else:
- dev_upgrade_check_timer = threading.Timer(dev_upgrade_dict['interval'], dev_upgrade_check)
- dev_upgrade_check_timer.daemon = True
- dev_upgrade_check_timer.start()
- # 启动定时检查 升级状态
- dev_upgrade_check_timer = None
- def start_dev_upgrade_check():
- global dev_upgrade_check_timer
- if dev_upgrade_check_timer is not None:
- dev_upgrade_check_timer.cancel()
- dev_upgrade_check()
- # 启动定时检查 在线状态
- dev_online_check_timer = None
- def dev_online_check():
- devices = load_devices()
- for device in devices:
- if device['online'] == '在线':
- if(int(time.time()) - device.get('realtime',0)) > 10:
- device['online'] = '离线'
- logger.warning(f"设备离线: 云平台:{device['cloud_platform']}, sn={device['dev_sn']}")
- dev_online_check_timer = threading.Timer(5, dev_online_check)
- dev_online_check_timer.daemon = True
- dev_online_check_timer.start()
- save_devices(devices)
- def start_dev_online_check():
- global dev_online_check_timer
- if dev_online_check_timer is not None:
- dev_online_check_timer.cancel()
- dev_online_check()
- # 启动定时检查 长时间未有客户端登录,MQTT连接自动断开
- mqtt_disconnect_check_timer = None
- def mqtt_disconnect_check():
- logger.info(f"mqtt_last_activity: {mqtt_last_activity}")
- def start_mqtt_disconnect_check():
- global mqtt_disconnect_check_timer
- if mqtt_disconnect_check_timer is not None:
- mqtt_disconnect_check_timer.cancel()
- mqtt_disconnect_check()
- if __name__ == '__main__':
- logger.info(f"APP_VERSION: {APP_VERSION}")
- init_data()
- # 启动 MQTT连接状态定时检查
- start_mqtt_status_check()
- # 启动 设备状态定时检查
- # start_dev_upgrade_check()
- # 启动 设备在线状态定时检查
- start_dev_online_check()
- # 启动 MQTT自动断开检查
- # start_mqtt_disconnect_check()
- app.run(host='0.0.0.0', port=9082, debug=False, threaded=True, use_reloader=False) # 启用多线程提高性能
|