Files
Feeding_control_system/vision/visual_callback.py

901 lines
41 KiB
Python
Raw Permalink Normal View History

2025-12-12 18:00:14 +08:00
from cv2.gapi import ov
from config.settings import app_set_config
from hardware.relay import RelayController
from hardware.transmitter import TransmitterController
import time
import threading
from datetime import datetime
2025-12-28 17:20:02 +08:00
import logging
2025-12-12 18:00:14 +08:00
from hardware.upper_plc import OmronFinsPollingService
2025-12-28 17:20:02 +08:00
from vision.muju_cls.muju_utils import run_stable_classification_loop
2025-12-12 18:00:14 +08:00
class VisualCallback:
# 类变量,用于存储实例引用,实现单例检测
_instance = None
_lock = threading.Lock()
def __new__(cls):
"""检测实例是否存在,实现单例模式"""
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化视觉回调处理器"""
# 避免重复初始化
if hasattr(self, '_initialized') and self._initialized:
return
self.relay_controller = RelayController()
self.transmitter_controller = TransmitterController(self.relay_controller)
# 线程安全的参数传递
self._new_data_available = threading.Event()
self._is_processing = threading.Lock()
self._stop_event = threading.Event()
# 添加下料斗门控制锁,防止两个线程同时控制
self._door_control_lock = threading.Lock()
# 记录当前控制门的线程名称,用于调试
self._current_controlling_thread = None
2025-12-28 17:20:02 +08:00
#是否启动后的第一个模具
self._is_first_module=True
self.init_val()
# self._setup_logging_2()
#F块完成重量的70%控制夹脚F块多于这个比例就没有记录了注意
self._max_f_angle_ratio=0.7
#完成多少,调整角度比例 多于0.8就没记录了(注意)
self._max_angle_radio=0.8
#完成多少,忽略未浇筑满
self._max_ignore_radio=0.5
2025-12-12 18:00:14 +08:00
2025-12-28 17:20:02 +08:00
# self.plc_data=5
self.plc_service = OmronFinsPollingService("192.168.250.233")
self.plc_service.register_data_callback(self.on_plc_update)
# self.plc_service.register_status_callback(self.on_status_change)
self.plc_service.start_polling(interval=2.0)
# 创建并启动单个持续运行的线程
2025-12-12 18:00:14 +08:00
self.callback_thread = threading.Thread(
target=self._run_thread_loop,
daemon=True
)
self.callback_thread.start()
self.feed_thread = threading.Thread(
target=self._run_feed,
daemon=True
)
self.feed_thread.start()
2025-12-28 17:20:02 +08:00
"""启动系统监控"""
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
daemon=True,
name='monitor'
)
self.monitor_thread.start()
def init_val(self):
#初始化值
"""初始化视觉回调处理器"""
self.angle_mode = "normal"
self.overflow = False
self.is_start_visual=False
2025-12-12 18:00:14 +08:00
2025-12-28 17:20:02 +08:00
# 线程安全的参数传递
self._current_angle = None
self._overflow_detected = None
# 新增标志位指示safe_control_lower_close是否正在执行
self._is_safe_closing = False
self._is_feed_start=True
#未浇筑满时间,用于确定是否进入未浇筑满
2025-12-12 18:00:14 +08:00
self._before_finish_time=None
2025-12-28 17:20:02 +08:00
#进入未浇筑满状态标志位
2025-12-12 18:00:14 +08:00
self._is_before_finish=False
2025-12-28 17:20:02 +08:00
#是否浇筑满标志位
2025-12-12 18:00:14 +08:00
self._is_finish=False
2025-12-28 17:20:02 +08:00
#浇筑完成比例(重量)
self._is_finish_ratio=0
#下料阶段,汇总时用枚举
self._is_feed_stage=0
#振动相关参数
self._last_arch_one_weight=0
self._last_arch_two_weight=0
self._last_arch_three_weight=0
self._last_arch_four_weight=0
self._last_arch_five_weight=0
self._last_arch_time=0
#是否为F块
self._is_small_f=None
#采集数据用,下料重量=之前下的重量+最后一阶段(下-->模具车)重量差
#记录最后一次下料斗到模具车前的重量
self._finish_weight=0
#记录最后一次下料斗到车初始重量
self._inital_finish_lweight=0
2025-12-12 18:00:14 +08:00
# 初始化控制间隔和堆料状态跟踪属性
self._last_overflow_state = False
self._last_control_time = 0
self._initialized = True
self.plc_data=None
2025-12-28 17:20:02 +08:00
2025-12-12 18:00:14 +08:00
def angle_visual_callback(self, current_angle, overflow_detected):
"""
视觉控制主逻辑供外部推送数据
使用单个持续运行的线程通过参数设置传递数据
如果线程正在处理数据则丢弃此次推送
"""
#print(f"{datetime.now().strftime('%H:%M:%S.%f')[:-3]} 收到推送数据")
# 尝试获取处理锁,若失败则说明正在处理,丢弃数据
if not self._is_processing.acquire(blocking=False):
print("回调线程仍在执行,丢弃此次推送数据")
return
try:
# 更新参数
if overflow_detected is not None:
#print(f"{datetime.now().strftime('%H:%M:%S.%f')[:-3]} 收到溢料:{overflow_detected}")
self._overflow_detected = overflow_detected
if current_angle is not None:
#print(f"{datetime.now().strftime('%H:%M:%S.%f')[:-3]} 收到角度:{current_angle}")
self._current_angle = current_angle
# 通知线程有新数据可用
self._new_data_available.set()
finally:
# 释放处理锁
self._is_processing.release()
2025-12-28 17:20:02 +08:00
def _monitor_loop(self):
"""监控循环"""
while not self._is_finish:
try:
current_time = time.time()
# 检查下料斗破拱(只有在下料过程中才检查)
if self._is_feed_stage==1: # 下料斗--》模具车
_arch_weight = self.transmitter_controller.read_data(2)
if _arch_weight is not None and _arch_weight>0:
# 检查重量变化是否过慢
if (abs(_arch_weight - self._last_arch_one_weight) < 200) and \
(current_time - self._last_arch_time) >= 2:
print('---------------------第一阶段振动5秒-----------------')
self.relay_controller.control_arch_lower_open_sync(5)
self._last_arch_one_weight = _arch_weight
elif self._is_feed_stage==2: #上料斗到下料斗,料多,谨慎振动
_arch_weight = self.transmitter_controller.read_data(1)
if _arch_weight is not None:
# 检查重量变化是否过慢
# if (abs(_arch_weight - self._last_arch_two_weight) < 50) and \
# (current_time - self._last_arch_time) >= 2:
# self.relay_controller.control_arch_upper_open_sync(3)
self._last_arch_two_weight = _arch_weight
elif self._is_feed_stage==3: #第二次下料斗-》模具车
_arch_weight = self.transmitter_controller.read_data(2)
if _arch_weight is not None:
# 检查重量变化是否过慢
if (abs(_arch_weight - self._last_arch_three_weight) < 100) and \
(current_time - self._last_arch_time) >= 2:
print('---------------------第三阶段振动5秒-----------------')
self.relay_controller.control_arch_lower_open_sync(3)
self._last_arch_three_weight = _arch_weight
elif self._is_feed_stage==4: #上料斗--》下料斗
_arch_weight = self.transmitter_controller.read_data(1)
if _arch_weight is not None:
# 检查重量变化是否过慢
# if (abs(_arch_weight - self._last_arch_four_weight) < 200) and \
# (current_time - self._last_arch_time) > 2:
# self.relay_controller.control_arch_upper_open_sync(5)
self._last_arch_four_weight = _arch_weight
elif self._is_feed_stage==5: #下料斗->模具车
_arch_weight = self.transmitter_controller.read_data(2)
if _arch_weight is not None:
_min_arch_weight=20
if self._is_finish_ratio<self._max_angle_radio:
_min_arch_weight=50
if (abs(_arch_weight - self._last_arch_five_weight) < _min_arch_weight) and \
(current_time - self._last_arch_time) >= 2:
print('---------------------第五阶段振动5秒-----------------')
self.relay_controller.control_arch_lower_open_sync(5)
self._last_arch_five_weight = _arch_weight
# 更新最后读取时间
self._last_arch_time = current_time
time.sleep(2)
except Exception as e:
print(f"监控线程错误: {e}")
2025-12-12 18:00:14 +08:00
def _run_thread_loop(self):
"""
2025-12-28 17:20:02 +08:00
接受视觉回调数据
2025-12-12 18:00:14 +08:00
线程主循环持续运行
等待新数据然后调用处理方法
"""
while not self._stop_event.is_set():
# 等待新数据可用
self._new_data_available.wait()
# 重置事件
self._new_data_available.clear()
# 获取当前参数(使用临时变量避免被其他线程修改)
current_angle = self._current_angle
overflow_detected = self._overflow_detected
self._is_feed_start=True
if self.is_start_visual:
# 处理数据
self._process_angle_callback(current_angle, overflow_detected)
time.sleep(0.1)
def _run_feed(self):
while True:
print("------------已启动----------------")
if self._is_feed_start:
2025-12-28 17:20:02 +08:00
# if self.plc_data==5:
#_is_finish_ratio完成 比例,根据重量过滤一下
if self._overflow_detected=='未堆料':
if self._is_first_module:
print('------------进入第一块111111-------------')
self._is_first_module=False
self.run_feed_all()
break
elif self._is_finish and self._is_finish_ratio>=0.7:
print('-----------进入连续块111111-----------')
# self.init_val()
# self.run_feed_all()
# else:
# print("-----------上料斗未就位----------------")
# print("---------3--上料斗未就位----------------")
2025-12-12 18:00:14 +08:00
2025-12-28 17:20:02 +08:00
time.sleep(2)
2025-12-12 18:00:14 +08:00
def safe_control_lower_close(self,duration=3):
"""线程安全的下料斗关闭方法"""
thread_name = threading.current_thread().name
print(f"[{thread_name}] 尝试关闭下料斗...")
# 设置标志位,指示正在执行安全关闭操作
self._is_safe_closing = True
try:
with self._door_control_lock:
self._current_controlling_thread = thread_name
print(f"[{thread_name}] 获得下料斗控制权,执行关闭操作")
self.relay_controller.control(self.relay_controller.DOOR_LOWER_OPEN, 'close')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_CLOSE, 'open')
time.sleep(duration)
self.relay_controller.control(self.relay_controller.DOOR_LOWER_CLOSE, 'close')
self._current_controlling_thread = None
print(f"[{thread_name}] 释放下料斗控制权")
finally:
# 无论成功失败,都要重置标志位
self._is_safe_closing = False
2025-12-28 17:20:02 +08:00
def close_lower_door_visual(self):
"""关闭下料斗门"""
self.is_start_visual=False
time.sleep(0.5)
self.safe_control_lower_close()
def _visual_close(self):
self.is_start_visual=False
self._is_finish=True
self._is_feed_stage=0
print(f'--------进入关闭-----------')
self.safe_control_lower_close(3)
print(f'--------关闭完成-----------')
#记录重量
_current_weight=self.transmitter_controller.read_data(2)
if _current_weight is not None:
self._finish_weight= self._finish_weight+(self._inital_finish_lweight-_current_weight)
with open('weight.txt', 'a') as f:
timestamp = datetime.now().strftime("%H:%M:%S")
f.write(f"{timestamp} - {self._finish_weight}\n")
def run_feed_all(self):
"""
全流程下料包括判断模具类型
"""
_is_f= run_stable_classification_loop()
print(f'------------已判断出模具类型: {_is_f}-------------')
if _is_f is not None:
if _is_f=='模具车1':
self._is_small_f=True
print('-------------F块模具--------------')
print('-------------F块模具--------------')
print('-------------F块模具--------------')
self.run_feed_f()
elif _is_f=='模具车2':
self._is_small_f=False
self.run_feed()
print('-------------其他模具---------------')
if self._is_small_f is None:
print('-----------未判断出模具类型--------------')
return
def run_feed_f(self):
"""第一阶段下料:下料斗向模具车下料(低速)"""
print("--------------------开始下料F块--------------------")
# loc_relay=self.relay_controller
loc_mitter=self.transmitter_controller
max_weight_none=5
cur_weight_none=0
initial_lower_weight=loc_mitter.read_data(2)
if initial_lower_weight is None:
print("-----f上料斗重量异常-----")
return
first_finish_weight=0
self._finish_weight=first_finish_weight
self._inital_finish_lweight=initial_lower_weight
need_total_weight=0.54*2416
if initial_lower_weight>100:
if not self._is_finish:
self.is_start_visual=True
initial_lower_weight=loc_mitter.read_data(2)
if initial_lower_weight is None:
print("-----f上料斗重量异常2-----")
return
self._is_feed_stage=5
while not self._is_finish:
current_weight = loc_mitter.read_data(2)
if current_weight is None:
cur_weight_none+=1
if cur_weight_none>max_weight_none:
#如果重量连续5次为None认为下料斗未就位跳出循环
print('------------f下到模具车,下料斗重量异常----------------')
print('------------f下到模具车,下料斗重量异常----------------')
self.close_lower_door_visual()
return
#视觉处理关闭,异常的话重量没有生效
continue
cur_weight_none=0
first_finish_weight=initial_lower_weight-current_weight
self._is_finish_ratio=(first_finish_weight)/need_total_weight
print(f'------------已下料比例: {self._is_finish_ratio}-------------')
if self._is_finish_ratio>self._max_f_angle_ratio:
#关5秒
#大于0.7后不再检测了,直接交给视觉控制夹脚
# print(f'------------已下料比例: {self._is_finish_ratio}-------------')
break
# print(f'------------已下料: {first_finish_weight+second_finish_weight}kg-------------')
time.sleep(1)
# initial_lower_weight=_current_lower_weight
print(f'------------已下料F: {first_finish_weight}kg-------------')
print(f'------------已下料F: {first_finish_weight}kg-------------')
print(f'------------已完成-------------')
2025-12-12 18:00:14 +08:00
def run_feed(self):
"""第一阶段下料:下料斗向模具车下料(低速)"""
2025-12-28 17:20:02 +08:00
print("--------------------开始下料(普通块)--------------------")
2025-12-12 18:00:14 +08:00
loc_relay=self.relay_controller
loc_mitter=self.transmitter_controller
2025-12-28 17:20:02 +08:00
max_weight_none=5
cur_weight_none=0
2025-12-12 18:00:14 +08:00
initial_lower_weight=loc_mitter.read_data(2)
2025-12-28 17:20:02 +08:00
# initial_upper_weight=loc_mitter.read_data(1)
if initial_lower_weight is None:
print("---------------下料斗重量异常----------------")
return
2025-12-12 18:00:14 +08:00
first_finish_weight=0
2025-12-28 17:20:02 +08:00
need_total_weight=1.91*2416
# start_time=None
2025-12-12 18:00:14 +08:00
self.is_start_visual=True
if initial_lower_weight>100:
#下料斗的料全部下完
2025-12-28 17:20:02 +08:00
self._is_feed_stage=1
while not self._is_finish:
2025-12-12 18:00:14 +08:00
current_weight = loc_mitter.read_data(2)
2025-12-28 17:20:02 +08:00
if current_weight is None:
cur_weight_none+=1
if cur_weight_none>max_weight_none:
print("-----------下料斗重量异常(第一次下到模具车)--------------")
self.close_lower_door_visual()
return
continue
cur_weight_none=0
if current_weight<250 and current_weight>0:
self.close_lower_door_visual()
break
2025-12-12 18:00:14 +08:00
time.sleep(1)
2025-12-28 17:20:02 +08:00
_current_lower_weight=loc_mitter.read_data(2)
if _current_lower_weight is None:
print("-------下料斗重量异常---------")
return
first_finish_weight=initial_lower_weight-_current_lower_weight
2025-12-12 18:00:14 +08:00
# initial_lower_weight=_current_lower_weight
2025-12-28 17:20:02 +08:00
print(f'------------已下料(第一次): {first_finish_weight}kg-------------')
print(f'------------已下料(第一次): {first_finish_weight}kg-------------')
self._is_feed_stage=0
while self.plc_data!=5:
print('------------上料斗未就位----------------')
print('------------上料斗未就位----------------')
time.sleep(1)
2025-12-12 18:00:14 +08:00
if self.plc_data==5:
print(f'------------上料斗向下料斗转移(留3000KG-------------')
#打开上料斗出砼门开5就开三分之一下
2025-12-28 17:20:02 +08:00
2025-12-12 18:00:14 +08:00
loc_relay.control_upper_open_sync(6)
2025-12-28 17:20:02 +08:00
self._is_feed_stage=2
2025-12-12 18:00:14 +08:00
loc_time_count=1
upper_open_time=time.time()
2025-12-28 17:20:02 +08:00
2025-12-12 18:00:14 +08:00
while not self._is_finish:
current_upper_weight = loc_mitter.read_data(1)
2025-12-28 17:20:02 +08:00
if current_upper_weight is None:
cur_weight_none+=1
if cur_weight_none>max_weight_none:
#如果重量连续5次为None认为上料斗未就位跳出循环
print('------------第一次上到下,上料斗重量异常----------------')
print('------------第一次上到下,上料斗重量异常----------------')
loc_relay.control_upper_close_sync(5+loc_time_count)
return
continue
cur_weight_none=0
if current_upper_weight<3000 and current_upper_weight>0:
2025-12-12 18:00:14 +08:00
#关5秒,loc_time_count多关一秒
2025-12-28 17:20:02 +08:00
loc_relay.control_upper_close_sync(5+loc_time_count)
2025-12-12 18:00:14 +08:00
break
else:
if time.time()-upper_open_time>5:
2025-12-28 17:20:02 +08:00
if loc_time_count<6:
upper_open_time=time.time()
loc_relay.control_upper_open_sync(0.8)
loc_time_count=loc_time_count+0.8
2025-12-12 18:00:14 +08:00
else:
time.sleep(0.5)
2025-12-28 17:20:02 +08:00
else:
loc_relay.control_upper_close_sync(6+loc_time_count)
2025-12-12 18:00:14 +08:00
self.is_start_visual=True
initial_lower_weight=loc_mitter.read_data(2)
2025-12-28 17:20:02 +08:00
if initial_lower_weight is None:
print("-------下料斗重量异常(第二次下料到模具车)---------")
return
self._is_feed_stage=3
2025-12-12 18:00:14 +08:00
while not self._is_finish:
current_weight = loc_mitter.read_data(2)
2025-12-28 17:20:02 +08:00
if current_weight is None:
cur_weight_none+=1
if cur_weight_none>max_weight_none:
print("-------下料斗重量异常(第二次下料到模具车)---------")
self.close_lower_door_visual()
return
continue
cur_weight_none=0
# second_finish_weight=initial_lower_weight-current_weight
if current_weight<250:
self.close_lower_door_visual()
break
# print(f'------------已下料: {first_finish_weight+second_finish_weight}kg-------------')
2025-12-12 18:00:14 +08:00
time.sleep(1)
2025-12-28 17:20:02 +08:00
_current_lower_weight=loc_mitter.read_data(2)
if _current_lower_weight is None:
print("-------下料斗重量异常(第二次下到模)---------")
return
first_finish_weight=first_finish_weight+initial_lower_weight-_current_lower_weight
print(f'------------已下料(第二次): {first_finish_weight}kg-------------')
print(f'------------已下料(第二次): {first_finish_weight}kg-------------')
self._is_feed_stage=0
2025-12-12 18:00:14 +08:00
if self.plc_data==5:
#第二次上料斗向下料斗转移
2025-12-28 17:20:02 +08:00
loc_relay.control_upper_open_sync(11)
2025-12-12 18:00:14 +08:00
loc_time_count=1
upper_open_time=time.time()
#第二次到下料斗还需要的量
#loc_left_need_weight=need_total_weight-first_finish_weight
2025-12-28 17:20:02 +08:00
# initial_upper_weight=loc_mitter.read_data(1)
# start_time=None
self._is_feed_stage=4
2025-12-12 18:00:14 +08:00
while not self._is_finish:
# print(f'------------上料斗向下料斗转移22222-------------')
current_upper_weight = loc_mitter.read_data(1)
2025-12-28 17:20:02 +08:00
if current_upper_weight is None:
cur_weight_none+=1
if cur_weight_none>max_weight_none:
#如果重量连续5次为None认为上料斗未就位跳出循环
print('------------第二次上到下,上料斗重量异常----------------')
print('------------第二次上到下,上料斗重量异常----------------')
loc_relay.control_upper_close_sync(15)
2025-12-12 18:00:14 +08:00
break
2025-12-28 17:20:02 +08:00
continue
cur_weight_none=0
if current_upper_weight<400 and current_upper_weight>0:
loc_relay.control_arch_upper_open()
loc_relay.control_upper_open_sync(5)
# start_time=None
#5秒后关闭
loc_relay.control_upper_close_after()#control_upper_close_sync(8+loc_time_count)
break
2025-12-12 18:00:14 +08:00
else:
2025-12-28 17:20:02 +08:00
if time.time()-upper_open_time>2:
# if loc_time_count<6:
2025-12-12 18:00:14 +08:00
upper_open_time=time.time()
2025-12-28 17:20:02 +08:00
loc_relay.control_upper_open_sync(1)
loc_time_count=loc_time_count+1
2025-12-12 18:00:14 +08:00
else:
time.sleep(0.5)
2025-12-28 17:20:02 +08:00
else:
loc_relay.control_upper_close_sync(15)
2025-12-12 18:00:14 +08:00
# time.sleep(0.4)
2025-12-28 17:20:02 +08:00
#第三次下料斗转移到模具车
2025-12-12 18:00:14 +08:00
if not self._is_finish:
self.is_start_visual=True
2025-12-28 17:20:02 +08:00
initial_lower_weight=loc_mitter.read_data(2)
self._finish_weight=first_finish_weight
self._inital_finish_lweight=initial_lower_weight
if initial_lower_weight is None:
print("-------下料斗重量异常(第三次下到模具车)---------")
return
self._is_feed_stage=5
while not self._is_finish:
current_weight = loc_mitter.read_data(2)
if current_weight is None:
cur_weight_none+=1
if cur_weight_none>max_weight_none:
#重量异常退出
print('------------第三次下到模具车,下料斗重量异常----------------')
self.close_lower_door_visual()
return
continue
cur_weight_none=0
second_finish_weight=initial_lower_weight-current_weight
self._is_finish_ratio=(second_finish_weight+first_finish_weight)/need_total_weight
print(f'------------已下料比例: {self._is_finish_ratio}-------------')
if self._is_finish_ratio>self._max_angle_radio:
#关5秒
# print(f'------------已下料比例: {self._is_finish_ratio}-------------')
break
# print(f'------------已下料: {first_finish_weight+second_finish_weight}kg-------------')
time.sleep(1)
2025-12-12 18:00:14 +08:00
# _current_lower_weight=loc_mitter.read_data(2)
# first_finish_weight=first_finish_weight+initial_lower_weight-_current_lower_weight
# print(f'------------已下料: {first_finish_weight}kg-------------')
# print(f'------------已下料: {first_finish_weight}kg-------------')
print(f'------------已完成-------------')
def _process_angle_callback(self, current_angle, overflow_detected):
"""
实时精细控制 - 基于PID思想无固定间隔
"""
try:
# 记录控制时间戳(用于微分计算,而非限制)
current_time = time.time()
# 确保所有PID相关属性都被正确初始化
if not hasattr(self, '_last_control_time'):
self._last_control_time = current_time
if not hasattr(self, '_last_error'):
self._last_error = 0
if not hasattr(self, '_error_integral'):
self._error_integral = 0
# print(f"{self.angle_mode}")
self.overflow = overflow_detected in ["大堆料", "小堆料"]
if current_angle is None:
return
print(f"{datetime.now().strftime('%H:%M:%S.%f')[:-3]} 角度11: {current_angle:.2f}°,{overflow_detected}")
if overflow_detected == "未浇筑满" or self._is_before_finish:
if self._before_finish_time is None:
self._before_finish_time=current_time
self.safe_control_lower_close(3)
print('-----------------关闭--------------------')
# time.sleep(3)
else:
if overflow_detected=='浇筑满':
2025-12-28 17:20:02 +08:00
self._visual_close()
return
# print(f'--------已关闭已关闭-----------')
2025-12-12 18:00:14 +08:00
elif overflow_detected=="大堆料":
2025-12-28 17:20:02 +08:00
print(f'--------未浇筑满,大堆料-----------')
self._pulse_control('open',0.3)
time.sleep(0.3)
self._pulse_control('close',0.4)
time.sleep(1)
self._is_before_finish=True
2025-12-12 18:00:14 +08:00
else:
2025-12-28 17:20:02 +08:00
# self._pulse_control('open',0.5)
# time.sleep(0.3)
# self._pulse_control('close',0.6)
self._pulse_control('open',0.6)
2025-12-12 18:00:14 +08:00
time.sleep(0.3)
2025-12-28 17:20:02 +08:00
self._pulse_control('close',0.7)
time.sleep(1)
2025-12-12 18:00:14 +08:00
self._is_before_finish=True
2025-12-28 17:20:02 +08:00
if self._is_finish_ratio<=self._max_ignore_radio:
#如果重量未达到最大忽略角度,需要跳出
self._is_before_finish=False
2025-12-12 18:00:14 +08:00
return
elif overflow_detected == "浇筑满":
2025-12-28 17:20:02 +08:00
self._visual_close()
2025-12-12 18:00:14 +08:00
return
else:
self._before_finish_time=None
2025-12-28 17:20:02 +08:00
if self._is_finish_ratio>=self._max_angle_radio or (self._is_finish_ratio>self._max_f_angle_ratio and self._is_small_f):
if overflow_detected == "大堆料":
TARGET_ANGLE = 5.0 # 大堆料时控制在15度左右
elif overflow_detected == "小堆料":
TARGET_ANGLE = 15.0 # 小堆料时控制在35度左右
else:
TARGET_ANGLE = 25.0 # 未溢料时开到最大56度
2025-12-12 18:00:14 +08:00
else:
2025-12-28 17:20:02 +08:00
if self._is_feed_stage==1 or self._is_feed_stage==3:
#根据溢料状态动态调整目标角度
if overflow_detected == "大堆料":
TARGET_ANGLE = 15.0 # 大堆料时控制在15度左右
elif overflow_detected == "小堆料":
TARGET_ANGLE = 45.0 # 小堆料时控制在35度左右
else:
TARGET_ANGLE = 55.0 # 未溢料时开到最大56度
else:
#根据溢料状态动态调整目标角度
if overflow_detected == "大堆料":
TARGET_ANGLE = 15.0 # 大堆料时控制在15度左右
elif overflow_detected == "小堆料":
TARGET_ANGLE = 25.0 # 小堆料时控制在35度左右
else:
TARGET_ANGLE = 45.0 # 未溢料时开到最大56度
2025-12-12 18:00:14 +08:00
# 确保目标角度在硬件范围内5-56度
TARGET_ANGLE = max(5.0, min(56.0, TARGET_ANGLE))
# PID控制参数
2025-12-28 17:20:02 +08:00
KP = 0.2 # 比例系数
2025-12-12 18:00:14 +08:00
KI = 0 # 积分系数
KD = 0 # 微分系数
# KP = 0.15 # 比例系数
# KI = 0.008 # 积分系数
# KD = 0.08 # 微分系数
# if TARGET_ANGLE <= 25.0:
# KP, KI, KD = 0.18, 0.008, 0.08 # 小角度,强控制
# elif TARGET_ANGLE <= 40.0:
# KP, KI, KD = 0.15, 0.01, 0.06 # 中角度
# else:
# KP, KI, KD = 0.12, 0.012, 0.04 # 大角度,温和控制
# 计算误差
error = current_angle - TARGET_ANGLE
dt = current_time - self._last_control_time
# 积分项(抗饱和)
self._error_integral += error * dt
self._error_integral = max(min(self._error_integral, 50), -50) # 积分限幅
# 微分项
error_derivative = (error - self._last_error) / dt if dt > 0 else 0
# PID输出
pid_output = (KP * error + KI * self._error_integral + KD * error_derivative)
# print(f"📊 PID计算: 误差={error:.2f}°, 积分={self._error_integral:.2f}, "
#f"微分={error_derivative:.2f}, 输出={pid_output:.2f}")
2025-12-12 18:00:14 +08:00
# 更新历史值
self._last_error = error
self._last_control_time = current_time
# 状态机 + PID控制
if self.angle_mode == "normal":
self._normal_mode_advanced(current_angle, pid_output,TARGET_ANGLE)
elif self.angle_mode == "reducing":
self._reducing_mode_advanced(current_angle, pid_output, TARGET_ANGLE)
elif self.angle_mode == "maintaining":
self._maintaining_mode_advanced(current_angle, pid_output, TARGET_ANGLE)
except Exception as e:
print(f"处理视觉回调时发生异常: {e}")
def _normal_mode_advanced(self, current_angle, pid_output,target_angle):
"""高级正常模式控制"""
if self.overflow:
self.angle_mode = "reducing"
print("检测到溢料,切换到减小模式")
return
# 🎯 修复1: 添加强制控制机制
# 基于PID输出的智能控制
control_threshold = 2 # 从2.0减小到0.5,提高灵敏度
if abs(pid_output) > control_threshold:
if pid_output > 0:
# 需要减小角度(关门)
pulse_time = min(0.3, pid_output * 0.1)
self._pulse_control("close", pulse_time)
print(f"正常模式: 角度偏高{pid_output:.1f},关门{pulse_time:.2f}")
else:
# 需要增大角度(开门)
pulse_time = min(0.3, abs(pid_output) * 0.1)
self._pulse_control("open", pulse_time)
print(f"正常模式: 角度偏低{abs(pid_output):.1f},开门{pulse_time:.2f}")
else:
# 在死区内,保持静止
error = current_angle - target_angle
abs_error = abs(error)
# 强制控制如果误差超过5度强制控制
if abs_error > 5:
if error > 0: # 当前角度 > 目标角度,需要关门
pulse_time=0.1 # 根据误差计算脉冲时间
self._pulse_control("close", pulse_time)
print(f"强制关门: 误差{abs_error:.1f}°过大,脉冲{pulse_time:.3f}s")
2025-12-12 18:00:14 +08:00
else: # 当前角度 < 目标角度,需要开门
pulse_time =0.1
self._pulse_control("open", pulse_time)
print(f"强制开门: 误差{abs_error:.1f}°过大,脉冲{pulse_time:.3f}s")
2025-12-12 18:00:14 +08:00
return
else:
self._stop_door()
print(f"正常模式: 角度在目标范围内,保持静止")
def _reducing_mode_advanced(self, current_angle, pid_output, target_angle):
"""高级减小模式控制"""
if not self.overflow:
if current_angle <= target_angle + 5.0:
self.angle_mode = "normal"
print("溢料消除且角度合适,返回正常模式")
else:
# 缓慢恢复
self._pulse_control("close", 0.1)
return
# 有溢料,积极减小角度
if current_angle > target_angle:
# 使用PID输出计算控制量
pulse_time = min(0.5, max(0.1, pid_output * 0.15))
self._pulse_control("close", pulse_time)
print(f"减小模式: 积极关门{pulse_time:.2f}PID输出:{pid_output:.1f}")
else:
self.angle_mode = "maintaining"
print("角度已达标,进入维持模式")
def _maintaining_mode_advanced(self, current_angle, pid_output, target_angle):
"""高级维持模式控制"""
if not self.overflow:
self.angle_mode = "normal"
print("溢料消除,返回正常模式")
return
# 精确维持控制
dead_zone = 1.5 # 更小的死区
if abs(pid_output) > dead_zone:
pulse_time = min(0.2, abs(pid_output) * 0.05) # 更精细的控制
if pid_output > 0:
self._pulse_control("close", pulse_time)
print(f"维持模式: 微调关门{pulse_time:.2f}")
else:
self._pulse_control("open", pulse_time)
print(f"维持模式: 微调开门{pulse_time:.2f}")
else:
self._stop_door()
print("维持模式: 角度精确控制中")
def _pulse_control(self, action, duration):
"""统一的脉冲控制方法"""
# 检查是否正在执行safe_control_lower_close如果是则跳过relay操作
if self._is_safe_closing:
thread_name = threading.current_thread().name
print(f"[{thread_name}] safe_control_lower_close正在执行跳过脉冲控制 {action}")
return
if duration <= 0:
return
thread_name = threading.current_thread().name
print(f"[{thread_name}] 尝试脉冲控制 {action},时长 {duration:.2f}秒...")
with self._door_control_lock:
self._current_controlling_thread = thread_name
print(f"[{thread_name}] 获得下料斗控制权,执行脉冲控制")
if action == "open":
self.relay_controller.control(self.relay_controller.DOOR_LOWER_CLOSE, 'close')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_OPEN, 'open')
time.sleep(duration)
self.relay_controller.control(self.relay_controller.DOOR_LOWER_OPEN, 'close')
print(f"[{thread_name}] 开门脉冲: {duration:.2f}")
else: # close
self.relay_controller.control(self.relay_controller.DOOR_LOWER_OPEN, 'close')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_CLOSE, 'open')
time.sleep(duration)
self.relay_controller.control(self.relay_controller.DOOR_LOWER_CLOSE, 'close')
print(f"[{thread_name}] 关门脉冲: {duration:.2f}")
self._current_controlling_thread = None
print(f"[{thread_name}] 释放下料斗控制权")
def _stop_door(self):
"""停止门运动"""
# 检查是否正在执行safe_control_lower_close如果是则跳过relay操作
if self._is_safe_closing:
thread_name = threading.current_thread().name
print(f"[{thread_name}] safe_control_lower_close正在执行跳过停止门运动操作")
return
thread_name = threading.current_thread().name
print(f"[{thread_name}] 尝试停止门运动...")
with self._door_control_lock:
self._current_controlling_thread = thread_name
print(f"[{thread_name}] 获得下料斗控制权,执行停止操作")
self.relay_controller.control(self.relay_controller.DOOR_LOWER_OPEN, 'close')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_CLOSE, 'close')
self._current_controlling_thread = None
print(f"[{thread_name}] 释放下料斗控制权")
def _open_door(self, duration=0.5):
"""打开门"""
self._pulse_control("open", 0.3)
def _close_door(self, duration=0.5):
"""关闭门"""
self._pulse_control("close", 1)
def on_plc_update(self,data: int, binary: str):
#4即将振捣室5振捣室 64即将搅拌楼 66到达搅拌楼
print(f"[数据回调] 数值: 0x{data:02X} | 十进制: {data:3d} | 二进制: {binary}")
self.plc_data=data
@classmethod
def instance_exists(cls):
"""检测实例是否存在"""
return cls._instance is not None
def shutdown(self):
"""关闭线程,清理资源"""
# 设置停止事件
self._stop_event.set()
# 唤醒线程以便它能检测到停止事件
self._new_data_available.set()
if self.plc_service:
self.plc_service.stop_polling()
# 等待线程结束
if self.callback_thread.is_alive():
self.callback_thread.join(timeout=1.0)
2025-12-28 17:20:02 +08:00
if self.feed_thread.is_alive():
self.feed_thread.join(timeout=1.0)
if self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=1.0)
2025-12-12 18:00:14 +08:00
def __del__(self):
"""析构函数,确保线程安全关闭"""
self.shutdown()
# 创建默认实例
# visual_callback_instance = VisualCallback()
# 兼容层,保持原来的函数调用方式可用
# def angle_visual_callback(current_angle, overflow_detected):
# """
# 兼容旧版本的函数调用方式
# 将调用转发到默认实例的angle_visual_callback方法
# """
# visual_callback_instance.angle_visual_callback(current_angle, overflow_detected)