#!/usr/bin/env python # -*- coding: utf-8 -*- """ # @Time : 2026/1/9 10:49 # @Author : reenrr # @File : client.py # @Desc : 工具工控机客户端 """ import socket import logging import time # -------- RK1106配置 ---------- _1106_IP = "127.0.0.1" # 替换为3506的真实IP(本地测试用127.0.0.1) _1106_PORT = 8888 # 和3506服务端一致的端口 TIMEOUT = 10 # 通讯超时时间(秒) class IndustrialPCClient: """工控机客户端类,封装与1106的通讯逻辑""" def __init__(self, ip: str, port: int, timeout: int = 10): self.ip = ip self.port = port self.timeout = timeout self.socket = None # 通讯socket def connect(self) -> bool: """连接1106服务端""" try: # 创建TCP socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(self.timeout) # 设置超时 # 连接3506 self.socket.connect((self.ip, self.port)) logging.info(f"[工控机] 成功连接1106:{self.ip}:{self.port}") return True except socket.timeout: logging.error(f"[工控机] 连接1106超时({self.timeout}s)") self.close() return False except ConnectionRefusedError: logging.error(f"[工控机] 1106拒绝连接(请检查1106服务是否启动)") self.close() return False except Exception as e: logging.error(f"[工控机] 连接1106失败:{str(e)}", exc_info=True) self.close() return False def send_command(self, prog_id: str, **params) -> tuple[bool, str]: """ 向1106发送指令 :param prog_id: 指令标识(如STEPPER_TEST) :param params: 指令参数(如speed=50, steps=200) :return: (是否成功, 响应信息) """ if not self.socket: return False, "未连接1106,请先调用connect()" # 构造指令字符串(符合1106解析格式) param_str = "&".join([f"{k}={v}" for k, v in params.items()]) cmd = f"{prog_id}|{param_str}" if param_str else prog_id try: # 发送指令(UTF-8编码) self.socket.sendall(cmd.encode("utf-8")) logging.info(f"[工控机] 发送指令:{cmd}") # 接收3506响应(最大1024字节) resp = self.socket.recv(1024).decode("utf-8", errors="ignore").strip() if not resp: return False, "未收到1106响应" # 解析响应(SUCCESS/FAIL|描述) if "|" in resp: status, msg = resp.split("|", 1) success = status == "SUCCESS" logging.info(f"[工控机] 1106响应:{'成功' if success else '失败'} - {msg}") return success, msg else: # 响应格式异常 logging.warning(f"[工控机] 1106响应格式异常:{resp}") return False, f"响应格式错误:{resp}" except socket.timeout: logging.error(f"[工控机] 发送指令后超时({self.timeout}s)未收到响应") return False, "通讯超时" except Exception as e: logging.error(f"[工控机] 发送指令失败:{str(e)}", exc_info=True) return False, str(e) def close(self): """关闭连接""" if self.socket: self.socket.close() self.socket = None logging.info("[工控机] 已断开与1106的连接") # ---------对外接口---------- def control_stepper_motor(): """示例:工控机调用1106的步进电机控制程序""" # 初始化客户端 client = IndustrialPCClient(_1106_IP, _1106_PORT, TIMEOUT) # 连接1106 if not client.connect(): return try: # ========== 示例1:无参数调用步进电机测试 ========== logging.info("\n===== 无参数调用步进电机测试 =====") success, msg = client.send_command("test") if not success: logging.error(f"调用失败:{msg}") time.sleep(2) # 间隔2秒 可能根据实际效果进行延时调整 finally: # 确保关闭连接 client.close() # ---------测试接口---------- if __name__ == "__main__": control_stepper_motor()