| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651 |
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\Scripts\pip.exe
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\python.exe
- from flask import Flask, render_template, request, redirect, url_for, session, jsonify, send_from_directory
- from datetime import datetime, timedelta
- import os
- import json
- import threading
- import paho.mqtt.client as mqtt
- import logging
- from functools import wraps
- import random
- import math
- import time
- import subprocess
- import platform as sys_platform
- import shutil
- import traceback
- # 自定义
- from pos_manager import frame
- # 应用版本信息
- APP_VERSION = '1.0.18'
- # 应用日志文件路径
- LOG_FILE = 'app.log'
- # 配置日志
- # 自定义日志格式化器,统一应用日志和Flask内部日志的时间格式
- class CustomFormatter(logging.Formatter):
- def format(self, record):
- if not record.name.startswith('werkzeug'):
- # 应用日志格式
- record.timestamp = f'APP:'+datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- return '{0.timestamp} - {0.levelname} - {0.funcName}[line:{0.lineno}]: {0.msg}'.format(record)
- else:
- # Flask内部日志(werkzeug)格式
- return super().format(record)
- # 创建处理器
- file_handler = logging.FileHandler(LOG_FILE, encoding='utf-8')
- stream_handler = logging.StreamHandler()
- # 设置格式器
- formatter = CustomFormatter('%(message)s')
- file_handler.setFormatter(formatter)
- stream_handler.setFormatter(formatter)
- # 配置根日志器
- logging.basicConfig(
- level=logging.INFO,
- handlers=[file_handler, stream_handler]
- )
- # 配置Flask内部日志级别
- werkzeug_logger = logging.getLogger('werkzeug')
- werkzeug_logger.setLevel(logging.INFO)
- logger = logging.getLogger(__name__)
- # 记录应用启动时间
- app_start_time = time.time()
- app = Flask(__name__)
- app.secret_key = 'supersecretkey'
- app.config['UPLOAD_FOLDER'] = 'static/uploads'
- app.config['ALLOWED_EXTENSIONS'] = {'bin', 'zip', 'tar', 'rar'}
- app.config['DATA_FOLDER'] = 'data'
- app.config['APP_UPGREADE_FOLDER_TEMP'] = 'app_upgrade_temp'
- # 关键配置:禁用 ASCII 转义,确保中文正常显示
- app.config['JSON_AS_ASCII'] = False
- # 确保目录存在
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- os.makedirs(app.config['DATA_FOLDER'], exist_ok=True)
- # 数据文件路径
- USERS_PATH = os.path.join(app.config['DATA_FOLDER'], 'users.json')
- FIRMWARES_PATH = os.path.join(app.config['DATA_FOLDER'], 'firmwares.json')
- DEVICES_PATH = os.path.join(app.config['DATA_FOLDER'], 'devices.json')
- PLATFORMS_PATH = os.path.join(app.config['DATA_FOLDER'], 'platforms.json')
- # 全局变量
- # MQTT客户端管理
- mqtt_clients = {}
- mqtt_threads = {}
- mqtt_lock = threading.Lock()
- mqtt_sub_topic = [("cpyypt/up/2006/#", 0)]
- mqtt_pub_topic = "cpyypt/down/2006/0000000101"
- # 定时器
- threading_timer = {
- 'mqtt_status_check_timer': None,
- 'mqtt_status_check_timeout': 2,
- 'dev_online_check_timer': None,
- 'dev_online_check_timeout': 5,
- 'per_second_timer': None,
- 'per_second_timeout': 2,
- }
- device_speed = {}
- wr_devices_lock = threading.Lock()
- @app.route('/favicon.ico')
- def favicon():
- return send_from_directory("static", "favicon.ico", mimetype='image/vnd.microsoft.icon')
- @app.before_request
- def update_last_activity():
- mqtt_last_activity['last_time'] = time.time()
- # logger.info(f"mqtt_last_activity: {mqtt_last_activity}")
- # 输出例如 "1天8小时37分钟36秒"
- def format_timedelta(seconds):
- # 使用timedelta来创建一个时间差对象
- delta = timedelta(seconds=seconds)
-
- # 获取总天数
- total_days = delta.days
-
- # 获取剩余的小时数、分钟数和秒数
- hours, remainder = divmod(delta.seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
-
- # 构建格式化字符串
- parts = []
- if total_days > 0:
- parts.append(f"{total_days}天")
- if hours > 0:
- parts.append(f"{hours}小时")
- if minutes > 0:
- parts.append(f"{minutes}分钟")
- if seconds > 0:
- parts.append(f"{seconds}秒")
-
- # 如果所有的部分都没有被添加,至少显示一秒
- if not parts:
- parts.append("0秒")
-
- return ' '.join(parts)
- # 获取应用运行时长
- @app.route('/api/run_time')
- def get_run_time():
- global app_start_time
- run_time_str = format_timedelta(int(time.time() - app_start_time))
- # logger.info(f"run_time_str: {run_time_str}")
- return jsonify({'run_time': run_time_str})
- # 上下文处理器
- @app.context_processor
- def inject_common_data():
- return {
- 'datetime': datetime,
- 'app_version': APP_VERSION
- }
- # 初始化数据
- def init_data():
- # 初始化用户数据
- if not os.path.exists(USERS_PATH):
- with open(USERS_PATH, 'w', encoding='utf-8') as f:
- json.dump({'admin': 'admin'}, f, ensure_ascii=False)
- # 初始化固件数据
- if not os.path.exists(FIRMWARES_PATH):
- init_firmwares = [
- {'id': 1, 'filename': 'pos.bin', 'upload_time': '2025-08-01 13:08:30', 'remark': '初始版本'},
- {'id': 2, 'filename': 'ITSF-POS V2.3.bin', 'upload_time': '2025-08-01 14:10:09', 'remark': '基础功能版'}
- ]
- with open(FIRMWARES_PATH, 'w', encoding='utf-8') as f:
- json.dump(init_firmwares, f, ensure_ascii=False, indent=2)
- # 初始化设备数据
- if not os.path.exists(DEVICES_PATH):
- init_devices()
- with open(DEVICES_PATH, 'w', encoding='utf-8') as f:
- json.dump(init_devices, f, ensure_ascii=False, indent=2)
- # 初始化云平台数据
- if not os.path.exists(PLATFORMS_PATH):
- platform_d = {
- "current_platform": "test_env",
- "platform_l": {
- "aliyun_prod_env": {
- "id": 1,
- "name": "阿里云生产",
- "status": "connected",
- "ip": "mqtt.cpyypt.cn",
- "port": 9000,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "cpyypt",
- "password": "1SvTlvm1VCawSzS"
- },
- "test_env": {
- "id": 2,
- "name": "测试环境",
- "status": "disconnected",
- "ip": "test-mqtt.cpyypt.cn",
- "port": 9000,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "admin",
- "password": "houjianwei"
- },
- "localize_a_env": {
- "id": 3,
- "name": "本地化A云平台",
- "status": "disconnected",
- "ip": "mqtt.localize_a_env.com",
- "port": 1883,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "localize_a_env_user",
- "password": "localize_a_env_pass"
- }
- }
- }
- # platform_d["platform_l"]["aliyun_prod_env"]["sub_topic"] = [("cpyypt/up/2006/#", 0)]
- # platform_d["platform_l"]["aliyun_prod_env"]["pub_topic"] = [("cpyypt/down/2006/#", 0)]
- # platform_d["platform_l"]["aliyun_prod_env"]["client_id"] = f"pos_mng_{random.randint(0, 100)}"
- with open(PLATFORMS_PATH, 'w', encoding='utf-8') as f:
- json.dump(platform_d, f, ensure_ascii=False, indent=2)
- # 数据加载函数
- def load_users():
- with open(USERS_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- def load_firmwares():
- with open(FIRMWARES_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- load_devices_first = True # 标记第1次加载
- # 创建线程锁对象用于保护load_devices函数
- def load_devices():
- # 使用线程锁确保函数的互斥访问
- with wr_devices_lock:
- devices = []
- if not os.path.exists(DEVICES_PATH):
- logger.info(f"设备配置文件不存在,初始化数据: {DEVICES_PATH}")
- init_data()
- with open(DEVICES_PATH, 'r', encoding='utf-8') as f:
- f_r = f.read()
- # logger.info(f"加载设备配置文件: {DEVICES_PATH}, 文件内容长度: {len(f_r)}")
- try:
- devices = json.loads(f_r)
- except json.JSONDecodeError as e:
- logger.error(f"解析设备配置文件失败: e:{e},f_r:{f_r}")
- stack = traceback.extract_stack()
- if stack:
- logger.info(f"调用信息 {stack[-3][2]} , {stack[-2][2]} , {stack[-1][2]} ")
- # 尝试重新初始化
- devices = init_devices()
- # 程序启动时,强制将所有所有统计信息为0
- global load_devices_first
- if load_devices_first :
- load_devices_first = False # 第1次加载完成,标记为False
- for device in devices:
- device['online'] = '离线'
- device['realtime'] = 0
- device['run_time'] = 0
- device['receive_sum'] = 0
- device['send_sum'] = 0
- device['abnormal_sum'] = 0
- device['success_rate'] = 0
- device['speed'] = 0
-
- # 保存强制更新后的状态
- save_devices(devices)
-
- return devices
-
- load_platforms_first = True # 标记第1次加载
- def load_platforms():
- try:
- if not os.path.exists(PLATFORMS_PATH):
- logger.info(f"云平台配置文件不存在,初始化数据: {PLATFORMS_PATH}")
- init_data()
-
- # logger.info(f"加载云平台配置文件: {PLATFORMS_PATH}")
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f:
- file_content = f.read().strip()
- if not file_content:
- logger.error("云平台配置文件为空")
- # 如果文件为空,重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f_new:
- platform_d = json.load(f_new)
- else:
- try:
- platform_d = json.loads(file_content)
- except json.JSONDecodeError as e:
- logger.error(f"解析云平台配置文件失败: {e}")
- # 尝试重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f_new:
- platform_d = json.load(f_new)
-
- # 程序启动时,强制将所有云平台状态设置为已断开
- global load_platforms_first
- if load_platforms_first and 'platform_l' in platform_d:
- load_platforms_first = False # 第1次加载完成,标记为False
- for platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- platform_d['platform_l'][platform_id]['client_id'] = "pos_mng_%s_%d" % (os.environ['COMPUTERNAME'], random.randint(0, 100))
-
- # 保存强制更新后的状态
- save_platforms(platform_d)
-
- return platform_d
- except Exception as e:
- logger.error(f"加载云平台配置时发生异常: {e}")
- # 尝试返回一个默认值或重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- return platform_d
- def load_platforms_c():
- platform_d = load_platforms()
- platform_c = {}
- for platform_id in platform_d['platform_l']:
- platform_c[platform_id] = {}
- platform_c[platform_id]['name'] = platform_d['platform_l'][platform_id]['name']
- platform_c[platform_id]['status'] = platform_d['platform_l'][platform_id]['status']
- platform_c[platform_id]['ip'] = platform_d['platform_l'][platform_id]['ip']
- return platform_c
- # 数据保存函数
- def save_users(users):
- with open(USERS_PATH, 'w', encoding='utf-8') as f:
- json.dump(users, f, ensure_ascii=False, indent=2)
- def save_firmwares(firmwares):
- with open(FIRMWARES_PATH, 'w', encoding='utf-8') as f:
- json.dump(firmwares, f, ensure_ascii=False, indent=2)
- def save_devices(devices):
- # with wr_devices_lock:
- with open(DEVICES_PATH, 'w', encoding='utf-8') as f:
- json.dump(devices, f, ensure_ascii=False, indent=2)
- def save_platforms(platform_d):
- with open(PLATFORMS_PATH, 'w', encoding='utf-8') as f:
- json.dump(platform_d, f, ensure_ascii=False, indent=2)
- def add_device(devices,dev_type,dev_sn):
- # 检查设备是否已存在
- for device in devices:
- if device['dev_type'] == dev_type and device['dev_sn'] == dev_sn:
- logger.info(f"设备 {dev_type} {dev_sn} 已存在,无需添加")
- return
- device = {
- "dev_type": dev_type,
- "dev_sn": dev_sn,
- "online": "离线",
- "realtime": 0,
- "realtime": 0,
- "run_time": 0,
- "receive_sum": 0,
- "send_sum": 0,
- "abnormal_sum": 0,
- "success_rate": 0,
- "speed": 0,
- "cloud_platform": "阿里云生产",
- "master_type": dev_type,
- "master_sn": dev_sn,
- "app_version": "未知",
- "upgrade_status": "未知",
- "reset_times": 0,
- "last_reset_type": "00",
- "uuid": "未知",
- "network_type": "未知",
- "wifi_ssid": "未知",
- "wifi_password": "未知",
- "sim_status": 1,
- "free_fifo": 0,
- "ip": "x.x.x.x"
- }
- devices.append(device)
- # 先排序,再保存
- devices.sort(key=lambda x: x['dev_sn'])
- save_devices(devices)
- return devices
- def init_devices():
- devices = []
- dev_type = '2006'
- dev_sns = ['101', '102', '103', '104', '105', '106']
- for dev_sn in dev_sns:
- device = {
- "dev_type": dev_type,
- "dev_sn": dev_sn,
- "online": "离线",
- "realtime": 0,
- "realtime": 0,
- "run_time": 0,
- "receive_sum": 0,
- "send_sum": 0,
- "abnormal_sum": 0,
- "success_rate": 0,
- "speed": 0,
- "cloud_platform": "阿里云生产",
- "master_type": dev_type,
- "master_sn": dev_sn,
- "app_version": "未知",
- "upgrade_status": "未知",
- "reset_times": 0,
- "last_reset_type": "00",
- "uuid": "未知",
- "network_type": "未知",
- "wifi_ssid": "未知",
- "wifi_password": "未知",
- "sim_status": 1,
- "free_fifo": 0,
- "ip": "x.x.x.x"
- }
- devices.append(device)
- # 先排序,再保存
- devices.sort(key=lambda x: x['dev_sn'])
- save_devices(devices)
- return devices
- # MQTT回调函数
- def on_connect(client, userdata, flags, rc):
- platform_id = userdata['platform_id']
- logger.info(f"MQTT连接成功 - 云平台: {platform_id}, 结果代码: {rc}")
- pub_topic = userdata['pub_topic']
- client.subscribe(mqtt_sub_topic)
- logger.info(f"已订阅主题: {mqtt_sub_topic}")
- with mqtt_lock:
- platform_d = load_platforms()
- if platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'connected'
- save_platforms(platform_d)
- def on_disconnect(client, userdata, rc):
- platform_id = userdata['platform_id']
- logger.info(f"MQTT断开连接 - 云平台: {platform_id}, 结果代码: {rc}")
- with mqtt_lock:
- platform_d = load_platforms()
- if platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- save_platforms(platform_d)
- def on_message(client, userdata, msg):
- platform_id = userdata['platform_id']
- try:
- payload_str = msg.payload.decode('gb2312')
- except UnicodeDecodeError:
- try:
- payload_str = msg.payload.decode('utf-8')
- except UnicodeDecodeError:
- payload_str = msg.payload.hex().upper()
- # logger.info(f"收到消息 - 云平台: {platform_id}, 主题: {msg.topic}, 内容: {payload_str}")
- # 如果长时间未操作,自动断开MQTT连接
- platform_d = load_platforms()
- # """
- get_platform_status = next((value['status'] for key, value in platform_d['platform_l'].items() if key == platform_id), None)
- if get_platform_status == 'connected':
- mqtt_disconnect_time_out = 5*60
- if time.time() - mqtt_last_activity['last_time'] > mqtt_disconnect_time_out: # 5分钟无操作
- logger.info(f"云平台 {platform_id} 长时间未操作,超过{mqtt_disconnect_time_out}秒,自动断开连接")
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- stop_mqtt_client(platform_id)
- logger.info(f"云平台 {platform_id} 自动断开连接,已完成")
- return
- # """
-
- dev_type = msg.topic.split("/")[-2]
- dev_sn = msg.topic.split("/")[-1].lstrip('0')
- devices = load_devices()
- device = None if devices == None else next((item for item in devices if item['dev_sn'] == dev_sn), None)
- if device == None:
- devices = init_devices()
- device = next((item for item in devices if item['dev_sn'] == dev_sn), None)
- if device:
- if(device.get('online',None) != '在线'):
- logger.info(f"云平台:{platform_id}, 设备 {dev_sn} 上线")
- device['online'] = '在线'
- device['realtime'] = int(time.time())
- platform_name = next((value['name'] for key, value in platform_d['platform_l'].items() if key == platform_id), None)
- device['cloud_platform'] = platform_name
- save_devices(devices)
-
- if device and type(msg.payload)== bytes and len(msg.payload) > 4 and payload_str[0:4] == "FEFE":
- # if type(msg.payload)== bytes and len(msg.payload) > 4 and msg.payload[0:2] == 0xFEFE:
- try:
- header, body, info_msg = frame.parse_data(client, msg.payload, msg.topic)
- if len(header) > 0 and len(body) > 0:
- pro_ver = header.get('pro_ver', None)
- msg_type1 = header.get('msg_type1', None)
- msg_type2 = header.get('msg_type2', None)
- if pro_ver == 0x01:
- if msg_type1 == 0x02:
- if msg_type2 == 0x2003:
- r_body = {}
- r_body['master_type'] = body.get('master_type', None)
- r_body['master_sn'] = body.get('master_sn', None)
- r_body['app_version'] = body.get('app_version', None)
- r_body['reset_times'] = body.get('reset_times', None)
- r_body['last_reset_type'] = body.get('last_reset_type', None)
- r_body['uuid'] = body.get('uuid', None)
- r_body['run_time'] = body.get('run_time', None)
- r_body['receive_sum'] = body.get('receive_sum', None)
- r_body['send_sum'] = body.get('send_sum', None)
- r_body['network_type'] = body.get('network_type', None)
- r_body['cloud_platform'] = body.get('cloud_platform', None)
- r_body['sim_status'] = body.get('sim_status', None)
- r_body['free_fifo'] = body.get('free_fifo', None)
- r_body['ip'] = body.get('ip', None)
-
- # 发送计数器
- device_send_sum = device.get('send_sum', None)
- if r_body['send_sum'] and device_send_sum:
- if r_body['send_sum'] < device_send_sum:
- os.makedirs('dev_log', exist_ok=True)
- with open(f"dev_log/{dev_sn.zfill(10)}.log", 'a', encoding='utf-8') as f:
- w_msg = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
- w_msg = w_msg + f"%s,%10u,%10u." % (dev_sn.zfill(10), device_send_sum, r_body['send_sum'])
- f.write(w_msg+ "\n")
- # 至少有1个不为None,则继续
- if any(r_body.values()):
- # 先整理一下字段
- if r_body['network_type'] is not None:
- if r_body['network_type'] == 1:
- r_body['network_type'] = 'WiFi'
- elif r_body['network_type'] == 2:
- r_body['network_type'] = '有线网'
- else:
- r_body['network_type'] = '未知'
- if r_body['cloud_platform'] is not None:
- if r_body['cloud_platform'] == 1:
- r_body['cloud_platform'] = '阿里云生产'
- elif r_body['cloud_platform'] == 2:
- r_body['cloud_platform'] = '测试环境'
- else:
- r_body['cloud_platform'] = '未知'
- # 有变化,才更新
- is_changed = False
- for k, v in r_body.items():
- if v is not None:
- if k not in device or device[k] != v:
- is_changed = True
- device[k] = v
- if is_changed:
- logger.info(f"云平台:{platform_id}, 设备 {dev_sn} 接收到MQTT消息,参数有更新")
-
- if dev_sn not in device_speed:
- device_speed[dev_sn] = []
- device_speed[dev_sn].append({
- 'svr_run_time':time.time(),
- 'dev_run_time':r_body['run_time'],
- 'send_sum':r_body['send_sum'],
- 'receive_sum':r_body['receive_sum'],
- })
- while len(device_speed[dev_sn]) > 11:
- device_speed[dev_sn].pop(0)
- if len(device_speed[dev_sn]) > 1:
- run_time_delta = device_speed[dev_sn][-1]['svr_run_time'] - device_speed[dev_sn][0]['svr_run_time']
- send_sum_delta = device_speed[dev_sn][-1]['send_sum'] - device_speed[dev_sn][0]['send_sum']
- speed = 0 if run_time_delta == 0 else (send_sum_delta / run_time_delta)
- else:
- speed = 0
- device['speed'] = speed
- save_devices(devices)
- if msg_type1 == 0x04:
- if msg_type2 == 0x2001:
- upgrade_status = body.get('upgrade_status', None)
- device['upgrade_status'] = f"0x%02X" % (upgrade_status) if upgrade_status != None else '未知'
- save_devices(devices)
- logger.info(f"info_msg:{info_msg}")
- except Exception as e:
- logger.error(f"解析数据失败 - 云平台: {platform_id}, 错误: {str(e)}")
- logger.error(f"解析数据:{' '.join([payload_str[i:i+2] for i in range(0, len(payload_str), 2)])}")
- # MQTT客户端线程
- def mqtt_client_thread(platform_id):
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- logger.error(f"云平台 {platform_id} 不存在")
- return
- platform = platform_d['platform_l'][platform_id]
- client = mqtt.Client(client_id=platform['client_id'])
- client.username_pw_set(platform['username'], platform['password'])
-
- # platform['pub_topic'],
- # platform['sub_topic']
- client.user_data_set({
- 'platform_id': platform_id,
- 'pub_topic': mqtt_pub_topic,
- 'sub_topic': mqtt_sub_topic
- })
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- with mqtt_lock:
- mqtt_clients[platform_id] = client
- try:
- client.connect(platform['ip'], platform['port'], 60)
- client.loop_forever()
- except Exception as e:
- logger.error(f"MQTT线程错误 - 云平台: {platform_id}, 错误: {str(e)}")
- finally:
- # 确保客户端已断开连接
- try:
- client.disconnect()
- except Exception as e:
- logger.error(f"MQTT线程结束时断开连接失败 - 云平台: {platform_id}, 错误: {str(e)}")
-
- with mqtt_lock:
- if platform_id in mqtt_clients:
- del mqtt_clients[platform_id]
- if platform_id in mqtt_threads:
- del mqtt_threads[platform_id]
- try:
- client.disconnect()
- except:
- pass
- # MQTT控制函数
- def start_mqtt_client(platform_id):
- with mqtt_lock:
- if platform_id in mqtt_threads and mqtt_threads[platform_id].is_alive():
- logger.info(f"MQTT客户端已运行 - 云平台: {platform_id}")
- return True
- thread = threading.Thread(target=mqtt_client_thread, args=(platform_id,), daemon=True)
- with mqtt_lock:
- mqtt_threads[platform_id] = thread
- thread.start()
- logger.info(f"启动MQTT客户端线程 - 云平台: {platform_id}")
- return True
- mqtt_last_activity = {'last_time':time.time()}
- # 定期检查所有MQTT连接状态
- def check_all_mqtt_connections():
-
- try:
- with mqtt_lock:
- platform_d = load_platforms()
- if 'platform_l' not in platform_d:
- return
- for platform_id in platform_d['platform_l']:
- # 检查MQTT线程是否存活
- # is_connected = platform_id in mqtt_threads and mqtt_threads[platform_id].is_alive()
- is_connected = platform_id in mqtt_clients and mqtt_clients[platform_id].is_connected()
- current_status = platform_d['platform_l'][platform_id].get('status', '')
- # logger.info(f"检查云平台状态 - 云平台: {platform_id}, 当前状态: {current_status}, 是否连接: {is_connected}")
- # 如果状态不一致,则更新
- if (is_connected and current_status != 'connected') or (not is_connected and current_status != 'disconnected'):
- logger.info(f"参数:platform_id={platform_id}, is_connected={is_connected}, current_status ={current_status}")
- platform_d['platform_l'][platform_id]['status'] = 'connected' if is_connected else 'disconnected'
- logger.info(f"更新云平台状态 - 云平台: {platform_id}, 新状态: {platform_d['platform_l'][platform_id]['status']}")
- save_platforms(platform_d)
- except Exception as e:
- logger.error(f"检查MQTT连接状态出错: {str(e)}")
- finally:
- # 设置下一次检查(2秒后)
- threading_timer['mqtt_status_check_timer'] = threading.Timer(threading_timer['mqtt_status_check_timeout'], check_all_mqtt_connections)
- threading_timer['mqtt_status_check_timer'].daemon = True
- threading_timer['mqtt_status_check_timer'].start()
- def start_mqtt_status_check():
- if threading_timer['mqtt_status_check_timer'] is not None:
- threading_timer['mqtt_status_check_timer'].cancel()
- check_all_mqtt_connections()
- def stop_mqtt_client(platform_id):
- logger.info(f"断开MQTT连接 - 云平台: {platform_id}")
- client = None
- # 先获取客户端引用,尽量缩短锁的持有时间
- with mqtt_lock:
- if platform_id in mqtt_clients:
- client = mqtt_clients[platform_id]
- # 从字典中移除,避免其他线程访问
- del mqtt_clients[platform_id]
- # 在锁外执行disconnect,避免死锁
- if client:
- try:
- client.disconnect()
- logger.info(f"断开MQTT连接 - 云平台: {platform_id}")
- except Exception as e:
- logger.error(f"断开MQTT连接失败 - 云平台: {platform_id}, 错误: {str(e)}")
- with mqtt_lock:
- if platform_id in mqtt_threads:
- logger.info(f"等待MQTT线程结束 - 云平台: {platform_id}")
- return True
- def publish_mqtt_message(platform_id, message, pub_topic=mqtt_pub_topic):
- with mqtt_lock:
- if platform_id not in mqtt_clients:
- logger.error(f"MQTT客户端未连接 - 云平台: {platform_id}")
- return False
- client = mqtt_clients[platform_id]
- platform_d = load_platforms()
- # platform = platform_d['platform_l'].get(platform_id, {})
- # pub_topic = mqtt_pub_topic # platform.get('pub_topic', '/device/data')
- try:
- result = client.publish(pub_topic, message, qos=0)
- result.wait_for_publish()
- if(type(message) == str):
- logger.info(f"发布消息成功 - 云平台: {platform_id}, 主题: {pub_topic}, 消息: {message}")
- else:
- logger.info(f"发布消息成功 - 云平台: {platform_id}, 主题: {pub_topic}, 消息: {message.hex().upper()}")
- return True
- except Exception as e:
- logger.error(f"发布消息失败 - 云平台: {platform_id}, 错误: {str(e)}")
- return False
- # 辅助函数
- def allowed_file(filename):
- return '.' in filename and \
- filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
- # 登录验证装饰器
- def login_required(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
- if 'logged_in' not in session:
- return redirect(url_for('login', next=request.url))
- return f(*args, **kwargs)
- return decorated_function
- # 路由
- @app.route('/')
- @login_required
- def index():
- platform_d = load_platforms()
- return render_template(
- 'index.html',
- title='主页',
- # cccc platform_l=platform_d['platform_l'],
- platform_c=load_platforms_c(),
- current_platform=platform_d['current_platform']
- )
- @app.route('/api/device_detail/<dev_sn>')
- @login_required
- def get_device_detail(dev_sn):
- devices = load_devices()
- device = next((d for d in devices if d['dev_sn'] == dev_sn), None)
- if not device:
- return jsonify({'error': '设备不存在'}), 404
- device['abnormal_sum'] = int(device['receive_sum']) - int(device['send_sum'])
- device['success_rate'] = 0 if int(device['receive_sum']) == 0 else 100 * int(device['send_sum']) / int(device['receive_sum'])
- # device['speed'] = 0 if int(device['run_time']) == 0 else int(device['send_sum']) / int(device['run_time'] )
- device['run_time_str'] = format_timedelta(int(device['run_time']))
- return jsonify(device)
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form['username']
- password = request.form['password']
- users = load_users()
- if username in users and users[username] == password:
- session['logged_in'] = True
- session['username'] = username
- next_page = request.args.get('next', url_for('index'))
- return redirect(next_page)
- else:
- error = '用户名或密码错误'
- return render_template('login.html', error=error)
- return render_template('login.html', title='用户登录')
- @app.route('/logout')
- def logout():
- session.pop('logged_in', None)
- session.pop('username', None)
- return redirect(url_for('login'))
- @app.route('/app_manage')
- @login_required
- def app_manage():
- logger.info("进入应用管理页面")
- return render_template('app_manage.html', title='应用管理')
- # 应用管理相关API
- @app.route('/api/ping', methods=['GET'])
- def ping():
- """
- 用于检测服务器是否正常运行的简单API
- """
- return jsonify({"status": "ok", "message": "服务器正常运行中"}), 200
- @app.route('/api/restart_app', methods=['POST'])
- @login_required
- def restart_app():
- try:
- logger.info("接收到重启应用请求")
- # 先返回响应给客户端
- response = jsonify({
- "status": "restarting",
- "message": "应用重启已开始,请等待服务重新启动..."
- })
- # 使用subprocess.Popen以非阻塞方式执行重启命令
- # 确保响应已发送完成后再执行重启
- s_p = sys_platform.system().upper()
- if s_p == 'WINDOWS':
- # 使用start命令在新窗口启动重启脚本,避免阻塞当前进程
- subprocess.Popen(['start', 'python', 'restart_app.py'], shell=True)
- elif s_p == 'LINUX':
- # 在后台执行重启脚本
- subprocess.Popen(['./sh_web_app_2006_Svr.sh', 'restart'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- logger.info("重启命令已发送,应用将很快重启")
- return response, 200
- except Exception as e:
- logger.error(f"应用重启失败: {str(e)}")
- return jsonify({'success': False, 'message': f'重启失败: {str(e)}'})
- @app.route('/api/update_app', methods=['POST'])
- @login_required
- def update_app():
- try:
- logger.info("接收到更新应用请求")
- if 'app_file' not in request.files:
- return jsonify({'success': False, 'message': '没有文件被上传'})
- file = request.files['app_file']
- if file.filename == '':
- return jsonify({'success': False, 'message': '没有选择文件'})
- # 检查文件类型
- if not (file.filename.endswith('.py') or (file.filename.endswith('.html') or file.filename.endswith('.zip'))):
- return jsonify({'success': False, 'message': '只支持Python文件(.py)、HTML文件(.html)或压缩包(.zip)'})
- # 保存文件到临时目录
- temp_dir = app.config['APP_UPGREADE_FOLDER_TEMP']
- os.makedirs(temp_dir, exist_ok=True)
- bk_dir = 'app_upgrade_bk'
- os.makedirs(bk_dir, exist_ok=True)
-
- if file.filename == 'web_app_2006.py':
- shutil.copy2(file.filename, os.path.join(bk_dir, file.filename+f'_{APP_VERSION}_bk_{time.time()}'))
- file.save(file.filename)
- elif file.filename == 'frame.py':
- shutil.copy2(os.path.join('pos_manager', file.filename), os.path.join(bk_dir, file.filename+f'_bk_{time.time()}'))
- file.save(os.path.join('pos_manager', file.filename))
- elif file.filename.endswith('.html') :
- shutil.copy2(os.path.join('templates', file.filename), os.path.join(bk_dir, file.filename+f'_bk_{time.time()}'))
- file.save(os.path.join('templates', file.filename))
- elif file.filename.endswith('.zip') :
- file.save(os.path.join(temp_dir, file.filename))
-
- '''
- # 解压文件
- with zipfile.ZipFile(os.path.join(temp_dir, file.filename), 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
- # 移动解压后的文件到目标目录
- for item in os.listdir(temp_dir):
- s = os.path.join(temp_dir, item)
- d = os.path.join(app.config['APP_UPGREADE_FOLDER'], item)
- if os.path.isdir(s):
- shutil.move(s, d)
- else:
- shutil.copy2(s, d)
- # 删除临时文件
- os.remove(os.path.join(temp_dir, file.filename))
- '''
- # 在实际应用中,这里需要实现安全的更新逻辑
- # 例如验证文件完整性、备份原有文件、应用更新等
- logger.info(f"文件 {file.filename} 上传成功")
- return jsonify({'success': True, 'message': f'文件上传成功,重启后生效. {file.filename}, {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'})
- except Exception as e:
- logger.error(f"应用更新失败: {str(e)}")
- return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
- @app.route('/api/get_logs')
- @login_required
- def get_logs():
- try:
- lines = request.args.get('lines', '50')
- # logger.info(f"请求获取日志,行数: {lines}")
- get_log_file = LOG_FILE
- s_p = sys_platform.system().upper()
- '''
- if s_p == 'WINDOWS':
- get_log_file = LOG_FILE
- elif s_p == 'LINUX':
- get_log_file = 'log.log'
- '''
- # 检查日志文件是否存在
- if not os.path.exists(get_log_file):
- return f"日志文件 {get_log_file} 不存在"
- # 读取日志文件
- with open(get_log_file, 'r', encoding='utf-8') as f:
- if lines == 'all':
- logs = ''.join(f.readlines()[-500:]) # 最大500行
- else:
- try:
- line_count = int(lines)
- logs = ''.join(f.readlines()[-line_count:])
- except ValueError:
- logs = ''.join(f.readlines()[-50:]) # 默认50行
- return logs
- except Exception as e:
- logger.error(f"获取日志失败: {str(e)}")
- return f"获取日志失败: {str(e)}"
- @app.route('/firmware', methods=['GET', 'POST'])
- @login_required
- def firmware():
- firmwares = load_firmwares()
- if request.method == 'POST':
- if 'file' not in request.files:
- error = '未选择文件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- file = request.files['file']
-
- remark = request.form.get('remark', '').strip()
- if file.filename == '':
- error = '未选择文件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if not remark:
- error = '备注信息不能为空'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if file and allowed_file(file.filename):
- filename = file.filename
-
- # 检查文件名是否重复
- if any(f['filename'] == filename for f in firmwares):
- error = f'文件名 "{filename}" 已存在,请更改文件名后再上传'
- return render_template('firmware.html', firmwares=firmwares, error=error)
-
- # 检查文件大小
- file.seek(0, os.SEEK_END) # 移动指针到文件末尾
- file_size = file.tell() # 获取指针位置(即文件大小)
- file.seek(0) # 重置指针到文件开头
- # 打印文件大小
- logger.info(f"文件名: {filename}, 文件大小: {file_size} 字节")
- if file_size > 1 * 1024 * 1024: # 1MB
- error = f'文件大小不能超过1MB, 当前文件大小: {file_size} 字节'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if file_size < 1 * 1024: # 1KB
- error = f'文件大小不能小于1KB, 当前文件大小: {file_size} 字节'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
- new_id = max(f['id'] for f in firmwares) + 1 if firmwares else 1
- firmwares.append({
- 'id': new_id,
- 'filename': filename,
- 'filesize': '%d 字节' % file_size,
- 'upload_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'remark': remark
- })
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理编辑操作
- edit_id = request.args.get('edit')
- if edit_id:
- new_remark = request.args.get('remark', '').strip()
- if new_remark:
- for f in firmwares:
- if f['id'] == int(edit_id):
- f['remark'] = new_remark
- break
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理删除操作
- delete_id = request.args.get('delete')
- if delete_id:
- firmwares = [f for f in firmwares if f['id'] != int(delete_id)]
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理下载操作
- download_id = request.args.get('download')
- if download_id:
- firmware = next((f for f in firmwares if f['id'] == int(download_id)), None)
- if firmware:
- filename = firmware['filename']
- file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
- if os.path.exists(file_path):
- return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
- else:
- error = f'文件 "{filename}" 不存在'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- else:
- error = f'未找到ID为 {download_id} 的固件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- return render_template('firmware.html', firmwares=firmwares, title='固件包管理')
- @app.route('/devices', methods=['GET', 'POST'])
- @login_required
- def devices():
- devices = load_devices()
- platform_l = load_platforms()['platform_l']
- firmwares = load_firmwares()
- if request.method == 'POST':
- # 处理设备配置保存
- if 'save_config' in request.form:
- config_type = request.form['config_type']
- dev_sn = request.form['dev_sn']
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- if config_type == 'all':
- # 处理保存所有配置
- target_device_wifi_ssid = request.form.get('ssid', '')
- target_device_wifi_password = request.form.get('wifi_password', '')
- target_device_cloud_platform = request.form.get('cloud-platform', '')
- target_device_network_type = request.form.get('network-type', '')
- log_msg = f"{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)}:"
- log_msg = log_msg + f"WiFi参数:{target_device_wifi_ssid},{target_device_wifi_password};"
- log_msg = log_msg + f"云平台参数:{target_device_cloud_platform};"
- log_msg = log_msg + f"网络类型参数:{target_device_network_type}"
-
- logger.info(log_msg)
- logger.info("TODO: 发送保存所有配置到设备")
- elif config_type == 'wifi':
- target_device['wifi_ssid'] = request.form.get('ssid', '')
- target_device['wifi_password'] = request.form.get('wifi_password', '')
- log_msg = f"WiFi参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['wifi_ssid']},{target_device['wifi_password']}"
- logger.info(log_msg)
- logger.info("TODO: 发送WiFi配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_wifi(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
-
- elif config_type == 'platform':
- log_msg = f"云平台参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['cloud_platform']}"
- logger.info(log_msg)
- logger.info("TODO: 发送云平台配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_platform(device=target_device,data=request.form.get('cloud-platform', ''))
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
- target_device['cloud_platform'] = request.form.get('cloud-platform', '')
- elif config_type == 'network':
- target_device['network_type'] = request.form.get('network-type', '')
- log_msg = f"网络类型参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['network_type']}"
- logger.info(log_msg)
- logger.info("TODO: 发送网络类型配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_network(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
- save_devices(devices)
- return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=True,
- message="操作成功,配置已下发!"
- )
- # 处理批量操作
- action = request.form.get('action')
- selected_sns = [dev_sn for dev_sn in request.form.getlist('dev_sn')]
- if action == 'restart':
- # 实际应用中这里会发送重启命令到设备
- logger.info(f"批量重启设备: {selected_sns}")
-
- for dev_sn in selected_sns:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_restart(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success:
- logger.error(f"消息发布失败,MQTT客户端未连接 -- {load_platforms_c()[platform_id]['name']},请返回主页 【连接】!")
-
- else:
- logger.info(f"消息发布成功 -- dev_sn:{dev_sn}")
- elif action == 'upgrade':
- firmware_id = request.form.get('firmware_id')
- firmware = next((fm for fm in firmwares if fm['id'] == int(firmware_id)), None)
- logger.info(f"批量升级设备 {selected_sns} 到固件 {firmware['filename']}")
-
- if not firmware:
- return jsonify({'status': 'error', 'message': '固件信息不存在'})
-
-
- # 获取文件大小
- bin_file=os.path.join(app.config['UPLOAD_FOLDER'], firmware['filename'])
- try:
- file_size = os.path.getsize(bin_file)
- logger.info(f"文件大小: {file_size} 字节")
- except FileNotFoundError:
- logger.error(f"错误: 文件 '{bin_file}' 不存在")
- return jsonify({'status': 'error', 'message': f'文件{bin_file}名不存在'})
- # 找到第1个被升级设备
- device = next((device for device in devices if device['dev_sn'] == selected_sns[0]), None)
- dev_platform = device.get('cloud_platform', '')
- platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == dev_platform), None)
- with open(bin_file, 'rb') as f:
- bin_data = f.read()
-
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['current_sn'] = ''
- dev_upgrade_dict['dev_type'] = device.get('dev_type', '2006')
- dev_upgrade_dict['dev_upgrade_sns'] = selected_sns
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['bin_data'] = bin_data
- dev_upgrade_dict['bin_len'] = file_size
- dev_upgrade_dict['pkg_size'] = 512
- dev_upgrade_dict['pkg_total'] = math.ceil(dev_upgrade_dict['bin_len'] / dev_upgrade_dict['pkg_size']) # 向上取整
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['platform_id'] = platform_id
- for dev_sn in selected_sns:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- target_device['upgrade_status'] = '升级中...'
- save_devices(devices)
- start_dev_upgrade_check()
- return jsonify({'status': 'success', 'message': '升级任务已启动'})
- elif action == 'config':
- logger.info(f"批量配置设备: {selected_sns}")
- # 显示设备配置页面
- dev_sn = request.args.get('dev_sn')
- if dev_sn:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- title='设备配置'
- )
- return render_template('devices.html', devices=devices, title='解密机配置')
- @app.route('/device_detail/<dev_sn>')
- @login_required
- def device_detail(dev_sn):
- devices = load_devices()
- device = next((d for d in devices if d['dev_sn'] == dev_sn), None)
- if not device:
- return '设备不存在', 404
- return render_template('device_detail.html', device=device, title='设备详情')
- @app.route('/api/get_firmwares')
- @login_required
- def get_firmwares():
- firmwares = load_firmwares()
- return jsonify(firmwares)
- # 云平台相关API
- @app.route('/api/select_platform', methods=['POST'])
- @login_required
- def select_platform():
- platform_id = request.json.get('platform_id')
- if not platform_id:
- return jsonify({'status': 'error', 'message': '请选择云平台'})
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({'status': 'error', 'message': '云平台不存在'})
- platform_d['current_platform'] = platform_id
- save_platforms(platform_d)
- selected_platform = platform_d['platform_l'][platform_id]
- return jsonify({
- 'status': 'success',
- 'platform': {
- 'id': platform_id,
- 'name': selected_platform['name'],
- 'status': selected_platform['status']
- }
- })
- @app.route('/api/toggle_connection', methods=['POST'])
- @login_required
- def toggle_connection():
- platform_id = request.json.get('platform_id')
- if not platform_id:
- return jsonify({'status': 'error', 'message': '请指定云平台'})
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({'status': 'error', 'message': '云平台不存在'})
- current_platform = platform_d['platform_l'][platform_id]
- if current_platform['status'] == 'connected':
- stop_mqtt_client(platform_id)
- new_status = 'disconnected'
- logger.info(f"云平台 {platform_id} 已断开")
- else:
- start_mqtt_client(platform_id)
- new_status = 'connected'
- logger.info(f"云平台 {platform_id} 已连接")
- current_platform['status'] = new_status
- save_platforms(platform_d)
- return jsonify({
- 'status': 'success',
- 'new_status': new_status,
- 'platform_name': current_platform['name']
- })
- @app.route('/api/get_platform_status', methods=['GET'])
- @login_required
- def get_platform_status():
- platform_id = request.args.get('platform_id')
- if not platform_id:
- return jsonify({"status": "error", "message": "未指定云平台ID"}), 400
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({"status": "error", "message": "云平台不存在"}), 404
- return jsonify({
- "status": "success",
- "platform": platform_d['platform_l'][platform_id]
- })
- @app.route('/api/publish_message', methods=['POST'])
- @login_required
- def publish_message():
- data = request.json
- platform_id = data.get('platform_id')
- message = data.get('message')
- if not platform_id or not message:
- return jsonify({"status": "error", "message": "云平台ID和消息内容都不能为空"}), 400
- success = publish_mqtt_message(platform_id, message)
- return jsonify({
- "status": "success" if success else "error",
- "message": "消息发布成功" if success else "消息发布失败"
- })
- # .......................
- def publish_message_dev(platform_id, data, pub_topic):
-
- if not platform_id or not data or not pub_topic:
- # return jsonify({"status": "error", "message": "云平台ID、消息内容、发布主题 都不能为空"}), 400
- logger.error(f"publish_message_dev: platform_id={platform_id}, data={data}, pub_topic={pub_topic}")
- return False
-
- success = publish_mqtt_message(platform_id, data, pub_topic=pub_topic)
- # if not success:
- # logger.error(f"publish_message_dev: platform_id={platform_id}, data={data}, pub_topic={pub_topic}, success={success}")
- return success
- # 定期检查设备状态
- dev_upgrade_dict = {
- 'status' : 'idel',
- 'current_sn' : '',
- 'dev_type' : '2006',
- 'interval' : 1,
- 'bin_data' : None,
- 'bin_len' : 0,
- 'pkg_size' : 512,
- 'pkg_total' : 0,
- 'pkg_cnt':0,
- 'remain':0,
- 'platform_id' : None,
- 'dev_upgrade_sns' : []
- }
- def dev_upgrade_dict_clr():
- dev_upgrade_dict['status'] = 'idel'
- # dev_upgrade_dict['current_sn'] = ''
- # dev_upgrade_dict['dev_type'] = '2006'
- dev_upgrade_dict['interval'] = 1
- # dev_upgrade_dict['bin_data'] = None
- # dev_upgrade_dict['bin_len'] = 0
- # dev_upgrade_dict['pkg_size'] = 512
- # dev_upgrade_dict['pkg_total'] = 0
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = 0
- dev_upgrade_dict['dev_upgrade_sns'] = []
- def dev_upgrade_check():
-
- if dev_upgrade_dict['current_sn'] == '':
- if len(dev_upgrade_dict['dev_upgrade_sns']) > 0:
- dev_upgrade_dict['current_sn'] = dev_upgrade_dict['dev_upgrade_sns'][0]
- else:
- dev_upgrade_dict['current_sn'] = ''
- if dev_upgrade_dict['current_sn'] != '':
- # 此处为状态机,需要完善一下,可用。dev_upgrade_dict['status']
- # 升级 状态
- if dev_upgrade_dict['status'] == 'idel':
- logger.info(f"status = idel,等待升级,{dev_upgrade_dict['current_sn']}")
- dev_upgrade_dict['status'] = 'reqeuest'
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = dev_upgrade_dict['bin_len']
- # 请求 状态
- elif dev_upgrade_dict['status'] == 'reqeuest':
- logger.info(f"status = request,下发升级开始指令,{dev_upgrade_dict['current_sn']}")
- dev_upgrade_dict['status'] = 'upgrade'
- pub_data = frame.get_msg_upgrade_1001(dev_type=dev_upgrade_dict['dev_type'],dev_sn=dev_upgrade_dict['current_sn'])
- pub_topic = f"cpyypt/down/{dev_upgrade_dict['dev_type'].zfill(4)}/{dev_upgrade_dict['current_sn'].zfill(10)}"
- platform_id = load_platforms()['current_platform']
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
- if not success:
- dev_upgrade_dict_clr()
- logger.error(f"dev update failed: platform_id={platform_id}, sn={dev_upgrade_dict['current_sn']}")
- return
- else:
- # time.sleep(2)
- dev_upgrade_dict['status'] = 'upgrade'
- dev_upgrade_dict['interval'] = 0.2
- # 升级 状态
- elif dev_upgrade_dict['status'] == 'upgrade':
- logger.info(f'status = upgrade,升级包发送中,{dev_upgrade_dict["current_sn"]},{dev_upgrade_dict["pkg_cnt"]}/{dev_upgrade_dict["pkg_total"]}')
- if dev_upgrade_dict['remain'] > 0:
- data = dev_upgrade_dict['bin_data'][
- dev_upgrade_dict['pkg_cnt'] * dev_upgrade_dict['pkg_size']: (dev_upgrade_dict['pkg_cnt'] + 1) *
- dev_upgrade_dict['pkg_size']]
- pub_data = frame.get_msg_upgrade_1030(dev_type=dev_upgrade_dict['dev_type'],
- dev_sn=dev_upgrade_dict['current_sn'],
- fileSum=dev_upgrade_dict['bin_len'],
- maxPkgId=dev_upgrade_dict['pkg_total'] - 1,
- curPkgId=dev_upgrade_dict['pkg_cnt'],
- curPkgsize=len(data),
- data=data)
- pub_topic = f"cpyypt/down/{dev_upgrade_dict['dev_type'].zfill(4)}/{dev_upgrade_dict['current_sn'].zfill(10)}"
- platform_id = load_platforms()['current_platform']
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
- if not success:
- dev_upgrade_dict_clr()
- logger.error(f"升级失败,发布MQTT消息失败: platform_id={platform_id}, sn={dev_upgrade_dict['current_sn']}")
- return
- # time.sleep(0.1)
- dev_upgrade_dict['status'] = 'upgrade'
- else:
- dev_upgrade_dict['status'] = 'finish'
- dev_upgrade_dict['pkg_cnt'] += 1
- dev_upgrade_dict['remain'] = dev_upgrade_dict['bin_len'] - dev_upgrade_dict['pkg_cnt'] * dev_upgrade_dict[
- 'pkg_size']
- # 完成 状态
- elif dev_upgrade_dict['status'] == 'finish':
- logger.info(f"status = finish,升级完成,{dev_upgrade_dict['current_sn']}")
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = 0
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['dev_upgrade_sns'].remove(dev_upgrade_dict['current_sn'])
- if len(dev_upgrade_dict['dev_upgrade_sns']) > 0:
- dev_upgrade_dict['current_sn'] = dev_upgrade_dict['dev_upgrade_sns'][0]
- else:
- dev_upgrade_dict['current_sn'] = ''
- if dev_upgrade_dict['current_sn'] == '' and len(dev_upgrade_dict['dev_upgrade_sns']) == 0:
- logger.info('未检测到待升级设备')
- else:
- dev_upgrade_check_timer = threading.Timer(dev_upgrade_dict['interval'], dev_upgrade_check)
- dev_upgrade_check_timer.daemon = True
- dev_upgrade_check_timer.start()
- # 启动定时检查 升级状态
- dev_upgrade_check_timer = None
- def start_dev_upgrade_check():
- global dev_upgrade_check_timer
- if dev_upgrade_check_timer is not None:
- dev_upgrade_check_timer.cancel()
- dev_upgrade_check()
- # 启动定时检查 在线状态
- def dev_online_check():
- devices = load_devices()
- for device in devices:
- if device.get('online',None) == '在线':
- # 检查 在线状态
- online_is_changed = False
- if(int(time.time()) - device.get('realtime',0)) > 10:
- device['online'] = '离线'
- logger.warning(f"设备离线: 云平台:{device['cloud_platform']}, sn={device['dev_sn']}")
- online_is_changed = True
-
- # 检查 速度
- speed_is_changed = False
- dev_sn = device['dev_sn']
- if dev_sn not in device_speed:
- device_speed[dev_sn] = []
-
- if device.get('send_sum',0) != 0: device_speed[dev_sn].append({
- 'svr_run_time':time.time(),
- 'dev_run_time':device['run_time'],
- 'send_sum':device['send_sum'],
- })
- while len(device_speed[dev_sn]) > 11:
- device_speed[dev_sn].pop(0)
- if len(device_speed[dev_sn]) > 2:
- run_time_delta = device_speed[dev_sn][-1]['dev_run_time'] - device_speed[dev_sn][0]['dev_run_time']
- send_sum_delta = device_speed[dev_sn][-1]['send_sum'] - device_speed[dev_sn][0]['send_sum']
- speed = 0 if run_time_delta <= 0 else (send_sum_delta / run_time_delta)
- else:
- speed = 0
- if device.get('speed',0) != speed:
- device['speed'] = speed
- speed_is_changed = True
- if online_is_changed or speed_is_changed:
- # logger.info(f"dev_online_check, dev_sn={dev_sn}, device_speed={device_speed[dev_sn]}")
- save_devices(devices)
-
- threading_timer['dev_online_check_timer'] = threading.Timer(threading_timer['dev_online_check_timeout'], dev_online_check)
- threading_timer['dev_online_check_timer'].daemon = True
- threading_timer['dev_online_check_timer'].start()
- def start_dev_online_check():
- if threading_timer['dev_online_check_timer'] is not None:
- threading_timer['dev_online_check_timer'].cancel()
- dev_online_check()
- # 类似守护进程 所有状态
- def per_second():
- # 定期检查 mqtt状态
- if threading_timer['mqtt_status_check_timer'] is None or not threading_timer['mqtt_status_check_timer'].is_alive():
- logger.warning(f"mqtt_status_check_timer 发现定时器停止了, 需要重新启动")
- start_mqtt_status_check()
-
- # 定期检查 设备在线状态
- if threading_timer['dev_online_check_timer'] is None or not threading_timer['dev_online_check_timer'].is_alive():
- logger.warning(f"dev_online_check_timer 发现定时器停止了, 需要重新启动")
- start_dev_online_check()
- threading_timer['per_second_timer'] = threading.Timer(threading_timer['per_second_timeout'], per_second)
- threading_timer['per_second_timer'].daemon = True
- threading_timer['per_second_timer'].start()
- if __name__ == '__main__':
- logger.info(f"APP_VERSION: {APP_VERSION}")
- init_data()
- # 启动 MQTT连接状态定时检查
- start_mqtt_status_check()
- # 启动 设备在线状态定时检查
- start_dev_online_check()
- # 启动 所有检查
- per_second()
- app.run(host='0.0.0.0', port=9082, debug=False, threaded=True, use_reloader=False) # 启用多线程提高性能
- # app.run(host='0.0.0.0', port=9082, debug=False, threaded=False, use_reloader=False) # 单线程
|