From a8a35545cc50703652640c15468fff3afea85994 Mon Sep 17 00:00:00 2001 From: pengqi Date: Fri, 9 Jan 2026 17:52:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=99=E4=B8=8DNG=E6=B5=81=E7=A8=8B=EF=BC=9A?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BC=A0=E9=80=81=E5=B8=A6=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E3=80=81=E6=AD=A5=E8=BF=9B=E7=94=B5=E6=9C=BA=E7=A7=BB=E5=8A=A8?= =?UTF-8?q?=E3=80=81=E7=BA=BF=E6=9D=A1=E4=B8=AA=E6=95=B0=E6=96=B9=E4=BE=BF?= =?UTF-8?q?=E5=90=8E=E7=BB=AD=E5=A4=B9=E7=88=AA=E7=9A=84=E6=8A=93=E5=8F=96?= =?UTF-8?q?=E3=80=81=E6=B0=94=E5=8A=A8=E5=A4=B9=E7=88=AA=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DM/DM_Motor_test.py | 6 +- EMV/EMV_test.py | 13 +- RK1106/RK1106_server.py | 119 +++++++++++ RK1106/test.py | 3 +- client.py | 125 ++++++++++++ .../conveyor_master_controller2_test.py | 2 +- main_control.py | 185 ++++++++++++++---- 7 files changed, 410 insertions(+), 43 deletions(-) create mode 100644 RK1106/RK1106_server.py create mode 100644 client.py diff --git a/DM/DM_Motor_test.py b/DM/DM_Motor_test.py index 512cdeb..af6acb6 100644 --- a/DM/DM_Motor_test.py +++ b/DM/DM_Motor_test.py @@ -112,8 +112,8 @@ class DMMotorController: :param v_desired: 目标速度(rad/s, 范围[-30, 30]) """ try: - # 归零 + 发送运动指令 - self.motor_control.set_zero_position(self.motor) + # 发送运动指令 + # self.motor_control.set_zero_position(self.motor) self.motor_control.control_Pos_Vel(self.motor, p_desired, v_desired) time.sleep(0.1) print(f"✅ 位置-速度控制:位置={p_desired}rad | 速度={v_desired}rad/s") @@ -198,6 +198,7 @@ class DMMotorController: except Exception as e: print(f"修改电机限位参数失败: {str(e)}") + def __del__(self): """析构函数:确保程序退出时失能电机、关闭串口""" try: @@ -211,7 +212,6 @@ class DMMotorController: except Exception as e: print(f"ℹ️ 析构函数执行警告:{str(e)}") - def dm_motor_control(): # 1.创建电机控制器实例 motor_controller = DMMotorController( diff --git a/EMV/EMV_test.py b/EMV/EMV_test.py index f7d94aa..688b967 100644 --- a/EMV/EMV_test.py +++ b/EMV/EMV_test.py @@ -100,6 +100,7 @@ class RelayController: self.press_sensors_thread = None # 保存按压开关线程对象 self.press_sensors_monitor_running = False # 按压传感器监听线程运行标志 + self.last_press_sensor_status = False # 记录传感器上一次状态,初始为无信号(用于上升沿检测) def send_command(self, command): """ @@ -291,12 +292,16 @@ class RelayController: # 检测两个传感器任意一个是否触发 press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors') press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors') - if press_sensor1_status or press_sensor2_status: + current_sensor_state = press_sensor1_status or press_sensor2_status + + # 上升沿触发(仅从无信号-->有信号时,才触发事件) + if current_sensor_state and not self.last_press_sensor_status: press_sensors_triggered.set() # 触发事件,通知主线程 logging.info("双压传感器触发:线条已落到传送带") - # 重置事件(等待下一次触发) - time.sleep(0.1) # 防重复触发 - press_sensors_triggered.clear() + + # 更新上一次传感器状态,为下一次上升沿检测做准备 + self.last_press_sensor_status = current_sensor_state + # 传感器检测间隔 time.sleep(check_interval) except Exception as e: logging.info(f"双压传感器监听异常: {e}") diff --git a/RK1106/RK1106_server.py b/RK1106/RK1106_server.py new file mode 100644 index 0000000..6cbae98 --- /dev/null +++ b/RK1106/RK1106_server.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2026/1/9 10:45 +# @Author : reenrr +# @File : RK1106_server.py +# @Desc : RK1106服务端,等待工控机调用 +''' +import socket +import logging +import sys +from test import motor_demo + + +# --------日志配置(终端+文件双输出)-------------- +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + # 核心新增:日志文件配置 + handlers=[ + # 1. 文件处理器:保存到.log文件 + logging.FileHandler( + "RK1106_server.log", # Buildroot推荐路径,临时测试可改/tmp/1106_server.log + mode='a', # 追加模式(不会覆盖历史日志) + encoding='utf-8' # 防止中文乱码(必加) + ), + # 2. 终端处理器:输出到控制台 + logging.StreamHandler(sys.stdout) + ] +) + +# --------配置TCP服务端---------- +HOST = "127.0.0.1" +PORT = 8888 + +# 程序映射表(指令表示 -> 执行函数) +PROG_MAP = { + # "STEPPER_TEST": motor_test_demo, + "test": motor_demo +} + +def parse_command(cmd_str: str) ->tuple[str, dict]: + """ + 解析工控机发送的指令字符串 + :param cmd_str: 指令字符串 + :return: 指令名称,参数字典 + """ + # 空指令处理 + if not cmd_str or cmd_str.strip() == "": + return "", {} + + # 分割指令标识和参数 + cmd_parts = cmd_str.strip().split("|", 1) + prog_id = cmd_parts[0].strip() + params = {} + + # 解析参数(格式:param1=val1¶m2=val2) + if len(cmd_parts) > 1 and cmd_parts[1].strip() != "": + param_str = cmd_parts[1].strip() + param_pairs = param_str.split("&") + for pair in param_pairs: + if "=" in pair: + key, value = pair.split("=", 1) # 处理值中含=的情况 + # 类型自动转换(数字/字符串) + try: + params[key.strip()] = int(value.strip()) + except ValueError: + try: + params[key.strip()] = float(value.strip()) + except ValueError: + params[key.strip()] = value.strip() + + return prog_id, params + +# ----------对外接口---------- +def server(): + # 创建TCP socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: + # 允许端口复用 + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((HOST, PORT)) + server_socket.listen(1) # 只允许1个工控机连接 + logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...") + + # 等待工控机连接 + conn, addr = server_socket.accept() + with conn: + logging.info(f"[1106] 工控机已连接:{addr}") + # 循环接收指令 + while True: + # 接收指令(最大1024字节) + data = conn.recv(1024).decode() + logging.info(f"\n[1106] 收到工控机指令:{data}") + # 解析指令 + prog_id, params = parse_command(data) + logging.info(f"[1106] 解析结果 - 指令:{prog_id},参数:{params}") + + # 执行对应程序 + responses = "" + if prog_id in PROG_MAP: + try: + result = PROG_MAP[prog_id](**params) + response = f"SUCCESS|步进电机测试执行完成,结果:{result}" + logging.info(f"[1106] {response}") + except Exception as e: + response = f"FAIL|步进电机测试执行失败:{str(e)}" + logging.error(f"[1106] {response}", exc_info=True) + else: + response = f"FAIL|未知指令:{prog_id},支持指令:{list(PROG_MAP.keys())}" + logging.warning(f"[1106] {response}") + + # 发送响应给工控机 + conn.sendall(response.encode("utf-8")) + logging.info(f"[1106] 已发送响应:{response}") + +# ----------测试接口---------- +if __name__ == "__main__": + server() + diff --git a/RK1106/test.py b/RK1106/test.py index 0b3e378..2b0c77d 100644 --- a/RK1106/test.py +++ b/RK1106/test.py @@ -249,7 +249,7 @@ class StepperMotor: """析构函数:确保资源释放""" self.close() -# 使用示例 +# ----------对外接口----------- def motor_demo(): """电机控制示例""" motor = None @@ -287,6 +287,5 @@ def motor_demo(): motor.close() print("[OK] 程序退出完成") - if __name__ == '__main__': motor_demo() \ No newline at end of file diff --git a/client.py b/client.py new file mode 100644 index 0000000..d070886 --- /dev/null +++ b/client.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +# @Time : 2026/1/9 10:49 +# @Author : reenrr +# @File : client.py +# @Desc : 工具工控机客户端 +""" +import socket +import logging +import time + +# -------- RK1106配置 ---------- +_1106_IP = "127.0.0.1" # 替换为3506的真实IP(本地测试用127.0.0.1) +_1106_PORT = 8888 # 和3506服务端一致的端口 +TIMEOUT = 10 # 通讯超时时间(秒) + + +class IndustrialPCClient: + """工控机客户端类,封装与1106的通讯逻辑""" + + def __init__(self, ip: str, port: int, timeout: int = 10): + self.ip = ip + self.port = port + self.timeout = timeout + self.socket = None # 通讯socket + + def connect(self) -> bool: + """连接1106服务端""" + try: + # 创建TCP socket + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(self.timeout) # 设置超时 + # 连接3506 + self.socket.connect((self.ip, self.port)) + logging.info(f"[工控机] 成功连接1106:{self.ip}:{self.port}") + return True + except socket.timeout: + logging.error(f"[工控机] 连接1106超时({self.timeout}s)") + self.close() + return False + except ConnectionRefusedError: + logging.error(f"[工控机] 1106拒绝连接(请检查1106服务是否启动)") + self.close() + return False + except Exception as e: + logging.error(f"[工控机] 连接1106失败:{str(e)}", exc_info=True) + self.close() + return False + + def send_command(self, prog_id: str, **params) -> tuple[bool, str]: + """ + 向1106发送指令 + :param prog_id: 指令标识(如STEPPER_TEST) + :param params: 指令参数(如speed=50, steps=200) + :return: (是否成功, 响应信息) + """ + if not self.socket: + return False, "未连接1106,请先调用connect()" + + # 构造指令字符串(符合1106解析格式) + param_str = "&".join([f"{k}={v}" for k, v in params.items()]) + cmd = f"{prog_id}|{param_str}" if param_str else prog_id + + try: + # 发送指令(UTF-8编码) + self.socket.sendall(cmd.encode("utf-8")) + logging.info(f"[工控机] 发送指令:{cmd}") + + # 接收3506响应(最大1024字节) + resp = self.socket.recv(1024).decode("utf-8", errors="ignore").strip() + if not resp: + return False, "未收到1106响应" + + # 解析响应(SUCCESS/FAIL|描述) + if "|" in resp: + status, msg = resp.split("|", 1) + success = status == "SUCCESS" + logging.info(f"[工控机] 1106响应:{'成功' if success else '失败'} - {msg}") + return success, msg + else: + # 响应格式异常 + logging.warning(f"[工控机] 1106响应格式异常:{resp}") + return False, f"响应格式错误:{resp}" + + except socket.timeout: + logging.error(f"[工控机] 发送指令后超时({self.timeout}s)未收到响应") + return False, "通讯超时" + except Exception as e: + logging.error(f"[工控机] 发送指令失败:{str(e)}", exc_info=True) + return False, str(e) + + def close(self): + """关闭连接""" + if self.socket: + self.socket.close() + self.socket = None + logging.info("[工控机] 已断开与1106的连接") + +# ---------对外接口---------- +def control_stepper_motor(): + """示例:工控机调用1106的步进电机控制程序""" + # 初始化客户端 + client = IndustrialPCClient(_1106_IP, _1106_PORT, TIMEOUT) + + # 连接1106 + if not client.connect(): + return + + try: + # ========== 示例1:无参数调用步进电机测试 ========== + logging.info("\n===== 无参数调用步进电机测试 =====") + success, msg = client.send_command("test") + if not success: + logging.error(f"调用失败:{msg}") + + time.sleep(2) # 间隔2秒 可能根据实际效果进行延时调整 + + finally: + # 确保关闭连接 + client.close() + +# ---------测试接口---------- +if __name__ == "__main__": + control_stepper_motor() diff --git a/conveyor_controller/conveyor_master_controller2_test.py b/conveyor_controller/conveyor_master_controller2_test.py index 3bcc63a..111c06e 100644 --- a/conveyor_controller/conveyor_master_controller2_test.py +++ b/conveyor_controller/conveyor_master_controller2_test.py @@ -11,7 +11,7 @@ import threading import time from datetime import datetime import serial -from EMV import RelayController +from EMV.EMV_test import RelayController logging.getLogger("pymodbus").setLevel(logging.CRITICAL) diff --git a/main_control.py b/main_control.py index da24a4c..0416ce1 100644 --- a/main_control.py +++ b/main_control.py @@ -1,19 +1,19 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -''' +""" # @Time : 2025/12/12 11:05 # @Author : reenrr # @File : main_control.py # @Desc : 主控程序 -''' -import multiprocessing # 多进程模块 -import threading -from threading import Event +""" import time from EMV.EMV_test import press_sensors_triggered, control_solenoid, global_relay from visual_algorithm.visual_algorithm import flaw_detection import logging import os +from conveyor_controller.conveyor_master_controller2_test import conveyor_control +from client import IndustrialPCClient +from DM.DM_Motor_test import DMMotorController, Control_Type, dm_motor_control # ------------ 日志+参数配置 ------------ script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -29,59 +29,178 @@ logging.basicConfig( ] ) -# ------------全局事件------------- -manger = multiprocessing.Manager() -conveyor_start_event = manger.Event() +# -------------------------- 电机参数配置 -------------------------- +SLAVE_ID = 0x01 +MASTER_ID = 0x11 +PORT = 'COM10' +BAUDRATE = 921600 -def start_thread(): - """开启各种线程""" - global_relay.start_press_sensors_monitor() # 双压传感器监听线程 +class LineMainControlSystem: + """ + 线条主控系统类 + """ + def __init__(self): + """ + 构造方法:初始化类属性 + """ + # 客户端 + self.client = IndustrialPCClient(ip='127.0.0.1', port=8888, timeout=10) + # 连接服务端 + if not self.client.connect(): + return -def stop_thread(): - """关闭各种线程""" - global_relay.stop_press_sensors_monitor() # 双压传感器监听线程 + # 合格线条处理记数 + self.qualified_line_count = 0 + self.qualified_trigger_threshold = 5 + + # 创建达妙电机实例、参数 + self.motor_controller = DMMotorController( + slave_id=SLAVE_ID, + master_id=MASTER_ID, + port=PORT, + baudrate=BAUDRATE + ) + self.dm_motor_init() -def quality_testing(): - logging.info("线条开始质量检测:") + def dm_motor_init(self): + # 切换到位置-速度模式 + self.motor_controller.switch_control_mode(Control_Type.POS_VEL) - # 执行质量检测 - result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08}) - if result == "qualified": + # 使能电机 + self.motor_controller.enable_motor() + + # 将电机位置恢复到初始位置(防止断电,电机停在某个位置上) + dm_motor_position = self.motor_controller.get_position() + logging.info(f"初始位置{dm_motor_position}") + # 将位置移动到0位 + self.motor_controller.control_pos_vel(p_desired=0, v_desired=30) + time.sleep(10) + + def start_thread(self): + """开启各种线程""" + logging.info("开启各类监听线程") + global_relay.start_press_sensors_monitor() # 双压传感器监听线程 + + def stop_thread(self): + """关闭各种线程""" + logging.info("关闭各类监听线程") + global_relay.stop_press_sensors_monitor() # 双压传感器监听线程 + + def handle_line(self): + try: + logging.info("线条开始质量检测:") + + # 执行质量检测 + result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08}) + if result == "qualified": + self._handle_qualified_line() + elif result == "unqualified": + self._handle_unqualified_line() + + except Exception as e: + logging.error(f"质量检测失败:{e}", exc_info=True) + + finally: + # 关闭连接服务端 + if self.client: + self.client.close() + logging.info("关闭连接服务端") + + def _handle_qualified_line(self): + """ + 处理合格线条的流程 + """ result = "合格" - logging.info("该线条是否合格:", result) + logging.info(f"该线条是否合格:{result}") logging.info("等待线条落到传送带(双压传感器触发)...") + # 等待时间触发,超时时间设为10秒(避免无限等待) if press_sensors_triggered.wait(timeout=10): + press_sensors_triggered.clear() + logging.info("线条已落到传送带,控制两个传送带向前移动") - # 触发传送带启动事件 - conveyor_start_event.set() + # 传送带运行 + conveyor_control() + + # 验证传送带运行结果 + press_sensor1_status = global_relay.get_device_status('press_sensor1', 'sensors') + press_sensor2_status = global_relay.get_device_status('press_sensor2', 'sensors') + if not press_sensor1_status and not press_sensor2_status: + logging.info("传送带运行正常,线条已移动") + + self.qualified_line_count += 1 + logging.info(f"已处理合格线条{self.qualified_line_count}/{self.qualified_trigger_threshold}个") + + # 控制步进电机移动 + logging.info("控制步进电机移动") + success, msg = self.client.send_command("test") + if not success: + logging.error(f"调用失败:{msg}") + else: + logging.info("步进电机移动成功") + + # 判断线条个数 + if self.qualified_line_count >= self.qualified_trigger_threshold: + # 舵机、电机相关控制 + self.qualified_line_count = 0 + else: + logging.warning("传送带运行异常,线条未正常移动,不更新合格计数") else: - logging.info("超时警告:线条未落到传送带,请检查") - elif result == "unqualified": + logging.info("线条未落到传送带,请检查传感器或设备状态") + + def _handle_unqualified_line(self): + """ + 处理不合格线条的流程 + """ result = "不合格" logging.info("该线条是否合格:", result) logging.info("等待线条落到传送带上") + # 等待时间触发,超时时间设为10秒(避免无限等待) if press_sensors_triggered.wait(timeout=10): + press_sensors_triggered.clear() logging.info("线条已落到传送带") + + # 执行NG动作,控制电磁阀 logging.info("进入NG动作") - control_solenoid() # 执行NG动作,控制电磁阀 + control_solenoid() # 执行NG动作,控制电磁阀 logging.info("NG动作结束") - # logging.info("判断NG线条是否落入肥料区:") + + def gripper_control(self): + """ + 气动夹爪相关控制:达妙电机的移动、舵机的翻转 + """ + # 打开电磁阀3 + global_relay.open('solenoid_valve3', "devices") + + # 控制达妙电机收缩 + self.motor_controller.control_pos_vel(p_desired=282.6, v_desired=30) # 450mm 665-215 + time.sleep(10) + + + # -----------对外接口------------- def main_control(): - logging.info("开启各种线程") - start_thread() + main_control_system = LineMainControlSystem() - logging.info("开始摆放线条") + try: + logging.info("开启各种线程") + main_control_system.start_thread() - # 质量检测 - quality_testing() + while True: # 防止跳出循环 + logging.info("开始摆放线条") + + # 质量检测 + main_control_system.handle_line() + except KeyboardInterrupt: + logging.info("用户手动退出") + finally: + # 程序退出时,关闭各类线程 + main_control_system.stop_thread() + logging.info("主控程序已正常退出") - while True: # 防止跳出循环 - time.sleep(1) # ------------测试接口------------- if __name__ == '__main__':