diff --git a/Fedding.py b/Fedding.py deleted file mode 100644 index 625f15a..0000000 --- a/Fedding.py +++ /dev/null @@ -1,1153 +0,0 @@ -import socket -import binascii -import time -import threading -import struct -import cv2 -import os -from pymodbus.client import ModbusTcpClient -from pymodbus.exceptions import ModbusException -# 添加视觉模块路径 -import sys - -sys.path.append(os.path.join(os.path.dirname(__file__), 'src', 'vision')) -# 导入视觉处理模块 -from vision.anger_caculate import predict_obb_best_angle -from vision.resize_tuili_image_main import classify_image_weighted, load_global_rois, crop_and_resize, YOLO - - -class FeedingControlSystem: - def __init__(self, relay_host='192.168.0.18', relay_port=50000): - # 网络继电器配置 - self.relay_host = relay_host - self.relay_port = relay_port - self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port) - - # 继电器映射 - self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动 - self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门 - self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门 - self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱 - self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱 - - # 继电器命令(原始Socket)mudbus TCP模式 - self.relay_commands = { - self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, - self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, - self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, - self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, - self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'} - } - - # 读取状态命令 - self.read_status_command = '000000000006010100000008' - - # 设备位映射 - self.device_bit_map = { - self.DOOR_UPPER: 0, - self.DOOR_LOWER_1: 1, - self.DOOR_LOWER_2: 2, - self.BREAK_ARCH_UPPER: 3, - self.BREAK_ARCH_LOWER: 4 - } - - # 变频器配置(Modbus RTU 协议) - self.inverter_config = { - 'slave_id': 1, - 'frequency_register': 0x01, # 2001H - 'start_register': 0x00, # 2000H - 'stop_register': 0x00, # 2000H(用于停机) - 'start_command': 0x0013, # 正转点动运行 - 'stop_command': 0x0001 # 停机 - } - - # 变送器配置(Modbus RTU) - self.transmitter_config = { - 1: { # 上料斗 - 'slave_id': 1, - 'weight_register': 0x01, - 'register_count': 2 - },#发出去的内容01 03 00 01 00 02 - 2: { # 下料斗 - 'slave_id': 2, - 'weight_register': 0x01, - 'register_count': 2 - }#发出去的内容02 03 00 01 00 02 - } - - # 系统状态 - self._running = False - self._monitor_thread = None - self._visual_control_thread = None - self._alignment_check_thread = None - - # 下料控制相关 - self.min_required_weight = 500 # 模具车最小需要重量(kg) - self.upper_door_position = 'default' # default(在搅拌楼下接料), over_lower(在下料斗上方), returning(返回中) - self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:等待上料, 4:等待模具车对齐 - self.lower_feeding_cycle = 0 # 下料斗下料循环次数 (1, 2, 3) - self.upper_feeding_count = 0 # 上料斗已下料次数 (0, 1, 2) - self.last_upper_weight = 0 - self.last_lower_weight = 0 - self.last_weight_time = time.time() - self.target_vehicle_weight = 5000 # 目标模具车重量(kg) - self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg),每次下料多下这么多 - self.single_batch_weight = 2500 # 单次下料重量(kg) - - - #夹角状态 - self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery - - # 错误计数 - self.upper_weight_error_count = 0 - self.lower_weight_error_count = 0 - self.max_error_count = 3 - - # 下料阶段频率(Hz) - self.inverter_max_frequency = 400.0#频率最大值 - self.frequencies = [220.0, 230.0, 240.0] - - # 视觉系统接口 - self.overflow_detected = False # 堆料检测 - self.door_opening_large = False # 夹角 - self.vehicle_aligned = False # 模具车是否对齐 - - # 视觉控制参数 - self.angle_threshold = 60.0 # 角度阈值,超过此值认为开口过大 - self.target_angle = 20.0 # 目标角度 - self.min_angle = 10.0 # 最小角度 - self.max_angle = 80.0 # 最大角度 - self.angle_tolerance = 5.0 # 角度容差 - self.visual_control_enabled = True # 视觉控制使能 - self.last_angle = None # 上次检测角度 - self.visual_check_interval = 1.0 # 视觉检查间隔(秒) - self.alignment_check_interval = 0.5 # 对齐检查间隔(秒) - - # 模型路径配置 - self.angle_model_path = "vision/models/angle.pt" - self.overflow_model_path = "vision/models/overflow.pt" - self.alignment_model_path = "vision/models/alig.pt" # 模具车对齐检测模型 - self.roi_file_path = "vision/roi_coordinates/1_rois.txt" - - # 模型实例 - self.angle_model = None # 夹角检测模型实例 - self.overflow_model = None # 堆料检测模型实例 - self.alignment_model = None # 对齐检测模型实例 - - # 摄像头相关配置 - self.camera = None - self.camera_type = "ip" - self.camera_ip = "192.168.1.51" - self.camera_port = 554 - self.camera_username = "admin" - self.camera_password = "XJ123456" - self.camera_channel = 1 - self.current_image_path = "current_frame.jpg" - - def set_camera_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1): - """ - 设置摄像头配置 - :param ip: 网络摄像头IP地址 - :param port: 网络摄像头端口 - :param username: 网络摄像头用户名 - :param password: 网络摄像头密码 - :param channel: 摄像头通道号 - """ - self.camera_type = camera_type - if ip: - self.camera_ip = ip - if port: - self.camera_port = port - if username: - self.camera_username = username - if password: - self.camera_password = password - self.camera_channel = channel - - def set_angle_parameters(self, target_angle=20.0, min_angle=10.0, max_angle=80.0, threshold=60.0): - """ - 设置角度控制参数 - """ - self.target_angle = target_angle - self.min_angle = min_angle - self.max_angle = max_angle - self.angle_threshold = threshold - - def set_feeding_parameters(self, target_vehicle_weight=5000, upper_buffer_weight=500, single_batch_weight=2500): - """ - 设置下料参数 - :param target_vehicle_weight: 目标模具车重量(kg) - :param upper_buffer_weight: 上料斗缓冲重量(kg) - :param single_batch_weight: 单次下料重量(kg) - """ - self.target_vehicle_weight = target_vehicle_weight - self.upper_buffer_weight = upper_buffer_weight - self.single_batch_weight = single_batch_weight - - def load_all_models(self): - """ - 加载所有视觉检测模型 - """ - success = True - - # 加载夹角检测模型 - try: - if not os.path.exists(self.angle_model_path): - print(f"夹角检测模型不存在: {self.angle_model_path}") - success = False - else: - # 注意:angle.pt模型通过predict_obb_best_angle函数使用,不需要预加载 - print(f"夹角检测模型路径: {self.angle_model_path}") - except Exception as e: - print(f"检查夹角检测模型失败: {e}") - success = False - - # 加载堆料检测模型 - try: - if not os.path.exists(self.overflow_model_path): - print(f"堆料检测模型不存在: {self.overflow_model_path}") - success = False - else: - self.overflow_model = YOLO(self.overflow_model_path) - print(f"成功加载堆料检测模型: {self.overflow_model_path}") - except Exception as e: - print(f"加载堆料检测模型失败: {e}") - success = False - - # 加载对齐检测模型 - try: - if not os.path.exists(self.alignment_model_path): - print(f"对齐检测模型不存在: {self.alignment_model_path}") - success = False - else: - self.alignment_model = YOLO(self.alignment_model_path) - print(f"成功加载对齐检测模型: {self.alignment_model_path}") - except Exception as e: - print(f"加载对齐检测模型失败: {e}") - success = False - - return success - - def send_relay_command(self, command_hex): - """发送原始Socket命令""" - try: - byte_data = binascii.unhexlify(command_hex) - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.connect((self.relay_host, self.relay_port)) - sock.send(byte_data) - response = sock.recv(1024) - print(f"收到继电器响应: {binascii.hexlify(response)}") - return response - except Exception as e: - print(f"继电器通信错误: {e}") - return None - - def get_relay_status(self): - """获取继电器状态""" - response = self.send_relay_command(self.read_status_command) - status_dict = {} - - if response and len(response) >= 10: - status_byte = response[9] - status_bin = f"{status_byte:08b}"[::-1] - for key, bit_index in self.device_bit_map.items(): - status_dict[key] = status_bin[bit_index] == '1' - else: - print("读取继电器状态失败") - - return status_dict - - def control_relay(self, device, action): - """控制继电器""" - if device in self.relay_commands and action in self.relay_commands[device]: - print(f"控制继电器 {device} {action}") - self.send_relay_command(self.relay_commands[device][action]) - time.sleep(0.1) - else: - print(f"无效设备或动作: {device}, {action}") - - def read_transmitter_data_via_relay(self, transmitter_id): - """读取变送器数据(Modbus TCP 转 RS485)""" - try: - if transmitter_id not in self.transmitter_config: - print(f"无效变送器ID: {transmitter_id}") - return None - - config = self.transmitter_config[transmitter_id] - - if not self.relay_modbus_client.connect(): - print("无法连接网络继电器Modbus服务") - return None - - result = self.relay_modbus_client.read_holding_registers( - address=config['weight_register'], - count=config['register_count'], - slave=config['slave_id']#转发给哪台变送器 - ) - - if isinstance(result, Exception): - print(f"读取变送器 {transmitter_id} 失败: {result}") - return None - - # 根据图片示例,正确解析数据 - if config['register_count'] == 2:#读两个寄存器 - # 获取原始字节数组 - raw_data = result.registers - # 组合成32位整数 - weight = (raw_data[0] << 16) + raw_data[1] - weight = weight / 1000.0 # 单位转换为千克 - elif config['register_count'] == 1: - weight = float(result.registers[0]) - else: - print(f"不支持的寄存器数量: {config['register_count']}") - return None - - print(f"变送器 {transmitter_id} 读取重量: {weight}kg") - return weight - - except ModbusException as e: - print(f"Modbus通信错误: {e}") - return None - except Exception as e: - print(f"数据解析错误: {e}") - return None - finally: - self.relay_modbus_client.close() - - def set_inverter_frequency_via_relay(self, frequency): - """设置变频器频率""" - try: - if not self.relay_modbus_client.connect(): - print("无法连接网络继电器Modbus服务") - return False - - # 使用最大频率变量计算百分比 - percentage = frequency / self.inverter_max_frequency # 得到 0~1 的比例 - value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数 - - # 限制范围 - value = max(-10000, min(10000, value)) - - result = self.relay_modbus_client.write_register( - self.inverter_config['frequency_register'], - value, - slave=self.inverter_config['slave_id'] - ) - - if isinstance(result, Exception): - print(f"设置频率失败: {result}") - return False - - print(f"设置变频器频率为 {frequency}Hz") - return True - except ModbusException as e: - print(f"变频器Modbus通信错误: {e}") - return False - finally: - self.relay_modbus_client.close() - - def control_inverter_via_relay(self, action): - try: - if not self.relay_modbus_client.connect(): - print("无法连接网络继电器Modbus服务") - return False - - if action == 'start': - result = self.relay_modbus_client.write_register( - address=self.inverter_config['start_register'], - value=self.inverter_config['start_command'], - slave=self.inverter_config['slave_id'] - ) - print("启动变频器") - elif action == 'stop': - result = self.relay_modbus_client.write_register( - address=self.inverter_config['start_register'], - value=self.inverter_config['stop_command'], - slave=self.inverter_config['slave_id'] - ) - print("停止变频器") - else: - print(f"无效操作: {action}") - return False - - if isinstance(result, Exception): - print(f"控制失败: {result}") - return False - - return True - except ModbusException as e: - print(f"变频器控制错误: {e}") - return False - finally: - self.relay_modbus_client.close() - - def check_upper_material_request(self): - """检查是否需要要料""" - current_weight = self.read_transmitter_data_via_relay(1) - - if current_weight is None: - self.upper_weight_error_count += 1 - print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}") - if self.upper_weight_error_count >= self.max_error_count: - print("警告:上料斗传感器连续读取失败,请检查连接") - return False - - self.upper_weight_error_count = 0 - # 判断是否需要要料:当前重量 < 目标重量 + 缓冲重量 - if current_weight < (self.single_batch_weight + self.min_required_weight): - print("上料斗重量不足,通知搅拌楼要料") - self.request_material_from_mixing_building() # 请求搅拌楼下料 - return True - return False - - def request_material_from_mixing_building(self): - """ - 请求搅拌楼下料(待完善) - TODO: 与同事对接通信协议 - """ - print("发送要料请求至搅拌楼...") - self.return_upper_door_to_default() - # 这里需要与同事对接具体的通信方式 - # 可能是Modbus写寄存器、TCP通信、HTTP请求等 - pass - - def wait_for_mixing_building_material(self): - """ - 等待搅拌楼下料完成(待完善) - TODO: 与同事对接信号接收 - """ - print("等待搅拌楼下料完成...") - # 这里需要与同事对接具体的信号接收方式 - # 可能是Modbus读寄存器、TCP通信、HTTP请求等 - # 模拟等待 - time.sleep(5) - print("搅拌楼下料完成") - self.move_upper_door_over_lower() - return True - - def move_upper_door_over_lower(self): - """移动上料斗到下料斗上方""" - print("移动上料斗到下料斗上方") - self.control_relay(self.DOOR_UPPER, 'open') - self.upper_door_position = 'over_lower' - - def return_upper_door(self): - """返回上料斗到搅拌楼""" - print("上料斗返回搅拌楼") - self.control_relay(self.DOOR_UPPER, 'close') - self.upper_door_position = 'returning' - - def return_upper_door_to_default(self): - """上料斗回到默认位置(搅拌楼下接料位置)""" - print("上料斗回到默认位置") - self.control_relay(self.DOOR_UPPER, 'close') - self.upper_door_position = 'default' - - def start_lower_feeding(self): - """开始分步下料""" - if self.lower_feeding_stage != 0: - print("下料已在进行中") - return - - # 检查关键设备是否可连接 - if not self._check_device_connectivity(): - print("关键设备连接失败,无法开始下料") - return - - print("开始分步下料过程") - # 重置计数器 - self.lower_feeding_cycle = 0 # 用于记录三阶段下料次数 - self.upper_feeding_count = 0 # 用于记录上料次数 - - # 第一次上料(总共需要上料2次) - self.transfer_material_from_upper_to_lower() - - # 等待模具车对齐并开始第一轮下料 - self.lower_feeding_stage = 4 # 从等待模具车对齐开始 - self.wait_for_vehicle_alignment() - - def _check_device_connectivity(self): - """检查关键设备连接状态""" - try: - # 检查网络继电器连接 - test_response = self.send_relay_command(self.read_status_command) - if not test_response: - print("网络继电器连接失败") - return False - - # 检查变频器连接 - if not self.relay_modbus_client.connect(): - print("无法连接到网络继电器Modbus服务") - return False - - # 尝试读取变频器一个寄存器(测试连接) - test_result = self.relay_modbus_client.read_holding_registers( - address=0x00, - count=1, - slave=self.inverter_config['slave_id'] - ) - - if isinstance(test_result, Exception): - print("变频器连接测试失败") - return False - - # 检查下料斗变送器连接 - test_weight = self.read_transmitter_data_via_relay(2) - if test_weight is None: - print("下料斗变送器连接失败") - return False - - self.relay_modbus_client.close() - return True - except Exception as e: - print(f"设备连接检查失败: {e}") - return False - - def transfer_material_from_upper_to_lower(self): - """上料斗向下料斗下料(基于上料斗重量传感器控制)""" - print(f"上料斗向下料斗下料 (第 {self.upper_feeding_count + 1} 次)") - - # 记录下料前的重量 - initial_upper_weight = self.read_transmitter_data_via_relay(1) - - # 如果无法读取重量,直接报错 - if initial_upper_weight is None: - raise Exception("无法读取上料斗重量传感器数据,下料操作终止") - - target_upper_weight = initial_upper_weight - self.single_batch_weight - target_upper_weight = max(target_upper_weight, 0) # 确保不低于0 - - print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg") - - # 确保下料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_2, 'close') - # 打开上料斗出砼门 - self.control_relay(self.DOOR_LOWER_1, 'open') - - # 等待物料流入下料斗,基于上料斗重量变化控制 - start_time = time.time() - timeout = 30 # 30秒超时 - - while time.time() - start_time < timeout: - current_upper_weight = self.read_transmitter_data_via_relay(1) - - # 如果无法读取重量,继续尝试 - if current_upper_weight is None: - print("无法读取上料斗重量,继续尝试...") - time.sleep(1) - continue - - print(f"上料斗当前重量: {current_upper_weight:.2f}kg") - - # 如果达到目标重量,则关闭上料斗出砼门 - if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围 - print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg") - break - elif time.time() - start_time > 25: # 如果25秒后重量变化过小 - weight_change = initial_upper_weight - current_upper_weight - if weight_change < 100: # 如果重量变化小于100kg - print("重量变化过小,可能存在堵塞,交由监控系统处理...") - # 不再在这里直接处理破拱,而是依靠监控系统处理 - break - - time.sleep(1) - - # 关闭上料斗出砼门 - self.control_relay(self.DOOR_LOWER_1, 'close') - - # 验证下料结果 - final_upper_weight = self.read_transmitter_data_via_relay(1) - if final_upper_weight is not None: - actual_transferred = initial_upper_weight - final_upper_weight - print(f"实际下料重量: {actual_transferred:.2f}kg") - - # 增加上料计数 - self.upper_feeding_count += 1 - print("上料斗下料完成") - - # 关闭上料斗出砼门 - self.control_relay(self.DOOR_LOWER_1, 'close') - - # 验证下料结果 - final_upper_weight = self.read_transmitter_data_via_relay(1) - if final_upper_weight is not None: - actual_transferred = initial_upper_weight - final_upper_weight - print(f"实际下料重量: {actual_transferred:.2f}kg") - - # 增加上料计数 - self.upper_feeding_count += 1 - print("上料斗下料完成") - - def wait_for_vehicle_alignment(self): - """等待模具车对齐""" - print("等待模具车对齐...") - self.lower_feeding_stage = 4 - - while self.lower_feeding_stage == 4 and self._running: - if self.vehicle_aligned: - print("模具车已对齐,开始下料") - self.lower_feeding_stage = 1 - self.feeding_stage_one() - break - time.sleep(self.alignment_check_interval) - - def feeding_stage_one(self): - """第一阶段下料:下料斗向模具车下料(低速)""" - print("开始第一阶段下料:下料斗低速下料") - self.set_inverter_frequency_via_relay(self.frequencies[0]) - self.control_inverter_via_relay('start') - - # 确保上料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_1, 'close') - # 打开下料斗出砼门 - self.control_relay(self.DOOR_LOWER_2, 'open') - - start_time = time.time() - initial_weight = self.read_transmitter_data_via_relay(2) - if initial_weight is None: - print("无法获取初始重量,取消下料") - self.finish_feeding_process() # 直接结束整个流程,而不是当前批次 - return - - target_weight = initial_weight + self.single_batch_weight - - while self.lower_feeding_stage == 1: - current_weight = self.read_transmitter_data_via_relay(2) - if current_weight is None: - self.lower_weight_error_count += 1 - if self.lower_weight_error_count >= self.max_error_count: - print("下料斗传感器连续读取失败,停止下料") - self.finish_feeding_process() # 直接结束整个流程 - return - else: - self.lower_weight_error_count = 0 - - if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 2 - self.feeding_stage_two() - break - time.sleep(2) - - def feeding_stage_two(self): - """第二阶段下料:下料斗向模具车下料(中速)""" - print("开始第二阶段下料:下料斗中速下料") - self.set_inverter_frequency_via_relay(self.frequencies[1]) - - # 保持下料斗出砼门打开 - self.control_relay(self.DOOR_LOWER_2, 'open') - # 确保上料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_1, 'close') - - start_time = time.time() - initial_weight = self.read_transmitter_data_via_relay(2) - if initial_weight is None: - print("无法获取初始重量,取消下料") - self.finish_feeding_process() # 直接结束整个流程 - return - - target_weight = initial_weight + self.single_batch_weight - - while self.lower_feeding_stage == 2: - current_weight = self.read_transmitter_data_via_relay(2) - if current_weight is None: - self.lower_weight_error_count += 1 - if self.lower_weight_error_count >= self.max_error_count: - print("下料斗传感器连续读取失败,停止下料") - self.finish_feeding_process() # 直接结束整个流程 - return - else: - self.lower_weight_error_count = 0 - - if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 3 - self.feeding_stage_three() - break - time.sleep(2) - - def feeding_stage_three(self): - """第三阶段下料:下料斗向模具车下料(高速)""" - print("开始第三阶段下料:下料斗高速下料") - self.set_inverter_frequency_via_relay(self.frequencies[2]) - - # 保持下料斗出砼门打开 - self.control_relay(self.DOOR_LOWER_2, 'open') - # 确保上料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_1, 'close') - - start_time = time.time() - initial_weight = self.read_transmitter_data_via_relay(2) - if initial_weight is None: - print("无法获取初始重量,取消下料") - self.finish_feeding_process() # 直接结束整个流程 - return - - target_weight = initial_weight + self.single_batch_weight - - while self.lower_feeding_stage == 3: - current_weight = self.read_transmitter_data_via_relay(2) - if current_weight is None: - self.lower_weight_error_count += 1 - if self.lower_weight_error_count >= self.max_error_count: - print("下料斗传感器连续读取失败,停止下料") - self.finish_feeding_process() # 直接结束整个流程 - return - else: - self.lower_weight_error_count = 0 - - if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 4 - self.finish_current_batch() - break - time.sleep(2) - - def finish_current_batch(self): - """完成当前批次下料""" - print("当前批次下料完成,关闭出砼门") - self.control_inverter_via_relay('stop') - self.control_relay(self.DOOR_LOWER_1, 'close') - self.control_relay(self.DOOR_LOWER_2, 'close') - - - # 增加三阶段下料轮次计数 - self.lower_feeding_cycle += 1 - - # 检查是否完成两轮三阶段下料(总共5吨) - if self.lower_feeding_cycle >= 2: - # 完成整个5吨下料任务 - print("完成两轮三阶段下料,5吨下料任务完成") - self.finish_feeding_process() - return - - # 如果只完成一轮三阶段下料,进行第二次上料 - print("第一轮三阶段下料完成,准备第二次上料") - # 上料斗第二次向下料斗下料 - try: - self.transfer_material_from_upper_to_lower() - except Exception as e: - print(f"第二次上料失败: {e}") - print("停止下料流程") - self.finish_feeding_process() # 出现严重错误时结束整个流程 - return - - # 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车) - print("第二次上料完成,继续三阶段下料") - self.lower_feeding_stage = 1 # 直接进入第一阶段下料 - self.feeding_stage_one() # 开始第二轮第一阶段下料 - - - def finish_feeding_process(self): - """完成整个下料流程""" - print("整个下料流程完成") - self.lower_feeding_stage = 0 - self.lower_feeding_cycle = 0 - self.upper_feeding_count = 0 - self.return_upper_door_to_default() - - def handle_overflow_control(self, overflow_detected, door_opening_large): - """处理溢料控制""" - if overflow_detected and door_opening_large: - print("检测到溢料且出砼门开口较大,调小出砼门") - self.control_relay(self.DOOR_LOWER_1, 'close') - time.sleep(0.1) - self.control_relay(self.DOOR_LOWER_1, 'open') - time.sleep(0.1) - - def is_lower_door_open(self): - """检查出砼门是否打开""" - return self.lower_feeding_stage in [1, 2] # 只有在下料阶段才认为门是打开的 - - def check_arch_blocking(self): - """检查是否需要破拱""" - current_time = time.time() - - # 检查下料斗破拱(只有在下料过程中才检查) - if self.lower_feeding_stage in [1, 2, 3]: # 在所有下料阶段检查 - lower_weight = self.read_transmitter_data_via_relay(2) - if lower_weight is not None: - # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) - if (abs(lower_weight - self.last_lower_weight) < 0.1) and \ - (current_time - self.last_weight_time) > 10: - print("下料斗可能堵塞,启动破拱") - self.control_relay(self.BREAK_ARCH_LOWER, 'open') - time.sleep(2) - self.control_relay(self.BREAK_ARCH_LOWER, 'close') - - self.last_lower_weight = lower_weight - - # 检查上料斗破拱(在上料斗向下料斗下料时检查) - if (self.upper_door_position == 'over_lower' and - self.lower_feeding_stage in [0, 1, 2, 3, 4]): # 在任何阶段都可能需要上料斗破拱 - upper_weight = self.read_transmitter_data_via_relay(1) - if upper_weight is not None: - # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) - if (abs(upper_weight - self.last_upper_weight) < 0.1) and \ - (current_time - self.last_weight_time) > 10: - print("上料斗可能堵塞,启动破拱") - self.control_relay(self.BREAK_ARCH_UPPER, 'open') - time.sleep(2) - self.control_relay(self.BREAK_ARCH_UPPER, 'close') - - self.last_upper_weight = upper_weight - - # 更新最后读取时间 - if (self.read_transmitter_data_via_relay(1) is not None or - self.read_transmitter_data_via_relay(2) is not None): - self.last_weight_time = current_time - - def monitor_system(self): - """监控系统状态""" - while self._running: - try: - self.check_upper_material_request() - self.check_arch_blocking() - time.sleep(1) - except Exception as e: - print(f"监控线程错误: {e}") - - def setup_camera_capture(self, camera_index=0): - """ - 设置摄像头捕获 - :param camera_index: USB摄像头索引或IP摄像头配置 - """ - try: - rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01" - self.camera = cv2.VideoCapture(rtsp_url) - - if not self.camera.isOpened(): - print(f"无法打开网络摄像头: {rtsp_url}") - return False - print(f"网络摄像头初始化成功,地址: {rtsp_url}") - return True - except Exception as e: - print(f"摄像头设置失败: {e}") - return False - - def capture_current_frame(self): - """捕获当前帧并返回numpy数组""" - try: - if self.camera is None: - print("摄像头未初始化") - return None - - ret, frame = self.camera.read() - if ret: - return frame - else: - print("无法捕获图像帧") - return None - except Exception as e: - print(f"图像捕获失败: {e}") - return None - - def detect_overflow_from_image(self, image_array): - """通过图像检测是否溢料(接受numpy数组)""" - try: - # 检查模型是否已加载 - if self.overflow_model is None: - print("堆料检测模型未加载") - return False - - rois = load_global_rois(self.roi_file_path) - - if not rois: - print("没有有效的ROI配置") - return False - - if image_array is None: - print("输入图像为空") - return False - - crops = crop_and_resize(image_array, rois, 640) - for roi_resized, _ in crops: - final_class, _, _, _ = classify_image_weighted(roi_resized, self.overflow_model, threshold=0.4) - if "大堆料" in final_class or "小堆料" in final_class: - return True - - return False - except Exception as e: - print(f"溢料检测失败: {e}") - return False - - def detect_vehicle_alignment(self, image_array): - """通过图像检测模具车是否对齐(接受numpy数组)""" - try: - # 检查模型是否已加载 - if self.alignment_model is None: - print("对齐检测模型未加载") - return False - - if image_array is None: - print("输入图像为空") - return False - - # 直接使用模型进行推理 - results = self.alignment_model(image_array) - pared_probs = results[0].probs.data.cpu().numpy().flatten() - - # 类别0: 未对齐, 类别1: 对齐 - class_id = int(pared_probs.argmax()) - confidence = float(pared_probs[class_id]) - - # 只有当对齐且置信度>95%时才认为对齐 - if class_id == 1 and confidence > 0.95: - return True - return False - except Exception as e: - print(f"对齐检测失败: {e}") - return False - - def get_current_door_angle(self, image=None, image_path=None): - """ - 通过视觉系统获取当前出砼门角度 - :param image: 图像数组(numpy array) - :param image_path: 图片路径 - """ - try: - # 检查模型是否已加载 - if self.angle_model is None: - print("夹角检测模型未加载") - return None - - angle_deg, _ = predict_obb_best_angle( - model=self.angle_model, # 传递预加载的模型实例 - image=image, # 传递图像数组 - image_path=image_path # 或传递图像路径 - ) - return angle_deg - except Exception as e: - print(f"角度检测失败: {e}") - return None - - def alignment_check_loop(self): - """ - 模具车对齐检查循环 - """ - while self._running: - try: - # 只在需要检查对齐时才检查 - if self.lower_feeding_stage == 4: - current_frame = self.capture_current_frame() - if current_frame is not None: - self.vehicle_aligned = self.detect_vehicle_alignment(current_frame) - if self.vehicle_aligned: - print("检测到模具车对齐") - else: - print("模具车未对齐") - time.sleep(self.alignment_check_interval) - except Exception as e: - print(f"对齐检查循环错误: {e}") - time.sleep(self.alignment_check_interval) - - def visual_control_loop(self): - """ - 视觉控制主循环 - """ - - while self._running and self.visual_control_enabled: - try: - current_frame = self.capture_current_frame() - if current_frame is None: - print("无法获取当前图像,跳过本次调整") - time.sleep(self.visual_check_interval) - continue - - # 检测是否溢料 - overflow = self.detect_overflow_from_image(current_frame) - - # 获取当前角度 - current_angle = self.get_current_door_angle(image=current_frame) - - if current_angle is None: - print("无法获取当前角度,跳过本次调整") - time.sleep(self.visual_check_interval) - continue - - print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.angle_control_mode}") - - # 状态机控制逻辑 - if self.angle_control_mode == "normal": - # 正常模式 - if overflow and current_angle > self.angle_threshold: - # 检测到堆料且角度过大,进入角度减小模式 - print("检测到堆料且角度过大,关闭出砼门开始减小角度") - self.control_relay(self.DOOR_LOWER_2, 'close') - self.angle_control_mode = "reducing" - else: - # 保持正常开门 - self.control_relay(self.DOOR_LOWER_2, 'open') - - elif self.angle_control_mode == "reducing": - # 角度减小模式 - if current_angle <= self.target_angle + self.angle_tolerance: - # 角度已达到目标范围 - if overflow: - # 仍有堆料,进入维持模式 - print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式") - self.angle_control_mode = "maintaining" - self.control_relay(self.DOOR_LOWER_2, 'open') # 先打开门 - else: - # 无堆料,恢复正常模式 - print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式") - self.control_relay(self.DOOR_LOWER_2, 'open') - self.angle_control_mode = "recovery" - - elif self.angle_control_mode == "maintaining": - # 维持模式 - 使用脉冲控制 - if not overflow: - # 堆料已消除,恢复正常模式 - print("堆料已消除,恢复正常模式") - self.control_relay(self.DOOR_LOWER_2, 'open') - self.angle_control_mode = "recovery" - else: - # 继续维持角度控制 - self.pulse_control_door_for_maintaining() - - elif self.angle_control_mode == "recovery":#打开夹爪的过程中又堆料了 - # 恢复模式 - 逐步打开门 - if overflow: - # 又出现堆料,回到角度减小模式 - print("恢复过程中又检测到堆料,回到角度减小模式") - self.angle_control_mode = "maintaining" - else: - # 堆料已消除,恢复正常模式 - print("堆料已消除,恢复正常模式") - self.control_relay(self.DOOR_LOWER_2, 'open') - self.angle_control_mode = "normal" - - self.last_angle = current_angle - time.sleep(self.visual_check_interval) - - except Exception as e: - print(f"视觉控制循环错误: {e}") - time.sleep(self.visual_check_interval) - - def pulse_control_door_for_maintaining(self): - """ - 用于维持模式的脉冲控制 - 保持角度在目标范围内 - """ - print("执行维持脉冲控制") - # 关门1秒 - self.control_relay(self.DOOR_LOWER_2, 'close') - time.sleep(1.0) - # 开门1秒 - self.control_relay(self.DOOR_LOWER_2, 'open') - time.sleep(1.0) - - def start_visual_control(self): - """ - 启动视觉控制线程 - """ - if not self.visual_control_enabled: - print("视觉控制未启用") - return - - print("启动视觉控制线程") - self._visual_control_thread = threading.Thread( - target=self.visual_control_loop, - daemon=True - ) - self._visual_control_thread.start() - return self._visual_control_thread - - def start_alignment_check(self): - """ - 启动模具车对齐检查线程 - """ - print("启动模具车对齐检查线程") - self._alignment_check_thread = threading.Thread( - target=self.alignment_check_loop, - daemon=True - ) - self._alignment_check_thread.start() - return self._alignment_check_thread - - def start(self): - """启动系统""" - if self._running: - print("系统已在运行") - return - print("启动控制系统") - self._running = True - self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True) - self._monitor_thread.start() - - def stop(self): - """停止系统""" - if not self._running: - print("系统未在运行") - return - print("停止控制系统") - self._running = False - if self._monitor_thread is not None: - self._monitor_thread.join() - if self._visual_control_thread is not None: - self._visual_control_thread.join() - if self._alignment_check_thread is not None: - self._alignment_check_thread.join() - if self.camera is not None: - self.camera.release() - print("控制系统已停止") - - -# 使用示例 -if __name__ == "__main__": - system = FeedingControlSystem(relay_host='192.168.0.18', relay_port=50000) - - # 设置角度控制参数 - system.set_angle_parameters( - target_angle=20.0, - min_angle=10.0, - max_angle=80.0, - threshold=60.0 - ) - - # 设置下料参数 - system.set_feeding_parameters( - target_vehicle_weight=5000, # 5吨 - upper_buffer_weight=500, # 0.5吨缓冲 - single_batch_weight=2500 # 每次下2.5吨 - ) - - # 设置摄像头配置 - system.set_camera_config( - camera_type="ip", - ip="192.168.1.51", - port=554, - username="admin", - password="XJ123456", - channel=1 - ) - - # 初始化摄像头 - if not system.setup_camera_capture(): - print("摄像头初始化失败") - exit(1) - - # 加载所有模型 - if not system.load_all_models(): - print("模型加载失败") - exit(1) - - # 启动系统监控 - system.start() - - # 启动视觉控制 - system.start_visual_control() - - # 启动对齐检查 - system.start_alignment_check() - - print("系统准备就绪,5秒后开始下料...") - time.sleep(5) - system.start_lower_feeding() # 启动下料流程 - - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - print("收到停止信号") - except Exception as e: - print(f"系统错误: {e}") - finally: - system.stop() diff --git a/core/feeding_system.py b/core/feeding_system.py deleted file mode 100644 index 12c7961..0000000 --- a/core/feeding_system.py +++ /dev/null @@ -1,1114 +0,0 @@ -# core/feeding_system.py -import socket -import binascii -import time -import threading -import cv2 -import os -from pymodbus.client import ModbusTcpClient -from pymodbus.exceptions import ModbusException - -# 添加视觉模块路径 -import sys - -sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'vision')) - -# 导入视觉处理模块 -from vision.anger_caculate import predict_obb_best_angle -from vision.resize_tuili_image_main import classify_image_weighted, load_global_rois, crop_and_resize -from vision.alignment_detector import detect_vehicle_alignment - - -class FeedingControlSystem: - def __init__(self, relay_host='192.168.0.18', relay_port=50000): - # 网络继电器配置 - self.relay_host = relay_host - self.relay_port = relay_port - self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port) - - # 继电器映射 - self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动 - self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门 - self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门 - self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱 - self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱 - - # 继电器命令(原始Socket)mudbus TCP模式 - self.relay_commands = { - self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, - self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, - self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, - self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, - self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'} - } - - # 读取状态命令 - self.read_status_command = '000000000006010100000008' - - # 设备位映射 - self.device_bit_map = { - self.DOOR_UPPER: 0, - self.DOOR_LOWER_1: 1, - self.DOOR_LOWER_2: 2, - self.BREAK_ARCH_UPPER: 3, - self.BREAK_ARCH_LOWER: 4 - } - - # 变频器配置(Modbus RTU 协议) - self.inverter_config = { - 'slave_id': 1, - 'frequency_register': 0x01, # 2001H - 'start_register': 0x00, # 2000H - 'stop_register': 0x00, # 2000H(用于停机) - 'start_command': 0x0013, # 正转点动运行 - 'stop_command': 0x0001 # 停机 - } - - # 变送器配置(Modbus RTU) - self.transmitter_config = { - 1: { # 上料斗 - 'slave_id': 1, - 'weight_register': 0x01, - 'register_count': 2 - }, - 2: { # 下料斗 - 'slave_id': 2, - 'weight_register': 0x01, - 'register_count': 2 - } - } - - # 系统状态 - self._running = False - self._monitor_thread = None - self._visual_control_thread = None - self._alignment_check_thread = None - - # 下料控制相关 - self.min_required_weight = 500 # 模具车最小需要重量(kg) - self.upper_door_position = 'default' # default(在搅拌楼下接料), over_lower(在下料斗上方), returning(返回中) - self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:等待上料, 4:等待模具车对齐 - self.lower_feeding_cycle = 0 # 下料斗下料循环次数 (1, 2, 3) - self.upper_feeding_count = 0 # 上料斗已下料次数 (0, 1, 2) - self.last_upper_weight = 0 - self.last_lower_weight = 0 - self.last_weight_time = time.time() - self.target_vehicle_weight = 5000 # 目标模具车重量(kg) - self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg),每次下料多下这么多 - self.single_batch_weight = 2500 # 单次下料重量(kg) - - # 夹角状态 - self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery - - # 错误计数 - self.upper_weight_error_count = 0 - self.lower_weight_error_count = 0 - self.max_error_count = 3 - - # 下料阶段频率(Hz) - self.inverter_max_frequency = 400.0 # 频率最大值 - self.frequencies = [220.0, 230.0, 240.0] - - # 视觉系统接口 - self.overflow_detected = False # 堆料检测 - self.door_opening_large = False # 夹角 - self.vehicle_aligned = False # 模具车是否对齐 - - # 视觉控制参数 - self.angle_threshold = 60.0 # 角度阈值,超过此值认为开口过大 - self.target_angle = 20.0 # 目标角度 - self.min_angle = 10.0 # 最小角度 - self.max_angle = 80.0 # 最大角度 - self.angle_tolerance = 5.0 # 角度容差 - self.visual_control_enabled = True # 视觉控制使能 - self.last_angle = None # 上次检测角度 - self.visual_check_interval = 1.0 # 视觉检查间隔(秒) - self.alignment_check_interval = 0.5 # 对齐检查间隔(秒) - - # 模型路径配置 - from config.settings import Settings - settings = Settings() - self.angle_model_path = settings.angle_model_path - self.overflow_model_path = settings.overflow_model_path - self.alignment_model_path = settings.alignment_model_path - self.roi_file_path = settings.roi_file_path - - # 模型实例 - self.angle_model = None # 夹角检测模型实例 - self.overflow_model = None # 堆料检测模型实例 - self.alignment_model = None # 对齐检测模型实例 - - # 摄像头相关配置 - self.camera = None - self.camera_type = "ip" - self.camera_ip = "192.168.1.51" - self.camera_port = 554 - self.camera_username = "admin" - self.camera_password = "XJ123456" - self.camera_channel = 1 - self.current_image_path = "current_frame.jpg" - - def initialize(self): - """初始化系统""" - print("初始化控制系统...") - - # 设置摄像头配置 - self.set_camera_config( - camera_type=self.camera_type, - ip=self.camera_ip, - port=self.camera_port, - username=self.camera_username, - password=self.camera_password, - channel=self.camera_channel - ) - - # 初始化摄像头 - if not self.setup_camera_capture(): - raise Exception("摄像头初始化失败") - - # 加载所有模型 - if not self.load_all_models(): - raise Exception("模型加载失败") - - # 启动系统监控 - self.start() - - # 启动视觉控制 - self.start_visual_control() - - # 启动对齐检查 - self.start_alignment_check() - - print("控制系统初始化完成") - - def set_camera_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1): - """ - 设置摄像头配置 - :param ip: 网络摄像头IP地址 - :param port: 网络摄像头端口 - :param username: 网络摄像头用户名 - :param password: 网络摄像头密码 - :param channel: 摄像头通道号 - """ - self.camera_type = camera_type - if ip: - self.camera_ip = ip - if port: - self.camera_port = port - if username: - self.camera_username = username - if password: - self.camera_password = password - self.camera_channel = channel - - def set_angle_parameters(self, target_angle=20.0, min_angle=10.0, max_angle=80.0, threshold=60.0): - """ - 设置角度控制参数 - """ - self.target_angle = target_angle - self.min_angle = min_angle - self.max_angle = max_angle - self.angle_threshold = threshold - - def set_feeding_parameters(self, target_vehicle_weight=5000, upper_buffer_weight=500, single_batch_weight=2500): - """ - 设置下料参数 - :param target_vehicle_weight: 目标模具车重量(kg) - :param upper_buffer_weight: 上料斗缓冲重量(kg) - :param single_batch_weight: 单次下料重量(kg) - """ - self.target_vehicle_weight = target_vehicle_weight - self.upper_buffer_weight = upper_buffer_weight - self.single_batch_weight = single_batch_weight - - def load_all_models(self): - """ - 加载所有视觉检测模型 - """ - success = True - - # 加载夹角检测模型 - try: - if not os.path.exists(self.angle_model_path): - print(f"夹角检测模型不存在: {self.angle_model_path}") - success = False - else: - # 注意:angle.pt模型通过predict_obb_best_angle函数使用,不需要预加载 - print(f"夹角检测模型路径: {self.angle_model_path}") - except Exception as e: - print(f"检查夹角检测模型失败: {e}") - success = False - - # 加载堆料检测模型 - try: - from ultralytics import YOLO - if not os.path.exists(self.overflow_model_path): - print(f"堆料检测模型不存在: {self.overflow_model_path}") - success = False - else: - self.overflow_model = YOLO(self.overflow_model_path) - print(f"成功加载堆料检测模型: {self.overflow_model_path}") - except Exception as e: - print(f"加载堆料检测模型失败: {e}") - success = False - - # 加载对齐检测模型 - try: - from ultralytics import YOLO - if not os.path.exists(self.alignment_model_path): - print(f"对齐检测模型不存在: {self.alignment_model_path}") - success = False - else: - self.alignment_model = YOLO(self.alignment_model_path) - print(f"成功加载对齐检测模型: {self.alignment_model_path}") - except Exception as e: - print(f"加载对齐检测模型失败: {e}") - success = False - - return success - - def send_relay_command(self, command_hex): - """发送原始Socket命令""" - try: - byte_data = binascii.unhexlify(command_hex) - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.connect((self.relay_host, self.relay_port)) - sock.send(byte_data) - response = sock.recv(1024) - print(f"收到继电器响应: {binascii.hexlify(response)}") - return response - except Exception as e: - print(f"继电器通信错误: {e}") - return None - - def get_relay_status(self): - """获取继电器状态""" - response = self.send_relay_command(self.read_status_command) - status_dict = {} - - if response and len(response) >= 10: - status_byte = response[9] - status_bin = f"{status_byte:08b}"[::-1] - for key, bit_index in self.device_bit_map.items(): - status_dict[key] = status_bin[bit_index] == '1' - else: - print("读取继电器状态失败") - - return status_dict - - def control_relay(self, device, action): - """控制继电器""" - if device in self.relay_commands and action in self.relay_commands[device]: - print(f"控制继电器 {device} {action}") - self.send_relay_command(self.relay_commands[device][action]) - time.sleep(0.1) - else: - print(f"无效设备或动作: {device}, {action}") - - def read_transmitter_data_via_relay(self, transmitter_id): - """读取变送器数据(Modbus TCP 转 RS485)""" - try: - if transmitter_id not in self.transmitter_config: - print(f"无效变送器ID: {transmitter_id}") - return None - - config = self.transmitter_config[transmitter_id] - - if not self.relay_modbus_client.connect(): - print("无法连接网络继电器Modbus服务") - return None - - result = self.relay_modbus_client.read_holding_registers( - address=config['weight_register'], - count=config['register_count'], - slave=config['slave_id'] - ) - - if isinstance(result, Exception): - print(f"读取变送器 {transmitter_id} 失败: {result}") - return None - - # 根据图片示例,正确解析数据 - if config['register_count'] == 2: - # 获取原始字节数组 - raw_data = result.registers - # 组合成32位整数 - weight = (raw_data[0] << 16) + raw_data[1] - weight = weight / 1000.0 # 单位转换为千克 - elif config['register_count'] == 1: - weight = float(result.registers[0]) - else: - print(f"不支持的寄存器数量: {config['register_count']}") - return None - - print(f"变送器 {transmitter_id} 读取重量: {weight}kg") - return weight - - except ModbusException as e: - print(f"Modbus通信错误: {e}") - return None - except Exception as e: - print(f"数据解析错误: {e}") - return None - finally: - self.relay_modbus_client.close() - - def set_inverter_frequency_via_relay(self, frequency): - """设置变频器频率""" - try: - if not self.relay_modbus_client.connect(): - print("无法连接网络继电器Modbus服务") - return False - - # 使用最大频率变量计算百分比 - percentage = frequency / self.inverter_max_frequency # 得到 0~1 的比例 - value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数 - - # 限制范围 - value = max(-10000, min(10000, value)) - - result = self.relay_modbus_client.write_register( - self.inverter_config['frequency_register'], - value, - slave=self.inverter_config['slave_id'] - ) - - if isinstance(result, Exception): - print(f"设置频率失败: {result}") - return False - - print(f"设置变频器频率为 {frequency}Hz") - return True - except ModbusException as e: - print(f"变频器Modbus通信错误: {e}") - return False - finally: - self.relay_modbus_client.close() - - def control_inverter_via_relay(self, action): - try: - if not self.relay_modbus_client.connect(): - print("无法连接网络继电器Modbus服务") - return False - - if action == 'start': - result = self.relay_modbus_client.write_register( - address=self.inverter_config['start_register'], - value=self.inverter_config['start_command'], - slave=self.inverter_config['slave_id'] - ) - print("启动变频器") - elif action == 'stop': - result = self.relay_modbus_client.write_register( - address=self.inverter_config['start_register'], - value=self.inverter_config['stop_command'], - slave=self.inverter_config['slave_id'] - ) - print("停止变频器") - else: - print(f"无效操作: {action}") - return False - - if isinstance(result, Exception): - print(f"控制失败: {result}") - return False - - return True - except ModbusException as e: - print(f"变频器控制错误: {e}") - return False - finally: - self.relay_modbus_client.close() - - def check_upper_material_request(self): - """检查是否需要要料""" - current_weight = self.read_transmitter_data_via_relay(1) - - if current_weight is None: - self.upper_weight_error_count += 1 - print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}") - if self.upper_weight_error_count >= self.max_error_count: - print("警告:上料斗传感器连续读取失败,请检查连接") - return False - - self.upper_weight_error_count = 0 - # 判断是否需要要料:当前重量 < 目标重量 + 缓冲重量 - if current_weight < (self.single_batch_weight + self.min_required_weight): - print("上料斗重量不足,通知搅拌楼要料") - self.request_material_from_mixing_building() # 请求搅拌楼下料 - return True - return False - - def request_material_from_mixing_building(self): - """ - 请求搅拌楼下料(待完善) - TODO: 与同事对接通信协议 - """ - print("发送要料请求至搅拌楼...") - self.return_upper_door_to_default() - # 这里需要与同事对接具体的通信方式 - # 可能是Modbus写寄存器、TCP通信、HTTP请求等 - pass - - def wait_for_mixing_building_material(self): - """ - 等待搅拌楼下料完成(待完善) - TODO: 与同事对接信号接收 - """ - print("等待搅拌楼下料完成...") - # 这里需要与同事对接具体的信号接收方式 - # 可能是Modbus读寄存器、TCP通信、HTTP请求等 - # 模拟等待 - time.sleep(5) - print("搅拌楼下料完成") - self.move_upper_door_over_lower() - return True - - def move_upper_door_over_lower(self): - """移动上料斗到下料斗上方""" - print("移动上料斗到下料斗上方") - self.control_relay(self.DOOR_UPPER, 'open') - self.upper_door_position = 'over_lower' - - def return_upper_door(self): - """返回上料斗到搅拌楼""" - print("上料斗返回搅拌楼") - self.control_relay(self.DOOR_UPPER, 'close') - self.upper_door_position = 'returning' - - def return_upper_door_to_default(self): - """上料斗回到默认位置(搅拌楼下接料位置)""" - print("上料斗回到默认位置") - self.control_relay(self.DOOR_UPPER, 'close') - self.upper_door_position = 'default' - - def start_lower_feeding(self): - """开始分步下料""" - if self.lower_feeding_stage != 0: - print("下料已在进行中") - return - - # 检查关键设备是否可连接 - if not self._check_device_connectivity(): - print("关键设备连接失败,无法开始下料") - return - - print("开始分步下料过程") - # 重置计数器 - self.lower_feeding_cycle = 0 # 用于记录三阶段下料次数 - self.upper_feeding_count = 0 # 用于记录上料次数 - - # 第一次上料(总共需要上料2次) - self.transfer_material_from_upper_to_lower() - - # 等待模具车对齐并开始第一轮下料 - self.lower_feeding_stage = 4 # 从等待模具车对齐开始 - self.wait_for_vehicle_alignment() - - def _check_device_connectivity(self): - """检查关键设备连接状态""" - try: - # 检查网络继电器连接 - test_response = self.send_relay_command(self.read_status_command) - if not test_response: - print("网络继电器连接失败") - return False - - # 检查变频器连接 - if not self.relay_modbus_client.connect(): - print("无法连接到网络继电器Modbus服务") - return False - - # 尝试读取变频器一个寄存器(测试连接) - test_result = self.relay_modbus_client.read_holding_registers( - address=0x00, - count=1, - slave=self.inverter_config['slave_id'] - ) - - if isinstance(test_result, Exception): - print("变频器连接测试失败") - return False - - # 检查下料斗变送器连接 - test_weight = self.read_transmitter_data_via_relay(2) - if test_weight is None: - print("下料斗变送器连接失败") - return False - - self.relay_modbus_client.close() - return True - except Exception as e: - print(f"设备连接检查失败: {e}") - return False - - def transfer_material_from_upper_to_lower(self): - """上料斗向下料斗下料(基于上料斗重量传感器控制)""" - print(f"上料斗向下料斗下料 (第 {self.upper_feeding_count + 1} 次)") - - # 记录下料前的重量 - initial_upper_weight = self.read_transmitter_data_via_relay(1) - - # 如果无法读取重量,直接报错 - if initial_upper_weight is None: - raise Exception("无法读取上料斗重量传感器数据,下料操作终止") - - target_upper_weight = initial_upper_weight - self.single_batch_weight - target_upper_weight = max(target_upper_weight, 0) # 确保不低于0 - - print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg") - - # 确保下料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_2, 'close') - # 打开上料斗出砼门 - self.control_relay(self.DOOR_LOWER_1, 'open') - - # 等待物料流入下料斗,基于上料斗重量变化控制 - start_time = time.time() - timeout = 30 # 30秒超时 - - while time.time() - start_time < timeout: - current_upper_weight = self.read_transmitter_data_via_relay(1) - - # 如果无法读取重量,继续尝试 - if current_upper_weight is None: - print("无法读取上料斗重量,继续尝试...") - time.sleep(1) - continue - - print(f"上料斗当前重量: {current_upper_weight:.2f}kg") - - # 如果达到目标重量,则关闭上料斗出砼门 - if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围 - print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg") - break - elif time.time() - start_time > 25: # 如果25秒后重量变化过小 - weight_change = initial_upper_weight - current_upper_weight - if weight_change < 100: # 如果重量变化小于100kg - print("重量变化过小,可能存在堵塞,交由监控系统处理...") - # 不再在这里直接处理破拱,而是依靠监控系统处理 - break - - time.sleep(1) - - # 关闭上料斗出砼门 - self.control_relay(self.DOOR_LOWER_1, 'close') - - # 验证下料结果 - final_upper_weight = self.read_transmitter_data_via_relay(1) - if final_upper_weight is not None: - actual_transferred = initial_upper_weight - final_upper_weight - print(f"实际下料重量: {actual_transferred:.2f}kg") - - # 增加上料计数 - self.upper_feeding_count += 1 - print("上料斗下料完成") - - def wait_for_vehicle_alignment(self): - """等待模具车对齐""" - print("等待模具车对齐...") - self.lower_feeding_stage = 4 - - while self.lower_feeding_stage == 4 and self._running: - if self.vehicle_aligned: - print("模具车已对齐,开始下料") - self.lower_feeding_stage = 1 - self.feeding_stage_one() - break - time.sleep(self.alignment_check_interval) - - def feeding_stage_one(self): - """第一阶段下料:下料斗向模具车下料(低速)""" - print("开始第一阶段下料:下料斗低速下料") - self.set_inverter_frequency_via_relay(self.frequencies[0]) - self.control_inverter_via_relay('start') - - # 确保上料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_1, 'close') - # 打开下料斗出砼门 - self.control_relay(self.DOOR_LOWER_2, 'open') - - start_time = time.time() - initial_weight = self.read_transmitter_data_via_relay(2) - if initial_weight is None: - print("无法获取初始重量,取消下料") - self.finish_feeding_process() # 直接结束整个流程,而不是当前批次 - return - - target_weight = initial_weight + self.single_batch_weight - - while self.lower_feeding_stage == 1: - current_weight = self.read_transmitter_data_via_relay(2) - if current_weight is None: - self.lower_weight_error_count += 1 - if self.lower_weight_error_count >= self.max_error_count: - print("下料斗传感器连续读取失败,停止下料") - self.finish_feeding_process() # 直接结束整个流程 - return - else: - self.lower_weight_error_count = 0 - - if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 2 - self.feeding_stage_two() - break - time.sleep(2) - - def feeding_stage_two(self): - """第二阶段下料:下料斗向模具车下料(中速)""" - print("开始第二阶段下料:下料斗中速下料") - self.set_inverter_frequency_via_relay(self.frequencies[1]) - - # 保持下料斗出砼门打开 - self.control_relay(self.DOOR_LOWER_2, 'open') - # 确保上料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_1, 'close') - - start_time = time.time() - initial_weight = self.read_transmitter_data_via_relay(2) - if initial_weight is None: - print("无法获取初始重量,取消下料") - self.finish_feeding_process() # 直接结束整个流程 - return - - target_weight = initial_weight + self.single_batch_weight - - while self.lower_feeding_stage == 2: - current_weight = self.read_transmitter_data_via_relay(2) - if current_weight is None: - self.lower_weight_error_count += 1 - if self.lower_weight_error_count >= self.max_error_count: - print("下料斗传感器连续读取失败,停止下料") - self.finish_feeding_process() # 直接结束整个流程 - return - else: - self.lower_weight_error_count = 0 - - if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 3 - self.feeding_stage_three() - break - time.sleep(2) - - def feeding_stage_three(self): - """第三阶段下料:下料斗向模具车下料(高速)""" - print("开始第三阶段下料:下料斗高速下料") - self.set_inverter_frequency_via_relay(self.frequencies[2]) - - # 保持下料斗出砼门打开 - self.control_relay(self.DOOR_LOWER_2, 'open') - # 确保上料斗出砼门关闭 - self.control_relay(self.DOOR_LOWER_1, 'close') - - start_time = time.time() - initial_weight = self.read_transmitter_data_via_relay(2) - if initial_weight is None: - print("无法获取初始重量,取消下料") - self.finish_feeding_process() # 直接结束整个流程 - return - - target_weight = initial_weight + self.single_batch_weight - - while self.lower_feeding_stage == 3: - current_weight = self.read_transmitter_data_via_relay(2) - if current_weight is None: - self.lower_weight_error_count += 1 - if self.lower_weight_error_count >= self.max_error_count: - print("下料斗传感器连续读取失败,停止下料") - self.finish_feeding_process() # 直接结束整个流程 - return - else: - self.lower_weight_error_count = 0 - - if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 4 - self.finish_current_batch() - break - time.sleep(2) - - def finish_current_batch(self): - """完成当前批次下料""" - print("当前批次下料完成,关闭出砼门") - self.control_inverter_via_relay('stop') - self.control_relay(self.DOOR_LOWER_1, 'close') - self.control_relay(self.DOOR_LOWER_2, 'close') - - # 增加三阶段下料轮次计数 - self.lower_feeding_cycle += 1 - - # 检查是否完成两轮三阶段下料(总共5吨) - if self.lower_feeding_cycle >= 2: - # 完成整个5吨下料任务 - print("完成两轮三阶段下料,5吨下料任务完成") - self.finish_feeding_process() - return - - # 如果只完成一轮三阶段下料,进行第二次上料 - print("第一轮三阶段下料完成,准备第二次上料") - # 上料斗第二次向下料斗下料 - try: - self.transfer_material_from_upper_to_lower() - except Exception as e: - print(f"第二次上料失败: {e}") - print("停止下料流程") - self.finish_feeding_process() # 出现严重错误时结束整个流程 - return - - # 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车) - print("第二次上料完成,继续三阶段下料") - self.lower_feeding_stage = 1 # 直接进入第一阶段下料 - self.feeding_stage_one() # 开始第二轮第一阶段下料 - - def finish_feeding_process(self): - """完成整个下料流程""" - print("整个下料流程完成") - self.lower_feeding_stage = 0 - self.lower_feeding_cycle = 0 - self.upper_feeding_count = 0 - self.return_upper_door_to_default() - - def handle_overflow_control(self, overflow_detected, door_opening_large): - """处理溢料控制""" - if overflow_detected and door_opening_large: - print("检测到溢料且出砼门开口较大,调小出砼门") - self.control_relay(self.DOOR_LOWER_1, 'close') - time.sleep(0.1) - self.control_relay(self.DOOR_LOWER_1, 'open') - time.sleep(0.1) - - def is_lower_door_open(self): - """检查出砼门是否打开""" - return self.lower_feeding_stage in [1, 2] # 只有在下料阶段才认为门是打开的 - - def check_arch_blocking(self): - """检查是否需要破拱""" - current_time = time.time() - - # 检查下料斗破拱(只有在下料过程中才检查) - if self.lower_feeding_stage in [1, 2, 3]: # 在所有下料阶段检查 - lower_weight = self.read_transmitter_data_via_relay(2) - if lower_weight is not None: - # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) - if (abs(lower_weight - self.last_lower_weight) < 0.1) and \ - (current_time - self.last_weight_time) > 10: - print("下料斗可能堵塞,启动破拱") - self.control_relay(self.BREAK_ARCH_LOWER, 'open') - time.sleep(2) - self.control_relay(self.BREAK_ARCH_LOWER, 'close') - - self.last_lower_weight = lower_weight - - # 检查上料斗破拱(在上料斗向下料斗下料时检查) - if (self.upper_door_position == 'over_lower' and - self.lower_feeding_stage in [0, 1, 2, 3, 4]): # 在任何阶段都可能需要上料斗破拱 - upper_weight = self.read_transmitter_data_via_relay(1) - if upper_weight is not None: - # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) - if (abs(upper_weight - self.last_upper_weight) < 0.1) and \ - (current_time - self.last_weight_time) > 10: - print("上料斗可能堵塞,启动破拱") - self.control_relay(self.BREAK_ARCH_UPPER, 'open') - time.sleep(2) - self.control_relay(self.BREAK_ARCH_UPPER, 'close') - - self.last_upper_weight = upper_weight - - # 更新最后读取时间 - if (self.read_transmitter_data_via_relay(1) is not None or - self.read_transmitter_data_via_relay(2) is not None): - self.last_weight_time = current_time - - def monitor_system(self): - """监控系统状态""" - while self._running: - try: - self.check_upper_material_request() - self.check_arch_blocking() - time.sleep(1) - except Exception as e: - print(f"监控线程错误: {e}") - - def setup_camera_capture(self, camera_index=0): - """ - 设置摄像头捕获 - :param camera_index: USB摄像头索引或IP摄像头配置 - """ - try: - rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01" - self.camera = cv2.VideoCapture(rtsp_url) - - if not self.camera.isOpened(): - print(f"无法打开网络摄像头: {rtsp_url}") - return False - print(f"网络摄像头初始化成功,地址: {rtsp_url}") - return True - except Exception as e: - print(f"摄像头设置失败: {e}") - return False - - def capture_current_frame(self): - """捕获当前帧并返回numpy数组""" - try: - if self.camera is None: - print("摄像头未初始化") - return None - - ret, frame = self.camera.read() - if ret: - return frame - else: - print("无法捕获图像帧") - return None - except Exception as e: - print(f"图像捕获失败: {e}") - return None - - def detect_overflow_from_image(self, image_array): - """通过图像检测是否溢料(接受numpy数组)""" - try: - # 检查模型是否已加载 - if self.overflow_model is None: - print("堆料检测模型未加载") - return False - - rois = load_global_rois(self.roi_file_path) - - if not rois: - print("没有有效的ROI配置") - return False - - if image_array is None: - print("输入图像为空") - return False - - crops = crop_and_resize(image_array, rois, 640) - for roi_resized, _ in crops: - final_class, _, _, _ = classify_image_weighted(roi_resized, self.overflow_model, threshold=0.4) - if "大堆料" in final_class or "小堆料" in final_class: - return True - - return False - except Exception as e: - print(f"溢料检测失败: {e}") - return False - - def detect_vehicle_alignment(self, image_array): - """通过图像检测模具车是否对齐(接受numpy数组)""" - try: - # 检查模型是否已加载 - if self.alignment_model is None: - print("对齐检测模型未加载") - return False - - if image_array is None: - print("输入图像为空") - return False - - # 直接使用模型进行推理 - results = self.alignment_model(image_array) - pared_probs = results[0].probs.data.cpu().numpy().flatten() - - # 类别0: 未对齐, 类别1: 对齐 - class_id = int(pared_probs.argmax()) - confidence = float(pared_probs[class_id]) - - # 只有当对齐且置信度>95%时才认为对齐 - if class_id == 1 and confidence > 0.95: - return True - return False - except Exception as e: - print(f"对齐检测失败: {e}") - return False - - def get_current_door_angle(self, image=None, image_path=None): - """ - 通过视觉系统获取当前出砼门角度 - :param image: 图像数组(numpy array) - :param image_path: 图片路径 - """ - try: - # 检查模型是否已加载 - if self.angle_model is None: - print("夹角检测模型未加载") - return None - - angle_deg, _ = predict_obb_best_angle( - model=self.angle_model, # 传递预加载的模型实例 - image=image, # 传递图像数组 - image_path=image_path # 或传递图像路径 - ) - return angle_deg - except Exception as e: - print(f"角度检测失败: {e}") - return None - - def alignment_check_loop(self): - """ - 模具车对齐检查循环 - """ - while self._running: - try: - # 只在需要检查对齐时才检查 - if self.lower_feeding_stage == 4: - current_frame = self.capture_current_frame() - if current_frame is not None: - self.vehicle_aligned = self.detect_vehicle_alignment(current_frame) - if self.vehicle_aligned: - print("检测到模具车对齐") - else: - print("模具车未对齐") - time.sleep(self.alignment_check_interval) - except Exception as e: - print(f"对齐检查循环错误: {e}") - time.sleep(self.alignment_check_interval) - - def visual_control_loop(self): - """ - 视觉控制主循环 - """ - - while self._running and self.visual_control_enabled: - try: - current_frame = self.capture_current_frame() - if current_frame is None: - print("无法获取当前图像,跳过本次调整") - time.sleep(self.visual_check_interval) - continue - - # 检测是否溢料 - overflow = self.detect_overflow_from_image(current_frame) - - # 获取当前角度 - current_angle = self.get_current_door_angle(image=current_frame) - - if current_angle is None: - print("无法获取当前角度,跳过本次调整") - time.sleep(self.visual_check_interval) - continue - - print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.angle_control_mode}") - - # 状态机控制逻辑 - if self.angle_control_mode == "normal": - # 正常模式 - if overflow and current_angle > self.angle_threshold: - # 检测到堆料且角度过大,进入角度减小模式 - print("检测到堆料且角度过大,关闭出砼门开始减小角度") - self.control_relay(self.DOOR_LOWER_2, 'close') - self.angle_control_mode = "reducing" - else: - # 保持正常开门 - self.control_relay(self.DOOR_LOWER_2, 'open') - - elif self.angle_control_mode == "reducing": - # 角度减小模式 - if current_angle <= self.target_angle + self.angle_tolerance: - # 角度已达到目标范围 - if overflow: - # 仍有堆料,进入维持模式 - print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式") - self.angle_control_mode = "maintaining" - self.control_relay(self.DOOR_LOWER_2, 'open') # 先打开门 - else: - # 无堆料,恢复正常模式 - print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式") - self.control_relay(self.DOOR_LOWER_2, 'open') - self.angle_control_mode = "normal" - - elif self.angle_control_mode == "maintaining": - # 维持模式 - 使用脉冲控制 - if not overflow: - # 堆料已消除,恢复正常模式 - print("堆料已消除,恢复正常模式") - self.control_relay(self.DOOR_LOWER_2, 'open') - self.angle_control_mode = "normal" - else: - # 继续维持角度控制 - self.pulse_control_door_for_maintaining() - - elif self.angle_control_mode == "recovery": # 打开夹爪的过程中又堆料了 - # 恢复模式 - 逐步打开门 - if overflow: - # 又出现堆料,回到角度减小模式 - print("恢复过程中又检测到堆料,回到角度减小模式") - self.angle_control_mode = "maintaining" - else: - # 堆料已消除,恢复正常模式 - print("堆料已消除,恢复正常模式") - self.control_relay(self.DOOR_LOWER_2, 'open') - self.angle_control_mode = "normal" - - self.last_angle = current_angle - time.sleep(self.visual_check_interval) - - except Exception as e: - print(f"视觉控制循环错误: {e}") - time.sleep(self.visual_check_interval) - - def pulse_control_door_for_maintaining(self): - """ - 用于维持模式的脉冲控制 - 保持角度在目标范围内 - """ - print("执行维持脉冲控制") - # 关门1秒 - self.control_relay(self.DOOR_LOWER_2, 'close') - time.sleep(1.0) - # 开门1秒 - self.control_relay(self.DOOR_LOWER_2, 'open') - time.sleep(1.0) - - def start_visual_control(self): - """ - 启动视觉控制线程 - """ - if not self.visual_control_enabled: - print("视觉控制未启用") - return - - print("启动视觉控制线程") - self._visual_control_thread = threading.Thread( - target=self.visual_control_loop, - daemon=True - ) - self._visual_control_thread.start() - return self._visual_control_thread - - def start_alignment_check(self): - """ - 启动模具车对齐检查线程 - """ - print("启动模具车对齐检查线程") - self._alignment_check_thread = threading.Thread( - target=self.alignment_check_loop, - daemon=True - ) - self._alignment_check_thread.start() - return self._alignment_check_thread - - def start(self): - """启动系统""" - if self._running: - print("系统已在运行") - return - print("启动控制系统") - self._running = True - self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True) - self._monitor_thread.start() - - def stop(self): - """停止系统""" - if not self._running: - print("系统未在运行") - return - print("停止控制系统") - self._running = False - if self._monitor_thread is not None: - self._monitor_thread.join() - if self._visual_control_thread is not None: - self._visual_control_thread.join() - if self._alignment_check_thread is not None: - self._alignment_check_thread.join() - if self.camera is not None: - self.camera.release() - print("控制系统已停止") diff --git a/feeding/process.py b/feeding/process.py index 5565198..91ed086 100644 --- a/feeding/process.py +++ b/feeding/process.py @@ -1,4 +1,3 @@ -# feeding/process.py class FeedingProcess: def __init__(self, relay_controller, inverter_controller, transmitter_controller, vision_detector, @@ -17,6 +16,11 @@ class FeedingProcess: print("下料已在进行中") return + # 检查关键设备是否可连接 + if not self._check_device_connectivity(): + print("关键设备连接失败,无法开始下料") + return + print("开始分步下料过程") # 重置计数器 self.state.lower_feeding_cycle = 0 @@ -29,6 +33,43 @@ class FeedingProcess: self.state.lower_feeding_stage = 4 self.wait_for_vehicle_alignment() + def _check_device_connectivity(self): + """检查关键设备连接状态""" + try: + # 检查网络继电器连接 + test_response = self.relay_controller.send_command(self.relay_controller.read_status_command) + if not test_response: + print("网络继电器连接失败") + return False + + # 检查变频器连接 + if not self.relay_controller.modbus_client.connect(): + print("无法连接到网络继电器Modbus服务") + return False + + # 尝试读取变频器一个寄存器(测试连接) + test_result = self.relay_controller.modbus_client.read_holding_registers( + address=0x00, + count=1, + slave=self.inverter_controller.config['slave_id'] + ) + + if isinstance(test_result, Exception): + print("变频器连接测试失败") + return False + + # 检查下料斗变送器连接 + test_weight = self.transmitter_controller.read_data(2) + if test_weight is None: + print("下料斗变送器连接失败") + return False + + self.relay_controller.modbus_client.close() + return True + except Exception as e: + print(f"设备连接检查失败: {e}") + return False + def transfer_material_from_upper_to_lower(self): """上料斗向下料斗下料""" print(f"上料斗向下料斗下料 (第 {self.state.upper_feeding_count + 1} 次)")