From bd7820df76b7c4e7e07f9906f011bf69e40b9f40 Mon Sep 17 00:00:00 2001 From: cdeyw <827523911@qq.com> Date: Mon, 29 Sep 2025 09:19:30 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Fedding.py | 1153 +++++++++++++++++ core/state.py | 1 + .../test_inverter_controller.cpython-39.pyc | Bin 0 -> 4587 bytes tests/test_feeding_process.py | 41 +- tests/test_inverter_controller.py | 161 +++ tests/test_relay_controller.py | 109 ++ tests/test_transmitter_controller.py | 165 +++ 7 files changed, 1609 insertions(+), 21 deletions(-) create mode 100644 Fedding.py create mode 100644 tests/__pycache__/test_inverter_controller.cpython-39.pyc create mode 100644 tests/test_inverter_controller.py create mode 100644 tests/test_relay_controller.py create mode 100644 tests/test_transmitter_controller.py diff --git a/Fedding.py b/Fedding.py new file mode 100644 index 0000000..378608d --- /dev/null +++ b/Fedding.py @@ -0,0 +1,1153 @@ +import socket +import binascii +import time +import threading +import struct +import cv2 +import os +from pymodbus.client import ModbusTcpClient +from pymodbus.exceptions import ModbusException +# 添加视觉模块路径 +import sys + +sys.path.append(os.path.join(os.path.dirname(__file__), 'src', 'vision')) +# 导入视觉处理模块 +from vision.anger_caculate import predict_obb_best_angle +from vision.resize_tuili_image_main import classify_image_weighted, load_global_rois, crop_and_resize, YOLO + + +class FeedingControlSystem: + def __init__(self, relay_host='192.168.0.18', relay_port=50000): + # 网络继电器配置 + self.relay_host = relay_host + self.relay_port = relay_port + self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port) + + # 继电器映射 + self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动 + self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门 + self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门 + self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱 + self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱 + + # 继电器命令(原始Socket)mudbus TCP模式 + self.relay_commands = { + self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, + self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, + self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, + self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, + self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'} + } + + # 读取状态命令 + self.read_status_command = '000000000006010100000008' + + # 设备位映射 + self.device_bit_map = { + self.DOOR_UPPER: 0, + self.DOOR_LOWER_1: 1, + self.DOOR_LOWER_2: 2, + self.BREAK_ARCH_UPPER: 3, + self.BREAK_ARCH_LOWER: 4 + } + + # 变频器配置(Modbus RTU 协议) + self.inverter_config = { + 'slave_id': 1, + 'frequency_register': 0x01, # 2001H + 'start_register': 0x00, # 2000H + 'stop_register': 0x00, # 2000H(用于停机) + 'start_command': 0x0013, # 正转点动运行 + 'stop_command': 0x0001 # 停机 + } + + # 变送器配置(Modbus RTU) + self.transmitter_config = { + 1: { # 上料斗 + 'slave_id': 1, + 'weight_register': 0x01, + 'register_count': 2 + },#发出去的内容01 03 00 01 00 02 + 2: { # 下料斗 + 'slave_id': 2, + 'weight_register': 0x01, + 'register_count': 2 + }#发出去的内容02 03 00 01 00 02 + } + + # 系统状态 + self._running = False + self._monitor_thread = None + self._visual_control_thread = None + self._alignment_check_thread = None + + # 下料控制相关 + self.min_required_weight = 500 # 模具车最小需要重量(kg) + self.upper_door_position = 'default' # default(在搅拌楼下接料), over_lower(在下料斗上方), returning(返回中) + self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:等待上料, 4:等待模具车对齐 + self.lower_feeding_cycle = 0 # 下料斗下料循环次数 (1, 2, 3) + self.upper_feeding_count = 0 # 上料斗已下料次数 (0, 1, 2) + self.last_upper_weight = 0 + self.last_lower_weight = 0 + self.last_weight_time = time.time() + self.target_vehicle_weight = 5000 # 目标模具车重量(kg) + self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg),每次下料多下这么多 + self.single_batch_weight = 2500 # 单次下料重量(kg) + + + #夹角状态 + self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery + + # 错误计数 + self.upper_weight_error_count = 0 + self.lower_weight_error_count = 0 + self.max_error_count = 3 + + # 下料阶段频率(Hz) + self.inverter_max_frequency = 400.0#频率最大值 + self.frequencies = [220.0, 230.0, 240.0] + + # 视觉系统接口 + self.overflow_detected = False # 堆料检测 + self.door_opening_large = False # 夹角 + self.vehicle_aligned = False # 模具车是否对齐 + + # 视觉控制参数 + self.angle_threshold = 60.0 # 角度阈值,超过此值认为开口过大 + self.target_angle = 20.0 # 目标角度 + self.min_angle = 10.0 # 最小角度 + self.max_angle = 80.0 # 最大角度 + self.angle_tolerance = 5.0 # 角度容差 + self.visual_control_enabled = True # 视觉控制使能 + self.last_angle = None # 上次检测角度 + self.visual_check_interval = 1.0 # 视觉检查间隔(秒) + self.alignment_check_interval = 0.5 # 对齐检查间隔(秒) + + # 模型路径配置 + self.angle_model_path = "vision/models/angle.pt" + self.overflow_model_path = "vision/models/overflow.pt" + self.alignment_model_path = "vision/models/alig.pt" # 模具车对齐检测模型 + self.roi_file_path = "vision/roi_coordinates/1_rois.txt" + + # 模型实例 + self.angle_model = None # 夹角检测模型实例 + self.overflow_model = None # 堆料检测模型实例 + self.alignment_model = None # 对齐检测模型实例 + + # 摄像头相关配置 + self.camera = None + self.camera_type = "ip" + self.camera_ip = "192.168.1.51" + self.camera_port = 554 + self.camera_username = "admin" + self.camera_password = "XJ123456" + self.camera_channel = 1 + # self.current_image_path = "current_frame.jpg" + + def set_camera_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1): + """ + 设置摄像头配置 + :param ip: 网络摄像头IP地址 + :param port: 网络摄像头端口 + :param username: 网络摄像头用户名 + :param password: 网络摄像头密码 + :param channel: 摄像头通道号 + """ + self.camera_type = camera_type + if ip: + self.camera_ip = ip + if port: + self.camera_port = port + if username: + self.camera_username = username + if password: + self.camera_password = password + self.camera_channel = channel + + def set_angle_parameters(self, target_angle=20.0, min_angle=10.0, max_angle=80.0, threshold=60.0): + """ + 设置角度控制参数 + """ + self.target_angle = target_angle + self.min_angle = min_angle + self.max_angle = max_angle + self.angle_threshold = threshold + + def set_feeding_parameters(self, target_vehicle_weight=5000, upper_buffer_weight=500, single_batch_weight=2500): + """ + 设置下料参数 + :param target_vehicle_weight: 目标模具车重量(kg) + :param upper_buffer_weight: 上料斗缓冲重量(kg) + :param single_batch_weight: 单次下料重量(kg) + """ + self.target_vehicle_weight = target_vehicle_weight + self.upper_buffer_weight = upper_buffer_weight + self.single_batch_weight = single_batch_weight + + def load_all_models(self): + """ + 加载所有视觉检测模型 + """ + success = True + + # 加载夹角检测模型 + try: + if not os.path.exists(self.angle_model_path): + print(f"夹角检测模型不存在: {self.angle_model_path}") + success = False + else: + # 注意:angle.pt模型通过predict_obb_best_angle函数使用,不需要预加载 + print(f"夹角检测模型路径: {self.angle_model_path}") + except Exception as e: + print(f"检查夹角检测模型失败: {e}") + success = False + + # 加载堆料检测模型 + try: + if not os.path.exists(self.overflow_model_path): + print(f"堆料检测模型不存在: {self.overflow_model_path}") + success = False + else: + self.overflow_model = YOLO(self.overflow_model_path) + print(f"成功加载堆料检测模型: {self.overflow_model_path}") + except Exception as e: + print(f"加载堆料检测模型失败: {e}") + success = False + + # 加载对齐检测模型 + try: + if not os.path.exists(self.alignment_model_path): + print(f"对齐检测模型不存在: {self.alignment_model_path}") + success = False + else: + self.alignment_model = YOLO(self.alignment_model_path) + print(f"成功加载对齐检测模型: {self.alignment_model_path}") + except Exception as e: + print(f"加载对齐检测模型失败: {e}") + success = False + + return success + + def send_relay_command(self, command_hex): + """发送原始Socket命令""" + try: + byte_data = binascii.unhexlify(command_hex) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect((self.relay_host, self.relay_port)) + sock.send(byte_data) + response = sock.recv(1024) + print(f"收到继电器响应: {binascii.hexlify(response)}") + return response + except Exception as e: + print(f"继电器通信错误: {e}") + return None + + def get_relay_status(self): + """获取继电器状态""" + response = self.send_relay_command(self.read_status_command) + status_dict = {} + + if response and len(response) >= 10: + status_byte = response[9] + status_bin = f"{status_byte:08b}"[::-1] + for key, bit_index in self.device_bit_map.items(): + status_dict[key] = status_bin[bit_index] == '1' + else: + print("读取继电器状态失败") + + return status_dict + + def control_relay(self, device, action): + """控制继电器""" + if device in self.relay_commands and action in self.relay_commands[device]: + print(f"控制继电器 {device} {action}") + self.send_relay_command(self.relay_commands[device][action]) + time.sleep(0.1) + else: + print(f"无效设备或动作: {device}, {action}") + + def read_transmitter_data_via_relay(self, transmitter_id): + """读取变送器数据(Modbus TCP 转 RS485)""" + try: + if transmitter_id not in self.transmitter_config: + print(f"无效变送器ID: {transmitter_id}") + return None + + config = self.transmitter_config[transmitter_id] + + if not self.relay_modbus_client.connect(): + print("无法连接网络继电器Modbus服务") + return None + + result = self.relay_modbus_client.read_holding_registers( + address=config['weight_register'], + count=config['register_count'], + slave=config['slave_id']#转发给哪台变送器 + ) + + if isinstance(result, Exception): + print(f"读取变送器 {transmitter_id} 失败: {result}") + return None + + # 根据图片示例,正确解析数据 + if config['register_count'] == 2:#读两个寄存器 + # 获取原始字节数组 + raw_data = result.registers + # 组合成32位整数 + weight = (raw_data[0] << 16) + raw_data[1] + weight = weight / 1000.0 # 单位转换为千克 + elif config['register_count'] == 1: + weight = float(result.registers[0]) + else: + print(f"不支持的寄存器数量: {config['register_count']}") + return None + + print(f"变送器 {transmitter_id} 读取重量: {weight}kg") + return weight + + except ModbusException as e: + print(f"Modbus通信错误: {e}") + return None + except Exception as e: + print(f"数据解析错误: {e}") + return None + finally: + self.relay_modbus_client.close() + + def set_inverter_frequency_via_relay(self, frequency): + """设置变频器频率""" + try: + if not self.relay_modbus_client.connect(): + print("无法连接网络继电器Modbus服务") + return False + + # 使用最大频率变量计算百分比 + percentage = frequency / self.inverter_max_frequency # 得到 0~1 的比例 + value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数 + + # 限制范围 + value = max(-10000, min(10000, value)) + + result = self.relay_modbus_client.write_register( + self.inverter_config['frequency_register'], + value, + slave=self.inverter_config['slave_id'] + ) + + if isinstance(result, Exception): + print(f"设置频率失败: {result}") + return False + + print(f"设置变频器频率为 {frequency}Hz") + return True + except ModbusException as e: + print(f"变频器Modbus通信错误: {e}") + return False + finally: + self.relay_modbus_client.close() + + def control_inverter_via_relay(self, action): + try: + if not self.relay_modbus_client.connect(): + print("无法连接网络继电器Modbus服务") + return False + + if action == 'start': + result = self.relay_modbus_client.write_register( + address=self.inverter_config['start_register'], + value=self.inverter_config['start_command'], + slave=self.inverter_config['slave_id'] + ) + print("启动变频器") + elif action == 'stop': + result = self.relay_modbus_client.write_register( + address=self.inverter_config['start_register'], + value=self.inverter_config['stop_command'], + slave=self.inverter_config['slave_id'] + ) + print("停止变频器") + else: + print(f"无效操作: {action}") + return False + + if isinstance(result, Exception): + print(f"控制失败: {result}") + return False + + return True + except ModbusException as e: + print(f"变频器控制错误: {e}") + return False + finally: + self.relay_modbus_client.close() + + def check_upper_material_request(self): + """检查是否需要要料""" + current_weight = self.read_transmitter_data_via_relay(1) + + if current_weight is None: + self.upper_weight_error_count += 1 + print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}") + if self.upper_weight_error_count >= self.max_error_count: + print("警告:上料斗传感器连续读取失败,请检查连接") + return False + + self.upper_weight_error_count = 0 + # 判断是否需要要料:当前重量 < 目标重量 + 缓冲重量 + if current_weight < (self.single_batch_weight + self.min_required_weight): + print("上料斗重量不足,通知搅拌楼要料") + self.request_material_from_mixing_building() # 请求搅拌楼下料 + return True + return False + + def request_material_from_mixing_building(self): + """ + 请求搅拌楼下料(待完善) + TODO: 与同事对接通信协议 + """ + print("发送要料请求至搅拌楼...") + self.return_upper_door_to_default() + # 这里需要与同事对接具体的通信方式 + # 可能是Modbus写寄存器、TCP通信、HTTP请求等 + pass + + def wait_for_mixing_building_material(self): + """ + 等待搅拌楼下料完成(待完善) + TODO: 与同事对接信号接收 + """ + print("等待搅拌楼下料完成...") + # 这里需要与同事对接具体的信号接收方式 + # 可能是Modbus读寄存器、TCP通信、HTTP请求等 + # 模拟等待 + time.sleep(5) + print("搅拌楼下料完成") + self.move_upper_door_over_lower() + return True + + def move_upper_door_over_lower(self): + """移动上料斗到下料斗上方""" + print("移动上料斗到下料斗上方") + self.control_relay(self.DOOR_UPPER, 'open') + self.upper_door_position = 'over_lower' + + def return_upper_door(self): + """返回上料斗到搅拌楼""" + print("上料斗返回搅拌楼") + self.control_relay(self.DOOR_UPPER, 'close') + self.upper_door_position = 'returning' + + def return_upper_door_to_default(self): + """上料斗回到默认位置(搅拌楼下接料位置)""" + print("上料斗回到默认位置") + self.control_relay(self.DOOR_UPPER, 'close') + self.upper_door_position = 'default' + + def start_lower_feeding(self): + """开始分步下料""" + if self.lower_feeding_stage != 0: + print("下料已在进行中") + return + + # 检查关键设备是否可连接 + if not self._check_device_connectivity(): + print("关键设备连接失败,无法开始下料") + return + + print("开始分步下料过程") + # 重置计数器 + self.lower_feeding_cycle = 0 # 用于记录三阶段下料次数 + self.upper_feeding_count = 0 # 用于记录上料次数 + + # 第一次上料(总共需要上料2次) + self.transfer_material_from_upper_to_lower() + + # 等待模具车对齐并开始第一轮下料 + self.lower_feeding_stage = 4 # 从等待模具车对齐开始 + self.wait_for_vehicle_alignment() + + def _check_device_connectivity(self): + """检查关键设备连接状态""" + try: + # 检查网络继电器连接 + test_response = self.send_relay_command(self.read_status_command) + if not test_response: + print("网络继电器连接失败") + return False + + # 检查变频器连接 + if not self.relay_modbus_client.connect(): + print("无法连接到网络继电器Modbus服务") + return False + + # 尝试读取变频器一个寄存器(测试连接) + test_result = self.relay_modbus_client.read_holding_registers( + address=0x00, + count=1, + slave=self.inverter_config['slave_id'] + ) + + if isinstance(test_result, Exception): + print("变频器连接测试失败") + return False + + # 检查下料斗变送器连接 + test_weight = self.read_transmitter_data_via_relay(2) + if test_weight is None: + print("下料斗变送器连接失败") + return False + + self.relay_modbus_client.close() + return True + except Exception as e: + print(f"设备连接检查失败: {e}") + return False + + def transfer_material_from_upper_to_lower(self): + """上料斗向下料斗下料(基于上料斗重量传感器控制)""" + print(f"上料斗向下料斗下料 (第 {self.upper_feeding_count + 1} 次)") + + # 记录下料前的重量 + initial_upper_weight = self.read_transmitter_data_via_relay(1) + + # 如果无法读取重量,直接报错 + if initial_upper_weight is None: + raise Exception("无法读取上料斗重量传感器数据,下料操作终止") + + target_upper_weight = initial_upper_weight - self.single_batch_weight + target_upper_weight = max(target_upper_weight, 0) # 确保不低于0 + + print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg") + + # 确保下料斗出砼门关闭 + self.control_relay(self.DOOR_LOWER_2, 'close') + # 打开上料斗出砼门 + self.control_relay(self.DOOR_LOWER_1, 'open') + + # 等待物料流入下料斗,基于上料斗重量变化控制 + start_time = time.time() + timeout = 30 # 30秒超时 + + while time.time() - start_time < timeout: + current_upper_weight = self.read_transmitter_data_via_relay(1) + + # 如果无法读取重量,继续尝试 + if current_upper_weight is None: + print("无法读取上料斗重量,继续尝试...") + time.sleep(1) + continue + + print(f"上料斗当前重量: {current_upper_weight:.2f}kg") + + # 如果达到目标重量,则关闭上料斗出砼门 + if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围 + print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg") + break + elif time.time() - start_time > 25: # 如果25秒后重量变化过小 + weight_change = initial_upper_weight - current_upper_weight + if weight_change < 100: # 如果重量变化小于100kg + print("重量变化过小,可能存在堵塞,交由监控系统处理...") + # 不再在这里直接处理破拱,而是依靠监控系统处理 + break + + time.sleep(1) + + # 关闭上料斗出砼门 + self.control_relay(self.DOOR_LOWER_1, 'close') + + # 验证下料结果 + final_upper_weight = self.read_transmitter_data_via_relay(1) + if final_upper_weight is not None: + actual_transferred = initial_upper_weight - final_upper_weight + print(f"实际下料重量: {actual_transferred:.2f}kg") + + # 增加上料计数 + self.upper_feeding_count += 1 + print("上料斗下料完成") + + # 关闭上料斗出砼门 + self.control_relay(self.DOOR_LOWER_1, 'close') + + # 验证下料结果 + final_upper_weight = self.read_transmitter_data_via_relay(1) + if final_upper_weight is not None: + actual_transferred = initial_upper_weight - final_upper_weight + print(f"实际下料重量: {actual_transferred:.2f}kg") + + # 增加上料计数 + self.upper_feeding_count += 1 + print("上料斗下料完成") + + def wait_for_vehicle_alignment(self): + """等待模具车对齐""" + print("等待模具车对齐...") + self.lower_feeding_stage = 4 + + while self.lower_feeding_stage == 4 and self._running: + if self.vehicle_aligned: + print("模具车已对齐,开始下料") + self.lower_feeding_stage = 1 + self.feeding_stage_one() + break + time.sleep(self.alignment_check_interval) + + def feeding_stage_one(self): + """第一阶段下料:下料斗向模具车下料(低速)""" + print("开始第一阶段下料:下料斗低速下料") + self.set_inverter_frequency_via_relay(self.frequencies[0]) + self.control_inverter_via_relay('start') + + # 确保上料斗出砼门关闭 + self.control_relay(self.DOOR_LOWER_1, 'close') + # 打开下料斗出砼门 + self.control_relay(self.DOOR_LOWER_2, 'open') + + start_time = time.time() + initial_weight = self.read_transmitter_data_via_relay(2) + if initial_weight is None: + print("无法获取初始重量,取消下料") + self.finish_feeding_process() # 直接结束整个流程,而不是当前批次 + return + + target_weight = initial_weight + self.single_batch_weight + + while self.lower_feeding_stage == 1: + current_weight = self.read_transmitter_data_via_relay(2) + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗传感器连续读取失败,停止下料") + self.finish_feeding_process() # 直接结束整个流程 + return + else: + self.lower_weight_error_count = 0 + + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 2 + self.feeding_stage_two() + break + time.sleep(2) + + def feeding_stage_two(self): + """第二阶段下料:下料斗向模具车下料(中速)""" + print("开始第二阶段下料:下料斗中速下料") + self.set_inverter_frequency_via_relay(self.frequencies[1]) + + # 保持下料斗出砼门打开 + self.control_relay(self.DOOR_LOWER_2, 'open') + # 确保上料斗出砼门关闭 + self.control_relay(self.DOOR_LOWER_1, 'close') + + start_time = time.time() + initial_weight = self.read_transmitter_data_via_relay(2) + if initial_weight is None: + print("无法获取初始重量,取消下料") + self.finish_feeding_process() # 直接结束整个流程 + return + + target_weight = initial_weight + self.single_batch_weight + + while self.lower_feeding_stage == 2: + current_weight = self.read_transmitter_data_via_relay(2) + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗传感器连续读取失败,停止下料") + self.finish_feeding_process() # 直接结束整个流程 + return + else: + self.lower_weight_error_count = 0 + + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 3 + self.feeding_stage_three() + break + time.sleep(2) + + def feeding_stage_three(self): + """第三阶段下料:下料斗向模具车下料(高速)""" + print("开始第三阶段下料:下料斗高速下料") + self.set_inverter_frequency_via_relay(self.frequencies[2]) + + # 保持下料斗出砼门打开 + self.control_relay(self.DOOR_LOWER_2, 'open') + # 确保上料斗出砼门关闭 + self.control_relay(self.DOOR_LOWER_1, 'close') + + start_time = time.time() + initial_weight = self.read_transmitter_data_via_relay(2) + if initial_weight is None: + print("无法获取初始重量,取消下料") + self.finish_feeding_process() # 直接结束整个流程 + return + + target_weight = initial_weight + self.single_batch_weight + + while self.lower_feeding_stage == 3: + current_weight = self.read_transmitter_data_via_relay(2) + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗传感器连续读取失败,停止下料") + self.finish_feeding_process() # 直接结束整个流程 + return + else: + self.lower_weight_error_count = 0 + + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 4 + self.finish_current_batch() + break + time.sleep(2) + + def finish_current_batch(self): + """完成当前批次下料""" + print("当前批次下料完成,关闭出砼门") + self.control_inverter_via_relay('stop') + self.control_relay(self.DOOR_LOWER_1, 'close') + self.control_relay(self.DOOR_LOWER_2, 'close') + + + # 增加三阶段下料轮次计数 + self.lower_feeding_cycle += 1 + + # 检查是否完成两轮三阶段下料(总共5吨) + if self.lower_feeding_cycle >= 2: + # 完成整个5吨下料任务 + print("完成两轮三阶段下料,5吨下料任务完成") + self.finish_feeding_process() + return + + # 如果只完成一轮三阶段下料,进行第二次上料 + print("第一轮三阶段下料完成,准备第二次上料") + # 上料斗第二次向下料斗下料 + try: + self.transfer_material_from_upper_to_lower() + except Exception as e: + print(f"第二次上料失败: {e}") + print("停止下料流程") + self.finish_feeding_process() # 出现严重错误时结束整个流程 + return + + # 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车) + print("第二次上料完成,继续三阶段下料") + self.lower_feeding_stage = 1 # 直接进入第一阶段下料 + self.feeding_stage_one() # 开始第二轮第一阶段下料 + + + def finish_feeding_process(self): + """完成整个下料流程""" + print("整个下料流程完成") + self.lower_feeding_stage = 0 + self.lower_feeding_cycle = 0 + self.upper_feeding_count = 0 + self.return_upper_door_to_default() + + def handle_overflow_control(self, overflow_detected, door_opening_large): + """处理溢料控制""" + if overflow_detected and door_opening_large: + print("检测到溢料且出砼门开口较大,调小出砼门") + self.control_relay(self.DOOR_LOWER_1, 'close') + time.sleep(0.1) + self.control_relay(self.DOOR_LOWER_1, 'open') + time.sleep(0.1) + + def is_lower_door_open(self): + """检查出砼门是否打开""" + return self.lower_feeding_stage in [1, 2] # 只有在下料阶段才认为门是打开的 + + def check_arch_blocking(self): + """检查是否需要破拱""" + current_time = time.time() + + # 检查下料斗破拱(只有在下料过程中才检查) + if self.lower_feeding_stage in [1, 2, 3]: # 在所有下料阶段检查 + lower_weight = self.read_transmitter_data_via_relay(2) + if lower_weight is not None: + # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) + if (abs(lower_weight - self.last_lower_weight) < 0.1) and \ + (current_time - self.last_weight_time) > 10: + print("下料斗可能堵塞,启动破拱") + self.control_relay(self.BREAK_ARCH_LOWER, 'open') + time.sleep(2) + self.control_relay(self.BREAK_ARCH_LOWER, 'close') + + self.last_lower_weight = lower_weight + + # 检查上料斗破拱(在上料斗向下料斗下料时检查) + if (self.upper_door_position == 'over_lower' and + self.lower_feeding_stage in [0, 1, 2, 3, 4]): # 在任何阶段都可能需要上料斗破拱 + upper_weight = self.read_transmitter_data_via_relay(1) + if upper_weight is not None: + # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) + if (abs(upper_weight - self.last_upper_weight) < 0.1) and \ + (current_time - self.last_weight_time) > 10: + print("上料斗可能堵塞,启动破拱") + self.control_relay(self.BREAK_ARCH_UPPER, 'open') + time.sleep(2) + self.control_relay(self.BREAK_ARCH_UPPER, 'close') + + self.last_upper_weight = upper_weight + + # 更新最后读取时间 + if (self.read_transmitter_data_via_relay(1) is not None or + self.read_transmitter_data_via_relay(2) is not None): + self.last_weight_time = current_time + + def monitor_system(self): + """监控系统状态""" + while self._running: + try: + self.check_upper_material_request() + self.check_arch_blocking() + time.sleep(1) + except Exception as e: + print(f"监控线程错误: {e}") + + def setup_camera_capture(self, camera_index=0): + """ + 设置摄像头捕获 + :param camera_index: USB摄像头索引或IP摄像头配置 + """ + try: + rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01" + self.camera = cv2.VideoCapture(rtsp_url) + + if not self.camera.isOpened(): + print(f"无法打开网络摄像头: {rtsp_url}") + return False + print(f"网络摄像头初始化成功,地址: {rtsp_url}") + return True + except Exception as e: + print(f"摄像头设置失败: {e}") + return False + + def capture_current_frame(self): + """捕获当前帧并返回numpy数组""" + try: + if self.camera is None: + print("摄像头未初始化") + return None + + ret, frame = self.camera.read() + if ret: + return frame + else: + print("无法捕获图像帧") + return None + except Exception as e: + print(f"图像捕获失败: {e}") + return None + + def detect_overflow_from_image(self, image_array): + """通过图像检测是否溢料(接受numpy数组)""" + try: + # 检查模型是否已加载 + if self.overflow_model is None: + print("堆料检测模型未加载") + return False + + rois = load_global_rois(self.roi_file_path) + + if not rois: + print("没有有效的ROI配置") + return False + + if image_array is None: + print("输入图像为空") + return False + + crops = crop_and_resize(image_array, rois, 640) + for roi_resized, _ in crops: + final_class, _, _, _ = classify_image_weighted(roi_resized, self.overflow_model, threshold=0.4) + if "大堆料" in final_class or "小堆料" in final_class: + return True + + return False + except Exception as e: + print(f"溢料检测失败: {e}") + return False + + def detect_vehicle_alignment(self, image_array): + """通过图像检测模具车是否对齐(接受numpy数组)""" + try: + # 检查模型是否已加载 + if self.alignment_model is None: + print("对齐检测模型未加载") + return False + + if image_array is None: + print("输入图像为空") + return False + + # 直接使用模型进行推理 + results = self.alignment_model(image_array) + pared_probs = results[0].probs.data.cpu().numpy().flatten() + + # 类别0: 未对齐, 类别1: 对齐 + class_id = int(pared_probs.argmax()) + confidence = float(pared_probs[class_id]) + + # 只有当对齐且置信度>95%时才认为对齐 + if class_id == 1 and confidence > 0.95: + return True + return False + except Exception as e: + print(f"对齐检测失败: {e}") + return False + + def get_current_door_angle(self, image=None, image_path=None): + """ + 通过视觉系统获取当前出砼门角度 + :param image: 图像数组(numpy array) + :param image_path: 图片路径 + """ + try: + # 检查模型是否已加载 + if self.angle_model is None: + print("夹角检测模型未加载") + return None + + angle_deg, _ = predict_obb_best_angle( + model=self.angle_model, # 传递预加载的模型实例 + image=image, # 传递图像数组 + image_path=image_path # 或传递图像路径 + ) + return angle_deg + except Exception as e: + print(f"角度检测失败: {e}") + return None + + def alignment_check_loop(self): + """ + 模具车对齐检查循环 + """ + while self._running: + try: + # 只在需要检查对齐时才检查 + if self.lower_feeding_stage == 4: + current_frame = self.capture_current_frame() + if current_frame is not None: + self.vehicle_aligned = self.detect_vehicle_alignment(current_frame) + if self.vehicle_aligned: + print("检测到模具车对齐") + else: + print("模具车未对齐") + time.sleep(self.alignment_check_interval) + except Exception as e: + print(f"对齐检查循环错误: {e}") + time.sleep(self.alignment_check_interval) + + def visual_control_loop(self): + """ + 视觉控制主循环 + """ + + while self._running and self.visual_control_enabled: + try: + current_frame = self.capture_current_frame() + if current_frame is None: + print("无法获取当前图像,跳过本次调整") + time.sleep(self.visual_check_interval) + continue + + # 检测是否溢料 + overflow = self.detect_overflow_from_image(current_frame) + + # 获取当前角度 + current_angle = self.get_current_door_angle(image=current_frame) + + if current_angle is None: + print("无法获取当前角度,跳过本次调整") + time.sleep(self.visual_check_interval) + continue + + print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.angle_control_mode}") + + # 状态机控制逻辑 + if self.angle_control_mode == "normal": + # 正常模式 + if overflow and current_angle > self.angle_threshold: + # 检测到堆料且角度过大,进入角度减小模式 + print("检测到堆料且角度过大,关闭出砼门开始减小角度") + self.control_relay(self.DOOR_LOWER_2, 'close') + self.angle_control_mode = "reducing" + else: + # 保持正常开门 + self.control_relay(self.DOOR_LOWER_2, 'open') + + elif self.angle_control_mode == "reducing": + # 角度减小模式 + if current_angle <= self.target_angle + self.angle_tolerance: + # 角度已达到目标范围 + if overflow: + # 仍有堆料,进入维持模式 + print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式") + self.angle_control_mode = "maintaining" + self.control_relay(self.DOOR_LOWER_2, 'open') # 先打开门 + else: + # 无堆料,恢复正常模式 + print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式") + self.control_relay(self.DOOR_LOWER_2, 'open') + self.angle_control_mode = "recovery" + + elif self.angle_control_mode == "maintaining": + # 维持模式 - 使用脉冲控制 + if not overflow: + # 堆料已消除,恢复正常模式 + print("堆料已消除,恢复正常模式") + self.control_relay(self.DOOR_LOWER_2, 'open') + self.angle_control_mode = "recovery" + else: + # 继续维持角度控制 + self.pulse_control_door_for_maintaining() + + elif self.angle_control_mode == "recovery":#打开夹爪的过程中又堆料了 + # 恢复模式 - 逐步打开门 + if overflow: + # 又出现堆料,回到角度减小模式 + print("恢复过程中又检测到堆料,回到角度减小模式") + self.angle_control_mode = "maintaining" + else: + # 堆料已消除,恢复正常模式 + print("堆料已消除,恢复正常模式") + self.control_relay(self.DOOR_LOWER_2, 'open') + self.angle_control_mode = "normal" + + self.last_angle = current_angle + time.sleep(self.visual_check_interval) + + except Exception as e: + print(f"视觉控制循环错误: {e}") + time.sleep(self.visual_check_interval) + + def pulse_control_door_for_maintaining(self): + """ + 用于维持模式的脉冲控制 + 保持角度在目标范围内 + """ + print("执行维持脉冲控制") + # 关门1秒 + self.control_relay(self.DOOR_LOWER_2, 'close') + time.sleep(1.0) + # 开门1秒 + self.control_relay(self.DOOR_LOWER_2, 'open') + time.sleep(1.0) + + def start_visual_control(self): + """ + 启动视觉控制线程 + """ + if not self.visual_control_enabled: + print("视觉控制未启用") + return + + print("启动视觉控制线程") + self._visual_control_thread = threading.Thread( + target=self.visual_control_loop, + daemon=True + ) + self._visual_control_thread.start() + return self._visual_control_thread + + def start_alignment_check(self): + """ + 启动模具车对齐检查线程 + """ + print("启动模具车对齐检查线程") + self._alignment_check_thread = threading.Thread( + target=self.alignment_check_loop, + daemon=True + ) + self._alignment_check_thread.start() + return self._alignment_check_thread + + def start(self): + """启动系统""" + if self._running: + print("系统已在运行") + return + print("启动控制系统") + self._running = True + self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True) + self._monitor_thread.start() + + def stop(self): + """停止系统""" + if not self._running: + print("系统未在运行") + return + print("停止控制系统") + self._running = False + if self._monitor_thread is not None: + self._monitor_thread.join() + if self._visual_control_thread is not None: + self._visual_control_thread.join() + if self._alignment_check_thread is not None: + self._alignment_check_thread.join() + if self.camera is not None: + self.camera.release() + print("控制系统已停止") + + +# 使用示例 +if __name__ == "__main__": + system = FeedingControlSystem(relay_host='192.168.0.18', relay_port=50000) + + # 设置角度控制参数 + system.set_angle_parameters( + target_angle=20.0, + min_angle=10.0, + max_angle=80.0, + threshold=60.0 + ) + + # 设置下料参数 + system.set_feeding_parameters( + target_vehicle_weight=5000, # 5吨 + upper_buffer_weight=500, # 0.5吨缓冲 + single_batch_weight=2500 # 每次下2.5吨 + ) + + # 设置摄像头配置 + system.set_camera_config( + camera_type="ip", + ip="192.168.1.51", + port=554, + username="admin", + password="XJ123456", + channel=1 + ) + + # 初始化摄像头 + if not system.setup_camera_capture(): + print("摄像头初始化失败") + exit(1) + + # 加载所有模型 + if not system.load_all_models(): + print("模型加载失败") + exit(1) + + # 启动系统监控 + system.start() + + # 启动视觉控制 + system.start_visual_control() + + # 启动对齐检查 + system.start_alignment_check() + + print("系统准备就绪,5秒后开始下料...") + time.sleep(5) + system.start_lower_feeding() # 启动下料流程 + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("收到停止信号") + except Exception as e: + print(f"系统错误: {e}") + finally: + system.stop() diff --git a/core/state.py b/core/state.py index 4c1c9d1..cdf3d83 100644 --- a/core/state.py +++ b/core/state.py @@ -9,6 +9,7 @@ class SystemState: self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:第三阶段, 4:等待模具车对齐 self.lower_feeding_cycle = 0 # 下料斗下料循环次数 self.upper_feeding_count = 0 # 上料斗已下料次数 + self.upper_feeding_max = 2 #上料斗最大下料次数 # 重量相关 self.last_upper_weight = 0 diff --git a/tests/__pycache__/test_inverter_controller.cpython-39.pyc b/tests/__pycache__/test_inverter_controller.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8dc9f548a05e8d602dcae20d7bc8222dda5da5ea GIT binary patch literal 4587 zcmc(j+iw(A7{KSu%ECmf*?i_F{oPtfj&rMqR0d2-{HLKZreY=7oYsTGkcruc2lFqY34iEJu~zBZs*jD z#Uc`1d*Z)N?As_wzhR^G(x5Q}w|y1_l_<$dI+k*l4w_n3UAwBnD){>C?m9=CZO!019TmfQM#UPfHFom!l!Ip7k3P<_=A@BsFiIr@9G~Mxa=_9M!)`1ms2S<3#Q>&? zkb`B3SMy>pK{V|3;X*(=;*n!Ws zg*57m)u({A28)w!z-G?7#a>v}4uX&rjYO2GZJl`yipeXe;oAyoXui`@8SV_p$RNQK zNwY_#q}C2l93`1xqbR73ME?Sz-9>E)9zmv4Mpxpt>|>2_uI z9#kr`UsrBlsC@jzI81_Ta!A+gta*$X7S$7DoE)T6xQ{%&awKqRXL0+I=7-IYIiAK2raidT|xKc3aJ-ZZurH zg}O(fLctX>LfwYqs58<@GF|Tp7^qZ?QSux)i3%6<${NO)S}Ra*mBZ683u2D~%2QjM zYU%FJix2OD{uVEttIl4ood0%o5uOW|HIzz51VbK%rW*oYd2j|d%9&H*dE=OwEiup6 z;>Wq=u!h2f4%`@+a|`)|N8U)A5Xz_lp^Lc@uWyu>*u27{@EE$pMAuxwY) z76H_^ykVEpX=dAx!x}c~P*S2KNTNzqiIX^)dcK=#I!63P zMcL13Qu06IznZd@!z$Kbm;M;M+qbS~dcl9_ls}HN*e<@5fZ4)tGxiDV-Ga3?h&ww#6k!N5yO}FOiXMSq*{IT{|3}Ew zzO`fYFSk0xzZ;#~Aj)1K=!g^@vzA&fFI3LosWcL-7ocHRf$1_e-x8iz#VX=9?jLz~ zv|daF-^96wmyc^P^*v-pT4V3#)5> z5Nu6qWHPIed9Itiby~;==HE*;BK6dAe$%R|SIgBav#^)0ZP?D=fxYyo9)~?FY;ve! zrg58-N`{-+zs0;n#(5H1iCFgt)-WP=jCxyr===|uM0jm!^Dpxsp=*tCE$GA+ zx7wbeao5o`LpOOfGBX$CI;Dz7e9*vLaNE^PzjzF2=l0TCxUE=eb#(tI@pi!FOSv5P zJ-i#UMw;c1;nX&h>%?rkk6C*z{0x!#66@Zz0Bc_+m5jTZVdTvmGYmIk7%-uf#d_2* z@bKY{a6Iqv1d44acA(gW0?)ntDHMB9JdNTR6wjh)COc8Xh|7Z@uw|o*X|fj9Vp=cU zQ7sN7)?#7~zE{q)@`jNdaJ3Q~SP+OFtJel`Tg@@Dcsq6lj4*RT literal 0 HcmV?d00001 diff --git a/tests/test_feeding_process.py b/tests/test_feeding_process.py index 4a248c1..b38a666 100644 --- a/tests/test_feeding_process.py +++ b/tests/test_feeding_process.py @@ -4,17 +4,17 @@ from unittest.mock import patch, MagicMock import sys import os -# 添加src目录到Python路径 -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) -from src.control.feeding_controller import FeedingControlSystem +from feeding.process import FeedingProcess class TestFeedingProcess(unittest.TestCase): - @patch('src.control.feeding_process.RelayController') - @patch('src.control.feeding_process.InverterController') - @patch('src.control.feeding_process.TransmitterController') + @patch('feeding.process.RelayController') + @patch('feeding.process.InverterController') + @patch('feeding.process.TransmitterController') def test_initialization(self, mock_transmitter, mock_inverter, mock_relay): """测试初始化""" # 创建模拟对象 @@ -28,28 +28,27 @@ class TestFeedingProcess(unittest.TestCase): mock_transmitter.return_value = mock_transmitter_instance # 创建系统实例 - system = FeedingControlSystem() + system = FeedingProcess() # 验证初始化 self.assertIsNotNone(system) - self.assertFalse(system._running) + self.assertFalse(system.state.running) def test_set_feeding_parameters(self): """测试设置下料参数""" - with patch('src.control.feeding_process.RelayController'), \ - patch('src.control.feeding_process.InverterController'), \ - patch('src.control.feeding_process.TransmitterController'): - system = FeedingControlSystem() - system.set_feeding_parameters( - target_vehicle_weight=3000, - upper_buffer_weight=300, - single_batch_weight=1500 - ) + with patch('feeding.process.RelayController'), \ + patch('feeding.process.InverterController'), \ + patch('feeding.process.TransmitterController'): + system = FeedingProcess() + # 通过settings修改参数 + system.settings.single_batch_weight = 1500 + system.settings.min_required_weight = 300 + system.settings.target_vehicle_weight = 3000 - self.assertEqual(system.target_vehicle_weight, 3000) - self.assertEqual(system.upper_buffer_weight, 300) - self.assertEqual(system.single_batch_weight, 1500) + self.assertEqual(system.settings.target_vehicle_weight, 3000) + self.assertEqual(system.settings.min_required_weight, 300) + self.assertEqual(system.settings.single_batch_weight, 1500) if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file diff --git a/tests/test_inverter_controller.py b/tests/test_inverter_controller.py new file mode 100644 index 0000000..d2c24ad --- /dev/null +++ b/tests/test_inverter_controller.py @@ -0,0 +1,161 @@ +import unittest +from unittest.mock import patch, MagicMock +import sys +import os + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from hardware.inverter import InverterController +from pymodbus.exceptions import ModbusException + + +class TestInverterController(unittest.TestCase): + + def setUp(self): + """测试前的准备工作""" + # 创建模拟的继电器控制器 + self.mock_relay = MagicMock() + self.mock_relay.modbus_client = MagicMock() + + # 创建变频器控制器实例 + self.inverter = InverterController(relay_controller=self.mock_relay) + + def test_inverter_initialization(self): + """测试变频器控制器初始化""" + self.assertEqual(self.inverter.relay_controller, self.mock_relay) + self.assertEqual(self.inverter.max_frequency, 400.0) + + # 检查配置 + self.assertIn('slave_id', self.inverter.config) + self.assertIn('frequency_register', self.inverter.config) + self.assertIn('start_register', self.inverter.config) + self.assertIn('stop_register', self.inverter.config) + self.assertIn('start_command', self.inverter.config) + self.assertIn('stop_command', self.inverter.config) + + def test_set_frequency_success(self): + """测试设置频率成功""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + mock_result = MagicMock() + self.mock_relay.modbus_client.write_register.return_value = mock_result + + # 设置频率 + result = self.inverter.set_frequency(200.0) + + # 验证调用 + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.write_register.assert_called_once() + self.assertTrue(result) + + def test_set_frequency_connection_failed(self): + """测试设置频率时连接失败""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = False + + # 设置频率 + result = self.inverter.set_frequency(200.0) + + # 验证结果 + self.assertFalse(result) + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.write_register.assert_not_called() + + def test_set_frequency_modbus_exception(self): + """测试设置频率时Modbus异常""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + self.mock_relay.modbus_client.write_register.side_effect = ModbusException("Modbus错误") + + # 设置频率 + result = self.inverter.set_frequency(200.0) + + # 验证结果 + self.assertFalse(result) + + def test_set_frequency_value_clamping(self): + """测试频率值限制""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + mock_result = MagicMock() + self.mock_relay.modbus_client.write_register.return_value = mock_result + + # 测试超过最大值的频率 + result = self.inverter.set_frequency(500.0) # 超过400.0最大值 + + # 验证调用 + self.assertTrue(result) + self.mock_relay.modbus_client.write_register.assert_called_once() + + def test_control_start_success(self): + """测试启动变频器成功""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + mock_result = MagicMock() + self.mock_relay.modbus_client.write_register.return_value = mock_result + + # 启动变频器 + result = self.inverter.control('start') + + # 验证调用 + self.assertTrue(result) + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.write_register.assert_called_once() + + def test_control_stop_success(self): + """测试停止变频器成功""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + mock_result = MagicMock() + self.mock_relay.modbus_client.write_register.return_value = mock_result + + # 停止变频器 + result = self.inverter.control('stop') + + # 验证调用 + self.assertTrue(result) + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.write_register.assert_called_once() + + def test_control_invalid_action(self): + """测试无效的控制动作""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + + # 使用无效动作 + result = self.inverter.control('invalid_action') + + # 验证结果 + self.assertFalse(result) + self.mock_relay.modbus_client.connect.assert_not_called() + self.mock_relay.modbus_client.write_register.assert_not_called() + + def test_control_connection_failed(self): + """测试控制时连接失败""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = False + + # 启动变频器 + result = self.inverter.control('start') + + # 验证结果 + self.assertFalse(result) + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.write_register.assert_not_called() + + def test_control_modbus_exception(self): + """测试控制时Modbus异常""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + self.mock_relay.modbus_client.write_register.side_effect = ModbusException("Modbus错误") + + # 启动变频器 + result = self.inverter.control('start') + + # 验证结果 + self.assertFalse(result) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_relay_controller.py b/tests/test_relay_controller.py new file mode 100644 index 0000000..ca030e8 --- /dev/null +++ b/tests/test_relay_controller.py @@ -0,0 +1,109 @@ +import unittest +from unittest.mock import patch, MagicMock +import sys +import os + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from hardware.relay import RelayController + + +class TestRelayController(unittest.TestCase): + + def setUp(self): + """测试前的准备工作""" + self.relay_host = '192.168.0.18' + self.relay_port = 50000 + self.relay = RelayController(host=self.relay_host, port=self.relay_port) + + def test_relay_initialization(self): + """测试继电器控制器初始化""" + self.assertEqual(self.relay.host, self.relay_host) + self.assertEqual(self.relay.port, self.relay_port) + self.assertIsNotNone(self.relay.modbus_client) + + # 检查设备映射 + self.assertIn(RelayController.DOOR_UPPER, self.relay.device_bit_map) + self.assertIn(RelayController.DOOR_LOWER_1, self.relay.device_bit_map) + self.assertIn(RelayController.DOOR_LOWER_2, self.relay.device_bit_map) + self.assertIn(RelayController.BREAK_ARCH_UPPER, self.relay.device_bit_map) + self.assertIn(RelayController.BREAK_ARCH_LOWER, self.relay.device_bit_map) + + @patch('hardware.relay.socket.socket') + def test_send_command_success(self, mock_socket_class): + """测试发送命令成功""" + # 配置模拟对象 + mock_socket_instance = MagicMock() + mock_socket_class.return_value.__enter__.return_value = mock_socket_instance + mock_socket_instance.recv.return_value = b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x00' + + # 发送命令 + response = self.relay.send_command(self.relay.read_status_command) + + # 验证调用 + mock_socket_instance.connect.assert_called_once_with((self.relay_host, self.relay_port)) + mock_socket_instance.send.assert_called_once() + self.assertIsNotNone(response) + + @patch('hardware.relay.socket.socket') + def test_send_command_exception(self, mock_socket_class): + """测试发送命令异常处理""" + # 配置模拟对象以引发异常 + mock_socket_class.return_value.__enter__.side_effect = Exception("网络错误") + + # 发送命令 + response = self.relay.send_command(self.relay.read_status_command) + + # 验证结果 + self.assertIsNone(response) + + def test_control_valid_device_action(self): + """测试控制有效设备和动作""" + with patch.object(self.relay, 'send_command') as mock_send: + # 测试打开上料斗门 + self.relay.control(RelayController.DOOR_UPPER, 'open') + mock_send.assert_called_once() + + def test_control_invalid_device(self): + """测试控制无效设备""" + with patch.object(self.relay, 'send_command') as mock_send: + # 测试无效设备 + self.relay.control('invalid_device', 'open') + mock_send.assert_not_called() + + def test_control_invalid_action(self): + """测试控制无效动作""" + with patch.object(self.relay, 'send_command') as mock_send: + # 测试无效动作 + self.relay.control(RelayController.DOOR_UPPER, 'invalid_action') + mock_send.assert_not_called() + + @patch.object(RelayController, 'send_command') + def test_get_status_success(self, mock_send_command): + """测试获取状态成功""" + # 模拟返回的有效响应 + mock_send_command.return_value = b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x1F' + + status = self.relay.get_status() + + # 验证返回的状态字典 + self.assertIsInstance(status, dict) + self.assertIn(RelayController.DOOR_UPPER, status) + mock_send_command.assert_called_once_with(self.relay.read_status_command) + + @patch.object(RelayController, 'send_command') + def test_get_status_failure(self, mock_send_command): + """测试获取状态失败""" + # 模拟返回的无效响应 + mock_send_command.return_value = None + + status = self.relay.get_status() + + # 验证返回空状态 + self.assertIsInstance(status, dict) + self.assertEqual(len(status), 0) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_transmitter_controller.py b/tests/test_transmitter_controller.py new file mode 100644 index 0000000..8300026 --- /dev/null +++ b/tests/test_transmitter_controller.py @@ -0,0 +1,165 @@ +import unittest +from unittest.mock import patch, MagicMock +import sys +import os + +# 添加项目根目录到Python路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from hardware.transmitter import TransmitterController +from pymodbus.exceptions import ModbusException + + +class TestTransmitterController(unittest.TestCase): + + def setUp(self): + """测试前的准备工作""" + # 创建模拟的继电器控制器 + self.mock_relay = MagicMock() + self.mock_relay.modbus_client = MagicMock() + + # 创建变送器控制器实例 + self.transmitter = TransmitterController(relay_controller=self.mock_relay) + + def test_transmitter_initialization(self): + """测试变送器控制器初始化""" + self.assertEqual(self.transmitter.relay_controller, self.mock_relay) + + # 检查配置 + self.assertIn(1, self.transmitter.config) # 上料斗 + self.assertIn(2, self.transmitter.config) # 下料斗 + + # 检查上料斗配置 + upper_config = self.transmitter.config[1] + self.assertEqual(upper_config['slave_id'], 1) + self.assertEqual(upper_config['weight_register'], 0x01) + self.assertEqual(upper_config['register_count'], 2) + + # 检查下料斗配置 + lower_config = self.transmitter.config[2] + self.assertEqual(lower_config['slave_id'], 2) + self.assertEqual(lower_config['weight_register'], 0x01) + self.assertEqual(lower_config['register_count'], 2) + + def test_read_data_valid_transmitter_id(self): + """测试读取有效变送器ID的数据""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + + # 模拟读取结果 + mock_result = MagicMock() + mock_result.registers = [0, 1500] # 表示1500kg + self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result + + # 读取上料斗数据 + weight = self.transmitter.read_data(1) + + # 验证调用 + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.read_holding_registers.assert_called_once() + self.assertEqual(weight, 1.5) # 1500kg / 1000 = 1.5kg + + def test_read_data_invalid_transmitter_id(self): + """测试读取无效变送器ID的数据""" + # 读取无效ID的数据 + weight = self.transmitter.read_data(99) + + # 验证结果 + self.assertIsNone(weight) + self.mock_relay.modbus_client.connect.assert_not_called() + self.mock_relay.modbus_client.read_holding_registers.assert_not_called() + + def test_read_data_connection_failed(self): + """测试读取数据时连接失败""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = False + + # 读取数据 + weight = self.transmitter.read_data(1) + + # 验证结果 + self.assertIsNone(weight) + self.mock_relay.modbus_client.connect.assert_called_once() + self.mock_relay.modbus_client.read_holding_registers.assert_not_called() + + def test_read_data_modbus_exception(self): + """测试读取数据时Modbus异常""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + self.mock_relay.modbus_client.read_holding_registers.side_effect = ModbusException("Modbus错误") + + # 读取数据 + weight = self.transmitter.read_data(1) + + # 验证结果 + self.assertIsNone(weight) + + def test_read_data_register_count_2(self): + """测试读取2个寄存器的数据""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + + # 模拟读取结果 (高位寄存器=1,低位寄存器=0xE848,表示128000) + mock_result = MagicMock() + mock_result.registers = [1, 0xE848] # 1 * 65536 + 59464 = 125000 + self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result + + # 读取数据 + weight = self.transmitter.read_data(1) + + # 验证结果 (125000 / 1000 = 125kg) + self.assertEqual(weight, 125.0) + + def test_read_data_register_count_1(self): + """测试读取1个寄存器的数据(模拟配置变更)""" + # 临时修改配置以测试单寄存器读取 + self.transmitter.config[1]['register_count'] = 1 + + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + + # 模拟读取结果 + mock_result = MagicMock() + mock_result.registers = [1500] # 表示1500kg + self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result + + # 读取数据 + weight = self.transmitter.read_data(1) + + # 验证结果 + self.assertEqual(weight, 1500.0) # 1500kg + + def test_read_data_invalid_register_count(self): + """测试不支持的寄存器数量""" + # 临时修改配置以测试无效寄存器数量 + self.transmitter.config[1]['register_count'] = 3 + + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + + # 模拟读取结果 + mock_result = MagicMock() + mock_result.registers = [0, 0, 0] + self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result + + # 读取数据 + weight = self.transmitter.read_data(1) + + # 验证结果 + self.assertIsNone(weight) + + def test_read_data_result_is_exception(self): + """测试读取结果为异常对象""" + # 配置模拟对象 + self.mock_relay.modbus_client.connect.return_value = True + self.mock_relay.modbus_client.read_holding_registers.return_value = Exception("读取错误") + + # 读取数据 + weight = self.transmitter.read_data(1) + + # 验证结果 + self.assertIsNone(weight) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file