| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434 |
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\Scripts\pip.exe
- # C:\Users\chx_s\AppData\Local\Programs\Python\Python310\python.exe
- from flask import Flask, render_template, request, redirect, url_for, session, jsonify, send_from_directory
- from datetime import datetime
- import os
- import json
- import threading
- import paho.mqtt.client as mqtt
- import logging
- from functools import wraps
- import random
- import math
- import time
- import subprocess
- import platform as sys_platform
- import shutil
- # 自定义
- from pos_manager import frame
- # 应用版本信息
- APP_VERSION = '1.0.11'
- # 应用日志文件路径
- LOG_FILE = 'app.log'
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- logging.FileHandler(LOG_FILE, encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- # 记录应用启动时间
- app_start_time = datetime.now()
- app = Flask(__name__)
- app.secret_key = 'supersecretkey'
- app.config['UPLOAD_FOLDER'] = 'static/uploads'
- app.config['ALLOWED_EXTENSIONS'] = {'bin', 'zip', 'tar', 'rar'}
- app.config['DATA_FOLDER'] = 'data'
- app.config['APP_UPGREADE_FOLDER_TEMP'] = 'app_upgrade_temp'
- # 关键配置:禁用 ASCII 转义,确保中文正常显示
- app.config['JSON_AS_ASCII'] = False
- # 确保目录存在
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- os.makedirs(app.config['DATA_FOLDER'], exist_ok=True)
- # 数据文件路径
- USERS_PATH = os.path.join(app.config['DATA_FOLDER'], 'users.json')
- FIRMWARES_PATH = os.path.join(app.config['DATA_FOLDER'], 'firmwares.json')
- DEVICES_PATH = os.path.join(app.config['DATA_FOLDER'], 'devices.json')
- PLATFORMS_PATH = os.path.join(app.config['DATA_FOLDER'], 'platforms.json')
- # 全局变量
- # MQTT客户端管理
- mqtt_clients = {}
- mqtt_threads = {}
- mqtt_lock = threading.Lock()
- mqtt_sub_topic = [("cpyypt/up/2006/#", 0)]
- mqtt_pub_topic = "cpyypt/down/2006/0000000101"
- @app.route('/favicon.ico')
- def favicon():
- return send_from_directory("static", "favicon.ico", mimetype='image/vnd.microsoft.icon')
- @app.before_request
- def update_last_activity():
- mqtt_last_activity['last_time'] = time.time()
- # logger.info(f"mqtt_last_activity: {mqtt_last_activity}")
- # 获取应用运行时长
- @app.route('/api/run_time')
- def get_run_time():
- global app_start_time
- current_time = datetime.now()
- run_time = current_time - app_start_time
-
- # 计算年、月、日、时、分、秒
- days = run_time.days
- seconds = run_time.seconds
- hours, remainder = divmod(seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
-
- # 简化计算,假设每年365天,每月30天
- years = days // 365
- days_remaining = days % 365
- months = days_remaining // 30
- days = days_remaining % 30
-
- run_time_str = f"{years}年{months}月{days}日 {hours}时{minutes}分{seconds}秒"
- return jsonify({'run_time': run_time_str})
- # 上下文处理器
- @app.context_processor
- def inject_common_data():
- return {
- 'datetime': datetime,
- 'app_version': APP_VERSION
- }
- # 初始化数据
- def init_data():
- # 初始化用户数据
- if not os.path.exists(USERS_PATH):
- with open(USERS_PATH, 'w', encoding='utf-8') as f:
- json.dump({'admin': 'admin'}, f, ensure_ascii=False)
- # 初始化固件数据
- if not os.path.exists(FIRMWARES_PATH):
- init_firmwares = [
- {'id': 1, 'filename': 'pos.bin', 'upload_time': '2025-08-01 13:08:30', 'remark': '初始版本'},
- {'id': 2, 'filename': 'ITSF-POS V2.3.bin', 'upload_time': '2025-08-01 14:10:09', 'remark': '基础功能版'}
- ]
- with open(FIRMWARES_PATH, 'w', encoding='utf-8') as f:
- json.dump(init_firmwares, f, ensure_ascii=False, indent=2)
- # 初始化设备数据
- if not os.path.exists(DEVICES_PATH):
- init_devices = [
- {
- "id": 1,
- "dev_type": "2006",
- "dev_sn": "101",
- "firmware_ver": "pos12.bin",
- "ip": "192.168.1.101",
- "status": "正常",
- "online": "在线",
- "cloud_platform": "阿里云测试",
- "network_type": "WiFi",
- "wifi_ssid": "ssss_5G",
- "wifi_password": "eeeeeeeeee",
- "run_time":1000,
- "receive_sum":12,
- "send_sum":10,
- "abnormal_sum":2,
- "success_rate":83.33,
- "speed":83.33
- },
- {
- "id": 2,
- "dev_type": "2006",
- "dev_sn": "103",
- "firmware_ver": "pos1.bin",
- "ip": "192.168.1.102",
- "status": "正常",
- "online": "离线",
- "cloud_platform": "阿里云测试",
- "network_type": "WiFi",
- "wifi_ssid": "wbjw",
- "wifi_password": "2222",
- "run_time":1000,
- "receive_sum":12,
- "send_sum":10,
- "abnormal_sum":2,
- "success_rate":83.33,
- "speed":83.33
- }
- ]
- with open(DEVICES_PATH, 'w', encoding='utf-8') as f:
- json.dump(init_devices, f, ensure_ascii=False, indent=2)
- # 初始化云平台数据
- if not os.path.exists(PLATFORMS_PATH):
- platform_d = {
- "current_platform": "test_env",
- "platform_l": {
- "aliyun_prod_env": {
- "id": 1,
- "name": "阿里云生产",
- "status": "connected",
- "ip": "mqtt.cpyypt.cn",
- "port": 9000,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "cpyypt",
- "password": "1SvTlvm1VCawSzS"
- },
- "test_env": {
- "id": 2,
- "name": "阿里云测试",
- "status": "disconnected",
- "ip": "test-mqtt.cpyypt.cn",
- "port": 9000,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "admin",
- "password": "houjianwei"
- },
- "localize_a_env": {
- "id": 3,
- "name": "本地化A云平台",
- "status": "disconnected",
- "ip": "mqtt.localize_a_env.com",
- "port": 1883,
- "sub_topic ": mqtt_sub_topic,
- "pub_topic ": mqtt_pub_topic,
- "client_id": f"pos_mng_{random.randint(0, 100)}",
- "username": "localize_a_env_user",
- "password": "localize_a_env_pass"
- }
- }
- }
- # platform_d["platform_l"]["aliyun_prod_env"]["sub_topic"] = [("cpyypt/up/2006/#", 0)]
- # platform_d["platform_l"]["aliyun_prod_env"]["pub_topic"] = [("cpyypt/down/2006/#", 0)]
- # platform_d["platform_l"]["aliyun_prod_env"]["client_id"] = f"pos_mng_{random.randint(0, 100)}"
- with open(PLATFORMS_PATH, 'w', encoding='utf-8') as f:
- json.dump(platform_d, f, ensure_ascii=False, indent=2)
- # 数据加载函数
- def load_users():
- with open(USERS_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- def load_firmwares():
- with open(FIRMWARES_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- load_devices_first = True # 标记第1次加载
- def load_devices():
- if not os.path.exists(DEVICES_PATH):
- logger.info(f"设备配置文件不存在,初始化数据: {DEVICES_PATH}")
- init_data()
- with open(DEVICES_PATH, 'r', encoding='utf-8') as f:
- devices = json.load(f)
- # 程序启动时,强制将所有所有统计信息为0
- global load_devices_first
- if load_devices_first :
- load_devices_first = False # 第1次加载完成,标记为False
- for device in devices:
- device['online'] = '离线'
- device['realtime'] = 0
- device['run_time'] = 0
- device['receive_sum'] = 0
- device['send_sum'] = 0
- device['abnormal_sum'] = 0
- device['success_rate'] = 0
- device['speed'] = 0
-
- # 保存强制更新后的状态
- save_devices(devices)
-
- return devices
-
- load_platforms_first = True # 标记第1次加载
- def load_platforms():
- try:
- if not os.path.exists(PLATFORMS_PATH):
- logger.info(f"云平台配置文件不存在,初始化数据: {PLATFORMS_PATH}")
- init_data()
-
- # logger.info(f"加载云平台配置文件: {PLATFORMS_PATH}")
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f:
- file_content = f.read().strip()
- if not file_content:
- logger.error("云平台配置文件为空")
- # 如果文件为空,重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f_new:
- platform_d = json.load(f_new)
- else:
- try:
- platform_d = json.loads(file_content)
- except json.JSONDecodeError as e:
- logger.error(f"解析云平台配置文件失败: {e}")
- # 尝试重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f_new:
- platform_d = json.load(f_new)
-
- # 程序启动时,强制将所有云平台状态设置为已断开
- global load_platforms_first
- if load_platforms_first and 'platform_l' in platform_d:
- load_platforms_first = False # 第1次加载完成,标记为False
- for platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- platform_d['platform_l'][platform_id]['client_id'] = "pos_mng_%s_%d" % (os.environ['COMPUTERNAME'], random.randint(0, 100))
-
- # 保存强制更新后的状态
- save_platforms(platform_d)
-
- return platform_d
- except Exception as e:
- logger.error(f"加载云平台配置时发生异常: {e}")
- # 尝试返回一个默认值或重新初始化
- init_data()
- with open(PLATFORMS_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- return platform_d
- def load_platforms_c():
- platform_d = load_platforms()
- platform_c = {}
- for platform_id in platform_d['platform_l']:
- platform_c[platform_id] = {}
- platform_c[platform_id]['name'] = platform_d['platform_l'][platform_id]['name']
- platform_c[platform_id]['status'] = platform_d['platform_l'][platform_id]['status']
- platform_c[platform_id]['ip'] = platform_d['platform_l'][platform_id]['ip']
- return platform_c
- # 数据保存函数
- def save_users(users):
- with open(USERS_PATH, 'w', encoding='utf-8') as f:
- json.dump(users, f, ensure_ascii=False, indent=2)
- def save_firmwares(firmwares):
- with open(FIRMWARES_PATH, 'w', encoding='utf-8') as f:
- json.dump(firmwares, f, ensure_ascii=False, indent=2)
- def save_devices(devices):
- with open(DEVICES_PATH, 'w', encoding='utf-8') as f:
- json.dump(devices, f, ensure_ascii=False, indent=2)
- def save_platforms(platform_d):
- with open(PLATFORMS_PATH, 'w', encoding='utf-8') as f:
- json.dump(platform_d, f, ensure_ascii=False, indent=2)
- # MQTT回调函数
- def on_connect(client, userdata, flags, rc):
- platform_id = userdata['platform_id']
- logger.info(f"MQTT连接成功 - 云平台: {platform_id}, 结果代码: {rc}")
- pub_topic = userdata['pub_topic']
- client.subscribe(mqtt_sub_topic)
- logger.info(f"已订阅主题: {mqtt_sub_topic}")
- with mqtt_lock:
- platform_d = load_platforms()
- if platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'connected'
- save_platforms(platform_d)
- def on_disconnect(client, userdata, rc):
- platform_id = userdata['platform_id']
- logger.info(f"MQTT断开连接 - 云平台: {platform_id}, 结果代码: {rc}")
- with mqtt_lock:
- platform_d = load_platforms()
- if platform_id in platform_d['platform_l']:
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- save_platforms(platform_d)
- def on_message(client, userdata, msg):
- platform_id = userdata['platform_id']
- try:
- payload_str = msg.payload.decode('gb2312')
- except UnicodeDecodeError:
- try:
- payload_str = msg.payload.decode('utf-8')
- except UnicodeDecodeError:
- payload_str = msg.payload.hex().upper()
- # logger.info(f"收到消息 - 云平台: {platform_id}, 主题: {msg.topic}, 内容: {payload_str}")
- # 如果长时间未操作,自动断开MQTT连接
- platform_d = load_platforms()
- # """
- get_platform_status = next((value['status'] for key, value in platform_d['platform_l'].items() if key == platform_id), None)
- if get_platform_status == 'connected':
- mqtt_disconnect_time_out = 5*60
- if time.time() - mqtt_last_activity['last_time'] > mqtt_disconnect_time_out: # 5分钟无操作
- logger.info(f"云平台 {platform_id} 长时间未操作,超过{mqtt_disconnect_time_out}秒,自动断开连接")
- platform_d['platform_l'][platform_id]['status'] = 'disconnected'
- stop_mqtt_client(platform_id)
- logger.info(f"云平台 {platform_id} 自动断开连接,已完成")
- return
- # """
-
- dev_type = msg.topic.split("/")[-2]
- dev_sn = msg.topic.split("/")[-1].lstrip('0')
- devices = load_devices()
- device = next((item for item in devices if item['dev_sn'] == dev_sn), None)
- if device:
- if(device['online'] != '在线'):
- logger.info(f"云平台:{platform_id}, 设备 {dev_sn} 上线")
- device['online'] = '在线'
- device['realtime'] = int(time.time())
- platform_name = next((value['name'] for key, value in platform_d['platform_l'].items() if key == platform_id), None)
- device['cloud_platform'] = platform_name
- save_devices(devices)
-
- if type(msg.payload)== bytes and len(msg.payload) > 4 and payload_str[0:4] == "FEFE":
- # if type(msg.payload)== bytes and len(msg.payload) > 4 and msg.payload[0:2] == 0xFEFE:
- try:
- header, body = frame.parse_data(client, msg.payload, msg.topic)
- if len(header) > 0 and len(body) > 0:
- pro_ver = header.get('pro_ver', None)
- msg_type1 = header.get('msg_type1', None)
- msg_type2 = header.get('msg_type2', None)
- r_body = {}
- r_body['master_type'] = body.get('master_type', None)
- r_body['master_sn'] = body.get('master_sn', None)
- r_body['app_version'] = body.get('app_version', None)
- r_body['reset_times'] = body.get('reset_times', None)
- r_body['last_reset_type'] = body.get('last_reset_type', None)
- r_body['uuid'] = body.get('uuid', None)
- r_body['run_time'] = body.get('run_time', None)
- r_body['receive_sum'] = body.get('receive_sum', None)
- r_body['send_sum'] = body.get('send_sum', None)
- r_body['network_type'] = body.get('network_type', None)
- r_body['cloud_platform'] = body.get('cloud_platform', None)
- r_body['sim_status'] = body.get('sim_status', None)
- r_body['free_fifo'] = body.get('free_fifo', None)
- r_body['ip'] = body.get('ip', None)
- # 至少有1个不为None,则继续
- if any(r_body.values()):
- # 先整理一下字段
- if r_body['network_type'] is not None:
- if r_body['network_type'] == 0:
- r_body['network_type'] = 'WiFi'
- elif r_body['network_type'] == 1:
- r_body['network_type'] = '有线网'
- else:
- r_body['network_type'] = '未知'
- if r_body['cloud_platform'] is not None:
- if r_body['cloud_platform'] == 0:
- r_body['cloud_platform'] = '阿里云生产'
- elif r_body['cloud_platform'] == 1:
- r_body['cloud_platform'] = '阿里云测试'
- else:
- r_body['cloud_platform'] = '未知'
- # 有变化,才更新
- is_changed = False
- for k, v in r_body.items():
- if v is not None:
- if k not in device or device[k] != v:
- is_changed = True
- device[k] = v
- if is_changed:
- logger.info(f"云平台:{platform_id}, 设备 {dev_sn} 接收到MQTT消息,状态更新")
- save_devices(devices)
- except Exception as e:
- logger.error(f"解析数据失败 - 云平台: {platform_id}, 错误: {str(e)}")
- # MQTT客户端线程
- def mqtt_client_thread(platform_id):
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- logger.error(f"云平台 {platform_id} 不存在")
- return
- platform = platform_d['platform_l'][platform_id]
- client = mqtt.Client(client_id=platform['client_id'])
- client.username_pw_set(platform['username'], platform['password'])
-
- # platform['pub_topic'],
- # platform['sub_topic']
- client.user_data_set({
- 'platform_id': platform_id,
- 'pub_topic': mqtt_pub_topic,
- 'sub_topic': mqtt_sub_topic
- })
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- with mqtt_lock:
- mqtt_clients[platform_id] = client
- try:
- client.connect(platform['ip'], platform['port'], 60)
- client.loop_forever()
- except Exception as e:
- logger.error(f"MQTT线程错误 - 云平台: {platform_id}, 错误: {str(e)}")
- finally:
- # 确保客户端已断开连接
- try:
- client.disconnect()
- except Exception as e:
- logger.error(f"MQTT线程结束时断开连接失败 - 云平台: {platform_id}, 错误: {str(e)}")
-
- with mqtt_lock:
- if platform_id in mqtt_clients:
- del mqtt_clients[platform_id]
- if platform_id in mqtt_threads:
- del mqtt_threads[platform_id]
- try:
- client.disconnect()
- except:
- pass
- # MQTT控制函数
- def start_mqtt_client(platform_id):
- with mqtt_lock:
- if platform_id in mqtt_threads and mqtt_threads[platform_id].is_alive():
- logger.info(f"MQTT客户端已运行 - 云平台: {platform_id}")
- return True
- thread = threading.Thread(target=mqtt_client_thread, args=(platform_id,), daemon=True)
- with mqtt_lock:
- mqtt_threads[platform_id] = thread
- thread.start()
- logger.info(f"启动MQTT客户端线程 - 云平台: {platform_id}")
- return True
- mqtt_last_activity = {'last_time':time.time()}
- # 定期检查所有MQTT连接状态
- def check_all_mqtt_connections():
- global mqtt_status_check_timer
- try:
- with mqtt_lock:
- platform_d = load_platforms()
- if 'platform_l' not in platform_d:
- return
- for platform_id in platform_d['platform_l']:
- # 检查MQTT线程是否存活
- # is_connected = platform_id in mqtt_threads and mqtt_threads[platform_id].is_alive()
- is_connected = platform_id in mqtt_clients and mqtt_clients[platform_id].is_connected()
- current_status = platform_d['platform_l'][platform_id].get('status', '')
- # logger.info(f"检查云平台状态 - 云平台: {platform_id}, 当前状态: {current_status}, 是否连接: {is_connected}")
- # 如果状态不一致,则更新
- if (is_connected and current_status != 'connected') or (not is_connected and current_status != 'disconnected'):
- logger.info(f"参数:platform_id={platform_id}, is_connected={is_connected}, current_status ={current_status}")
- platform_d['platform_l'][platform_id]['status'] = 'connected' if is_connected else 'disconnected'
- logger.info(f"更新云平台状态 - 云平台: {platform_id}, 新状态: {platform_d['platform_l'][platform_id]['status']}")
- save_platforms(platform_d)
- except Exception as e:
- logger.error(f"检查MQTT连接状态出错: {str(e)}")
- finally:
- # 设置下一次检查(2秒后)
- mqtt_status_check_timer = threading.Timer(2, check_all_mqtt_connections)
- mqtt_status_check_timer.daemon = True
- mqtt_status_check_timer.start()
- # 启动定时检查
- mqtt_status_check_timer = None
- def start_mqtt_status_check():
- global mqtt_status_check_timer
- if mqtt_status_check_timer is not None:
- mqtt_status_check_timer.cancel()
- check_all_mqtt_connections()
- def stop_mqtt_client(platform_id):
- logger.info(f"断开MQTT连接 - 云平台: {platform_id}")
- client = None
- # 先获取客户端引用,尽量缩短锁的持有时间
- with mqtt_lock:
- if platform_id in mqtt_clients:
- client = mqtt_clients[platform_id]
- # 从字典中移除,避免其他线程访问
- del mqtt_clients[platform_id]
- # 在锁外执行disconnect,避免死锁
- if client:
- try:
- client.disconnect()
- logger.info(f"断开MQTT连接 - 云平台: {platform_id}")
- except Exception as e:
- logger.error(f"断开MQTT连接失败 - 云平台: {platform_id}, 错误: {str(e)}")
- with mqtt_lock:
- if platform_id in mqtt_threads:
- logger.info(f"等待MQTT线程结束 - 云平台: {platform_id}")
- return True
- def publish_mqtt_message(platform_id, message, pub_topic=mqtt_pub_topic):
- with mqtt_lock:
- if platform_id not in mqtt_clients:
- logger.error(f"MQTT客户端未连接 - 云平台: {platform_id}")
- return False
- client = mqtt_clients[platform_id]
- platform_d = load_platforms()
- # platform = platform_d['platform_l'].get(platform_id, {})
- # pub_topic = mqtt_pub_topic # platform.get('pub_topic', '/device/data')
- try:
- result = client.publish(pub_topic, message, qos=0)
- result.wait_for_publish()
- if(type(message) == str):
- logger.info(f"发布消息成功 - 云平台: {platform_id}, 主题: {pub_topic}, 消息: {message}")
- else:
- logger.info(f"发布消息成功 - 云平台: {platform_id}, 主题: {pub_topic}, 消息: {message.hex().upper()}")
- return True
- except Exception as e:
- logger.error(f"发布消息失败 - 云平台: {platform_id}, 错误: {str(e)}")
- return False
- # 辅助函数
- def allowed_file(filename):
- return '.' in filename and \
- filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
- # 登录验证装饰器
- def login_required(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
- if 'logged_in' not in session:
- return redirect(url_for('login', next=request.url))
- return f(*args, **kwargs)
- return decorated_function
- # 路由
- @app.route('/')
- @login_required
- def index():
- platform_d = load_platforms()
- return render_template(
- 'index.html',
- title='主页',
- # cccc platform_l=platform_d['platform_l'],
- platform_c=load_platforms_c(),
- current_platform=platform_d['current_platform']
- )
- @app.route('/api/device_detail/<dev_sn>')
- @login_required
- def get_device_detail(dev_sn):
- devices = load_devices()
- device = next((d for d in devices if d['dev_sn'] == dev_sn), None)
- if not device:
- return jsonify({'error': '设备不存在'}), 404
- device['abnormal_sum'] = int(device['receive_sum']) - int(device['send_sum'])
- device['success_rate'] = 0 if int(device['receive_sum']) == 0 else int(device['send_sum']) / int(device['receive_sum'])
- device['speed'] = 0 if int(device['run_time']) == 0 else int(device['send_sum']) / int(device['run_time'] * 100)
- return jsonify(device)
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'POST':
- username = request.form['username']
- password = request.form['password']
- users = load_users()
- if username in users and users[username] == password:
- session['logged_in'] = True
- session['username'] = username
- next_page = request.args.get('next', url_for('index'))
- return redirect(next_page)
- else:
- error = '用户名或密码错误'
- return render_template('login.html', error=error)
- return render_template('login.html', title='用户登录')
- @app.route('/logout')
- def logout():
- session.pop('logged_in', None)
- session.pop('username', None)
- return redirect(url_for('login'))
- @app.route('/app_manage')
- @login_required
- def app_manage():
- logger.info("进入应用管理页面")
- return render_template('app_manage.html', title='应用管理')
- # 应用管理相关API
- @app.route('/api/restart_app', methods=['POST'])
- @login_required
- def restart_app():
- try:
- logger.info("接收到重启应用请求")
- # 在实际应用中,这里需要实现安全的重启逻辑
- # 这里只是模拟重启
- time.sleep(2) # 模拟重启过程
- s_p = sys_platform.system().upper()
- if s_p == 'WINDOWS':
- os.system('python restart_app.py')
- elif s_p == 'LINUX':
- os.system('./sh_web_app_2006_Svr.sh restart')
- return jsonify({"status": "restarting"}), 200
- # logger.info("应用重启成功")
- # return jsonify({'success': True, 'message': '应用重启成功,请等待服务重新启动...'})
- except Exception as e:
- logger.error(f"应用重启失败: {str(e)}")
- return jsonify({'success': False, 'message': f'重启失败: {str(e)}'})
- @app.route('/api/update_app', methods=['POST'])
- @login_required
- def update_app():
- try:
- logger.info("接收到更新应用请求")
- if 'app_file' not in request.files:
- return jsonify({'success': False, 'message': '没有文件被上传'})
- file = request.files['app_file']
- if file.filename == '':
- return jsonify({'success': False, 'message': '没有选择文件'})
- # 检查文件类型
- if not (file.filename.endswith('.py') or (file.filename.endswith('.html') or file.filename.endswith('.zip'))):
- return jsonify({'success': False, 'message': '只支持Python文件(.py)、HTML文件(.html)或压缩包(.zip)'})
- # 保存文件到临时目录
- temp_dir = app.config['APP_UPGREADE_FOLDER_TEMP']
- os.makedirs(temp_dir, exist_ok=True)
- bk_dir = 'app_upgrade_bk'
- os.makedirs(bk_dir, exist_ok=True)
-
- if file.filename == 'web_app_2006.py':
- shutil.copy2(file.filename, os.path.join(bk_dir, file.filename+f'_{APP_VERSION}_bk_{time.time()}'))
- file.save(file.filename)
- elif file.filename == 'frame.py':
- shutil.copy2(os.path.join('pos_manager', file.filename), os.path.join(bk_dir, file.filename+f'_bk_{time.time()}'))
- file.save(os.path.join('pos_manager', file.filename))
- elif file.filename.endswith('.html') :
- shutil.copy2(os.path.join('templates', file.filename), os.path.join(bk_dir, file.filename+f'_bk_{time.time()}'))
- file.save(os.path.join('templates', file.filename))
- elif file.filename.endswith('.zip') :
- file.save(os.path.join(temp_dir, file.filename))
-
- '''
- # 解压文件
- with zipfile.ZipFile(os.path.join(temp_dir, file.filename), 'r') as zip_ref:
- zip_ref.extractall(temp_dir)
- # 移动解压后的文件到目标目录
- for item in os.listdir(temp_dir):
- s = os.path.join(temp_dir, item)
- d = os.path.join(app.config['APP_UPGREADE_FOLDER'], item)
- if os.path.isdir(s):
- shutil.move(s, d)
- else:
- shutil.copy2(s, d)
- # 删除临时文件
- os.remove(os.path.join(temp_dir, file.filename))
- '''
- # 在实际应用中,这里需要实现安全的更新逻辑
- # 例如验证文件完整性、备份原有文件、应用更新等
- logger.info(f"文件 {file.filename} 上传成功")
- return jsonify({'success': True, 'message': f'文件上传成功,重启后生效. {file.filename}, {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'})
- except Exception as e:
- logger.error(f"应用更新失败: {str(e)}")
- return jsonify({'success': False, 'message': f'更新失败: {str(e)}'})
- @app.route('/api/get_logs')
- @login_required
- def get_logs():
- try:
- lines = request.args.get('lines', '50')
- # logger.info(f"请求获取日志,行数: {lines}")
- get_log_file = LOG_FILE
- s_p = sys_platform.system().upper()
- '''
- if s_p == 'WINDOWS':
- get_log_file = LOG_FILE
- elif s_p == 'LINUX':
- get_log_file = 'log.log'
- '''
- # 检查日志文件是否存在
- if not os.path.exists(get_log_file):
- return f"日志文件 {get_log_file} 不存在"
- # 读取日志文件
- with open(get_log_file, 'r', encoding='utf-8') as f:
- if lines == 'all':
- logs = ''.join(f.readlines()[-500:]) # 最大500行
- else:
- try:
- line_count = int(lines)
- logs = ''.join(f.readlines()[-line_count:])
- except ValueError:
- logs = ''.join(f.readlines()[-50:]) # 默认50行
- return logs
- except Exception as e:
- logger.error(f"获取日志失败: {str(e)}")
- return f"获取日志失败: {str(e)}"
- @app.route('/firmware', methods=['GET', 'POST'])
- @login_required
- def firmware():
- firmwares = load_firmwares()
- if request.method == 'POST':
- if 'file' not in request.files:
- error = '未选择文件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- file = request.files['file']
-
- remark = request.form.get('remark', '').strip()
- if file.filename == '':
- error = '未选择文件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if not remark:
- error = '备注信息不能为空'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if file and allowed_file(file.filename):
- filename = file.filename
-
- # 检查文件名是否重复
- if any(f['filename'] == filename for f in firmwares):
- error = f'文件名 "{filename}" 已存在,请更改文件名后再上传'
- return render_template('firmware.html', firmwares=firmwares, error=error)
-
- # 检查文件大小
- file.seek(0, os.SEEK_END) # 移动指针到文件末尾
- file_size = file.tell() # 获取指针位置(即文件大小)
- file.seek(0) # 重置指针到文件开头
- # 打印文件大小
- logger.info(f"文件名: {filename}, 文件大小: {file_size} 字节")
- if file_size > 1 * 1024 * 1024: # 1MB
- error = f'文件大小不能超过1MB, 当前文件大小: {file_size} 字节'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- if file_size < 1 * 1024: # 1KB
- error = f'文件大小不能小于1KB, 当前文件大小: {file_size} 字节'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
- new_id = max(f['id'] for f in firmwares) + 1 if firmwares else 1
- firmwares.append({
- 'id': new_id,
- 'filename': filename,
- 'filesize': '%d 字节' % file_size,
- 'upload_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- 'remark': remark
- })
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理编辑操作
- edit_id = request.args.get('edit')
- if edit_id:
- new_remark = request.args.get('remark', '').strip()
- if new_remark:
- for f in firmwares:
- if f['id'] == int(edit_id):
- f['remark'] = new_remark
- break
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理删除操作
- delete_id = request.args.get('delete')
- if delete_id:
- firmwares = [f for f in firmwares if f['id'] != int(delete_id)]
- save_firmwares(firmwares)
- return redirect(url_for('firmware'))
- # 处理下载操作
- download_id = request.args.get('download')
- if download_id:
- firmware = next((f for f in firmwares if f['id'] == int(download_id)), None)
- if firmware:
- filename = firmware['filename']
- file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
- if os.path.exists(file_path):
- return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
- else:
- error = f'文件 "{filename}" 不存在'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- else:
- error = f'未找到ID为 {download_id} 的固件'
- return render_template('firmware.html', firmwares=firmwares, error=error)
- return render_template('firmware.html', firmwares=firmwares, title='固件包管理')
- @app.route('/devices', methods=['GET', 'POST'])
- @login_required
- def devices():
- devices = load_devices()
- platform_l = load_platforms()['platform_l']
- firmwares = load_firmwares()
- if request.method == 'POST':
- # 处理设备配置保存
- if 'save_config' in request.form:
- config_type = request.form['config_type']
- dev_sn = request.form['dev_sn']
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- if config_type == 'all':
- # 处理保存所有配置
- target_device_wifi_ssid = request.form.get('ssid', '')
- target_device_wifi_password = request.form.get('wifi_password', '')
- target_device_cloud_platform = request.form.get('cloud-platform', '')
- target_device_network_type = request.form.get('network-type', '')
- log_msg = f"{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)}:"
- log_msg = log_msg + f"WiFi参数:{target_device_wifi_ssid},{target_device_wifi_password};"
- log_msg = log_msg + f"云平台参数:{target_device_cloud_platform};"
- log_msg = log_msg + f"网络类型参数:{target_device_network_type}"
-
- logger.info(log_msg)
- logger.info("TODO: 发送保存所有配置到设备")
- elif config_type == 'wifi':
- target_device['wifi_ssid'] = request.form.get('ssid', '')
- target_device['wifi_password'] = request.form.get('wifi_password', '')
- log_msg = f"WiFi参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['wifi_ssid']},{target_device['wifi_password']}"
- logger.info(log_msg)
- logger.info("TODO: 发送WiFi配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_wifi(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
-
- elif config_type == 'platform':
- log_msg = f"云平台参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['cloud_platform']}"
- logger.info(log_msg)
- logger.info("TODO: 发送云平台配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_platform(device=target_device,data=request.form.get('cloud-platform', ''))
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
- target_device['cloud_platform'] = request.form.get('cloud-platform', '')
- elif config_type == 'network':
- target_device['network_type'] = request.form.get('network-type', '')
- log_msg = f"网络类型参数:{target_device['dev_type'].zfill(4)}-{target_device['dev_sn'].zfill(10)},{target_device['network_type']}"
- logger.info(log_msg)
- logger.info("TODO: 发送网络类型配置到设备")
-
- # platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == target_device['cloud_platform']), None)
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_config_network(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success: return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=False,
- message=f"消息发布失败,MQTT客户端未连接 -- {platform_id},请返回主页 【连接】!"
- )
- save_devices(devices)
- return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- success=True,
- message="操作成功,配置已下发!"
- )
- # 处理批量操作
- action = request.form.get('action')
- selected_sns = [dev_sn for dev_sn in request.form.getlist('dev_sn')]
- if action == 'restart':
- # 实际应用中这里会发送重启命令到设备
- logger.info(f"批量重启设备: {selected_sns}")
-
- for dev_sn in selected_sns:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- platform_id = load_platforms()['current_platform']
- pub_data = frame.get_msg_restart(device=target_device,data="")
- pub_topic = f"cpyypt/down/{target_device['dev_type'].zfill(4)}/{target_device['dev_sn'].zfill(10)}"
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
-
- if not success:
- logger.error(f"消息发布失败,MQTT客户端未连接 -- {load_platforms_c()[platform_id]['name']},请返回主页 【连接】!")
-
- else:
- logger.info(f"消息发布成功 -- dev_sn:{dev_sn}")
- elif action == 'upgrade':
- firmware_id = request.form.get('firmware_id')
- firmware = next((fm for fm in firmwares if fm['id'] == int(firmware_id)), None)
- logger.info(f"批量升级设备 {selected_sns} 到固件 {firmware['filename']}")
-
- if not firmware:
- return jsonify({'status': 'error', 'message': '固件信息不存在'})
-
-
- # 获取文件大小
- bin_file=os.path.join(app.config['UPLOAD_FOLDER'], firmware['filename'])
- try:
- file_size = os.path.getsize(bin_file)
- logger.info(f"文件大小: {file_size} 字节")
- except FileNotFoundError:
- logger.error(f"错误: 文件 '{bin_file}' 不存在")
- return jsonify({'status': 'error', 'message': f'文件{bin_file}名不存在'})
- # 找到第1个被升级设备
- device = next((device for device in devices if device['dev_sn'] == selected_sns[0]), None)
- dev_platform = device.get('cloud_platform', '')
- platform_id = next((platform_id for platform_id in platform_l if platform_l[platform_id]['name'] == dev_platform), None)
- with open(bin_file, 'rb') as f:
- bin_data = f.read()
-
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['current_sn'] = ''
- dev_upgrade_dict['dev_type'] = device.get('dev_type', '2006')
- dev_upgrade_dict['dev_upgrade_sns'] = selected_sns
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['bin_data'] = bin_data
- dev_upgrade_dict['bin_len'] = file_size
- dev_upgrade_dict['pkg_size'] = 512
- dev_upgrade_dict['pkg_total'] = math.ceil(dev_upgrade_dict['bin_len'] / dev_upgrade_dict['pkg_size']) # 向上取整
- dev_upgrade_dict['pkg_cnt'] = 0
-
- dev_upgrade_dict['platform_id'] = platform_id
- start_dev_upgrade_check()
- return jsonify({'status': 'success', 'message': '升级任务已启动'})
- elif action == 'config':
- logger.info(f"批量配置设备: {selected_sns}")
- # 显示设备配置页面
- dev_sn = request.args.get('dev_sn')
- if dev_sn:
- target_device = next((device for device in devices if device['dev_sn'] == dev_sn), None)
- if target_device:
- return render_template(
- 'device_config.html',
- device=target_device,
- # cccc platform_l=platform_l.values(),
- platform_c=load_platforms_c().values(),
- title='设备配置'
- )
- return render_template('devices.html', devices=devices, title='解密机配置')
- @app.route('/device_detail/<dev_sn>')
- @login_required
- def device_detail(dev_sn):
- devices = load_devices()
- device = next((d for d in devices if d['dev_sn'] == dev_sn), None)
- if not device:
- return '设备不存在', 404
- return render_template('device_detail.html', device=device, title='设备详情')
- @app.route('/api/get_firmwares')
- @login_required
- def get_firmwares():
- firmwares = load_firmwares()
- return jsonify(firmwares)
- # 云平台相关API
- @app.route('/api/select_platform', methods=['POST'])
- @login_required
- def select_platform():
- platform_id = request.json.get('platform_id')
- if not platform_id:
- return jsonify({'status': 'error', 'message': '请选择云平台'})
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({'status': 'error', 'message': '云平台不存在'})
- platform_d['current_platform'] = platform_id
- save_platforms(platform_d)
- selected_platform = platform_d['platform_l'][platform_id]
- return jsonify({
- 'status': 'success',
- 'platform': {
- 'id': platform_id,
- 'name': selected_platform['name'],
- 'status': selected_platform['status']
- }
- })
- @app.route('/api/toggle_connection', methods=['POST'])
- @login_required
- def toggle_connection():
- platform_id = request.json.get('platform_id')
- if not platform_id:
- return jsonify({'status': 'error', 'message': '请指定云平台'})
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({'status': 'error', 'message': '云平台不存在'})
- current_platform = platform_d['platform_l'][platform_id]
- if current_platform['status'] == 'connected':
- stop_mqtt_client(platform_id)
- new_status = 'disconnected'
- logger.info(f"云平台 {platform_id} 已断开")
- else:
- start_mqtt_client(platform_id)
- new_status = 'connected'
- logger.info(f"云平台 {platform_id} 已连接")
- current_platform['status'] = new_status
- save_platforms(platform_d)
- return jsonify({
- 'status': 'success',
- 'new_status': new_status,
- 'platform_name': current_platform['name']
- })
- @app.route('/api/get_platform_status', methods=['GET'])
- @login_required
- def get_platform_status():
- platform_id = request.args.get('platform_id')
- if not platform_id:
- return jsonify({"status": "error", "message": "未指定云平台ID"}), 400
- platform_d = load_platforms()
- if platform_id not in platform_d['platform_l']:
- return jsonify({"status": "error", "message": "云平台不存在"}), 404
- return jsonify({
- "status": "success",
- "platform": platform_d['platform_l'][platform_id]
- })
- @app.route('/api/publish_message', methods=['POST'])
- @login_required
- def publish_message():
- data = request.json
- platform_id = data.get('platform_id')
- message = data.get('message')
- if not platform_id or not message:
- return jsonify({"status": "error", "message": "云平台ID和消息内容都不能为空"}), 400
- success = publish_mqtt_message(platform_id, message)
- return jsonify({
- "status": "success" if success else "error",
- "message": "消息发布成功" if success else "消息发布失败"
- })
- # .......................
- def publish_message_dev(platform_id, data, pub_topic):
-
- if not platform_id or not data or not pub_topic:
- # return jsonify({"status": "error", "message": "云平台ID、消息内容、发布主题 都不能为空"}), 400
- logger.error(f"publish_message_dev: platform_id={platform_id}, data={data}, pub_topic={pub_topic}")
- return False
-
- success = publish_mqtt_message(platform_id, data, pub_topic=pub_topic)
- # if not success:
- # logger.error(f"publish_message_dev: platform_id={platform_id}, data={data}, pub_topic={pub_topic}, success={success}")
- return success
- # 定期检查设备状态
- dev_upgrade_dict = {
- 'status' : 'idel',
- 'current_sn' : '',
- 'dev_type' : '2006',
- 'interval' : 1,
- 'bin_data' : None,
- 'bin_len' : 0,
- 'pkg_size' : 512,
- 'pkg_total' : 0,
- 'pkg_cnt':0,
- 'remain':0,
- 'platform_id' : None,
- 'dev_upgrade_sns' : []
- }
- def dev_upgrade_dict_clr():
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['current_sn'] = ''
- dev_upgrade_dict['dev_type'] = '2006'
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['bin_data'] = None
- dev_upgrade_dict['bin_len'] = 0
- dev_upgrade_dict['pkg_size'] = 512
- dev_upgrade_dict['pkg_total'] = 0
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = 0
- dev_upgrade_dict['dev_upgrade_sns'] = []
- def dev_upgrade_check():
- devices = load_devices()
- if dev_upgrade_dict['current_sn'] == '':
- if len(dev_upgrade_dict['dev_upgrade_sns']) > 0:
- dev_upgrade_dict['current_sn'] = dev_upgrade_dict['dev_upgrade_sns'][0]
- else:
- dev_upgrade_dict['current_sn'] = ''
- else:
- device = next((device for device in devices if device['dev_sn'] == dev_upgrade_dict['current_sn']), None)
- # 此处为状态机,需要完善一下,可用。dev_upgrade_dict['status']
- # 升级 状态
- if dev_upgrade_dict['status'] == 'idel':
- logger.info(f'idel,do somethime,{device["dev_sn"]}')
- dev_upgrade_dict['status'] = 'reqeuest'
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = dev_upgrade_dict['bin_len']
- # 请求 状态
- elif dev_upgrade_dict['status'] == 'reqeuest':
- logger.info(f'request,do somethime,{device["dev_sn"]}')
- dev_upgrade_dict['status'] = 'upgrade'
- pub_data = frame.get_msg_upgrade_1001(device=device)
- pub_topic = f"cpyypt/down/{device['dev_type'].zfill(4)}/{device['dev_sn'].zfill(10)}"
- platform_id = load_platforms()['current_platform']
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
- if not success:
- dev_upgrade_dict_clr()
- logger.error(f"dev update failed: platform_id={device['cloud_platform']}, sn={device['dev_sn']}")
- return
- else:
- # time.sleep(2)
- dev_upgrade_dict['status'] = 'upgrade'
- dev_upgrade_dict['interval'] = 0.1
- # 升级 状态
- elif dev_upgrade_dict['status'] == 'upgrade':
- logger.info(f'upgrade,do somethime,{device["dev_sn"]},{dev_upgrade_dict["pkg_cnt"]}/{dev_upgrade_dict["pkg_total"]}')
- if dev_upgrade_dict['remain'] > 0:
- data = dev_upgrade_dict['bin_data'][
- dev_upgrade_dict['pkg_cnt'] * dev_upgrade_dict['pkg_size']: (dev_upgrade_dict['pkg_cnt'] + 1) *
- dev_upgrade_dict['pkg_size']]
- pub_data = frame.get_msg_upgrade_1030(device=device,
- fileSum=dev_upgrade_dict['bin_len'],
- maxPkgId=dev_upgrade_dict['pkg_total'] - 1,
- curPkgId=dev_upgrade_dict['pkg_cnt'],
- curPkgsize=len(data),
- data=data)
- pub_topic = f"cpyypt/down/{device['dev_type'].zfill(4)}/{device['dev_sn'].zfill(10)}"
- platform_id = load_platforms()['current_platform']
- success = publish_message_dev(platform_id=platform_id, data=pub_data, pub_topic=pub_topic)
- if not success:
- dev_upgrade_dict_clr()
- logger.error(f"dev update failed: platform_id={device['cloud_platform']}, sn={device['dev_sn']}")
- return
- # time.sleep(0.1)
- dev_upgrade_dict['status'] = 'upgrade'
- else:
- dev_upgrade_dict['status'] = 'finish'
- dev_upgrade_dict['pkg_cnt'] += 1
- dev_upgrade_dict['remain'] = dev_upgrade_dict['bin_len'] - dev_upgrade_dict['pkg_cnt'] * dev_upgrade_dict[
- 'pkg_size']
- # 完成 状态
- elif dev_upgrade_dict['status'] == 'finish':
- logger.info(f'finish,do somethime,{device["dev_sn"]}')
- dev_upgrade_dict['status'] = 'idel'
- dev_upgrade_dict['pkg_cnt'] = 0
- dev_upgrade_dict['remain'] = 0
- dev_upgrade_dict['interval'] = 1
- dev_upgrade_dict['dev_upgrade_sns'].remove(dev_upgrade_dict['current_sn'])
- if len(dev_upgrade_dict['dev_upgrade_sns']) > 0:
- dev_upgrade_dict['current_sn'] = dev_upgrade_dict['dev_upgrade_sns'][0]
- else:
- dev_upgrade_dict['current_sn'] = ''
- save_devices(devices)
- if dev_upgrade_dict['current_sn'] == '' and len(dev_upgrade_dict['dev_upgrade_sns']) == 0:
- logger.info('dev_upgrade_check exit')
- else:
- dev_upgrade_check_timer = threading.Timer(dev_upgrade_dict['interval'], dev_upgrade_check)
- dev_upgrade_check_timer.daemon = True
- dev_upgrade_check_timer.start()
- # 启动定时检查 升级状态
- dev_upgrade_check_timer = None
- def start_dev_upgrade_check():
- global dev_upgrade_check_timer
- if dev_upgrade_check_timer is not None:
- dev_upgrade_check_timer.cancel()
- dev_upgrade_check()
- # 启动定时检查 在线状态
- dev_online_check_timer = None
- def dev_online_check():
- devices = load_devices()
- for device in devices:
- if device['online'] == '在线':
- if(int(time.time()) - device.get('realtime',0)) > 10:
- device['online'] = '离线'
- logger.warning(f"设备离线: 云平台:{device['cloud_platform']}, sn={device['dev_sn']}")
- dev_online_check_timer = threading.Timer(5, dev_online_check)
- dev_online_check_timer.daemon = True
- dev_online_check_timer.start()
- save_devices(devices)
- def start_dev_online_check():
- global dev_online_check_timer
- if dev_online_check_timer is not None:
- dev_online_check_timer.cancel()
- dev_online_check()
- # 启动定时检查 长时间未有客户端登录,MQTT连接自动断开
- mqtt_disconnect_check_timer = None
- def mqtt_disconnect_check():
- logger.info(f"mqtt_last_activity: {mqtt_last_activity}")
- def start_mqtt_disconnect_check():
- global mqtt_disconnect_check_timer
- if mqtt_disconnect_check_timer is not None:
- mqtt_disconnect_check_timer.cancel()
- mqtt_disconnect_check()
- if __name__ == '__main__':
- logger.info(f"APP_VERSION: {APP_VERSION}")
- init_data()
- # 启动 MQTT连接状态定时检查
- start_mqtt_status_check()
- # 启动 设备状态定时检查
- # start_dev_upgrade_check()
- # 启动 设备在线状态定时检查
- start_dev_online_check()
- # 启动 MQTT自动断开检查
- # start_mqtt_disconnect_check()
- app.run(host='0.0.0.0', port=9082, debug=False, threaded=True, use_reloader=False) # 启用多线程提高性能
|