#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2026/3/16 15:00 # @Author : reenrr # @File : RK1106_server_test.py # @Desc : RK1106服务端(类形式,多线程),等待工控机调用 通信为JSON格式 --待测试,加了线程 ''' import socket import logging import json import threading from stepper_motor import motor_start, align_wire, cleanup, motor_stop # --------配置TCP服务端---------- HOST = "192.168.5.100" PORT = 8888 CONFIG_FILE = "motor_config.json" class RK1106Server: """ RK1106服务端类 在初始化时加载配置,提供配置管理和指令处理功能 """ def __init__(self): """初始化服务端,加载配置""" self.config = self._load_config() self.current_thread = None logging.info(f"服务初始化完成,加载配置:{self.config}") def _load_config(self): """ 从JSON文件加载电机配置 :return: 加载的配置字典 """ try: with open(CONFIG_FILE, "r") as f: return json.load(f) except FileNotFoundError: logging.warning(f"配置文件 {CONFIG_FILE} 未找到,将使用默认配置") return {"speed": 2500, "cycle": 10.0} except json.JSONDecodeError: logging.error(f"配置文件 {CONFIG_FILE} 格式错误") return {"speed": 2500, "cycle": 10.0} def _save_config(self): """ 将电机配置保存到JSON文件 """ try: with open(CONFIG_FILE, "w") as f: json.dump(self.config, f, indent=4, ensure_ascii=False) logging.info(f"配置已保存:{self.config}") except IOError as e: logging.error(f"配置文件 {CONFIG_FILE} 保存失败: {str(e)}") def _execute_motor_command(self, cmd_func: callable, *args, **kwargs): """ 在单独的线程中执行电机命令 :param cmd_func: 要执行的电机命令函数 :param args: 传递给cmd_func的位置参数 :param kwargs: 传递给cmd_func的关键字参数 """ try: result = cmd_func(*args, **kwargs) logging.info(f"电机命令执行完成: {result}") except Exception as e: logging.error(f"执行电机命令时出错: {str(e)}") def handle_setting(self, para_type: str, para_value: str) -> dict: """ 处理客户端发送的参数设置指令(cmd:"setting"),更新电机配置 :param para_type: 要设置的参数类型,仅支持"speed"或"cycle" :param para_value: 参数值字符串,将尝试转换为int(speed)或float(cycle) :return: 标准化相应字典dict """ try: if para_type == "speed": self.config["speed"] = int(para_value) elif para_type == "cycle": self.config["cycle"] = float(para_value) else: return {"Result": "0", "ErrMsg": f"不支持的参数类型: {para_type}"} # 保存到文件 self._save_config() return {"Result": "1", "ErrMsg": "设置成功"} except ValueError as e: return {"Result": "0", "ErrMsg": f"参数值格式错误: {str(e)}"} def handle_start(self, para_type: str, para_value: str) -> dict: """ 处理启动电机指令(cmd: "start"),使用当前配置运行电机 :param para_type:为"direction"时,使用"para_value"作为临时方向 :param para_value:为0或1 :return: 标准化相应字典dict """ try: if para_type == "direction": direction = int(para_value) if direction not in (0, 1): return {"Result": "0", "ErrMsg": "方向必须为0或1"} # 在新线程中启动电机 thread = threading.Thread( target=self._execute_motor_command, args=(motor_start,), kwargs={ "speed": self.config["speed"], "cycle": self.config["cycle"], "direction": direction } ) thread.start() dir_str = "正向" if direction == 1 else "负向" return {"Result": "1", "ErrMsg": f"电机启动指令已发送({dir_str})"} else: return {"Result": "0", "ErrMsg": "start 指令仅支持'direction'"} except Exception as e: return {"Result": "0", "ErrMsg": f"电机启动失败:{str(e)}"} def handle_stop(self) -> dict: """ 处理停止电机指令(cmd: "stop") :return: 标准化相应字典dict """ try: # 在新线程中停止电机 thread = threading.Thread( target=self._execute_motor_command, args=(motor_stop,) ) thread.start() return {"Result": "1", "ErrMsg": "电机停止指令已发送"} except Exception as e: return {"Result": "0", "ErrMsg": f"电机停止失败:{str(e)}"} def handle_align(self) -> dict: """ 处理线条对齐(挡板一来一回) :return: dict """ try: # 在新线程中对齐线条 thread = threading.Thread( target=self._execute_motor_command, args=(align_wire,), kwargs={ "speed": self.config['speed'], "cycle": self.config['cycle'] } ) thread.start() return {"Result": "1", "ErrMsg": "线条对齐指令已发送"} except Exception as e: return {"Result": "0", "ErrMsg": "线条对齐失败"} def parse_json_command(self, data: str) -> dict: """ 解析客户端发送的原始JSON字符串指令,并分发至对应处理函数 :param data: 客户端发送的原始JSON字符串 :return dict:标准化响应字典 """ try: cmd_obj = json.loads(data.strip()) except json.JSONDecodeError as e: return {"Result": "0", "ErrMsg": f"JSON 格式错误: {str(e)}"} cmd = cmd_obj.get("cmd", "").strip() para_type = cmd_obj.get("para_type", "").strip() para_value = cmd_obj.get("para_value", "").strip() if cmd == "setting": return self.handle_setting(para_type, para_value) elif cmd == "start": return self.handle_start(para_type, para_value) elif cmd == "stop": if para_type == "none" and para_value == "none": return self.handle_stop() else: return {"Result": "0", "ErrMsg": "stop指令参数必须为none"} elif cmd == "alignment": if para_type == "none" and para_value == "none": return self.handle_align() else: return {"Result": "0", "ErrMsg": "alignment指令参数必须为none"} else: return {"Result": "0", "ErrMsg": f"未知指令:{cmd}"} def run_server(self): """ 启动TCP服务端,监听指定端口,接收工控机连接并循环处理JSON指令 """ # 创建TCP socket server_socket = None conn = None try: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((HOST, PORT)) server_socket.listen(1) # 只允许1个工控机连接 logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...") while True: # 持续接受新连接 try: # 等待工控机连接 conn, addr = server_socket.accept() logging.info(f"[1106] 工控机已连接:{addr}") # 循环接收指令 while True: # 接收指令(最大1024字节) data = conn.recv(1024).decode() if not data: logging.warning("客户端断开连接") break logging.info(f"\n[1106] 收到工控机指令:{data}") # 解析指令 response_dict = self.parse_json_command(data) response_json = json.dumps(response_dict, ensure_ascii=False) + "\n" # 看需不需要加换行符\n # 发送响应给工控机 conn.sendall(response_json.encode("utf-8")) logging.info(f"[1106] 已发送响应:{response_json}") except ConnectionError: logging.info("客户端异常断开") except Exception as e: logging.info(f"处理连接时发生错误: {e}") finally: if conn is not None: conn.close() conn = None # 重置,避免重复关闭 logging.info("客户端连接已关闭,等待新连接...") except KeyboardInterrupt: logging.info("\n收到 Ctrl+C,正在关闭服务...") finally: if server_socket: server_socket.close() logging.info("服务已停止,监听 socket 已释放") # ----------测试接口---------- if __name__ == "__main__": SERVER = RK1106Server() SERVER.run_server()