重构目录结构:调整项目布局
This commit is contained in:
0
config/__init__.py
Normal file
0
config/__init__.py
Normal file
BIN
config/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
config/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
config/__pycache__/settings.cpython-39.pyc
Normal file
BIN
config/__pycache__/settings.cpython-39.pyc
Normal file
Binary file not shown.
51
config/settings.py
Normal file
51
config/settings.py
Normal file
@ -0,0 +1,51 @@
|
||||
# config/settings.py
|
||||
import os
|
||||
|
||||
|
||||
class Settings:
|
||||
def __init__(self):
|
||||
# 项目根目录
|
||||
self.project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
# 网络继电器配置
|
||||
self.relay_host = '192.168.0.18'
|
||||
self.relay_port = 50000
|
||||
|
||||
# 摄像头配置
|
||||
self.camera_type = "ip"
|
||||
self.camera_ip = "192.168.1.51"
|
||||
self.camera_port = 554
|
||||
self.camera_username = "admin"
|
||||
self.camera_password = "XJ123456"
|
||||
self.camera_channel = 1
|
||||
|
||||
# 下料控制参数
|
||||
self.min_required_weight = 500 # 模具车最小需要重量(kg)
|
||||
self.target_vehicle_weight = 5000 # 目标模具车重量(kg)
|
||||
self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg)
|
||||
self.single_batch_weight = 2500 # 单次下料重量(kg)
|
||||
|
||||
# 角度控制参数
|
||||
self.target_angle = 20.0 # 目标角度
|
||||
self.min_angle = 10.0 # 最小角度
|
||||
self.max_angle = 80.0 # 最大角度
|
||||
self.angle_threshold = 60.0 # 角度阈值
|
||||
self.angle_tolerance = 5.0 # 角度容差
|
||||
|
||||
# 变频器配置
|
||||
self.inverter_max_frequency = 400.0 # 频率最大值
|
||||
self.frequencies = [220.0, 230.0, 240.0] # 下料阶段频率(Hz)
|
||||
|
||||
# 模型路径配置
|
||||
self.models_dir = os.path.join(self.project_root, 'vision', 'models')
|
||||
self.angle_model_path = os.path.join(self.models_dir, 'angle.pt')
|
||||
self.overflow_model_path = os.path.join(self.models_dir, 'overflow.pt')
|
||||
self.alignment_model_path = os.path.join(self.models_dir, 'alig.pt')
|
||||
|
||||
# ROI路径配置
|
||||
self.roi_file_path = os.path.join(self.project_root, 'vision', 'roi_coordinates', '1_rois.txt')
|
||||
|
||||
# 系统控制参数
|
||||
self.visual_check_interval = 1.0 # 视觉检查间隔(秒)
|
||||
self.alignment_check_interval = 0.5 # 对齐检查间隔(秒)
|
||||
self.max_error_count = 3 # 最大错误计数
|
||||
8
core/__init__.py
Normal file
8
core/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
"""
|
||||
核心模块
|
||||
包含系统核心控制逻辑和状态管理
|
||||
"""
|
||||
from .system import FeedingControlSystem
|
||||
from .state import SystemState
|
||||
|
||||
__all__ = ['FeedingControlSystem', 'SystemState']
|
||||
BIN
core/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
core/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
core/__pycache__/state.cpython-39.pyc
Normal file
BIN
core/__pycache__/state.cpython-39.pyc
Normal file
Binary file not shown.
BIN
core/__pycache__/system.cpython-39.pyc
Normal file
BIN
core/__pycache__/system.cpython-39.pyc
Normal file
Binary file not shown.
1114
core/feeding_system.py
Normal file
1114
core/feeding_system.py
Normal file
File diff suppressed because it is too large
Load Diff
27
core/state.py
Normal file
27
core/state.py
Normal file
@ -0,0 +1,27 @@
|
||||
# core/state.py
|
||||
class SystemState:
|
||||
def __init__(self):
|
||||
# 系统运行状态
|
||||
self.running = False
|
||||
|
||||
# 下料控制相关
|
||||
self.upper_door_position = 'default' # default(在搅拌楼下接料), over_lower(在下料斗上方), returning(返回中)
|
||||
self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:第三阶段, 4:等待模具车对齐
|
||||
self.lower_feeding_cycle = 0 # 下料斗下料循环次数
|
||||
self.upper_feeding_count = 0 # 上料斗已下料次数
|
||||
|
||||
# 重量相关
|
||||
self.last_upper_weight = 0
|
||||
self.last_lower_weight = 0
|
||||
self.last_weight_time = 0
|
||||
|
||||
# 错误计数
|
||||
self.upper_weight_error_count = 0
|
||||
self.lower_weight_error_count = 0
|
||||
|
||||
# 视觉系统状态
|
||||
self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery
|
||||
self.overflow_detected = False # 堆料检测
|
||||
self.door_opening_large = False # 夹角
|
||||
self.vehicle_aligned = False # 模具车是否对齐
|
||||
self.last_angle = None # 上次检测角度
|
||||
166
core/system.py
Normal file
166
core/system.py
Normal file
@ -0,0 +1,166 @@
|
||||
# core/system.py
|
||||
import threading
|
||||
import time
|
||||
import cv2
|
||||
from config.settings import Settings
|
||||
from core.state import SystemState
|
||||
from hardware.relay import RelayController
|
||||
from hardware.inverter import InverterController
|
||||
from hardware.transmitter import TransmitterController
|
||||
from vision.camera import CameraController
|
||||
from vision.detector import VisionDetector
|
||||
from feeding.controller import FeedingController
|
||||
|
||||
|
||||
class FeedingControlSystem:
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings = settings
|
||||
self.state = SystemState()
|
||||
|
||||
# 初始化硬件控制器
|
||||
self.relay_controller = RelayController(
|
||||
host=settings.relay_host,
|
||||
port=settings.relay_port
|
||||
)
|
||||
|
||||
self.inverter_controller = InverterController(self.relay_controller)
|
||||
self.transmitter_controller = TransmitterController(self.relay_controller)
|
||||
|
||||
# 初始化视觉系统
|
||||
self.camera_controller = CameraController()
|
||||
self.vision_detector = VisionDetector(settings)
|
||||
|
||||
# 初始化下料控制器
|
||||
self.feeding_controller = FeedingController(
|
||||
self.relay_controller,
|
||||
self.inverter_controller,
|
||||
self.transmitter_controller,
|
||||
self.vision_detector,
|
||||
self.camera_controller,
|
||||
self.state,
|
||||
settings
|
||||
)
|
||||
|
||||
# 线程管理
|
||||
self.monitor_thread = None
|
||||
self.visual_control_thread = None
|
||||
self.alignment_check_thread = None
|
||||
|
||||
def initialize(self):
|
||||
"""初始化系统"""
|
||||
print("初始化控制系统...")
|
||||
|
||||
# 设置摄像头配置
|
||||
self.camera_controller.set_config(
|
||||
camera_type=self.settings.camera_type,
|
||||
ip=self.settings.camera_ip,
|
||||
port=self.settings.camera_port,
|
||||
username=self.settings.camera_username,
|
||||
password=self.settings.camera_password,
|
||||
channel=self.settings.camera_channel
|
||||
)
|
||||
|
||||
# 初始化摄像头
|
||||
if not self.camera_controller.setup_capture():
|
||||
raise Exception("摄像头初始化失败")
|
||||
|
||||
# 加载视觉模型
|
||||
if not self.vision_detector.load_models():
|
||||
raise Exception("视觉模型加载失败")
|
||||
|
||||
# 启动系统监控
|
||||
self.start_monitoring()
|
||||
|
||||
# 启动视觉控制
|
||||
self.start_visual_control()
|
||||
|
||||
# 启动对齐检查
|
||||
self.start_alignment_check()
|
||||
|
||||
print("控制系统初始化完成")
|
||||
|
||||
def start_monitoring(self):
|
||||
"""启动系统监控"""
|
||||
self.state.running = True
|
||||
self.monitor_thread = threading.Thread(
|
||||
target=self._monitor_loop,
|
||||
daemon=True
|
||||
)
|
||||
self.monitor_thread.start()
|
||||
|
||||
def _monitor_loop(self):
|
||||
"""监控循环"""
|
||||
while self.state.running:
|
||||
try:
|
||||
self.feeding_controller.check_upper_material_request()
|
||||
self.feeding_controller.check_arch_blocking()
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
print(f"监控线程错误: {e}")
|
||||
|
||||
def start_visual_control(self):
|
||||
"""启动视觉控制"""
|
||||
self.visual_control_thread = threading.Thread(
|
||||
target=self._visual_control_loop,
|
||||
daemon=True
|
||||
)
|
||||
self.visual_control_thread.start()
|
||||
|
||||
def _visual_control_loop(self):
|
||||
"""视觉控制循环"""
|
||||
while self.state.running:
|
||||
try:
|
||||
current_frame = self.camera_controller.capture_frame()
|
||||
if current_frame is not None:
|
||||
# 执行视觉控制逻辑
|
||||
self.feeding_controller.visual_control(current_frame)
|
||||
time.sleep(self.settings.visual_check_interval)
|
||||
except Exception as e:
|
||||
print(f"视觉控制循环错误: {e}")
|
||||
time.sleep(self.settings.visual_check_interval)
|
||||
|
||||
def start_alignment_check(self):
|
||||
"""启动对齐检查"""
|
||||
self.alignment_check_thread = threading.Thread(
|
||||
target=self._alignment_check_loop,
|
||||
daemon=True
|
||||
)
|
||||
self.alignment_check_thread.start()
|
||||
|
||||
def _alignment_check_loop(self):
|
||||
"""对齐检查循环"""
|
||||
while self.state.running:
|
||||
try:
|
||||
if self.state.lower_feeding_stage == 4: # 等待对齐阶段
|
||||
current_frame = self.camera_controller.capture_frame()
|
||||
if current_frame is not None:
|
||||
self.state.vehicle_aligned = self.vision_detector.detect_vehicle_alignment(current_frame)
|
||||
if self.state.vehicle_aligned:
|
||||
print("检测到模具车对齐")
|
||||
else:
|
||||
print("模具车未对齐")
|
||||
time.sleep(self.settings.alignment_check_interval)
|
||||
except Exception as e:
|
||||
print(f"对齐检查循环错误: {e}")
|
||||
time.sleep(self.settings.alignment_check_interval)
|
||||
|
||||
def start_lower_feeding(self):
|
||||
"""启动下料流程"""
|
||||
self.feeding_controller.start_feeding()
|
||||
|
||||
def stop(self):
|
||||
"""停止系统"""
|
||||
print("停止控制系统...")
|
||||
self.state.running = False
|
||||
|
||||
# 等待线程结束
|
||||
if self.monitor_thread:
|
||||
self.monitor_thread.join()
|
||||
if self.visual_control_thread:
|
||||
self.visual_control_thread.join()
|
||||
if self.alignment_check_thread:
|
||||
self.alignment_check_thread.join()
|
||||
|
||||
# 释放摄像头资源
|
||||
self.camera_controller.release()
|
||||
print("控制系统已停止")
|
||||
9
feeding/__init__.py
Normal file
9
feeding/__init__.py
Normal file
@ -0,0 +1,9 @@
|
||||
# feeding/__init__.py
|
||||
"""
|
||||
下料控制模块
|
||||
包含下料流程控制和管理
|
||||
"""
|
||||
from .process import FeedingProcess
|
||||
from .controller import FeedingController
|
||||
|
||||
__all__ = ['FeedingProcess', 'FeedingController']
|
||||
BIN
feeding/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
feeding/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
feeding/__pycache__/controller.cpython-39.pyc
Normal file
BIN
feeding/__pycache__/controller.cpython-39.pyc
Normal file
Binary file not shown.
BIN
feeding/__pycache__/process.cpython-39.pyc
Normal file
BIN
feeding/__pycache__/process.cpython-39.pyc
Normal file
Binary file not shown.
175
feeding/controller.py
Normal file
175
feeding/controller.py
Normal file
@ -0,0 +1,175 @@
|
||||
# feeding/controller.py
|
||||
import time
|
||||
from feeding.process import FeedingProcess
|
||||
|
||||
|
||||
class FeedingController:
|
||||
def __init__(self, relay_controller, inverter_controller,
|
||||
transmitter_controller, vision_detector,
|
||||
camera_controller, state, settings):
|
||||
self.relay_controller = relay_controller
|
||||
self.inverter_controller = inverter_controller
|
||||
self.transmitter_controller = transmitter_controller
|
||||
self.vision_detector = vision_detector
|
||||
self.camera_controller = camera_controller
|
||||
self.state = state
|
||||
self.settings = settings
|
||||
|
||||
# 初始化下料流程
|
||||
self.process = FeedingProcess(
|
||||
relay_controller, inverter_controller,
|
||||
transmitter_controller, vision_detector,
|
||||
camera_controller, state, settings
|
||||
)
|
||||
|
||||
def start_feeding(self):
|
||||
"""启动下料流程"""
|
||||
self.process.start_feeding()
|
||||
|
||||
def check_upper_material_request(self):
|
||||
"""检查是否需要要料"""
|
||||
current_weight = self.transmitter_controller.read_data(1)
|
||||
|
||||
if current_weight is None:
|
||||
self.state.upper_weight_error_count += 1
|
||||
print(f"上料斗重量读取失败,错误计数: {self.state.upper_weight_error_count}")
|
||||
if self.state.upper_weight_error_count >= self.settings.max_error_count:
|
||||
print("警告:上料斗传感器连续读取失败,请检查连接")
|
||||
return False
|
||||
|
||||
self.state.upper_weight_error_count = 0
|
||||
# 判断是否需要要料:当前重量 < 目标重量 + 缓冲重量
|
||||
if current_weight < (self.settings.single_batch_weight + self.settings.min_required_weight):
|
||||
print("上料斗重量不足,通知搅拌楼要料")
|
||||
self.request_material_from_mixing_building() # 请求搅拌楼下料
|
||||
return True
|
||||
return False
|
||||
|
||||
def request_material_from_mixing_building(self):
|
||||
"""
|
||||
请求搅拌楼下料
|
||||
"""
|
||||
print("发送要料请求至搅拌楼...")
|
||||
self.process.return_upper_door_to_default()
|
||||
# 这里需要与同事对接具体的通信方式
|
||||
# 可能是Modbus写寄存器、TCP通信、HTTP请求等
|
||||
pass
|
||||
|
||||
def check_arch_blocking(self):
|
||||
"""检查是否需要破拱"""
|
||||
current_time = time.time()
|
||||
|
||||
# 检查下料斗破拱(只有在下料过程中才检查)
|
||||
if self.state.lower_feeding_stage in [1, 2, 3]: # 在所有下料阶段检查
|
||||
lower_weight = self.transmitter_controller.read_data(2)
|
||||
if lower_weight is not None:
|
||||
# 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒)
|
||||
if (abs(lower_weight - self.state.last_lower_weight) < 0.1) and \
|
||||
(current_time - self.state.last_weight_time) > 10:
|
||||
print("下料斗可能堵塞,启动破拱")
|
||||
self.relay_controller.control(self.relay_controller.BREAK_ARCH_LOWER, 'open')
|
||||
time.sleep(2)
|
||||
self.relay_controller.control(self.relay_controller.BREAK_ARCH_LOWER, 'close')
|
||||
|
||||
self.state.last_lower_weight = lower_weight
|
||||
|
||||
# 检查上料斗破拱(在上料斗向下料斗下料时检查)
|
||||
if (self.state.upper_door_position == 'over_lower' and
|
||||
self.state.lower_feeding_stage in [0, 1, 2, 3, 4]): # 在任何阶段都可能需要上料斗破拱
|
||||
upper_weight = self.transmitter_controller.read_data(1)
|
||||
if upper_weight is not None:
|
||||
# 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒)
|
||||
if (abs(upper_weight - self.state.last_upper_weight) < 0.1) and \
|
||||
(current_time - self.state.last_weight_time) > 10:
|
||||
print("上料斗可能堵塞,启动破拱")
|
||||
self.relay_controller.control(self.relay_controller.BREAK_ARCH_UPPER, 'open')
|
||||
time.sleep(2)
|
||||
self.relay_controller.control(self.relay_controller.BREAK_ARCH_UPPER, 'close')
|
||||
|
||||
self.state.last_upper_weight = upper_weight
|
||||
|
||||
# 更新最后读取时间
|
||||
if (self.transmitter_controller.read_data(1) is not None or
|
||||
self.transmitter_controller.read_data(2) is not None):
|
||||
self.state.last_weight_time = current_time
|
||||
|
||||
def visual_control(self, current_frame):
|
||||
"""
|
||||
视觉控制主逻辑
|
||||
"""
|
||||
# 检测是否溢料
|
||||
overflow = self.vision_detector.detect_overflow(current_frame)
|
||||
|
||||
# 获取当前角度
|
||||
current_angle = self.vision_detector.detect_angle(image=current_frame)
|
||||
|
||||
if current_angle is None:
|
||||
print("无法获取当前角度,跳过本次调整")
|
||||
return
|
||||
|
||||
print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.state.angle_control_mode}")
|
||||
|
||||
# 状态机控制逻辑
|
||||
if self.state.angle_control_mode == "normal":
|
||||
# 正常模式
|
||||
if overflow and current_angle > self.settings.angle_threshold:
|
||||
# 检测到堆料且角度过大,进入角度减小模式
|
||||
print("检测到堆料且角度过大,关闭出砼门开始减小角度")
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
|
||||
self.state.angle_control_mode = "reducing"
|
||||
else:
|
||||
# 保持正常开门
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
|
||||
elif self.state.angle_control_mode == "reducing":
|
||||
# 角度减小模式
|
||||
if current_angle <= self.settings.target_angle + self.settings.angle_tolerance:
|
||||
# 角度已达到目标范围
|
||||
if overflow:
|
||||
# 仍有堆料,进入维持模式
|
||||
print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式")
|
||||
self.state.angle_control_mode = "maintaining"
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open') # 先打开门
|
||||
else:
|
||||
# 无堆料,恢复正常模式
|
||||
print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式")
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
self.state.angle_control_mode = "normal"
|
||||
|
||||
elif self.state.angle_control_mode == "maintaining":
|
||||
# 维持模式 - 使用脉冲控制
|
||||
if not overflow:
|
||||
# 堆料已消除,恢复正常模式
|
||||
print("堆料已消除,恢复正常模式")
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
self.state.angle_control_mode = "normal"
|
||||
else:
|
||||
# 继续维持角度控制
|
||||
self.pulse_control_door_for_maintaining()
|
||||
|
||||
elif self.state.angle_control_mode == "recovery":
|
||||
# 恢复模式 - 逐步打开门
|
||||
if overflow:
|
||||
# 又出现堆料,回到角度减小模式
|
||||
print("恢复过程中又检测到堆料,回到角度减小模式")
|
||||
self.state.angle_control_mode = "maintaining"
|
||||
else:
|
||||
# 堆料已消除,恢复正常模式
|
||||
print("堆料已消除,恢复正常模式")
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
self.state.angle_control_mode = "normal"
|
||||
|
||||
self.state.last_angle = current_angle
|
||||
|
||||
def pulse_control_door_for_maintaining(self):
|
||||
"""
|
||||
用于维持模式的脉冲控制
|
||||
保持角度在目标范围内
|
||||
"""
|
||||
print("执行维持脉冲控制")
|
||||
# 关门1秒
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
|
||||
time.sleep(1.0)
|
||||
# 开门1秒
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
time.sleep(1.0)
|
||||
265
feeding/process.py
Normal file
265
feeding/process.py
Normal file
@ -0,0 +1,265 @@
|
||||
# feeding/process.py
|
||||
class FeedingProcess:
|
||||
def __init__(self, relay_controller, inverter_controller,
|
||||
transmitter_controller, vision_detector,
|
||||
camera_controller, state, settings):
|
||||
self.relay_controller = relay_controller
|
||||
self.inverter_controller = inverter_controller
|
||||
self.transmitter_controller = transmitter_controller
|
||||
self.vision_detector = vision_detector
|
||||
self.camera_controller = camera_controller
|
||||
self.state = state
|
||||
self.settings = settings
|
||||
|
||||
def start_feeding(self):
|
||||
"""开始分步下料"""
|
||||
if self.state.lower_feeding_stage != 0:
|
||||
print("下料已在进行中")
|
||||
return
|
||||
|
||||
print("开始分步下料过程")
|
||||
# 重置计数器
|
||||
self.state.lower_feeding_cycle = 0
|
||||
self.state.upper_feeding_count = 0
|
||||
|
||||
# 第一次上料
|
||||
self.transfer_material_from_upper_to_lower()
|
||||
|
||||
# 等待模具车对齐并开始第一轮下料
|
||||
self.state.lower_feeding_stage = 4
|
||||
self.wait_for_vehicle_alignment()
|
||||
|
||||
def transfer_material_from_upper_to_lower(self):
|
||||
"""上料斗向下料斗下料"""
|
||||
print(f"上料斗向下料斗下料 (第 {self.state.upper_feeding_count + 1} 次)")
|
||||
|
||||
# 记录下料前的重量
|
||||
initial_upper_weight = self.transmitter_controller.read_data(1)
|
||||
|
||||
# 如果无法读取重量,直接报错
|
||||
if initial_upper_weight is None:
|
||||
raise Exception("无法读取上料斗重量传感器数据,下料操作终止")
|
||||
|
||||
target_upper_weight = initial_upper_weight - self.settings.single_batch_weight
|
||||
target_upper_weight = max(target_upper_weight, 0) # 确保不低于0
|
||||
|
||||
print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg")
|
||||
|
||||
# 确保下料斗出砼门关闭
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
|
||||
# 打开上料斗出砼门
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'open')
|
||||
|
||||
# 等待物料流入下料斗,基于上料斗重量变化控制
|
||||
import time
|
||||
start_time = time.time()
|
||||
timeout = 30 # 30秒超时
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
current_upper_weight = self.transmitter_controller.read_data(1)
|
||||
|
||||
# 如果无法读取重量,继续尝试
|
||||
if current_upper_weight is None:
|
||||
print("无法读取上料斗重量,继续尝试...")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
print(f"上料斗当前重量: {current_upper_weight:.2f}kg")
|
||||
|
||||
# 如果达到目标重量,则关闭上料斗出砼门
|
||||
if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围
|
||||
print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg")
|
||||
break
|
||||
elif time.time() - start_time > 25: # 如果25秒后重量变化过小
|
||||
weight_change = initial_upper_weight - current_upper_weight
|
||||
if weight_change < 100: # 如果重量变化小于100kg
|
||||
print("重量变化过小,可能存在堵塞,交由监控系统处理...")
|
||||
break
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# 关闭上料斗出砼门
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
|
||||
|
||||
# 验证下料结果
|
||||
final_upper_weight = self.transmitter_controller.read_data(1)
|
||||
if final_upper_weight is not None:
|
||||
actual_transferred = initial_upper_weight - final_upper_weight
|
||||
print(f"实际下料重量: {actual_transferred:.2f}kg")
|
||||
|
||||
# 增加上料计数
|
||||
self.state.upper_feeding_count += 1
|
||||
print("上料斗下料完成")
|
||||
|
||||
def wait_for_vehicle_alignment(self):
|
||||
"""等待模具车对齐"""
|
||||
print("等待模具车对齐...")
|
||||
self.state.lower_feeding_stage = 4
|
||||
|
||||
import time
|
||||
while self.state.lower_feeding_stage == 4 and self.state.running:
|
||||
if self.state.vehicle_aligned:
|
||||
print("模具车已对齐,开始下料")
|
||||
self.state.lower_feeding_stage = 1
|
||||
self.feeding_stage_one()
|
||||
break
|
||||
time.sleep(self.settings.alignment_check_interval)
|
||||
|
||||
def feeding_stage_one(self):
|
||||
"""第一阶段下料:下料斗向模具车下料(低速)"""
|
||||
print("开始第一阶段下料:下料斗低速下料")
|
||||
self.inverter_controller.set_frequency(self.settings.frequencies[0])
|
||||
self.inverter_controller.control('start')
|
||||
|
||||
# 确保上料斗出砼门关闭
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
|
||||
# 打开下料斗出砼门
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
|
||||
import time
|
||||
start_time = time.time()
|
||||
initial_weight = self.transmitter_controller.read_data(2)
|
||||
if initial_weight is None:
|
||||
print("无法获取初始重量,取消下料")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
|
||||
target_weight = initial_weight + self.settings.single_batch_weight
|
||||
|
||||
while self.state.lower_feeding_stage == 1:
|
||||
current_weight = self.transmitter_controller.read_data(2)
|
||||
if current_weight is None:
|
||||
self.state.lower_weight_error_count += 1
|
||||
if self.state.lower_weight_error_count >= self.settings.max_error_count:
|
||||
print("下料斗传感器连续读取失败,停止下料")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
else:
|
||||
self.state.lower_weight_error_count = 0
|
||||
|
||||
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
|
||||
self.state.lower_feeding_stage = 2
|
||||
self.feeding_stage_two()
|
||||
break
|
||||
time.sleep(2)
|
||||
|
||||
def feeding_stage_two(self):
|
||||
"""第二阶段下料:下料斗向模具车下料(中速)"""
|
||||
print("开始第二阶段下料:下料斗中速下料")
|
||||
self.inverter_controller.set_frequency(self.settings.frequencies[1])
|
||||
|
||||
# 保持下料斗出砼门打开
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
# 确保上料斗出砼门关闭
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
|
||||
|
||||
import time
|
||||
start_time = time.time()
|
||||
initial_weight = self.transmitter_controller.read_data(2)
|
||||
if initial_weight is None:
|
||||
print("无法获取初始重量,取消下料")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
|
||||
target_weight = initial_weight + self.settings.single_batch_weight
|
||||
|
||||
while self.state.lower_feeding_stage == 2:
|
||||
current_weight = self.transmitter_controller.read_data(2)
|
||||
if current_weight is None:
|
||||
self.state.lower_weight_error_count += 1
|
||||
if self.state.lower_weight_error_count >= self.settings.max_error_count:
|
||||
print("下料斗传感器连续读取失败,停止下料")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
else:
|
||||
self.state.lower_weight_error_count = 0
|
||||
|
||||
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
|
||||
self.state.lower_feeding_stage = 3
|
||||
self.feeding_stage_three()
|
||||
break
|
||||
time.sleep(2)
|
||||
|
||||
def feeding_stage_three(self):
|
||||
"""第三阶段下料:下料斗向模具车下料(高速)"""
|
||||
print("开始第三阶段下料:下料斗高速下料")
|
||||
self.inverter_controller.set_frequency(self.settings.frequencies[2])
|
||||
|
||||
# 保持下料斗出砼门打开
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
|
||||
# 确保上料斗出砼门关闭
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
|
||||
|
||||
import time
|
||||
start_time = time.time()
|
||||
initial_weight = self.transmitter_controller.read_data(2)
|
||||
if initial_weight is None:
|
||||
print("无法获取初始重量,取消下料")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
|
||||
target_weight = initial_weight + self.settings.single_batch_weight
|
||||
|
||||
while self.state.lower_feeding_stage == 3:
|
||||
current_weight = self.transmitter_controller.read_data(2)
|
||||
if current_weight is None:
|
||||
self.state.lower_weight_error_count += 1
|
||||
if self.state.lower_weight_error_count >= self.settings.max_error_count:
|
||||
print("下料斗传感器连续读取失败,停止下料")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
else:
|
||||
self.state.lower_weight_error_count = 0
|
||||
|
||||
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
|
||||
self.state.lower_feeding_stage = 4
|
||||
self.finish_current_batch()
|
||||
break
|
||||
time.sleep(2)
|
||||
|
||||
def finish_current_batch(self):
|
||||
"""完成当前批次下料"""
|
||||
print("当前批次下料完成,关闭出砼门")
|
||||
self.inverter_controller.control('stop')
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
|
||||
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
|
||||
|
||||
# 增加三阶段下料轮次计数
|
||||
self.state.lower_feeding_cycle += 1
|
||||
|
||||
# 检查是否完成两轮三阶段下料(总共5吨)
|
||||
if self.state.lower_feeding_cycle >= 2:
|
||||
# 完成整个5吨下料任务
|
||||
print("完成两轮三阶段下料,5吨下料任务完成")
|
||||
self.finish_feeding_process()
|
||||
return
|
||||
|
||||
# 如果只完成一轮三阶段下料,进行第二次上料
|
||||
print("第一轮三阶段下料完成,准备第二次上料")
|
||||
# 上料斗第二次向下料斗下料
|
||||
try:
|
||||
self.transfer_material_from_upper_to_lower()
|
||||
except Exception as e:
|
||||
print(f"第二次上料失败: {e}")
|
||||
print("停止下料流程")
|
||||
self.finish_feeding_process() # 出现严重错误时结束整个流程
|
||||
return
|
||||
|
||||
# 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车)
|
||||
print("第二次上料完成,继续三阶段下料")
|
||||
self.state.lower_feeding_stage = 1 # 直接进入第一阶段下料
|
||||
self.feeding_stage_one() # 开始第二轮第一阶段下料
|
||||
|
||||
def finish_feeding_process(self):
|
||||
"""完成整个下料流程"""
|
||||
print("整个下料流程完成")
|
||||
self.state.lower_feeding_stage = 0
|
||||
self.state.lower_feeding_cycle = 0
|
||||
self.state.upper_feeding_count = 0
|
||||
self.return_upper_door_to_default()
|
||||
|
||||
def return_upper_door_to_default(self):
|
||||
"""上料斗回到默认位置(搅拌楼下接料位置)"""
|
||||
print("上料斗回到默认位置")
|
||||
self.relay_controller.control(self.relay_controller.DOOR_UPPER, 'close')
|
||||
self.state.upper_door_position = 'default'
|
||||
10
hardware/__init__.py
Normal file
10
hardware/__init__.py
Normal file
@ -0,0 +1,10 @@
|
||||
# hardware/__init__.py
|
||||
"""
|
||||
硬件控制模块
|
||||
包含所有硬件设备的控制接口
|
||||
"""
|
||||
from .relay import RelayController
|
||||
from .inverter import InverterController
|
||||
from .transmitter import TransmitterController
|
||||
|
||||
__all__ = ['RelayController', 'InverterController', 'TransmitterController']
|
||||
BIN
hardware/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
hardware/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
hardware/__pycache__/inverter.cpython-39.pyc
Normal file
BIN
hardware/__pycache__/inverter.cpython-39.pyc
Normal file
Binary file not shown.
BIN
hardware/__pycache__/relay.cpython-39.pyc
Normal file
BIN
hardware/__pycache__/relay.cpython-39.pyc
Normal file
Binary file not shown.
BIN
hardware/__pycache__/transmitter.cpython-39.pyc
Normal file
BIN
hardware/__pycache__/transmitter.cpython-39.pyc
Normal file
Binary file not shown.
86
hardware/inverter.py
Normal file
86
hardware/inverter.py
Normal file
@ -0,0 +1,86 @@
|
||||
# hardware/inverter.py
|
||||
from pymodbus.exceptions import ModbusException
|
||||
|
||||
|
||||
class InverterController:
|
||||
def __init__(self, relay_controller):
|
||||
self.relay_controller = relay_controller
|
||||
self.max_frequency = 400.0 # 频率最大值
|
||||
|
||||
# 变频器配置
|
||||
self.config = {
|
||||
'slave_id': 1,
|
||||
'frequency_register': 0x01, # 2001H
|
||||
'start_register': 0x00, # 2000H
|
||||
'stop_register': 0x00, # 2000H(用于停机)
|
||||
'start_command': 0x0013, # 正转点动运行
|
||||
'stop_command': 0x0001 # 停机
|
||||
}
|
||||
|
||||
def set_frequency(self, frequency):
|
||||
"""设置变频器频率"""
|
||||
try:
|
||||
if not self.relay_controller.modbus_client.connect():
|
||||
print("无法连接网络继电器Modbus服务")
|
||||
return False
|
||||
|
||||
# 使用最大频率变量计算百分比
|
||||
percentage = frequency / self.max_frequency # 得到 0~1 的比例
|
||||
value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数
|
||||
|
||||
# 限制范围
|
||||
value = max(-10000, min(10000, value))
|
||||
|
||||
result = self.relay_controller.modbus_client.write_register(
|
||||
self.config['frequency_register'],
|
||||
value,
|
||||
slave=self.config['slave_id']
|
||||
)
|
||||
|
||||
if isinstance(result, Exception):
|
||||
print(f"设置频率失败: {result}")
|
||||
return False
|
||||
|
||||
print(f"设置变频器频率为 {frequency}Hz")
|
||||
return True
|
||||
except ModbusException as e:
|
||||
print(f"变频器Modbus通信错误: {e}")
|
||||
return False
|
||||
finally:
|
||||
self.relay_controller.modbus_client.close()
|
||||
|
||||
def control(self, action):
|
||||
"""控制变频器启停"""
|
||||
try:
|
||||
if not self.relay_controller.modbus_client.connect():
|
||||
print("无法连接网络继电器Modbus服务")
|
||||
return False
|
||||
|
||||
if action == 'start':
|
||||
result = self.relay_controller.modbus_client.write_register(
|
||||
address=self.config['start_register'],
|
||||
value=self.config['start_command'],
|
||||
slave=self.config['slave_id']
|
||||
)
|
||||
print("启动变频器")
|
||||
elif action == 'stop':
|
||||
result = self.relay_controller.modbus_client.write_register(
|
||||
address=self.config['start_register'],
|
||||
value=self.config['stop_command'],
|
||||
slave=self.config['slave_id']
|
||||
)
|
||||
print("停止变频器")
|
||||
else:
|
||||
print(f"无效操作: {action}")
|
||||
return False
|
||||
|
||||
if isinstance(result, Exception):
|
||||
print(f"控制失败: {result}")
|
||||
return False
|
||||
|
||||
return True
|
||||
except ModbusException as e:
|
||||
print(f"变频器控制错误: {e}")
|
||||
return False
|
||||
finally:
|
||||
self.relay_controller.modbus_client.close()
|
||||
77
hardware/relay.py
Normal file
77
hardware/relay.py
Normal file
@ -0,0 +1,77 @@
|
||||
# hardware/relay.py
|
||||
import socket
|
||||
import binascii
|
||||
from pymodbus.client import ModbusTcpClient
|
||||
from pymodbus.exceptions import ModbusException
|
||||
|
||||
|
||||
class RelayController:
|
||||
# 继电器映射
|
||||
DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动
|
||||
DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门
|
||||
DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门
|
||||
BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱
|
||||
BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱
|
||||
|
||||
def __init__(self, host='192.168.0.18', port=50000):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.modbus_client = ModbusTcpClient(host, port=port)
|
||||
|
||||
# 继电器命令(原始Socket)
|
||||
self.relay_commands = {
|
||||
self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
|
||||
self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
|
||||
self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
|
||||
self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
|
||||
self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}
|
||||
}
|
||||
|
||||
# 读取状态命令
|
||||
self.read_status_command = '000000000006010100000008'
|
||||
|
||||
# 设备位映射
|
||||
self.device_bit_map = {
|
||||
self.DOOR_UPPER: 0,
|
||||
self.DOOR_LOWER_1: 1,
|
||||
self.DOOR_LOWER_2: 2,
|
||||
self.BREAK_ARCH_UPPER: 3,
|
||||
self.BREAK_ARCH_LOWER: 4
|
||||
}
|
||||
|
||||
def send_command(self, command_hex):
|
||||
"""发送原始Socket命令"""
|
||||
try:
|
||||
byte_data = binascii.unhexlify(command_hex)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.connect((self.host, self.port))
|
||||
sock.send(byte_data)
|
||||
response = sock.recv(1024)
|
||||
print(f"收到继电器响应: {binascii.hexlify(response)}")
|
||||
return response
|
||||
except Exception as e:
|
||||
print(f"继电器通信错误: {e}")
|
||||
return None
|
||||
|
||||
def get_status(self):
|
||||
"""获取继电器状态"""
|
||||
response = self.send_command(self.read_status_command)
|
||||
status_dict = {}
|
||||
|
||||
if response and len(response) >= 10:
|
||||
status_byte = response[9]
|
||||
status_bin = f"{status_byte:08b}"[::-1]
|
||||
for key, bit_index in self.device_bit_map.items():
|
||||
status_dict[key] = status_bin[bit_index] == '1'
|
||||
else:
|
||||
print("读取继电器状态失败")
|
||||
|
||||
return status_dict
|
||||
|
||||
def control(self, device, action):
|
||||
"""控制继电器"""
|
||||
if device in self.relay_commands and action in self.relay_commands[device]:
|
||||
print(f"控制继电器 {device} {action}")
|
||||
self.send_command(self.relay_commands[device][action])
|
||||
else:
|
||||
print(f"无效设备或动作: {device}, {action}")
|
||||
69
hardware/transmitter.py
Normal file
69
hardware/transmitter.py
Normal file
@ -0,0 +1,69 @@
|
||||
# hardware/transmitter.py
|
||||
from pymodbus.exceptions import ModbusException
|
||||
|
||||
|
||||
class TransmitterController:
|
||||
def __init__(self, relay_controller):
|
||||
self.relay_controller = relay_controller
|
||||
|
||||
# 变送器配置
|
||||
self.config = {
|
||||
1: { # 上料斗
|
||||
'slave_id': 1,
|
||||
'weight_register': 0x01,
|
||||
'register_count': 2
|
||||
},
|
||||
2: { # 下料斗
|
||||
'slave_id': 2,
|
||||
'weight_register': 0x01,
|
||||
'register_count': 2
|
||||
}
|
||||
}
|
||||
|
||||
def read_data(self, transmitter_id):
|
||||
"""读取变送器数据"""
|
||||
try:
|
||||
if transmitter_id not in self.config:
|
||||
print(f"无效变送器ID: {transmitter_id}")
|
||||
return None
|
||||
|
||||
config = self.config[transmitter_id]
|
||||
|
||||
if not self.relay_controller.modbus_client.connect():
|
||||
print("无法连接网络继电器Modbus服务")
|
||||
return None
|
||||
|
||||
result = self.relay_controller.modbus_client.read_holding_registers(
|
||||
address=config['weight_register'],
|
||||
count=config['register_count'],
|
||||
slave=config['slave_id']
|
||||
)
|
||||
|
||||
if isinstance(result, Exception):
|
||||
print(f"读取变送器 {transmitter_id} 失败: {result}")
|
||||
return None
|
||||
|
||||
# 根据图片示例,正确解析数据
|
||||
if config['register_count'] == 2:
|
||||
# 获取原始字节数组
|
||||
raw_data = result.registers
|
||||
# 组合成32位整数
|
||||
weight = (raw_data[0] << 16) + raw_data[1]
|
||||
weight = weight / 1000.0 # 单位转换为千克
|
||||
elif config['register_count'] == 1:
|
||||
weight = float(result.registers[0])
|
||||
else:
|
||||
print(f"不支持的寄存器数量: {config['register_count']}")
|
||||
return None
|
||||
|
||||
print(f"变送器 {transmitter_id} 读取重量: {weight}kg")
|
||||
return weight
|
||||
|
||||
except ModbusException as e:
|
||||
print(f"Modbus通信错误: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"数据解析错误: {e}")
|
||||
return None
|
||||
finally:
|
||||
self.relay_controller.modbus_client.close()
|
||||
37
main.py
Normal file
37
main.py
Normal file
@ -0,0 +1,37 @@
|
||||
# main.py
|
||||
import time
|
||||
from config.settings import Settings
|
||||
from core.system import FeedingControlSystem
|
||||
|
||||
|
||||
def main():
|
||||
# 加载配置
|
||||
settings = Settings()
|
||||
|
||||
# 初始化系统
|
||||
system = FeedingControlSystem(settings)
|
||||
|
||||
try:
|
||||
# 系统初始化
|
||||
system.initialize()
|
||||
|
||||
print("系统准备就绪,5秒后开始下料...")
|
||||
time.sleep(5)
|
||||
|
||||
# 启动下料流程
|
||||
system.start_lower_feeding()
|
||||
|
||||
# 保持运行
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("收到停止信号")
|
||||
except Exception as e:
|
||||
print(f"系统错误: {e}")
|
||||
finally:
|
||||
system.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
44
readme
Normal file
44
readme
Normal file
@ -0,0 +1,44 @@
|
||||
1.visual_control_loop()执行逻辑:
|
||||
首先申明4个状态,normal,
|
||||
|
||||
1.视觉判断下料,搞定
|
||||
2.上料斗来回,搞定
|
||||
3.上料斗下料,搞定
|
||||
4.三阶段振捣,搞定
|
||||
5.检查是否需要要料
|
||||
6.分次下料,搞定
|
||||
7.模具车对齐后再下料,搞定
|
||||
8.继电器控制,搞定
|
||||
9.485转发,搞定
|
||||
10.震动频率,搞定
|
||||
11.控制变频器启停,搞定
|
||||
12。破拱
|
||||
|
||||
|
||||
|
||||
# 混凝土喂料控制系统
|
||||
|
||||
一个基于Python的工程化混凝土喂料控制系统,集成了设备控制、视觉检测和自动化下料功能。
|
||||
|
||||
## 功能特性
|
||||
|
||||
- 自动化三阶段下料控制
|
||||
- 基于重量传感器的精确控制
|
||||
- 视觉系统检测堆料和模具车对齐
|
||||
- 网络继电器设备控制
|
||||
- 变频器速度调节
|
||||
- 实时监控和破拱处理
|
||||
|
||||
## 系统架构
|
||||
|
||||
- `src/control/`: 控制逻辑模块
|
||||
- `src/devices/`: 设备驱动模块
|
||||
- `src/utils/`: 工具模块(配置、日志)
|
||||
- `src/vision/`: 视觉处理模块
|
||||
- `config/`: 配置文件
|
||||
- `tests/`: 测试代码
|
||||
|
||||
## 安装依赖
|
||||
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
tests/__pycache__/test_feeding_process.cpython-39.pyc
Normal file
BIN
tests/__pycache__/test_feeding_process.cpython-39.pyc
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
55
tests/test_feeding_process.py
Normal file
55
tests/test_feeding_process.py
Normal file
@ -0,0 +1,55 @@
|
||||
# tests/test_feeding_process.py
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
import os
|
||||
|
||||
# 添加src目录到Python路径
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
||||
|
||||
from src.control.feeding_controller import FeedingControlSystem
|
||||
|
||||
|
||||
class TestFeedingProcess(unittest.TestCase):
|
||||
|
||||
@patch('src.control.feeding_process.RelayController')
|
||||
@patch('src.control.feeding_process.InverterController')
|
||||
@patch('src.control.feeding_process.TransmitterController')
|
||||
def test_initialization(self, mock_transmitter, mock_inverter, mock_relay):
|
||||
"""测试初始化"""
|
||||
# 创建模拟对象
|
||||
mock_relay_instance = MagicMock()
|
||||
mock_relay.return_value = mock_relay_instance
|
||||
|
||||
mock_inverter_instance = MagicMock()
|
||||
mock_inverter.return_value = mock_inverter_instance
|
||||
|
||||
mock_transmitter_instance = MagicMock()
|
||||
mock_transmitter.return_value = mock_transmitter_instance
|
||||
|
||||
# 创建系统实例
|
||||
system = FeedingControlSystem()
|
||||
|
||||
# 验证初始化
|
||||
self.assertIsNotNone(system)
|
||||
self.assertFalse(system._running)
|
||||
|
||||
def test_set_feeding_parameters(self):
|
||||
"""测试设置下料参数"""
|
||||
with patch('src.control.feeding_process.RelayController'), \
|
||||
patch('src.control.feeding_process.InverterController'), \
|
||||
patch('src.control.feeding_process.TransmitterController'):
|
||||
system = FeedingControlSystem()
|
||||
system.set_feeding_parameters(
|
||||
target_vehicle_weight=3000,
|
||||
upper_buffer_weight=300,
|
||||
single_batch_weight=1500
|
||||
)
|
||||
|
||||
self.assertEqual(system.target_vehicle_weight, 3000)
|
||||
self.assertEqual(system.upper_buffer_weight, 300)
|
||||
self.assertEqual(system.single_batch_weight, 1500)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
18
vision/__init__.py
Normal file
18
vision/__init__.py
Normal file
@ -0,0 +1,18 @@
|
||||
# vision/__init__.py
|
||||
"""
|
||||
视觉处理模块
|
||||
包含摄像头控制和视觉检测功能
|
||||
"""
|
||||
from .camera import CameraController
|
||||
from .detector import VisionDetector
|
||||
from .angle_detector import get_current_door_angle
|
||||
from .overflow_detector import detect_overflow_from_image
|
||||
from .alignment_detector import detect_vehicle_alignment
|
||||
|
||||
__all__ = [
|
||||
'CameraController',
|
||||
'VisionDetector',
|
||||
'get_current_door_angle',
|
||||
'detect_overflow_from_image',
|
||||
'detect_vehicle_alignment'
|
||||
]
|
||||
BIN
vision/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
vision/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
vision/__pycache__/alignment_detector.cpython-39.pyc
Normal file
BIN
vision/__pycache__/alignment_detector.cpython-39.pyc
Normal file
Binary file not shown.
BIN
vision/__pycache__/anger_caculate.cpython-39.pyc
Normal file
BIN
vision/__pycache__/anger_caculate.cpython-39.pyc
Normal file
Binary file not shown.
BIN
vision/__pycache__/angle_detector.cpython-39.pyc
Normal file
BIN
vision/__pycache__/angle_detector.cpython-39.pyc
Normal file
Binary file not shown.
BIN
vision/__pycache__/camera.cpython-39.pyc
Normal file
BIN
vision/__pycache__/camera.cpython-39.pyc
Normal file
Binary file not shown.
BIN
vision/__pycache__/detector.cpython-39.pyc
Normal file
BIN
vision/__pycache__/detector.cpython-39.pyc
Normal file
Binary file not shown.
BIN
vision/__pycache__/overflow_detector.cpython-39.pyc
Normal file
BIN
vision/__pycache__/overflow_detector.cpython-39.pyc
Normal file
Binary file not shown.
Binary file not shown.
30
vision/alignment_detector.py
Normal file
30
vision/alignment_detector.py
Normal file
@ -0,0 +1,30 @@
|
||||
# vision/alignment_detector.py
|
||||
def detect_vehicle_alignment(image_array, alignment_model):
|
||||
"""
|
||||
通过图像检测模具车是否对齐
|
||||
"""
|
||||
try:
|
||||
# 检查模型是否已加载
|
||||
if alignment_model is None:
|
||||
print("对齐检测模型未加载")
|
||||
return False
|
||||
|
||||
if image_array is None:
|
||||
print("输入图像为空")
|
||||
return False
|
||||
|
||||
# 直接使用模型进行推理
|
||||
results = alignment_model(image_array)
|
||||
pared_probs = results[0].probs.data.cpu().numpy().flatten()
|
||||
|
||||
# 类别0: 未对齐, 类别1: 对齐
|
||||
class_id = int(pared_probs.argmax())
|
||||
confidence = float(pared_probs[class_id])
|
||||
|
||||
# 只有当对齐且置信度>95%时才认为对齐
|
||||
if class_id == 1 and confidence > 0.95:
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"对齐检测失败: {e}")
|
||||
return False
|
||||
88
vision/anger_caculate.py
Normal file
88
vision/anger_caculate.py
Normal file
@ -0,0 +1,88 @@
|
||||
import cv2
|
||||
import os
|
||||
import numpy as np
|
||||
from ultralytics import YOLO
|
||||
|
||||
def predict_obb_best_angle(model=None, model_path=None, image=None, image_path=None, save_path=None):
|
||||
"""
|
||||
输入:
|
||||
model: 预加载的YOLO模型实例(可选)
|
||||
model_path: YOLO 权重路径(当model为None时使用)
|
||||
image: 图像数组(numpy array)
|
||||
image_path: 图片路径(当image为None时使用)
|
||||
save_path: 可选,保存带标注图像
|
||||
输出:
|
||||
angle_deg: 置信度最高两个框的主方向夹角(度),如果检测少于两个目标返回 None
|
||||
annotated_img: 可视化图像
|
||||
"""
|
||||
# 1. 使用预加载的模型或加载新模型
|
||||
if model is not None:
|
||||
loaded_model = model
|
||||
elif model_path is not None:
|
||||
loaded_model = YOLO(model_path)
|
||||
else:
|
||||
raise ValueError("必须提供model或model_path参数")
|
||||
|
||||
# 2. 读取图像(优先使用传入的图像数组)
|
||||
if image is not None:
|
||||
img = image
|
||||
elif image_path is not None:
|
||||
img = cv2.imread(image_path)
|
||||
if img is None:
|
||||
print(f"无法读取图像: {image_path}")
|
||||
return None, None
|
||||
else:
|
||||
raise ValueError("必须提供image或image_path参数")
|
||||
|
||||
# 3. 推理 OBB
|
||||
results = loaded_model(img, save=False, imgsz=640, conf=0.5, mode='obb')
|
||||
result = results[0]
|
||||
|
||||
# 4. 可视化
|
||||
annotated_img = result.plot()
|
||||
if save_path:
|
||||
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
||||
cv2.imwrite(save_path, annotated_img)
|
||||
print(f"推理结果已保存至: {save_path}")
|
||||
|
||||
# 5. 提取旋转角度和置信度
|
||||
boxes = result.obb
|
||||
if boxes is None or len(boxes) < 2:
|
||||
print("检测到少于两个目标,无法计算夹角。")
|
||||
return None, annotated_img
|
||||
|
||||
box_info = []
|
||||
for box in boxes:
|
||||
conf = box.conf.cpu().numpy()[0]
|
||||
cx, cy, w, h, r_rad = box.xywhr.cpu().numpy()[0]
|
||||
direction = r_rad if w >= h else r_rad + np.pi/2
|
||||
direction = direction % np.pi
|
||||
box_info.append((conf, direction))
|
||||
|
||||
# 6. 取置信度最高两个框
|
||||
box_info = sorted(box_info, key=lambda x: x[0], reverse=True)
|
||||
dir1, dir2 = box_info[0][1], box_info[1][1]
|
||||
|
||||
# 7. 计算夹角(最小夹角,0~90°)
|
||||
diff = abs(dir1 - dir2)
|
||||
diff = min(diff, np.pi - diff)
|
||||
angle_deg = np.degrees(diff)
|
||||
|
||||
print(f"置信度最高两个框主方向夹角: {angle_deg:.2f}°")
|
||||
return angle_deg, annotated_img
|
||||
|
||||
|
||||
# ------------------- 测试 -------------------
|
||||
# if __name__ == "__main__":
|
||||
# weight_path = r'angle.pt'
|
||||
# image_path = r"./test_image/3.jpg"
|
||||
# save_path = "./inference_results/detected_3.jpg"
|
||||
#
|
||||
# #angle_deg, annotated_img = predict_obb_best_angle(weight_path, image_path, save_path)
|
||||
# angle_deg,_ = predict_obb_best_angle(model_path=weight_path, image_path=image_path, save_path=save_path)
|
||||
# annotated_img = None
|
||||
# print(angle_deg)
|
||||
# if annotated_img is not None:
|
||||
# cv2.imshow("YOLO OBB Prediction", annotated_img)
|
||||
# cv2.waitKey(0)
|
||||
# cv2.destroyAllWindows()
|
||||
27
vision/angle_detector.py
Normal file
27
vision/angle_detector.py
Normal file
@ -0,0 +1,27 @@
|
||||
# vision/angle_detector.py
|
||||
import sys
|
||||
import os
|
||||
from vision.anger_caculate import predict_obb_best_angle
|
||||
|
||||
# 添加项目根目录到Python路径
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
def get_current_door_angle(model=None, image=None, image_path=None):
|
||||
"""
|
||||
通过视觉系统获取当前出砼门角度
|
||||
:param model: 模型实例
|
||||
:param image: 图像数组(numpy array)
|
||||
:param image_path: 图片路径
|
||||
:return: 角度值(度)
|
||||
"""
|
||||
try:
|
||||
# 调用实际的角度检测函数
|
||||
angle_deg, _ = predict_obb_best_angle(
|
||||
model=model,
|
||||
image=image,
|
||||
image_path=image_path
|
||||
)
|
||||
return angle_deg
|
||||
except Exception as e:
|
||||
print(f"角度检测失败: {e}")
|
||||
return None
|
||||
67
vision/camera.py
Normal file
67
vision/camera.py
Normal file
@ -0,0 +1,67 @@
|
||||
# vision/camera.py
|
||||
import cv2
|
||||
|
||||
|
||||
class CameraController:
|
||||
def __init__(self):
|
||||
self.camera = None
|
||||
self.camera_type = "ip"
|
||||
self.camera_ip = "192.168.1.51"
|
||||
self.camera_port = 554
|
||||
self.camera_username = "admin"
|
||||
self.camera_password = "XJ123456"
|
||||
self.camera_channel = 1
|
||||
|
||||
def set_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1):
|
||||
"""
|
||||
设置摄像头配置
|
||||
"""
|
||||
self.camera_type = camera_type
|
||||
if ip:
|
||||
self.camera_ip = ip
|
||||
if port:
|
||||
self.camera_port = port
|
||||
if username:
|
||||
self.camera_username = username
|
||||
if password:
|
||||
self.camera_password = password
|
||||
self.camera_channel = channel
|
||||
|
||||
def setup_capture(self, camera_index=0):
|
||||
"""
|
||||
设置摄像头捕获
|
||||
"""
|
||||
try:
|
||||
rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01"
|
||||
self.camera = cv2.VideoCapture(rtsp_url)
|
||||
|
||||
if not self.camera.isOpened():
|
||||
print(f"无法打开网络摄像头: {rtsp_url}")
|
||||
return False
|
||||
print(f"网络摄像头初始化成功,地址: {rtsp_url}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"摄像头设置失败: {e}")
|
||||
return False
|
||||
|
||||
def capture_frame(self):
|
||||
"""捕获当前帧并返回numpy数组"""
|
||||
try:
|
||||
if self.camera is None:
|
||||
print("摄像头未初始化")
|
||||
return None
|
||||
|
||||
ret, frame = self.camera.read()
|
||||
if ret:
|
||||
return frame
|
||||
else:
|
||||
print("无法捕获图像帧")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"图像捕获失败: {e}")
|
||||
return None
|
||||
|
||||
def release(self):
|
||||
"""释放摄像头资源"""
|
||||
if self.camera is not None:
|
||||
self.camera.release()
|
||||
86
vision/detector.py
Normal file
86
vision/detector.py
Normal file
@ -0,0 +1,86 @@
|
||||
# vision/detector.py
|
||||
import os
|
||||
from ultralytics import YOLO
|
||||
from vision.angle_detector import get_current_door_angle
|
||||
from vision.overflow_detector import detect_overflow_from_image
|
||||
from vision.alignment_detector import detect_vehicle_alignment
|
||||
|
||||
|
||||
class VisionDetector:
|
||||
def __init__(self, settings):
|
||||
self.settings = settings
|
||||
|
||||
# 模型实例
|
||||
self.angle_model = None
|
||||
self.overflow_model = None
|
||||
self.alignment_model = None
|
||||
|
||||
def load_models(self):
|
||||
"""
|
||||
加载所有视觉检测模型
|
||||
"""
|
||||
success = True
|
||||
|
||||
# 加载夹角检测模型
|
||||
try:
|
||||
if not os.path.exists(self.settings.angle_model_path):
|
||||
print(f"夹角检测模型不存在: {self.settings.angle_model_path}")
|
||||
success = False
|
||||
else:
|
||||
# 注意:angle.pt模型通过predict_obb_best_angle函数使用,不需要预加载
|
||||
print(f"夹角检测模型路径: {self.settings.angle_model_path}")
|
||||
except Exception as e:
|
||||
print(f"检查夹角检测模型失败: {e}")
|
||||
success = False
|
||||
|
||||
# 加载堆料检测模型
|
||||
try:
|
||||
if not os.path.exists(self.settings.overflow_model_path):
|
||||
print(f"堆料检测模型不存在: {self.settings.overflow_model_path}")
|
||||
success = False
|
||||
else:
|
||||
self.overflow_model = YOLO(self.settings.overflow_model_path)
|
||||
print(f"成功加载堆料检测模型: {self.settings.overflow_model_path}")
|
||||
except Exception as e:
|
||||
print(f"加载堆料检测模型失败: {e}")
|
||||
success = False
|
||||
|
||||
# 加载对齐检测模型
|
||||
try:
|
||||
if not os.path.exists(self.settings.alignment_model_path):
|
||||
print(f"对齐检测模型不存在: {self.settings.alignment_model_path}")
|
||||
success = False
|
||||
else:
|
||||
self.alignment_model = YOLO(self.settings.alignment_model_path)
|
||||
print(f"成功加载对齐检测模型: {self.settings.alignment_model_path}")
|
||||
except Exception as e:
|
||||
print(f"加载对齐检测模型失败: {e}")
|
||||
success = False
|
||||
|
||||
return success
|
||||
|
||||
def detect_angle(self, image=None, image_path=None):
|
||||
"""
|
||||
通过视觉系统获取当前出砼门角度
|
||||
"""
|
||||
return get_current_door_angle(
|
||||
model=self.angle_model,
|
||||
image=image,
|
||||
image_path=image_path
|
||||
)
|
||||
|
||||
def detect_overflow(self, image_array):
|
||||
"""
|
||||
通过图像检测是否溢料
|
||||
"""
|
||||
return detect_overflow_from_image(
|
||||
image_array,
|
||||
self.overflow_model,
|
||||
self.settings.roi_file_path
|
||||
)
|
||||
|
||||
def detect_vehicle_alignment(self, image_array):
|
||||
"""
|
||||
通过图像检测模具车是否对齐
|
||||
"""
|
||||
return detect_vehicle_alignment(image_array, self.alignment_model)
|
||||
BIN
vision/models/alig.pt
Normal file
BIN
vision/models/alig.pt
Normal file
Binary file not shown.
BIN
vision/models/angle.pt
Normal file
BIN
vision/models/angle.pt
Normal file
Binary file not shown.
BIN
vision/models/overflow.pt
Normal file
BIN
vision/models/overflow.pt
Normal file
Binary file not shown.
47
vision/overflow_detector.py
Normal file
47
vision/overflow_detector.py
Normal file
@ -0,0 +1,47 @@
|
||||
# vision/overflow_detector.py
|
||||
import sys
|
||||
import os
|
||||
from vision.resize_tuili_image_main import classify_image_weighted, load_global_rois, crop_and_resize
|
||||
|
||||
# 添加项目根目录到Python路径
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
def detect_overflow_from_image(image_array, overflow_model, roi_file_path):
|
||||
"""
|
||||
通过图像检测是否溢料
|
||||
:param image_array: 图像数组
|
||||
:param overflow_model: 溢料检测模型
|
||||
:param roi_file_path: ROI文件路径
|
||||
:return: 是否检测到溢料 (True/False)
|
||||
"""
|
||||
try:
|
||||
# 检查模型是否已加载
|
||||
if overflow_model is None:
|
||||
print("堆料检测模型未加载")
|
||||
return False
|
||||
|
||||
# 加载ROI区域
|
||||
rois = load_global_rois(roi_file_path)
|
||||
|
||||
if not rois:
|
||||
print(f"没有有效的ROI配置: {roi_file_path}")
|
||||
return False
|
||||
|
||||
if image_array is None:
|
||||
print("输入图像为空")
|
||||
return False
|
||||
|
||||
# 裁剪和调整图像大小
|
||||
crops = crop_and_resize(image_array, rois, 640)
|
||||
|
||||
# 对每个ROI区域进行分类检测
|
||||
for roi_resized, _ in crops:
|
||||
final_class, _, _, _ = classify_image_weighted(roi_resized, overflow_model, threshold=0.4)
|
||||
if "大堆料" in final_class or "小堆料" in final_class:
|
||||
print(f"检测到溢料: {final_class}")
|
||||
return True
|
||||
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"溢料检测失败: {e}")
|
||||
return False
|
||||
106
vision/resize_main.py
Normal file
106
vision/resize_main.py
Normal file
@ -0,0 +1,106 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from ultralytics import YOLO
|
||||
import cv2
|
||||
|
||||
# ---------------------------
|
||||
# ROI 裁剪函数
|
||||
# ---------------------------
|
||||
def load_global_rois(txt_path):
|
||||
"""加载全局 ROI 坐标"""
|
||||
rois = []
|
||||
if not os.path.exists(txt_path):
|
||||
print(f"❌ ROI 文件不存在: {txt_path}")
|
||||
return rois
|
||||
with open(txt_path, 'r') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
x, y, w, h = map(int, line.split(','))
|
||||
rois.append((x, y, w, h))
|
||||
print(f"📌 加载 ROI: (x={x}, y={y}, w={w}, h={h})")
|
||||
except Exception as e:
|
||||
print(f"⚠️ 无法解析 ROI 行: {line}, 错误: {e}")
|
||||
return rois
|
||||
|
||||
def crop_and_resize(img, rois, target_size=640):
|
||||
"""根据 ROI 裁剪并 resize"""
|
||||
crops = []
|
||||
for i, (x, y, w, h) in enumerate(rois):
|
||||
h_img, w_img = img.shape[:2]
|
||||
if x < 0 or y < 0 or x + w > w_img or y + h > h_img:
|
||||
print(f"⚠️ ROI 越界,跳过: {x},{y},{w},{h}")
|
||||
continue
|
||||
roi_img = img[y:y+h, x:x+w]
|
||||
roi_resized = cv2.resize(roi_img, (target_size, target_size), interpolation=cv2.INTER_AREA)
|
||||
crops.append((roi_resized, i))
|
||||
return crops
|
||||
|
||||
# ---------------------------
|
||||
# 分类函数
|
||||
# ---------------------------
|
||||
def classify_and_save_images(model_path, input_folder, output_root, roi_file, target_size=640):
|
||||
# 加载模型
|
||||
model = YOLO(model_path)
|
||||
|
||||
# 确保输出根目录存在
|
||||
output_root = Path(output_root)
|
||||
output_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 创建类别子文件夹 (class0 到 class4)
|
||||
class_dirs = []
|
||||
for i in range(5): # 假设有5个类别 (0-4)
|
||||
class_dir = output_root / f"class{i}"
|
||||
class_dir.mkdir(exist_ok=True)
|
||||
class_dirs.append(class_dir)
|
||||
|
||||
# 加载 ROI
|
||||
rois = load_global_rois(roi_file)
|
||||
if len(rois) == 0:
|
||||
print("❌ 没有有效 ROI,退出")
|
||||
return
|
||||
|
||||
# 遍历输入文件夹
|
||||
for img_path in Path(input_folder).glob("*.*"):
|
||||
if img_path.suffix.lower() not in ['.jpg', '.jpeg', '.png', '.bmp', '.tif']:
|
||||
continue
|
||||
|
||||
try:
|
||||
# 读取原图
|
||||
img = cv2.imread(str(img_path))
|
||||
if img is None:
|
||||
print(f"❌ 无法读取图像: {img_path}")
|
||||
continue
|
||||
|
||||
# 根据 ROI 裁剪
|
||||
crops = crop_and_resize(img, rois, target_size)
|
||||
|
||||
for roi_img, roi_idx in crops:
|
||||
# YOLO 推理
|
||||
results = model(roi_img)
|
||||
|
||||
pred = results[0].probs.data # 获取概率分布
|
||||
class_id = int(pred.argmax())
|
||||
|
||||
# 保存到对应类别文件夹
|
||||
suffix = f"_roi{roi_idx}" if len(crops) > 1 else ""
|
||||
dst_path = class_dirs[class_id] / f"{img_path.stem}{suffix}{img_path.suffix}"
|
||||
cv2.imwrite(dst_path, roi_img) # 保存裁剪后的 ROI 图像
|
||||
print(f"Processed {img_path.name}{suffix} -> Class {class_id}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing {img_path.name}: {str(e)}")
|
||||
|
||||
# ---------------------------
|
||||
# 主程序
|
||||
# ---------------------------
|
||||
if __name__ == "__main__":
|
||||
model_path = r"models/overflow.pt"
|
||||
input_folder = "/media/hx/04e879fa-d697-4b02-ac7e-a4148876ebb0/dataset/f6"
|
||||
output_root = "/media/hx/04e879fa-d697-4b02-ac7e-a4148876ebb0/dataset/class111"
|
||||
roi_file = "./roi_coordinates/1_rois.txt" # 训练时使用的 ROI 文件
|
||||
target_size = 640
|
||||
|
||||
classify_and_save_images(model_path, input_folder, output_root, roi_file, target_size)
|
||||
184
vision/resize_tuili_image_main.py
Normal file
184
vision/resize_tuili_image_main.py
Normal file
@ -0,0 +1,184 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import cv2
|
||||
import numpy as np
|
||||
from ultralytics import YOLO
|
||||
|
||||
# ---------------------------
|
||||
# 类别映射
|
||||
# ---------------------------
|
||||
CLASS_NAMES = {
|
||||
0: "未堆料",
|
||||
1: "小堆料",
|
||||
2: "大堆料",
|
||||
3: "未浇筑满",
|
||||
4: "浇筑满"
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# 加载 ROI 列表
|
||||
# ---------------------------
|
||||
def load_global_rois(txt_path):
|
||||
rois = []
|
||||
if not os.path.exists(txt_path):
|
||||
print(f"ROI 文件不存在: {txt_path}")
|
||||
return rois
|
||||
with open(txt_path, 'r') as f:
|
||||
for line in f:
|
||||
s = line.strip()
|
||||
if s:
|
||||
try:
|
||||
x, y, w, h = map(int, s.split(','))
|
||||
rois.append((x, y, w, h))
|
||||
except Exception as e:
|
||||
print(f"无法解析 ROI 行 '{s}': {e}")
|
||||
return rois
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# 裁剪并 resize ROI
|
||||
# ---------------------------
|
||||
def crop_and_resize(img, rois, target_size=640):
|
||||
crops = []
|
||||
h_img, w_img = img.shape[:2]
|
||||
for i, (x, y, w, h) in enumerate(rois):
|
||||
if x < 0 or y < 0 or x + w > w_img or y + h > h_img:
|
||||
continue
|
||||
roi = img[y:y + h, x:x + w]
|
||||
roi_resized = cv2.resize(roi, (target_size, target_size), interpolation=cv2.INTER_AREA)
|
||||
crops.append((roi_resized, i))
|
||||
return crops
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# class1/class2 加权判断
|
||||
# ---------------------------
|
||||
def weighted_small_large(pred_probs, threshold=0.4, w1=0.3, w2=0.7):
|
||||
p1 = float(pred_probs[1])
|
||||
p2 = float(pred_probs[2])
|
||||
total = p1 + p2
|
||||
if total > 0:
|
||||
score = (w1 * p1 + w2 * p2) / total
|
||||
else:
|
||||
score = 0.0
|
||||
final_class = "大堆料" if score >= threshold else "小堆料"
|
||||
return final_class, score, p1, p2
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# 单张图片推理函数
|
||||
# ---------------------------
|
||||
def classify_image_weighted(image, model, threshold=0.4):
|
||||
results = model(image)
|
||||
pred_probs = results[0].probs.data.cpu().numpy().flatten()
|
||||
class_id = int(pred_probs.argmax())
|
||||
confidence = float(pred_probs[class_id])
|
||||
class_name = CLASS_NAMES.get(class_id, f"未知类别({class_id})")
|
||||
|
||||
# class1/class2 使用加权得分
|
||||
if class_id in [1, 2]:
|
||||
final_class, score, p1, p2 = weighted_small_large(pred_probs, threshold=threshold)
|
||||
else:
|
||||
final_class = class_name
|
||||
score = confidence
|
||||
p1 = float(pred_probs[1])
|
||||
p2 = float(pred_probs[2])
|
||||
|
||||
return final_class, score, p1, p2
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# 实时视频流推理函数
|
||||
# ---------------------------
|
||||
def real_time_inference(rtsp_url, model_path, roi_file, target_size=640, threshold=0.4):
|
||||
"""
|
||||
从RTSP流实时推理
|
||||
:param rtsp_url: RTSP流URL
|
||||
:param model_path: 模型路径
|
||||
:param roi_file: ROI文件路径
|
||||
:param target_size: 目标尺寸
|
||||
:param threshold: 分类阈值
|
||||
"""
|
||||
# 加载模型
|
||||
model = YOLO(model_path)
|
||||
|
||||
# 加载ROI
|
||||
rois = load_global_rois(roi_file)
|
||||
if not rois:
|
||||
print("❌ 没有有效 ROI,退出")
|
||||
return
|
||||
|
||||
# 打开RTSP流
|
||||
cap = cv2.VideoCapture(rtsp_url)
|
||||
|
||||
if not cap.isOpened():
|
||||
print(f"❌ 无法打开视频流: {rtsp_url}")
|
||||
return
|
||||
|
||||
print(f"✅ 成功连接到视频流: {rtsp_url}")
|
||||
print("按 'q' 键退出,按 's' 键保存当前帧")
|
||||
|
||||
frame_count = 0
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
print("❌ 无法读取帧,可能连接已断开")
|
||||
break
|
||||
|
||||
frame_count += 1
|
||||
print(f"\n处理第 {frame_count} 帧")
|
||||
|
||||
try:
|
||||
# 裁剪并调整ROI
|
||||
crops = crop_and_resize(frame, rois, target_size)
|
||||
|
||||
for roi_resized, roi_idx in crops:
|
||||
final_class, score, p1, p2 = classify_image_weighted(roi_resized, model, threshold=threshold)
|
||||
|
||||
print(f"ROI {roi_idx} -> 类别: {final_class}, 加权分数: {score:.2f}, "
|
||||
f"class1 置信度: {p1:.2f}, class2 置信度: {p2:.2f}")
|
||||
|
||||
# 判断是否溢料
|
||||
if "大堆料" in final_class or "浇筑满" in final_class:
|
||||
print(f"🚨 检测到溢料: ROI {roi_idx} - {final_class}")
|
||||
|
||||
# 可视化(可选)
|
||||
cv2.imshow(f'ROI {roi_idx}', roi_resized)
|
||||
|
||||
# 显示原始帧
|
||||
cv2.imshow('Original Frame', frame)
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理帧时出错: {e}")
|
||||
continue
|
||||
|
||||
# 键盘控制
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
if key == ord('q'): # 按q退出
|
||||
break
|
||||
elif key == ord('s'): # 按s保存当前帧
|
||||
cv2.imwrite(f"frame_{frame_count}.jpg", frame)
|
||||
print(f"保存帧到 frame_{frame_count}.jpg")
|
||||
|
||||
# 清理资源
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
print("✅ 视频流处理结束")
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# 主函数 - 实时推理示例
|
||||
# ---------------------------
|
||||
# if __name__ == "__main__":
|
||||
# # RTSP流URL
|
||||
# rtsp_url = "rtsp://admin:XJ123456@192.168.1.51:554/streaming/channels/101"
|
||||
#
|
||||
# # 配置参数
|
||||
# model_path = r"models/overflow.pt"
|
||||
# roi_file = r"./roi_coordinates/1_rois.txt"
|
||||
# target_size = 640
|
||||
# threshold = 0.4
|
||||
#
|
||||
# print("开始实时视频流推理...")
|
||||
# real_time_inference(rtsp_url, model_path, roi_file, target_size, threshold)
|
||||
1
vision/roi_coordinates/1_rois.txt
Normal file
1
vision/roi_coordinates/1_rois.txt
Normal file
@ -0,0 +1 @@
|
||||
859,810,696,328
|
||||
Reference in New Issue
Block a user