LogTT_SvrV02.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # python3.6
  2. import json
  3. import os
  4. import struct
  5. import getopt
  6. import random
  7. import sys
  8. from datetime import datetime
  9. import time
  10. from paho.mqtt import client as mqtt_client
  11. from binascii import *
  12. # import chardet
  13. # broker = 'vip.frp.wlphp.com'
  14. # port = 5801
  15. # topic = "/yw/#"
  16. # pyinstaller -F -n mqtt_sub_v0004 mqtt_sub.py
  17. # pyinstaller -F -n mqtt_sub_v0006 mqtt_sub.py
  18. # pyinstaller -F -n mqtt_c0_v2 mqtt_c0.py
  19. # C:\Users\chx_s\AppData\Local\Programs\Python\Python36\Scripts\pyinstaller.exe -F -n code_sub code_sub.py
  20. # C:\Users\chx_s\AppData\Local\Programs\Python\Python36-32\python code_sub.py
  21. test = {"mqtt_broker": "test-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "admin", "mqtt_pwd": "houjianwei"}
  22. prod = {"mqtt_broker": "mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "1SvTlvm1VCawSzS"}
  23. zz = {"mqtt_broker": "zz-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "oZ8hl3tYpMNEqHx"}
  24. mqtt_env = "test"
  25. # mqtt_env = "prod"
  26. mqtt_broker = 'test-mqtt.cpyypt.cn'
  27. mqtt_port = 9000
  28. mqtt_user = 'admin'
  29. mqtt_pwd = 'houjianwei'
  30. # mqtt_subtopic = 'cpyypt/logup/#'
  31. # mqtt_subtopic = [("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)]
  32. mqtt_subtopic = [("cpyypt/up/9002/#", 0), ("cpyypt/down/9002/broadcast", 0)]
  33. mqtt_hex = 1
  34. version = "0.0.0.2"
  35. version_w_ascii = 1
  36. version_w_hex = 1
  37. version_w_tt = 1
  38. error_msg = ""
  39. info_msg = ""
  40. # generate client ID with pub prefix randomly
  41. client_id = f'LogTT_Svr_{random.randint(0, 100)}'
  42. # LogTT_SN = "0"
  43. def time_n2s(time_n): # 传入参数
  44. data_sj = time.localtime(time_n)
  45. time_str = time.strftime("%Y-%m-%d %H:%M:%S", data_sj) # 时间戳转换正常时间
  46. return time_str # 返回日期,格式为str
  47. def get_f2s(f, n):
  48. sf = f"%0.2f" % f
  49. if len(sf) < n:
  50. sf = " " * (n - len(sf)) + sf
  51. return sf
  52. def get_log_file_name_ascii(sn):
  53. # global
  54. log_file_name = f"./log/LogTT_{sn}_ascii_" + datetime.now().strftime('%Y-%m-%d') + ".log"
  55. return log_file_name
  56. def get_log_file_name_hex(sn):
  57. # global
  58. log_file_name = f"./log/LogTT_{sn}_hex_" + datetime.now().strftime('%Y-%m-%d') + ".log"
  59. return log_file_name
  60. def get_log_file_name_tt(sn):
  61. # global
  62. log_file_name = f"./log/LogTT_{sn}_tt_" + datetime.now().strftime('%Y-%m-%d') + ".log"
  63. return log_file_name
  64. def my_unpack(s, data, i):
  65. global info_msg
  66. global error_msg
  67. temp_msg = ""
  68. if type(s) == str:
  69. np = struct.calcsize(s)
  70. try:
  71. val = struct.unpack(s, data[i:(i + np)])
  72. return i + np, val[0]
  73. except:
  74. temp_msg = "fun:" + sys._getframe().f_code.co_name + \
  75. ",line:" + f"{sys._getframe().f_lineno}." + \
  76. f"\n出错了!!!,s={s},i={i},data={data}"
  77. error_msg = error_msg + temp_msg
  78. info_msg = info_msg + temp_msg
  79. return i + np, 0
  80. elif type(s) == int:
  81. val = data[i:(i + s)]
  82. return i + s, val
  83. def parse_data(data, topic):
  84. global info_msg
  85. global error_msg
  86. def subscribe(client: mqtt_client):
  87. global info_msg
  88. global error_msg
  89. def on_message(client, userdata, msg):
  90. LogTT_SN = str.split(msg.topic, "/")[-1]
  91. head = b""
  92. body = b""
  93. line_feed = ""
  94. start_ttl = "TTL_U="
  95. start_rs232 = "RS232_U="
  96. start_rs485 = "RS485_U="
  97. start_fixed = "FIXED_INFO="
  98. start_heart = "HEART_MSG="
  99. start_pt = "PT_CMD="
  100. flag_ascii = False
  101. flag_hex = True
  102. if msg.topic.find("ttup") > 0:
  103. msg_tt = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
  104. msg_tt = msg_tt + f"topic:{msg.topic} msg:{msg.payload.hex().upper()}"
  105. # 输出信息
  106. try:
  107. with open(get_log_file_name_tt(LogTT_SN), 'a') as f:
  108. global version_w_tt
  109. if version_w_tt > 0:
  110. f.write("version:" + version + "\n")
  111. version_w_tt = 0
  112. f.write(msg_tt + "\r\n")
  113. f.close()
  114. except Exception as e:
  115. print(str(e))
  116. print(msg_tt)
  117. return
  118. if msg.payload[:len(start_ttl)] == start_ttl.encode("utf-8"):
  119. flag_ascii = True
  120. flag_hex = True
  121. head = msg.payload[:len(start_ttl)]
  122. body = msg.payload[len(start_ttl):]
  123. elif msg.payload[:len(start_rs232)] == start_rs232.encode("utf-8"):
  124. flag_ascii = False
  125. flag_hex = True
  126. head = msg.payload[:len(start_rs232)]
  127. body = msg.payload[len(start_rs232):]
  128. elif msg.payload[:len(start_rs485)] == start_rs485.encode("utf-8"):
  129. flag_ascii = False
  130. flag_hex = True
  131. head = msg.payload[:len(start_rs485)]
  132. body = msg.payload[len(start_rs485):]
  133. elif msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
  134. flag_ascii = True
  135. flag_hex = False
  136. head = msg.payload[:len(start_fixed)]
  137. body = msg.payload[len(start_fixed):]
  138. elif msg.payload[:len(start_heart)] == start_heart.encode("utf-8"):
  139. flag_ascii = True
  140. flag_hex = False
  141. head = msg.payload[:len(start_heart)]
  142. body = msg.payload[len(start_heart):]
  143. elif msg.payload[:len(start_pt)] == start_pt.encode("utf-8"):
  144. flag_ascii = True
  145. flag_hex = False
  146. head = msg.payload[:len(start_pt)]
  147. body = msg.payload[len(start_pt):]
  148. else:
  149. flag_ascii = False
  150. flag_hex = True
  151. body = msg.payload
  152. if flag_ascii:
  153. try:
  154. body_str = body.decode('gb2312')
  155. except UnicodeDecodeError:
  156. try:
  157. body_str = body.decode('utf-8')
  158. except UnicodeDecodeError:
  159. body_str = body.hex().upper()
  160. head_str = head.decode('utf-8')
  161. msg_ascii = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
  162. msg_ascii = msg_ascii + f"topic:{msg.topic} msg:{head_str}\r\n{body_str}"
  163. msg_ascii = msg_ascii.rstrip() + "\r\n"
  164. # 输出信息
  165. try:
  166. with open(get_log_file_name_ascii(LogTT_SN), 'a') as f:
  167. global version_w_ascii
  168. if version_w_ascii > 0:
  169. f.write("version:" + version + "\n")
  170. version_w_ascii = 0
  171. f.write(msg_ascii)
  172. f.close()
  173. except Exception as e:
  174. print(str(e))
  175. print(msg_ascii, end="")
  176. # FIXED_INFO={"base_info":{"name":"张三","uplinkB":1010,"rsrp":-99,"tel":"13888888888","reset_times":49,"project":"LogTT","csq":23,"downlinkB":573,"rsrq":-13,"dev_type":"9002","simid":0,"muid":"20230818221817A863212A0300364949","snr":-1,"version":"9002.4.002","iccid":"89860622330053052122","imei":"869861069998891","rssi":-66,"dev_pd":"20240329","dev_sn":1},"pos_info":{"lng":0,"lat":0}}
  177. if msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
  178. body_dict = json.loads(body_str)
  179. print(body_dict)
  180. if flag_hex:
  181. msg_hex = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
  182. msg_hex = msg_hex + f"topic:{msg.topic} msg:{head.decode('utf-8')}{body.hex().upper()}"
  183. # 输出信息
  184. try:
  185. with open(get_log_file_name_hex(LogTT_SN), 'a') as f:
  186. global version_w_hex
  187. if version_w_hex > 0:
  188. f.write("version:" + version + "\n")
  189. version_w_hex = 0
  190. f.write(msg_hex + "\r\n")
  191. f.close()
  192. except Exception as e:
  193. print(str(e))
  194. if not flag_ascii:
  195. print(msg_hex)
  196. client.subscribe(mqtt_subtopic)
  197. client.on_message = on_message
  198. def connect_mqtt() -> mqtt_client:
  199. def on_connect(client, userdata, flags, rc):
  200. if rc == 0:
  201. print("Connected to MQTT Broker!")
  202. subscribe(client)
  203. else:
  204. print("Failed to connect, return code %d\n", rc)
  205. client = mqtt_client.Client(client_id)
  206. client.username_pw_set(mqtt_user, mqtt_pwd)
  207. client.on_connect = on_connect
  208. client.connect(mqtt_broker, mqtt_port)
  209. return client
  210. def on_disconnect(client, userdata, rc):
  211. print("mqtt disconnect")
  212. def run():
  213. client = connect_mqtt()
  214. client.on_disconnect = on_disconnect
  215. # subscribe(client)
  216. client.loop_forever()
  217. def print_usage():
  218. print('')
  219. print('帮助信息:')
  220. print('--env=<prod生产环境/zz郑州政务云/test测试环境> ')
  221. print('')
  222. print('举例:')
  223. print('\r\ndbg_mqtt.exe --env=test\r\n')
  224. print('\r\ndbg_mqtt.exe --env=prod\r\n')
  225. print('\r\ndbg_mqtt.exe --env=zz\r\n')
  226. print('\r\ndbg_mqtt.exe --env=test --subtopic=cpyypt/up/0902/0000000003,cpyypt/down/0902/0000000003\r\n')
  227. print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0102/0000020142,cpyypt/down/0102/0000020142\r\n')
  228. print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0101/0000001048,cpyypt/down/0101/0000001048\r\n')
  229. print('')
  230. def main(argv):
  231. global mqtt_env
  232. global mqtt_broker
  233. global mqtt_port
  234. global mqtt_user
  235. global mqtt_pwd
  236. global mqtt_subtopic
  237. global mqtt_hex
  238. if not os.path.exists("log"):
  239. os.makedirs("log")
  240. # python sub2.py broker=mqtt.cpyypt.cn port=9000 user=admin pwd=houjianwei subtopic="/yw/#"
  241. print("dbg_mqtt version:" + version)
  242. print('输入的参数 argv=:', argv)
  243. try:
  244. opts, args = getopt.getopt(argv, "h", ["env=", "broker=", "port=", "user=", "pwd=", "subtopic=", "hex"])
  245. except getopt.GetoptError:
  246. print_usage()
  247. sys.exit(2)
  248. for opt, arg in opts:
  249. print('解析', opt, arg)
  250. if opt == '-h':
  251. print_usage()
  252. sys.exit()
  253. elif opt == "--env":
  254. mqtt_env = str(arg)
  255. elif opt == "--broker":
  256. mqtt_broker = str(arg)
  257. elif opt == "--port":
  258. mqtt_port = int(arg)
  259. elif opt == "--user":
  260. mqtt_user = str(arg)
  261. elif opt == "--pwd":
  262. mqtt_pwd = str(arg)
  263. elif opt == "--subtopic":
  264. st = str(arg).split(",")
  265. mqtt_subtopic = [(s, 0) for s in st]
  266. # mqtt_subtopic = str(arg).split(";") #[("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)] ("cpyypt/#", 0)
  267. print("mqtt_subtopic=", mqtt_subtopic)
  268. elif opt == "--hex":
  269. mqtt_hex = 1
  270. if mqtt_broker == '':
  271. print('【服务器地址】为空,请重新输入!')
  272. print_usage()
  273. sys.exit()
  274. if mqtt_port == 0:
  275. print('【服务器端口】为空,请重新输入!')
  276. print_usage()
  277. sys.exit()
  278. if mqtt_subtopic == '':
  279. print('【主题】为空,请重新输入!')
  280. print_usage()
  281. sys.exit()
  282. if mqtt_env == "prod":
  283. mqtt_broker = prod["mqtt_broker"]
  284. mqtt_port = prod["mqtt_port"]
  285. mqtt_user = prod["mqtt_user"]
  286. mqtt_pwd = prod["mqtt_pwd"]
  287. elif mqtt_env == "zz":
  288. mqtt_broker = zz["mqtt_broker"]
  289. mqtt_port = zz["mqtt_port"]
  290. mqtt_user = zz["mqtt_user"]
  291. mqtt_pwd = zz["mqtt_pwd"]
  292. else:
  293. mqtt_broker = test["mqtt_broker"]
  294. mqtt_port = test["mqtt_port"]
  295. mqtt_user = test["mqtt_user"]
  296. mqtt_pwd = test["mqtt_pwd"]
  297. run()
  298. if __name__ == '__main__':
  299. # run()
  300. main(sys.argv[1:])