diff --git a/Fedding.py b/Fedding.py index 562d6f9..0e12105 100644 --- a/Fedding.py +++ b/Fedding.py @@ -1,4 +1,3 @@ -# Fedding.py (修正版 - 统一模型管理) import socket import binascii import time @@ -56,23 +55,25 @@ class FeedingControlSystem: # 变频器配置(Modbus RTU 协议) self.inverter_config = { 'slave_id': 1, - 'frequency_register': 0x01, # 寄存器地址:0x01(对应2001H) - 'start_register': 0x00, # 启动命令:0x0001(正转运行) - 'stop_register': 0x01 # 停止命令:0x0000(停机) + 'frequency_register': 0x01, # 2001H + 'start_register': 0x00, # 2000H + 'stop_register': 0x00, # 2000H(用于停机) + 'start_command': 0x0013, # 正转点动运行 + 'stop_command': 0x0001 # 停机 } # 变送器配置(Modbus RTU) self.transmitter_config = { 1: { # 上料斗 'slave_id': 1, - 'weight_register': 0x00, + 'weight_register': 0x01, 'register_count': 2 - }, + },#发出去的内容01 03 00 01 00 02 2: { # 下料斗 'slave_id': 2, - 'weight_register': 0x00, + 'weight_register': 0x01, 'register_count': 2 - } + }#发出去的内容02 03 00 01 00 02 } # 系统状态 @@ -93,13 +94,18 @@ class FeedingControlSystem: self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg),每次下料多下这么多 self.single_batch_weight = 2500 # 单次下料重量(kg) + + #夹角状态 + self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery + # 错误计数 self.upper_weight_error_count = 0 self.lower_weight_error_count = 0 self.max_error_count = 3 # 下料阶段频率(Hz) - self.frequencies = [30.0, 40.0, 50.0] + self.inverter_max_frequency = 400.0#频率最大值 + self.frequencies = [220.0, 230.0, 240.0] # 视觉系统接口 self.overflow_detected = False # 堆料检测 @@ -277,17 +283,20 @@ class FeedingControlSystem: result = self.relay_modbus_client.read_holding_registers( address=config['weight_register'], count=config['register_count'], - slave=config['slave_id'] + slave=config['slave_id']#转发给哪台变送器 ) if isinstance(result, Exception): print(f"读取变送器 {transmitter_id} 失败: {result}") return None - if config['register_count'] == 2: - # 解析为 32 位整数(大端序) - weight_bytes = struct.pack('>HH', result.registers[0], result.registers[1]) - weight = struct.unpack('>I', weight_bytes)[0] / 100.0 # 假设单位是 kg,精度两位 + # 根据图片示例,正确解析数据 + if config['register_count'] == 2:#读两个寄存器 + # 获取原始字节数组 + raw_data = result.registers + # 组合成32位整数 + weight = (raw_data[0] << 16) + raw_data[1] + weight = weight / 1000.0 # 单位转换为千克 elif config['register_count'] == 1: weight = float(result.registers[0]) else: @@ -313,7 +322,12 @@ class FeedingControlSystem: print("无法连接网络继电器Modbus服务") return False - value = int(frequency * 100) + # 使用最大频率变量计算百分比 + percentage = frequency / self.inverter_max_frequency # 得到 0~1 的比例 + value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数 + + # 限制范围 + value = max(-10000, min(10000, value)) result = self.relay_modbus_client.write_register( self.inverter_config['frequency_register'], @@ -334,7 +348,6 @@ class FeedingControlSystem: self.relay_modbus_client.close() def control_inverter_via_relay(self, action): - """控制变频器启停""" try: if not self.relay_modbus_client.connect(): print("无法连接网络继电器Modbus服务") @@ -342,15 +355,15 @@ class FeedingControlSystem: if action == 'start': result = self.relay_modbus_client.write_register( - self.inverter_config['start_register'], - 1, + address=self.inverter_config['start_register'], + value=self.inverter_config['start_command'], slave=self.inverter_config['slave_id'] ) print("启动变频器") elif action == 'stop': result = self.relay_modbus_client.write_register( - self.inverter_config['stop_register'], - 0, + address=self.inverter_config['start_register'], + value=self.inverter_config['stop_command'], slave=self.inverter_config['slave_id'] ) print("停止变频器") @@ -537,7 +550,7 @@ class FeedingControlSystem: """第二阶段下料(剩余2.5吨)""" print("开始第二阶段下料 (2/2)") self.upper_feeding_count = 2 - self.set_inverter_frequency_via_relay(self.frequencies[1]) + self.set_inverter_frequency_via_relay(self.lower_feeding_frequencies[1]) start_time = time.time() initial_weight = self.read_transmitter_data_via_relay(2) @@ -560,7 +573,39 @@ class FeedingControlSystem: self.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: - self.lower_feeding_stage = 3 + self.lower_feeding_stage = 3 # 改为跳转到第三阶段 + self.feeding_stage_three() # 调用第三阶段 + break + time.sleep(2) + + def feeding_stage_three(self): + """第三阶段下料""" + print("开始第三阶段下料 (3/3)") + self.upper_feeding_count = 3 + self.set_inverter_frequency_via_relay(self.lower_feeding_frequencies[2]) # 使用第三个频率 + + start_time = time.time() + initial_weight = self.read_transmitter_data_via_relay(2) + if initial_weight is None: + print("无法获取初始重量,取消下料") + self.finish_current_batch() + return + + target_weight = initial_weight + self.single_batch_weight + + while self.lower_feeding_stage == 3: + current_weight = self.read_transmitter_data_via_relay(2) + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗传感器连续读取失败,停止下料") + self.finish_current_batch() + return + else: + self.lower_weight_error_count = 0 + + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 4 self.finish_current_batch() break time.sleep(2) @@ -710,7 +755,7 @@ class FeedingControlSystem: crops = crop_and_resize(image_array, rois, 640) for roi_resized, _ in crops: final_class, _, _, _ = classify_image_weighted(roi_resized, self.overflow_model, threshold=0.4) - if "大堆料" in final_class or "浇筑满" in final_class: + if "大堆料" in final_class or "小堆料" in final_class: return True return False @@ -732,11 +777,11 @@ class FeedingControlSystem: # 直接使用模型进行推理 results = self.alignment_model(image_array) - pred_probs = results[0].probs.data.cpu().numpy().flatten() + pared_probs = results[0].probs.data.cpu().numpy().flatten() # 类别0: 未对齐, 类别1: 对齐 - class_id = int(pred_probs.argmax()) - confidence = float(pred_probs[class_id]) + class_id = int(pared_probs.argmax()) + confidence = float(pared_probs[class_id]) # 只有当对齐且置信度>95%时才认为对齐 if class_id == 1 and confidence > 0.95: @@ -746,9 +791,11 @@ class FeedingControlSystem: print(f"对齐检测失败: {e}") return False - def get_current_door_angle(self, image_path): + def get_current_door_angle(self, image=None, image_path=None): """ 通过视觉系统获取当前出砼门角度 + :param image: 图像数组(numpy array) + :param image_path: 图片路径 """ try: # 检查模型是否已加载 @@ -758,38 +805,14 @@ class FeedingControlSystem: angle_deg, _ = predict_obb_best_angle( model=self.angle_model, # 传递预加载的模型实例 - image_path=image_path + image=image, # 传递图像数组 + image_path=image_path # 或传递图像路径 ) return angle_deg except Exception as e: print(f"角度检测失败: {e}") return None - def adjust_door_angle(self, current_angle, target_angle): - """ - 根据当前角度和目标角度调整出砼门 - """ - angle_diff = abs(current_angle - target_angle) - - if angle_diff <= self.angle_tolerance: - print(f"角度已在目标范围内: {current_angle:.2f}°") - return True - - if current_angle > target_angle: - # 需要减小角度 - 关闭DO2 - print(f"角度 {current_angle:.2f}° 过大,调整至 {target_angle}°,关闭出砼门") - self.control_relay(self.DOOR_LOWER_2, 'close') - time.sleep(0.1) - self.control_relay(self.DOOR_LOWER_2, 'open') - return False - else: - # 需要增大角度 - 打开DO2 - print(f"角度 {current_angle:.2f}° 过小,调整至 {target_angle}°,打开出砼门") - self.control_relay(self.DOOR_LOWER_2, 'open') - time.sleep(0.1) - self.control_relay(self.DOOR_LOWER_2, 'close') - return False - def alignment_check_loop(self): """ 模具车对齐检查循环 @@ -814,6 +837,7 @@ class FeedingControlSystem: """ 视觉控制主循环 """ + while self._running and self.visual_control_enabled: try: current_frame = self.capture_current_frame() @@ -825,31 +849,65 @@ class FeedingControlSystem: # 检测是否溢料 overflow = self.detect_overflow_from_image(current_frame) - # 获取当前角度(需要临时文件) - temp_path = "temp_angle_image.jpg" - cv2.imwrite(temp_path, current_frame) - current_angle = self.get_current_door_angle(temp_path) - if os.path.exists(temp_path): - os.remove(temp_path) + # 获取当前角度 + current_angle = self.get_current_door_angle(image=current_frame) if current_angle is None: print("无法获取当前角度,跳过本次调整") time.sleep(self.visual_check_interval) continue - print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}") + print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.angle_control_mode}") - # 根据溢料状态和角度决定调整策略 - if overflow and current_angle > self.angle_threshold: - self.adjust_door_angle(current_angle, self.target_angle) - elif not overflow and current_angle < self.target_angle: - if current_angle < self.max_angle - self.angle_tolerance: - target = min(current_angle + 10, self.max_angle) - self.adjust_door_angle(current_angle, target) - elif overflow and current_angle <= self.angle_threshold: - print("溢料但角度合理,无需调整") - else: - print("角度状态正常,无需调整") + # 状态机控制逻辑 + if self.angle_control_mode == "normal": + # 正常模式 + if overflow and current_angle > self.angle_threshold: + # 检测到堆料且角度过大,进入角度减小模式 + print("检测到堆料且角度过大,关闭出砼门开始减小角度") + self.control_relay(self.DOOR_LOWER_2, 'close') + self.angle_control_mode = "reducing" + else: + # 保持正常开门 + self.control_relay(self.DOOR_LOWER_2, 'open') + + elif self.angle_control_mode == "reducing": + # 角度减小模式 + if current_angle <= self.target_angle + self.angle_tolerance: + # 角度已达到目标范围 + if overflow: + # 仍有堆料,进入维持模式 + print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式") + self.angle_control_mode = "maintaining" + self.control_relay(self.DOOR_LOWER_2, 'open') # 先打开门 + else: + # 无堆料,恢复正常模式 + print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式") + self.control_relay(self.DOOR_LOWER_2, 'open') + self.angle_control_mode = "recovery" + + elif self.angle_control_mode == "maintaining": + # 维持模式 - 使用脉冲控制 + if not overflow: + # 堆料已消除,恢复正常模式 + print("堆料已消除,恢复正常模式") + self.control_relay(self.DOOR_LOWER_2, 'open') + self.angle_control_mode = "recovery" + else: + # 继续维持角度控制 + self.pulse_control_door_for_maintaining() + + elif self.angle_control_mode == "recovery":#打开夹爪的过程中又堆料了 + # 恢复模式 - 逐步打开门 + if overflow: + # 又出现堆料,回到角度减小模式 + print("恢复过程中又检测到堆料,回到角度减小模式") + self.angle_control_mode = "maintaining" + else: + # 堆料已消除,恢复正常模式 + print("堆料已消除,恢复正常模式") + self.control_relay(self.DOOR_LOWER_2, 'open') + self.angle_control_mode = "normal" self.last_angle = current_angle time.sleep(self.visual_check_interval) @@ -858,6 +916,19 @@ class FeedingControlSystem: print(f"视觉控制循环错误: {e}") time.sleep(self.visual_check_interval) + def pulse_control_door_for_maintaining(self): + """ + 用于维持模式的脉冲控制 + 保持角度在目标范围内 + """ + print("执行维持脉冲控制") + # 关门1秒 + self.control_relay(self.DOOR_LOWER_2, 'close') + time.sleep(1.0) + # 开门1秒 + self.control_relay(self.DOOR_LOWER_2, 'open') + time.sleep(1.0) + def start_visual_control(self): """ 启动视觉控制线程 @@ -963,10 +1034,11 @@ if __name__ == "__main__": system.start_alignment_check() try: - # 运行一段时间 - time.sleep(300) # 运行5分钟 - + while True: + time.sleep(1) except KeyboardInterrupt: - print("程序被中断") + print("收到停止信号") + except Exception as e: + print(f"系统错误: {e}") finally: system.stop() diff --git a/src/vision/anger_caculate.py b/src/vision/anger_caculate.py index e299d83..2a82dd3 100644 --- a/src/vision/anger_caculate.py +++ b/src/vision/anger_caculate.py @@ -3,12 +3,13 @@ import os import numpy as np from ultralytics import YOLO -def predict_obb_best_angle(model=None, model_path=None, image_path=None, save_path=None): +def predict_obb_best_angle(model=None, model_path=None, image=None, image_path=None, save_path=None): """ 输入: model: 预加载的YOLO模型实例(可选) model_path: YOLO 权重路径(当model为None时使用) - image_path: 图片路径 + image: 图像数组(numpy array) + image_path: 图片路径(当image为None时使用) save_path: 可选,保存带标注图像 输出: angle_deg: 置信度最高两个框的主方向夹角(度),如果检测少于两个目标返回 None @@ -16,19 +17,22 @@ def predict_obb_best_angle(model=None, model_path=None, image_path=None, save_pa """ # 1. 使用预加载的模型或加载新模型 if model is not None: - # 使用预加载的模型 loaded_model = model elif model_path is not None: - # 加载模型 loaded_model = YOLO(model_path) else: raise ValueError("必须提供model或model_path参数") - # 2. 读取图像 - img = cv2.imread(image_path) - if img is None: - print(f"无法读取图像: {image_path}") - return None, None + # 2. 读取图像(优先使用传入的图像数组) + if image is not None: + img = image + elif image_path is not None: + img = cv2.imread(image_path) + if img is None: + print(f"无法读取图像: {image_path}") + return None, None + else: + raise ValueError("必须提供image或image_path参数") # 3. 推理 OBB results = loaded_model(img, save=False, imgsz=640, conf=0.5, mode='obb')