Files
zjsh_weight_system_ui/tcp_client.py
2025-11-16 11:58:22 +08:00

174 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/11/13 11:05
# @Author : reenrr
# @File : tcp_client.py
'''
import json
from PySide6.QtCore import QTimer, Slot, Signal, QObject
from PySide6.QtNetwork import QTcpSocket, QAbstractSocket
from datetime import datetime
# -----------
# 参数配置
# -----------
tcp_server_host = "127.0.0.1"
tcp_server_port = 8888
RECONNECT_INTERVAL = 2000 # 重连间隔(毫秒)
class TcpClient(QObject):
# 信号,用于向主窗口/重量弹窗发送最新重量数据
weight_updated = Signal(dict) # 重量更新信号
connection_status_changed = Signal(bool) # 连接状态更新信号
def __init__(self, parent=None):
super().__init__(parent)
self.is_running = False # 系统运行状态标记
self.latest_json_data = {} # 缓存服务端发送的最新JSON数据
self.statusWidgets = [] # 存储api_field + 对应的value标签
# ---------------
# TCP客户端核心配置
# ---------------
self.tcp_socket = QTcpSocket(self) # TCP socket实例
self.tcp_server_host = tcp_server_host
self.tcp_server_port = tcp_server_port
self.is_tcp_connected = False # TCP连接状态标记
self.has_connected_once = False # 连接至服务器至少一次标记(区别首次连接和断开后重连)
self.reconnect_count = 0 # 重连次数计数器
# 重连定时器每隔RECONNECT_INTERVAL毫秒重连一次
self.reconnect_timer = QTimer(self)
self.reconnect_timer.setInterval(RECONNECT_INTERVAL)
self.reconnect_timer.timeout.connect(self._reconnect_to_server) # 绑定重连函数
# 绑定TCP信号与槽事件驱动
self._bind_tcp_signals()
# ---------------------
# TCP连接相关函数
# ---------------------
def _connect_to_server(self):
"""主动发起连接(仅在未连接状态下有效"""
if not self.is_tcp_connected:
self.tcp_socket.abort() # 终止现有连接
self.tcp_socket.connectToHost(self.tcp_server_host, self.tcp_server_port)
print(f"发起连接请求:{self.tcp_server_host}:{self.tcp_server_port}")
def _reconnect_to_server(self):
"""重连执行函数:仅在未连接且未达最大次数时触发"""
if not self.is_tcp_connected:
self.reconnect_count += 1
# 日志优化每10次重连提示一次避免日志刷屏
if self.reconnect_count % 10 == 0:
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
else:
print(f"{self.reconnect_count}次重连请求:{self.tcp_server_host}:{self.tcp_server_port}")
self._connect_to_server()
def _bind_tcp_signals(self):
"""绑定TCP socket的核心信号连接、断开、接收数据、错误"""
# 连接成功信号
self.tcp_socket.connected.connect(self._on_tcp_connected)
# 断开连接信号
self.tcp_socket.disconnected.connect(self._on_tcp_disconnected)
# 接收数据信号(有新数据时触发)
self.tcp_socket.readyRead.connect(self._on_tcp_data_received)
# 错误信号(连接/通信出错时触发)
self.tcp_socket.errorOccurred.connect(self._on_tcp_error)
# ------------------
# TCP客户端核心功能
# ------------------
@Slot()
def _on_tcp_connected(self):
"""TCP连接成功回调"""
self.is_tcp_connected = True
self.has_connected_once = True
self.reconnect_timer.stop() # 停止重连定时器
self.reconnect_count = 0 # 重连计数器清零
self.is_running = True
print(f"TCP连接成功{self.tcp_server_host}:{self.tcp_server_port}")
# 发送连接状态信号
self.connection_status_changed.emit(True)
# 连接成功后,向服务器发送“请求初始数据”指令
self._send_tcp_request("get_initial_data")
@Slot()
def _on_tcp_disconnected(self):
"""TCP连接断开回调"""
self.is_tcp_connected = False
self.is_running = False
print(f"TCP连接断开{self.tcp_server_host}:{self.tcp_server_port}")
# 发送连接状态信号
self.connection_status_changed.emit(False)
# 发送重量清零信号
self.weight_updated.emit({"hopper_up_weight": 0, "hopper_down_weight": 0})
if not self.reconnect_timer.isActive(): # 未启动重连定时器时才启动
print(f"连接断开,将在{RECONNECT_INTERVAL / 1000}秒后启动重连...")
self.reconnect_timer.start()
@Slot()
def _on_tcp_data_received(self):
"""TCP数据接收回调服务器发送数据时触发"""
tcp_data = self.tcp_socket.readAll().data().decode("utf-8").strip()
print(f"TCP数据接收{tcp_data}")
# 解析数据
try:
status_data = json.loads(tcp_data)
self.latest_json_data = status_data
self.weight_updated.emit(status_data)
except json.JSONDecodeError as e:
print(f"TCP数据解析失败非JSON格式{e}, 原始数据:{tcp_data}")
except Exception as e:
print(f"TCP数据处理异常{e}")
@Slot(QAbstractSocket.SocketError)
def _on_tcp_error(self, error):
"""TCP错误回调"""
if not self.is_tcp_connected:
error_str = self.tcp_socket.errorString()
print(f"TCP错误{error_str}")
self.is_tcp_connected = False
self.is_running = False
# 发送连接断开信号
self.connection_status_changed.emit(False)
# 首次连接失败时,启动重连定时器
if not self.reconnect_timer.isActive():
print(f"将在{RECONNECT_INTERVAL / 1000}秒后启动重连")
self.reconnect_timer.start()
def _send_tcp_request(self, request_cmd="get_status"):
"""向TCP服务器发送请求指令"""
if not self.is_tcp_connected:
print("TCP连接未建立无法发送请求")
return
# 构造请求数据
request_data = json.dumps({
"cmd": request_cmd,
"timestamp": self.get_current_time(),
"client_info": "布料系统客户端"
}) + "\n" # 增加换行符作为数据结束标识
# 发送请求数据
self.tcp_socket.write(request_data.encode("utf-8"))
print(f"TCP请求发送{request_data.strip()}")
# ------------------
# 时间相关的通用方法
# ------------------
def get_current_time(self):
"""获取格式化的当前时间"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')