#!/usr/bin/python3 """ # @Time : 2025/12/12 11:05 # @Author : reenrr # @File : COM_TCP.py # @Desc : TCP客户端 """ import json import logging import socket import threading import time import Constant class TCPClient: def __init__(self, ip, port): # -------连接状态与错误统计属性------- self.error_count=0 # 通信错误计数器(记录连续通信失败次数,用于触发重连逻辑) self.connected = False # TCP连接状态 self.IPAddress = ip self.port = port self.thread_signal = True # 通信循环运行信号 self.client_socket = None # TCP套接字对象 def CreatConnect(self): """ 建立TCP连接方法:创建并初始化TCP套接字,与目标服务端建立网络连接 若已有连接,先关闭旧连接再创建新连接,避免套接字资源泄露 """ if self.client_socket: self.client_socket.close() self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_socket.settimeout(5) self.client_socket.connect((self.IPAddress, self.port)) def is_Connect(self): """ 检测当前TCP连接是否有效 """ try: # 发送空字节流,仅用于检测连接可用性 self.client_socket.send(b'') return True except OSError: return False def run(self): """ 维持TCP长连接,循环执行状态查询和执行发送,实现自动重连 """ while self.thread_signal: time.sleep(0.4) # 控制通讯循环频率,避免过于频繁 self.connected = (self.error_count <= 3) try: if self.send_Status() and self.send_Command(): self.error_count = 0 except Exception as e: logging.ERROR(f'COM_TCP: {str(e)}') self.error_count += 1 # 触发自动重连 if self.error_count> 5: print("Error: 机械臂控制程序中TCPClient is not connected") try: self.CreatConnect() logging.INFO(Constant.str_tcp_reconnect) except OSError as e1: # 捕获套接字特定异常(错误码10056:已存在连接,无法再次连接) if e1.errno == 10056: self.client_socket.close() print("Error: 机械臂控制程序中TCPClient is not connected_1") logging.ERROR(Constant.str_tcp_connect_error) except Exception as e2: print(e2) def close(self): self.thread_signal = False self.client_socket.close() def send_Command(self): return False def send_Status(self): return False