Files
Feeding_control_system/Fedding.py
2025-09-29 09:19:30 +08:00

1154 lines
45 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import binascii
import time
import threading
import struct
import cv2
import os
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
# 添加视觉模块路径
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), 'src', 'vision'))
# 导入视觉处理模块
from vision.anger_caculate import predict_obb_best_angle
from vision.resize_tuili_image_main import classify_image_weighted, load_global_rois, crop_and_resize, YOLO
class FeedingControlSystem:
def __init__(self, relay_host='192.168.0.18', relay_port=50000):
# 网络继电器配置
self.relay_host = relay_host
self.relay_port = relay_port
self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port)
# 继电器映射
self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动
self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门
self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门
self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱
self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱
# 继电器命令原始Socketmudbus TCP模式
self.relay_commands = {
self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}
}
# 读取状态命令
self.read_status_command = '000000000006010100000008'
# 设备位映射
self.device_bit_map = {
self.DOOR_UPPER: 0,
self.DOOR_LOWER_1: 1,
self.DOOR_LOWER_2: 2,
self.BREAK_ARCH_UPPER: 3,
self.BREAK_ARCH_LOWER: 4
}
# 变频器配置Modbus RTU 协议)
self.inverter_config = {
'slave_id': 1,
'frequency_register': 0x01, # 2001H
'start_register': 0x00, # 2000H
'stop_register': 0x00, # 2000H用于停机
'start_command': 0x0013, # 正转点动运行
'stop_command': 0x0001 # 停机
}
# 变送器配置Modbus RTU
self.transmitter_config = {
1: { # 上料斗
'slave_id': 1,
'weight_register': 0x01,
'register_count': 2
},#发出去的内容01 03 00 01 00 02
2: { # 下料斗
'slave_id': 2,
'weight_register': 0x01,
'register_count': 2
}#发出去的内容02 03 00 01 00 02
}
# 系统状态
self._running = False
self._monitor_thread = None
self._visual_control_thread = None
self._alignment_check_thread = None
# 下料控制相关
self.min_required_weight = 500 # 模具车最小需要重量(kg)
self.upper_door_position = 'default' # default(在搅拌楼下接料), over_lower(在下料斗上方), returning(返回中)
self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:等待上料, 4:等待模具车对齐
self.lower_feeding_cycle = 0 # 下料斗下料循环次数 (1, 2, 3)
self.upper_feeding_count = 0 # 上料斗已下料次数 (0, 1, 2)
self.last_upper_weight = 0
self.last_lower_weight = 0
self.last_weight_time = time.time()
self.target_vehicle_weight = 5000 # 目标模具车重量(kg)
self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg),每次下料多下这么多
self.single_batch_weight = 2500 # 单次下料重量(kg)
#夹角状态
self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery
# 错误计数
self.upper_weight_error_count = 0
self.lower_weight_error_count = 0
self.max_error_count = 3
# 下料阶段频率Hz
self.inverter_max_frequency = 400.0#频率最大值
self.frequencies = [220.0, 230.0, 240.0]
# 视觉系统接口
self.overflow_detected = False # 堆料检测
self.door_opening_large = False # 夹角
self.vehicle_aligned = False # 模具车是否对齐
# 视觉控制参数
self.angle_threshold = 60.0 # 角度阈值,超过此值认为开口过大
self.target_angle = 20.0 # 目标角度
self.min_angle = 10.0 # 最小角度
self.max_angle = 80.0 # 最大角度
self.angle_tolerance = 5.0 # 角度容差
self.visual_control_enabled = True # 视觉控制使能
self.last_angle = None # 上次检测角度
self.visual_check_interval = 1.0 # 视觉检查间隔(秒)
self.alignment_check_interval = 0.5 # 对齐检查间隔(秒)
# 模型路径配置
self.angle_model_path = "vision/models/angle.pt"
self.overflow_model_path = "vision/models/overflow.pt"
self.alignment_model_path = "vision/models/alig.pt" # 模具车对齐检测模型
self.roi_file_path = "vision/roi_coordinates/1_rois.txt"
# 模型实例
self.angle_model = None # 夹角检测模型实例
self.overflow_model = None # 堆料检测模型实例
self.alignment_model = None # 对齐检测模型实例
# 摄像头相关配置
self.camera = None
self.camera_type = "ip"
self.camera_ip = "192.168.1.51"
self.camera_port = 554
self.camera_username = "admin"
self.camera_password = "XJ123456"
self.camera_channel = 1
# self.current_image_path = "current_frame.jpg"
def set_camera_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1):
"""
设置摄像头配置
:param ip: 网络摄像头IP地址
:param port: 网络摄像头端口
:param username: 网络摄像头用户名
:param password: 网络摄像头密码
:param channel: 摄像头通道号
"""
self.camera_type = camera_type
if ip:
self.camera_ip = ip
if port:
self.camera_port = port
if username:
self.camera_username = username
if password:
self.camera_password = password
self.camera_channel = channel
def set_angle_parameters(self, target_angle=20.0, min_angle=10.0, max_angle=80.0, threshold=60.0):
"""
设置角度控制参数
"""
self.target_angle = target_angle
self.min_angle = min_angle
self.max_angle = max_angle
self.angle_threshold = threshold
def set_feeding_parameters(self, target_vehicle_weight=5000, upper_buffer_weight=500, single_batch_weight=2500):
"""
设置下料参数
:param target_vehicle_weight: 目标模具车重量(kg)
:param upper_buffer_weight: 上料斗缓冲重量(kg)
:param single_batch_weight: 单次下料重量(kg)
"""
self.target_vehicle_weight = target_vehicle_weight
self.upper_buffer_weight = upper_buffer_weight
self.single_batch_weight = single_batch_weight
def load_all_models(self):
"""
加载所有视觉检测模型
"""
success = True
# 加载夹角检测模型
try:
if not os.path.exists(self.angle_model_path):
print(f"夹角检测模型不存在: {self.angle_model_path}")
success = False
else:
# 注意angle.pt模型通过predict_obb_best_angle函数使用不需要预加载
print(f"夹角检测模型路径: {self.angle_model_path}")
except Exception as e:
print(f"检查夹角检测模型失败: {e}")
success = False
# 加载堆料检测模型
try:
if not os.path.exists(self.overflow_model_path):
print(f"堆料检测模型不存在: {self.overflow_model_path}")
success = False
else:
self.overflow_model = YOLO(self.overflow_model_path)
print(f"成功加载堆料检测模型: {self.overflow_model_path}")
except Exception as e:
print(f"加载堆料检测模型失败: {e}")
success = False
# 加载对齐检测模型
try:
if not os.path.exists(self.alignment_model_path):
print(f"对齐检测模型不存在: {self.alignment_model_path}")
success = False
else:
self.alignment_model = YOLO(self.alignment_model_path)
print(f"成功加载对齐检测模型: {self.alignment_model_path}")
except Exception as e:
print(f"加载对齐检测模型失败: {e}")
success = False
return success
def send_relay_command(self, command_hex):
"""发送原始Socket命令"""
try:
byte_data = binascii.unhexlify(command_hex)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.relay_host, self.relay_port))
sock.send(byte_data)
response = sock.recv(1024)
print(f"收到继电器响应: {binascii.hexlify(response)}")
return response
except Exception as e:
print(f"继电器通信错误: {e}")
return None
def get_relay_status(self):
"""获取继电器状态"""
response = self.send_relay_command(self.read_status_command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
for key, bit_index in self.device_bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print("读取继电器状态失败")
return status_dict
def control_relay(self, device, action):
"""控制继电器"""
if device in self.relay_commands and action in self.relay_commands[device]:
print(f"控制继电器 {device} {action}")
self.send_relay_command(self.relay_commands[device][action])
time.sleep(0.1)
else:
print(f"无效设备或动作: {device}, {action}")
def read_transmitter_data_via_relay(self, transmitter_id):
"""读取变送器数据Modbus TCP 转 RS485"""
try:
if transmitter_id not in self.transmitter_config:
print(f"无效变送器ID: {transmitter_id}")
return None
config = self.transmitter_config[transmitter_id]
if not self.relay_modbus_client.connect():
print("无法连接网络继电器Modbus服务")
return None
result = self.relay_modbus_client.read_holding_registers(
address=config['weight_register'],
count=config['register_count'],
slave=config['slave_id']#转发给哪台变送器
)
if isinstance(result, Exception):
print(f"读取变送器 {transmitter_id} 失败: {result}")
return None
# 根据图片示例,正确解析数据
if config['register_count'] == 2:#读两个寄存器
# 获取原始字节数组
raw_data = result.registers
# 组合成32位整数
weight = (raw_data[0] << 16) + raw_data[1]
weight = weight / 1000.0 # 单位转换为千克
elif config['register_count'] == 1:
weight = float(result.registers[0])
else:
print(f"不支持的寄存器数量: {config['register_count']}")
return None
print(f"变送器 {transmitter_id} 读取重量: {weight}kg")
return weight
except ModbusException as e:
print(f"Modbus通信错误: {e}")
return None
except Exception as e:
print(f"数据解析错误: {e}")
return None
finally:
self.relay_modbus_client.close()
def set_inverter_frequency_via_relay(self, frequency):
"""设置变频器频率"""
try:
if not self.relay_modbus_client.connect():
print("无法连接网络继电器Modbus服务")
return False
# 使用最大频率变量计算百分比
percentage = frequency / self.inverter_max_frequency # 得到 0~1 的比例
value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数
# 限制范围
value = max(-10000, min(10000, value))
result = self.relay_modbus_client.write_register(
self.inverter_config['frequency_register'],
value,
slave=self.inverter_config['slave_id']
)
if isinstance(result, Exception):
print(f"设置频率失败: {result}")
return False
print(f"设置变频器频率为 {frequency}Hz")
return True
except ModbusException as e:
print(f"变频器Modbus通信错误: {e}")
return False
finally:
self.relay_modbus_client.close()
def control_inverter_via_relay(self, action):
try:
if not self.relay_modbus_client.connect():
print("无法连接网络继电器Modbus服务")
return False
if action == 'start':
result = self.relay_modbus_client.write_register(
address=self.inverter_config['start_register'],
value=self.inverter_config['start_command'],
slave=self.inverter_config['slave_id']
)
print("启动变频器")
elif action == 'stop':
result = self.relay_modbus_client.write_register(
address=self.inverter_config['start_register'],
value=self.inverter_config['stop_command'],
slave=self.inverter_config['slave_id']
)
print("停止变频器")
else:
print(f"无效操作: {action}")
return False
if isinstance(result, Exception):
print(f"控制失败: {result}")
return False
return True
except ModbusException as e:
print(f"变频器控制错误: {e}")
return False
finally:
self.relay_modbus_client.close()
def check_upper_material_request(self):
"""检查是否需要要料"""
current_weight = self.read_transmitter_data_via_relay(1)
if current_weight is None:
self.upper_weight_error_count += 1
print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}")
if self.upper_weight_error_count >= self.max_error_count:
print("警告:上料斗传感器连续读取失败,请检查连接")
return False
self.upper_weight_error_count = 0
# 判断是否需要要料:当前重量 < 目标重量 + 缓冲重量
if current_weight < (self.single_batch_weight + self.min_required_weight):
print("上料斗重量不足,通知搅拌楼要料")
self.request_material_from_mixing_building() # 请求搅拌楼下料
return True
return False
def request_material_from_mixing_building(self):
"""
请求搅拌楼下料(待完善)
TODO: 与同事对接通信协议
"""
print("发送要料请求至搅拌楼...")
self.return_upper_door_to_default()
# 这里需要与同事对接具体的通信方式
# 可能是Modbus写寄存器、TCP通信、HTTP请求等
pass
def wait_for_mixing_building_material(self):
"""
等待搅拌楼下料完成(待完善)
TODO: 与同事对接信号接收
"""
print("等待搅拌楼下料完成...")
# 这里需要与同事对接具体的信号接收方式
# 可能是Modbus读寄存器、TCP通信、HTTP请求等
# 模拟等待
time.sleep(5)
print("搅拌楼下料完成")
self.move_upper_door_over_lower()
return True
def move_upper_door_over_lower(self):
"""移动上料斗到下料斗上方"""
print("移动上料斗到下料斗上方")
self.control_relay(self.DOOR_UPPER, 'open')
self.upper_door_position = 'over_lower'
def return_upper_door(self):
"""返回上料斗到搅拌楼"""
print("上料斗返回搅拌楼")
self.control_relay(self.DOOR_UPPER, 'close')
self.upper_door_position = 'returning'
def return_upper_door_to_default(self):
"""上料斗回到默认位置(搅拌楼下接料位置)"""
print("上料斗回到默认位置")
self.control_relay(self.DOOR_UPPER, 'close')
self.upper_door_position = 'default'
def start_lower_feeding(self):
"""开始分步下料"""
if self.lower_feeding_stage != 0:
print("下料已在进行中")
return
# 检查关键设备是否可连接
if not self._check_device_connectivity():
print("关键设备连接失败,无法开始下料")
return
print("开始分步下料过程")
# 重置计数器
self.lower_feeding_cycle = 0 # 用于记录三阶段下料次数
self.upper_feeding_count = 0 # 用于记录上料次数
# 第一次上料总共需要上料2次
self.transfer_material_from_upper_to_lower()
# 等待模具车对齐并开始第一轮下料
self.lower_feeding_stage = 4 # 从等待模具车对齐开始
self.wait_for_vehicle_alignment()
def _check_device_connectivity(self):
"""检查关键设备连接状态"""
try:
# 检查网络继电器连接
test_response = self.send_relay_command(self.read_status_command)
if not test_response:
print("网络继电器连接失败")
return False
# 检查变频器连接
if not self.relay_modbus_client.connect():
print("无法连接到网络继电器Modbus服务")
return False
# 尝试读取变频器一个寄存器(测试连接)
test_result = self.relay_modbus_client.read_holding_registers(
address=0x00,
count=1,
slave=self.inverter_config['slave_id']
)
if isinstance(test_result, Exception):
print("变频器连接测试失败")
return False
# 检查下料斗变送器连接
test_weight = self.read_transmitter_data_via_relay(2)
if test_weight is None:
print("下料斗变送器连接失败")
return False
self.relay_modbus_client.close()
return True
except Exception as e:
print(f"设备连接检查失败: {e}")
return False
def transfer_material_from_upper_to_lower(self):
"""上料斗向下料斗下料(基于上料斗重量传感器控制)"""
print(f"上料斗向下料斗下料 (第 {self.upper_feeding_count + 1} 次)")
# 记录下料前的重量
initial_upper_weight = self.read_transmitter_data_via_relay(1)
# 如果无法读取重量,直接报错
if initial_upper_weight is None:
raise Exception("无法读取上料斗重量传感器数据,下料操作终止")
target_upper_weight = initial_upper_weight - self.single_batch_weight
target_upper_weight = max(target_upper_weight, 0) # 确保不低于0
print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg")
# 确保下料斗出砼门关闭
self.control_relay(self.DOOR_LOWER_2, 'close')
# 打开上料斗出砼门
self.control_relay(self.DOOR_LOWER_1, 'open')
# 等待物料流入下料斗,基于上料斗重量变化控制
start_time = time.time()
timeout = 30 # 30秒超时
while time.time() - start_time < timeout:
current_upper_weight = self.read_transmitter_data_via_relay(1)
# 如果无法读取重量,继续尝试
if current_upper_weight is None:
print("无法读取上料斗重量,继续尝试...")
time.sleep(1)
continue
print(f"上料斗当前重量: {current_upper_weight:.2f}kg")
# 如果达到目标重量,则关闭上料斗出砼门
if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围
print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg")
break
elif time.time() - start_time > 25: # 如果25秒后重量变化过小
weight_change = initial_upper_weight - current_upper_weight
if weight_change < 100: # 如果重量变化小于100kg
print("重量变化过小,可能存在堵塞,交由监控系统处理...")
# 不再在这里直接处理破拱,而是依靠监控系统处理
break
time.sleep(1)
# 关闭上料斗出砼门
self.control_relay(self.DOOR_LOWER_1, 'close')
# 验证下料结果
final_upper_weight = self.read_transmitter_data_via_relay(1)
if final_upper_weight is not None:
actual_transferred = initial_upper_weight - final_upper_weight
print(f"实际下料重量: {actual_transferred:.2f}kg")
# 增加上料计数
self.upper_feeding_count += 1
print("上料斗下料完成")
# 关闭上料斗出砼门
self.control_relay(self.DOOR_LOWER_1, 'close')
# 验证下料结果
final_upper_weight = self.read_transmitter_data_via_relay(1)
if final_upper_weight is not None:
actual_transferred = initial_upper_weight - final_upper_weight
print(f"实际下料重量: {actual_transferred:.2f}kg")
# 增加上料计数
self.upper_feeding_count += 1
print("上料斗下料完成")
def wait_for_vehicle_alignment(self):
"""等待模具车对齐"""
print("等待模具车对齐...")
self.lower_feeding_stage = 4
while self.lower_feeding_stage == 4 and self._running:
if self.vehicle_aligned:
print("模具车已对齐,开始下料")
self.lower_feeding_stage = 1
self.feeding_stage_one()
break
time.sleep(self.alignment_check_interval)
def feeding_stage_one(self):
"""第一阶段下料:下料斗向模具车下料(低速)"""
print("开始第一阶段下料:下料斗低速下料")
self.set_inverter_frequency_via_relay(self.frequencies[0])
self.control_inverter_via_relay('start')
# 确保上料斗出砼门关闭
self.control_relay(self.DOOR_LOWER_1, 'close')
# 打开下料斗出砼门
self.control_relay(self.DOOR_LOWER_2, 'open')
start_time = time.time()
initial_weight = self.read_transmitter_data_via_relay(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process() # 直接结束整个流程,而不是当前批次
return
target_weight = initial_weight + self.single_batch_weight
while self.lower_feeding_stage == 1:
current_weight = self.read_transmitter_data_via_relay(2)
if current_weight is None:
self.lower_weight_error_count += 1
if self.lower_weight_error_count >= self.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process() # 直接结束整个流程
return
else:
self.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.lower_feeding_stage = 2
self.feeding_stage_two()
break
time.sleep(2)
def feeding_stage_two(self):
"""第二阶段下料:下料斗向模具车下料(中速)"""
print("开始第二阶段下料:下料斗中速下料")
self.set_inverter_frequency_via_relay(self.frequencies[1])
# 保持下料斗出砼门打开
self.control_relay(self.DOOR_LOWER_2, 'open')
# 确保上料斗出砼门关闭
self.control_relay(self.DOOR_LOWER_1, 'close')
start_time = time.time()
initial_weight = self.read_transmitter_data_via_relay(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process() # 直接结束整个流程
return
target_weight = initial_weight + self.single_batch_weight
while self.lower_feeding_stage == 2:
current_weight = self.read_transmitter_data_via_relay(2)
if current_weight is None:
self.lower_weight_error_count += 1
if self.lower_weight_error_count >= self.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process() # 直接结束整个流程
return
else:
self.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.lower_feeding_stage = 3
self.feeding_stage_three()
break
time.sleep(2)
def feeding_stage_three(self):
"""第三阶段下料:下料斗向模具车下料(高速)"""
print("开始第三阶段下料:下料斗高速下料")
self.set_inverter_frequency_via_relay(self.frequencies[2])
# 保持下料斗出砼门打开
self.control_relay(self.DOOR_LOWER_2, 'open')
# 确保上料斗出砼门关闭
self.control_relay(self.DOOR_LOWER_1, 'close')
start_time = time.time()
initial_weight = self.read_transmitter_data_via_relay(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process() # 直接结束整个流程
return
target_weight = initial_weight + self.single_batch_weight
while self.lower_feeding_stage == 3:
current_weight = self.read_transmitter_data_via_relay(2)
if current_weight is None:
self.lower_weight_error_count += 1
if self.lower_weight_error_count >= self.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process() # 直接结束整个流程
return
else:
self.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.lower_feeding_stage = 4
self.finish_current_batch()
break
time.sleep(2)
def finish_current_batch(self):
"""完成当前批次下料"""
print("当前批次下料完成,关闭出砼门")
self.control_inverter_via_relay('stop')
self.control_relay(self.DOOR_LOWER_1, 'close')
self.control_relay(self.DOOR_LOWER_2, 'close')
# 增加三阶段下料轮次计数
self.lower_feeding_cycle += 1
# 检查是否完成两轮三阶段下料总共5吨
if self.lower_feeding_cycle >= 2:
# 完成整个5吨下料任务
print("完成两轮三阶段下料5吨下料任务完成")
self.finish_feeding_process()
return
# 如果只完成一轮三阶段下料,进行第二次上料
print("第一轮三阶段下料完成,准备第二次上料")
# 上料斗第二次向下料斗下料
try:
self.transfer_material_from_upper_to_lower()
except Exception as e:
print(f"第二次上料失败: {e}")
print("停止下料流程")
self.finish_feeding_process() # 出现严重错误时结束整个流程
return
# 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车)
print("第二次上料完成,继续三阶段下料")
self.lower_feeding_stage = 1 # 直接进入第一阶段下料
self.feeding_stage_one() # 开始第二轮第一阶段下料
def finish_feeding_process(self):
"""完成整个下料流程"""
print("整个下料流程完成")
self.lower_feeding_stage = 0
self.lower_feeding_cycle = 0
self.upper_feeding_count = 0
self.return_upper_door_to_default()
def handle_overflow_control(self, overflow_detected, door_opening_large):
"""处理溢料控制"""
if overflow_detected and door_opening_large:
print("检测到溢料且出砼门开口较大,调小出砼门")
self.control_relay(self.DOOR_LOWER_1, 'close')
time.sleep(0.1)
self.control_relay(self.DOOR_LOWER_1, 'open')
time.sleep(0.1)
def is_lower_door_open(self):
"""检查出砼门是否打开"""
return self.lower_feeding_stage in [1, 2] # 只有在下料阶段才认为门是打开的
def check_arch_blocking(self):
"""检查是否需要破拱"""
current_time = time.time()
# 检查下料斗破拱(只有在下料过程中才检查)
if self.lower_feeding_stage in [1, 2, 3]: # 在所有下料阶段检查
lower_weight = self.read_transmitter_data_via_relay(2)
if lower_weight is not None:
# 检查重量变化是否过慢小于0.1kg变化且时间超过10秒
if (abs(lower_weight - self.last_lower_weight) < 0.1) and \
(current_time - self.last_weight_time) > 10:
print("下料斗可能堵塞,启动破拱")
self.control_relay(self.BREAK_ARCH_LOWER, 'open')
time.sleep(2)
self.control_relay(self.BREAK_ARCH_LOWER, 'close')
self.last_lower_weight = lower_weight
# 检查上料斗破拱(在上料斗向下料斗下料时检查)
if (self.upper_door_position == 'over_lower' and
self.lower_feeding_stage in [0, 1, 2, 3, 4]): # 在任何阶段都可能需要上料斗破拱
upper_weight = self.read_transmitter_data_via_relay(1)
if upper_weight is not None:
# 检查重量变化是否过慢小于0.1kg变化且时间超过10秒
if (abs(upper_weight - self.last_upper_weight) < 0.1) and \
(current_time - self.last_weight_time) > 10:
print("上料斗可能堵塞,启动破拱")
self.control_relay(self.BREAK_ARCH_UPPER, 'open')
time.sleep(2)
self.control_relay(self.BREAK_ARCH_UPPER, 'close')
self.last_upper_weight = upper_weight
# 更新最后读取时间
if (self.read_transmitter_data_via_relay(1) is not None or
self.read_transmitter_data_via_relay(2) is not None):
self.last_weight_time = current_time
def monitor_system(self):
"""监控系统状态"""
while self._running:
try:
self.check_upper_material_request()
self.check_arch_blocking()
time.sleep(1)
except Exception as e:
print(f"监控线程错误: {e}")
def setup_camera_capture(self, camera_index=0):
"""
设置摄像头捕获
:param camera_index: USB摄像头索引或IP摄像头配置
"""
try:
rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01"
self.camera = cv2.VideoCapture(rtsp_url)
if not self.camera.isOpened():
print(f"无法打开网络摄像头: {rtsp_url}")
return False
print(f"网络摄像头初始化成功,地址: {rtsp_url}")
return True
except Exception as e:
print(f"摄像头设置失败: {e}")
return False
def capture_current_frame(self):
"""捕获当前帧并返回numpy数组"""
try:
if self.camera is None:
print("摄像头未初始化")
return None
ret, frame = self.camera.read()
if ret:
return frame
else:
print("无法捕获图像帧")
return None
except Exception as e:
print(f"图像捕获失败: {e}")
return None
def detect_overflow_from_image(self, image_array):
"""通过图像检测是否溢料接受numpy数组"""
try:
# 检查模型是否已加载
if self.overflow_model is None:
print("堆料检测模型未加载")
return False
rois = load_global_rois(self.roi_file_path)
if not rois:
print("没有有效的ROI配置")
return False
if image_array is None:
print("输入图像为空")
return False
crops = crop_and_resize(image_array, rois, 640)
for roi_resized, _ in crops:
final_class, _, _, _ = classify_image_weighted(roi_resized, self.overflow_model, threshold=0.4)
if "大堆料" in final_class or "小堆料" in final_class:
return True
return False
except Exception as e:
print(f"溢料检测失败: {e}")
return False
def detect_vehicle_alignment(self, image_array):
"""通过图像检测模具车是否对齐接受numpy数组"""
try:
# 检查模型是否已加载
if self.alignment_model is None:
print("对齐检测模型未加载")
return False
if image_array is None:
print("输入图像为空")
return False
# 直接使用模型进行推理
results = self.alignment_model(image_array)
pared_probs = results[0].probs.data.cpu().numpy().flatten()
# 类别0: 未对齐, 类别1: 对齐
class_id = int(pared_probs.argmax())
confidence = float(pared_probs[class_id])
# 只有当对齐且置信度>95%时才认为对齐
if class_id == 1 and confidence > 0.95:
return True
return False
except Exception as e:
print(f"对齐检测失败: {e}")
return False
def get_current_door_angle(self, image=None, image_path=None):
"""
通过视觉系统获取当前出砼门角度
:param image: 图像数组numpy array
:param image_path: 图片路径
"""
try:
# 检查模型是否已加载
if self.angle_model is None:
print("夹角检测模型未加载")
return None
angle_deg, _ = predict_obb_best_angle(
model=self.angle_model, # 传递预加载的模型实例
image=image, # 传递图像数组
image_path=image_path # 或传递图像路径
)
return angle_deg
except Exception as e:
print(f"角度检测失败: {e}")
return None
def alignment_check_loop(self):
"""
模具车对齐检查循环
"""
while self._running:
try:
# 只在需要检查对齐时才检查
if self.lower_feeding_stage == 4:
current_frame = self.capture_current_frame()
if current_frame is not None:
self.vehicle_aligned = self.detect_vehicle_alignment(current_frame)
if self.vehicle_aligned:
print("检测到模具车对齐")
else:
print("模具车未对齐")
time.sleep(self.alignment_check_interval)
except Exception as e:
print(f"对齐检查循环错误: {e}")
time.sleep(self.alignment_check_interval)
def visual_control_loop(self):
"""
视觉控制主循环
"""
while self._running and self.visual_control_enabled:
try:
current_frame = self.capture_current_frame()
if current_frame is None:
print("无法获取当前图像,跳过本次调整")
time.sleep(self.visual_check_interval)
continue
# 检测是否溢料
overflow = self.detect_overflow_from_image(current_frame)
# 获取当前角度
current_angle = self.get_current_door_angle(image=current_frame)
if current_angle is None:
print("无法获取当前角度,跳过本次调整")
time.sleep(self.visual_check_interval)
continue
print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.angle_control_mode}")
# 状态机控制逻辑
if self.angle_control_mode == "normal":
# 正常模式
if overflow and current_angle > self.angle_threshold:
# 检测到堆料且角度过大,进入角度减小模式
print("检测到堆料且角度过大,关闭出砼门开始减小角度")
self.control_relay(self.DOOR_LOWER_2, 'close')
self.angle_control_mode = "reducing"
else:
# 保持正常开门
self.control_relay(self.DOOR_LOWER_2, 'open')
elif self.angle_control_mode == "reducing":
# 角度减小模式
if current_angle <= self.target_angle + self.angle_tolerance:
# 角度已达到目标范围
if overflow:
# 仍有堆料,进入维持模式
print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式")
self.angle_control_mode = "maintaining"
self.control_relay(self.DOOR_LOWER_2, 'open') # 先打开门
else:
# 无堆料,恢复正常模式
print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式")
self.control_relay(self.DOOR_LOWER_2, 'open')
self.angle_control_mode = "recovery"
elif self.angle_control_mode == "maintaining":
# 维持模式 - 使用脉冲控制
if not overflow:
# 堆料已消除,恢复正常模式
print("堆料已消除,恢复正常模式")
self.control_relay(self.DOOR_LOWER_2, 'open')
self.angle_control_mode = "recovery"
else:
# 继续维持角度控制
self.pulse_control_door_for_maintaining()
elif self.angle_control_mode == "recovery":#打开夹爪的过程中又堆料了
# 恢复模式 - 逐步打开门
if overflow:
# 又出现堆料,回到角度减小模式
print("恢复过程中又检测到堆料,回到角度减小模式")
self.angle_control_mode = "maintaining"
else:
# 堆料已消除,恢复正常模式
print("堆料已消除,恢复正常模式")
self.control_relay(self.DOOR_LOWER_2, 'open')
self.angle_control_mode = "normal"
self.last_angle = current_angle
time.sleep(self.visual_check_interval)
except Exception as e:
print(f"视觉控制循环错误: {e}")
time.sleep(self.visual_check_interval)
def pulse_control_door_for_maintaining(self):
"""
用于维持模式的脉冲控制
保持角度在目标范围内
"""
print("执行维持脉冲控制")
# 关门1秒
self.control_relay(self.DOOR_LOWER_2, 'close')
time.sleep(1.0)
# 开门1秒
self.control_relay(self.DOOR_LOWER_2, 'open')
time.sleep(1.0)
def start_visual_control(self):
"""
启动视觉控制线程
"""
if not self.visual_control_enabled:
print("视觉控制未启用")
return
print("启动视觉控制线程")
self._visual_control_thread = threading.Thread(
target=self.visual_control_loop,
daemon=True
)
self._visual_control_thread.start()
return self._visual_control_thread
def start_alignment_check(self):
"""
启动模具车对齐检查线程
"""
print("启动模具车对齐检查线程")
self._alignment_check_thread = threading.Thread(
target=self.alignment_check_loop,
daemon=True
)
self._alignment_check_thread.start()
return self._alignment_check_thread
def start(self):
"""启动系统"""
if self._running:
print("系统已在运行")
return
print("启动控制系统")
self._running = True
self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True)
self._monitor_thread.start()
def stop(self):
"""停止系统"""
if not self._running:
print("系统未在运行")
return
print("停止控制系统")
self._running = False
if self._monitor_thread is not None:
self._monitor_thread.join()
if self._visual_control_thread is not None:
self._visual_control_thread.join()
if self._alignment_check_thread is not None:
self._alignment_check_thread.join()
if self.camera is not None:
self.camera.release()
print("控制系统已停止")
# 使用示例
if __name__ == "__main__":
system = FeedingControlSystem(relay_host='192.168.0.18', relay_port=50000)
# 设置角度控制参数
system.set_angle_parameters(
target_angle=20.0,
min_angle=10.0,
max_angle=80.0,
threshold=60.0
)
# 设置下料参数
system.set_feeding_parameters(
target_vehicle_weight=5000, # 5吨
upper_buffer_weight=500, # 0.5吨缓冲
single_batch_weight=2500 # 每次下2.5吨
)
# 设置摄像头配置
system.set_camera_config(
camera_type="ip",
ip="192.168.1.51",
port=554,
username="admin",
password="XJ123456",
channel=1
)
# 初始化摄像头
if not system.setup_camera_capture():
print("摄像头初始化失败")
exit(1)
# 加载所有模型
if not system.load_all_models():
print("模型加载失败")
exit(1)
# 启动系统监控
system.start()
# 启动视觉控制
system.start_visual_control()
# 启动对齐检查
system.start_alignment_check()
print("系统准备就绪5秒后开始下料...")
time.sleep(5)
system.start_lower_feeding() # 启动下料流程
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("收到停止信号")
except Exception as e:
print(f"系统错误: {e}")
finally:
system.stop()