LogTT_SvrV02.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. # python3.6
  2. import json
  3. import os
  4. import struct
  5. import getopt
  6. import random
  7. import sys
  8. from datetime import datetime
  9. import time
  10. from paho.mqtt import client as mqtt_client
  11. from binascii import *
  12. # import chardet
  13. # broker = 'vip.frp.wlphp.com'
  14. # port = 5801
  15. # topic = "/yw/#"
  16. # pyinstaller -F -n mqtt_sub_v0004 mqtt_sub.py
  17. # pyinstaller -F -n mqtt_sub_v0006 mqtt_sub.py
  18. # pyinstaller -F -n mqtt_c0_v2 mqtt_c0.py
  19. # C:\Users\chx_s\AppData\Local\Programs\Python\Python36\Scripts\pyinstaller.exe -F -n code_sub code_sub.py
  20. # C:\Users\chx_s\AppData\Local\Programs\Python\Python36-32\python code_sub.py
  21. test = {"mqtt_broker": "test-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "admin", "mqtt_pwd": "houjianwei"}
  22. prod = {"mqtt_broker": "mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "1SvTlvm1VCawSzS"}
  23. zz = {"mqtt_broker": "zz-mqtt.cpyypt.cn", "mqtt_port": 9000, "mqtt_user": "cpyypt", "mqtt_pwd": "oZ8hl3tYpMNEqHx"}
  24. mqtt_env = "test"
  25. # mqtt_env = "prod"
  26. mqtt_broker = 'test-mqtt.cpyypt.cn'
  27. mqtt_port = 9000
  28. mqtt_user = 'admin'
  29. mqtt_pwd = 'houjianwei'
  30. # mqtt_subtopic = 'cpyypt/logup/#'
  31. # mqtt_subtopic = [("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)]
  32. mqtt_subtopic = [("cpyypt/up/9002/#", 0), ("cpyypt/down/9002/broadcast", 0)]
  33. mqtt_hex = 1
  34. version = "0.0.0.2"
  35. version_w_ascii = 1
  36. version_w_hex = 1
  37. error_msg = ""
  38. info_msg = ""
  39. # generate client ID with pub prefix randomly
  40. client_id = f'LogTT_Svr_{random.randint(0, 100)}'
  41. # LogTT_SN = "0"
  42. def time_n2s(time_n): # 传入参数
  43. data_sj = time.localtime(time_n)
  44. time_str = time.strftime("%Y-%m-%d %H:%M:%S", data_sj) # 时间戳转换正常时间
  45. return time_str # 返回日期,格式为str
  46. def get_f2s(f, n):
  47. sf = f"%0.2f" % f
  48. if len(sf) < n:
  49. sf = " " * (n - len(sf)) + sf
  50. return sf
  51. def get_log_file_name_ascii(sn):
  52. # global
  53. log_file_name = f"./log/LogTT_{sn}_ascii_" + datetime.now().strftime('%Y-%m-%d') + ".log"
  54. return log_file_name
  55. def get_log_file_name_hex(sn):
  56. # global
  57. log_file_name = f"./log/LogTT_{sn}_hex_" + datetime.now().strftime('%Y-%m-%d') + ".log"
  58. return log_file_name
  59. def my_unpack(s, data, i):
  60. global info_msg
  61. global error_msg
  62. temp_msg = ""
  63. if type(s) == str:
  64. np = struct.calcsize(s)
  65. try:
  66. val = struct.unpack(s, data[i:(i + np)])
  67. return i + np, val[0]
  68. except:
  69. temp_msg = "fun:" + sys._getframe().f_code.co_name + \
  70. ",line:" + f"{sys._getframe().f_lineno}." + \
  71. f"\n出错了!!!,s={s},i={i},data={data}"
  72. error_msg = error_msg + temp_msg
  73. info_msg = info_msg + temp_msg
  74. return i + np, 0
  75. elif type(s) == int:
  76. val = data[i:(i + s)]
  77. return i + s, val
  78. def parse_data(data, topic):
  79. global info_msg
  80. global error_msg
  81. def subscribe(client: mqtt_client):
  82. global info_msg
  83. global error_msg
  84. def on_message(client, userdata, msg):
  85. LogTT_SN = str.split(msg.topic, "/")[-1]
  86. head = b""
  87. body = b""
  88. line_feed = ""
  89. start_ttl = "TTL_U="
  90. start_rs232 = "RS232_U="
  91. start_rs485 = "RS485_U="
  92. start_fixed = "FIXED_INFO="
  93. start_heart = "HEART_MSG="
  94. start_pt = "PT_CMD="
  95. flag_ascii = False
  96. flag_hex = True
  97. if msg.payload[:len(start_ttl)] == start_ttl.encode("utf-8"):
  98. flag_ascii = True
  99. flag_hex = True
  100. head = msg.payload[:len(start_ttl)]
  101. body = msg.payload[len(start_ttl):]
  102. elif msg.payload[:len(start_rs232)] == start_rs232.encode("utf-8"):
  103. flag_ascii = False
  104. flag_hex = True
  105. head = msg.payload[:len(start_rs232)]
  106. body = msg.payload[len(start_rs232):]
  107. elif msg.payload[:len(start_rs485)] == start_rs485.encode("utf-8"):
  108. flag_ascii = False
  109. flag_hex = True
  110. head = msg.payload[:len(start_rs485)]
  111. body = msg.payload[len(start_rs485):]
  112. elif msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
  113. flag_ascii = True
  114. flag_hex = False
  115. head = msg.payload[:len(start_fixed)]
  116. body = msg.payload[len(start_fixed):]
  117. elif msg.payload[:len(start_heart)] == start_heart.encode("utf-8"):
  118. flag_ascii = True
  119. flag_hex = False
  120. head = msg.payload[:len(start_heart)]
  121. body = msg.payload[len(start_heart):]
  122. elif msg.payload[:len(start_pt)] == start_pt.encode("utf-8"):
  123. flag_ascii = True
  124. flag_hex = False
  125. head = msg.payload[:len(start_pt)]
  126. body = msg.payload[len(start_pt):]
  127. else:
  128. flag_ascii = False
  129. flag_hex = True
  130. body = msg.payload
  131. if flag_ascii:
  132. try:
  133. body_str = body.decode('gb2312')
  134. except UnicodeDecodeError:
  135. try:
  136. body_str = body.decode('utf-8')
  137. except UnicodeDecodeError:
  138. body_str = body.hex().upper()
  139. head_str = head.decode('utf-8')
  140. msg_ascii = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
  141. msg_ascii = msg_ascii + f"topic:{msg.topic} msg:{head_str}\r\n{body_str}"
  142. msg_ascii = msg_ascii.rstrip() + "\r\n"
  143. # 输出信息
  144. try:
  145. with open(get_log_file_name_ascii(LogTT_SN), 'a') as f:
  146. global version_w_ascii
  147. if version_w_ascii > 0:
  148. f.write("version:" + version + "\n")
  149. version_w_ascii = 0
  150. f.write(msg_ascii)
  151. f.close()
  152. except Exception as e:
  153. print(str(e))
  154. print(msg_ascii, end="")
  155. # FIXED_INFO={"base_info":{"name":"张三","uplinkB":1010,"rsrp":-99,"tel":"13888888888","reset_times":49,"project":"LogTT","csq":23,"downlinkB":573,"rsrq":-13,"dev_type":"9002","simid":0,"muid":"20230818221817A863212A0300364949","snr":-1,"version":"9002.4.002","iccid":"89860622330053052122","imei":"869861069998891","rssi":-66,"dev_pd":"20240329","dev_sn":1},"pos_info":{"lng":0,"lat":0}}
  156. if msg.payload[:len(start_fixed)] == start_fixed.encode("utf-8"):
  157. body_dict = json.loads(body_str)
  158. print(body_dict)
  159. if flag_hex:
  160. msg_hex = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + "-->:"
  161. msg_hex = msg_hex + f"topic:{msg.topic} msg:{head.decode('utf-8')}{body.hex().upper()}"
  162. # 输出信息
  163. try:
  164. with open(get_log_file_name_hex(LogTT_SN), 'a') as f:
  165. global version_w_hex
  166. if version_w_hex > 0:
  167. f.write("version:" + version + "\n")
  168. version_w_hex = 0
  169. f.write(msg_hex + "\r\n")
  170. f.close()
  171. except Exception as e:
  172. print(str(e))
  173. if not flag_ascii:
  174. print(msg_hex)
  175. client.subscribe(mqtt_subtopic)
  176. client.on_message = on_message
  177. def connect_mqtt() -> mqtt_client:
  178. def on_connect(client, userdata, flags, rc):
  179. if rc == 0:
  180. print("Connected to MQTT Broker!")
  181. subscribe(client)
  182. else:
  183. print("Failed to connect, return code %d\n", rc)
  184. client = mqtt_client.Client(client_id)
  185. client.username_pw_set(mqtt_user, mqtt_pwd)
  186. client.on_connect = on_connect
  187. client.connect(mqtt_broker, mqtt_port)
  188. return client
  189. def on_disconnect(client, userdata, rc):
  190. print("mqtt disconnect")
  191. def run():
  192. client = connect_mqtt()
  193. client.on_disconnect = on_disconnect
  194. # subscribe(client)
  195. client.loop_forever()
  196. def print_usage():
  197. print('')
  198. print('帮助信息:')
  199. print('--env=<prod生产环境/zz郑州政务云/test测试环境> ')
  200. print('')
  201. print('举例:')
  202. print('\r\ndbg_mqtt.exe --env=test\r\n')
  203. print('\r\ndbg_mqtt.exe --env=prod\r\n')
  204. print('\r\ndbg_mqtt.exe --env=zz\r\n')
  205. print('\r\ndbg_mqtt.exe --env=test --subtopic=cpyypt/up/0902/0000000003,cpyypt/down/0902/0000000003\r\n')
  206. print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0102/0000020142,cpyypt/down/0102/0000020142\r\n')
  207. print('\r\npython dbg_mqtt.py --env=prod --subtopic=cpyypt/up/0101/0000001048,cpyypt/down/0101/0000001048\r\n')
  208. print('')
  209. def main(argv):
  210. global mqtt_env
  211. global mqtt_broker
  212. global mqtt_port
  213. global mqtt_user
  214. global mqtt_pwd
  215. global mqtt_subtopic
  216. global mqtt_hex
  217. if not os.path.exists("log"):
  218. os.makedirs("log")
  219. # python sub2.py broker=mqtt.cpyypt.cn port=9000 user=admin pwd=houjianwei subtopic="/yw/#"
  220. print("dbg_mqtt version:" + version)
  221. print('输入的参数 argv=:', argv)
  222. try:
  223. opts, args = getopt.getopt(argv, "h", ["env=", "broker=", "port=", "user=", "pwd=", "subtopic=", "hex"])
  224. except getopt.GetoptError:
  225. print_usage()
  226. sys.exit(2)
  227. for opt, arg in opts:
  228. print('解析', opt, arg)
  229. if opt == '-h':
  230. print_usage()
  231. sys.exit()
  232. elif opt == "--env":
  233. mqtt_env = str(arg)
  234. elif opt == "--broker":
  235. mqtt_broker = str(arg)
  236. elif opt == "--port":
  237. mqtt_port = int(arg)
  238. elif opt == "--user":
  239. mqtt_user = str(arg)
  240. elif opt == "--pwd":
  241. mqtt_pwd = str(arg)
  242. elif opt == "--subtopic":
  243. st = str(arg).split(",")
  244. mqtt_subtopic = [(s, 0) for s in st]
  245. # mqtt_subtopic = str(arg).split(";") #[("cpyypt/up/#", 0), ("cpyypt/logup/#", 0)] ("cpyypt/#", 0)
  246. print("mqtt_subtopic=", mqtt_subtopic)
  247. elif opt == "--hex":
  248. mqtt_hex = 1
  249. if mqtt_broker == '':
  250. print('【服务器地址】为空,请重新输入!')
  251. print_usage()
  252. sys.exit()
  253. if mqtt_port == 0:
  254. print('【服务器端口】为空,请重新输入!')
  255. print_usage()
  256. sys.exit()
  257. if mqtt_subtopic == '':
  258. print('【主题】为空,请重新输入!')
  259. print_usage()
  260. sys.exit()
  261. if mqtt_env == "prod":
  262. mqtt_broker = prod["mqtt_broker"]
  263. mqtt_port = prod["mqtt_port"]
  264. mqtt_user = prod["mqtt_user"]
  265. mqtt_pwd = prod["mqtt_pwd"]
  266. elif mqtt_env == "zz":
  267. mqtt_broker = zz["mqtt_broker"]
  268. mqtt_port = zz["mqtt_port"]
  269. mqtt_user = zz["mqtt_user"]
  270. mqtt_pwd = zz["mqtt_pwd"]
  271. else:
  272. mqtt_broker = test["mqtt_broker"]
  273. mqtt_port = test["mqtt_port"]
  274. mqtt_user = test["mqtt_user"]
  275. mqtt_pwd = test["mqtt_pwd"]
  276. run()
  277. if __name__ == '__main__':
  278. # run()
  279. main(sys.argv[1:])