Files
Feeding_control_system/controller/main_controller.py

292 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PySide6.QtCore import QTimer, Signal, QObject, Qt
from PySide6.QtWidgets import QApplication # 用于获取主线程
import threading
from view.main_window import MainWindow
from .camera_controller import CameraController
from .bottom_control_controller import BottomControlController
from .hopper_controller import HopperController
from .hopper_controller import UpperHopperPosition
from common.constant_config_manager import ConfigManager
from service.msg_recorder import MessageRecorder
from service.opcua_ui_client import OpcuaUiClient
from service.artifact_query_thread import ArtifactInfoQueryThread # 管片任务查询
from service.pdrecord_query_thread import PDRecordQueryThread # 派单任务查询
from busisness.models import ArtifactInfoModel
from busisness.models import PDRecordModel
from typing import List
class MainController:
def __init__(self):
# 主界面
self.main_window = MainWindow()
self.msg_recorder = MessageRecorder()
self.msg_recorder.normal_record("开始自动智能浇筑系统") # 记录系统状态消息
# 初始化子界面和控制器
self._initSubViews()
self._initSubControllers()
# 加载配置管理器
self.config_manager = ConfigManager()
# 启动消息数据库(messages.db)定时清理任务
self.start_msg_database_clean_task()
self.MSG_CLEAN_INTERVAL = None
# opcua客户端
self.opc_client = OpcuaUiClient()
self.opc_client.start()
# 连接信号
self.__connectSignals()
def showMainWindow(self):
self.main_window.showFullScreen()
# self.main_window.show()
def _initSubControllers(self):
# 右侧视频显示控制模块
self.camera_controller = CameraController(
video_view=self.main_window.vibration_video
)
# 底部按钮控制模块
self.bottom_control_controller = BottomControlController(
bottom_control_widget=self.main_window.bottom_control_widget,
main_window=self.main_window
)
# 料斗控制模块(包括 夹爪开合、拱等按钮)
self.hopper_controller = HopperController(
hopper_view = self.main_window.hopper_widget,
conveyor_view = self.main_window.conveyor_system_widget
)
def _initSubViews(self):
pass
def __connectSignals(self):
self.main_window.about_to_close.connect(self.handleMainWindowClose) # 处理主界面关闭
self.config_manager.msg_clean_interval_changed.connect(self.onMsgDbCleanIntervalChanged) # 消息清理间隔改变
self.opc_client.opc_signal.value_changed.connect(self._onOpcValueChanged, Qt.QueuedConnection) # opcua服务器值改变
self.opc_client.opc_signal.opc_log.connect(self.msg_recorder.normal_record, Qt.QueuedConnection) # opcua客户端日志
# 主界面的计划表单自动派单控制
self.main_window.plan_table_widget.auto_dispatch_signal.connect(self.handlePlanTableAutoDispatch) # 计划表单的自动派单切换
# 主界面的计划表单中的计划方量修改控制
self.main_window.plan_volume_modified_signal.connect(self.handleVolumeModified) # 计划方量修改
def handleMainWindowClose(self):
"""主界面关闭"""
self.msg_recorder.normal_record("关闭自动智能浇筑系统")
# 停止系统底部控制器中的线程
if hasattr(self, 'bottom_control_controller'):
self.bottom_control_controller.stop_threads()
# 停止opc客户端
if hasattr(self, 'opc_client'):
self.opc_client.stop_run()
def handlePlanTableAutoDispatch(self, auto_status:bool):
"""处理计划表单的 自动派单和手动派单的切换
pd_mode: 1,自动派单 2,手动派单
"""
if auto_status: # 自动派单
self.opc_client.write_value_by_name("pd_mode", 1)
else: # 手动派单
self.opc_client.write_value_by_name("pd_mode", 2)
def handleVolumeModified(self, volume_json_str:str):
"""处理 修改方量 (计划表单中 和 派单详情中)"""
self.opc_client.write_value_by_name("pd_plan_volume", volume_json_str)
def start_msg_database_clean_task(self):
"""启动清理消息数据库(messages.db)中过期消息的定时任务"""
from PySide6.QtCore import QTimer, QDateTime, QDate, QTime
self.MSG_CLEAN_INTERVAL = self.config_manager.get_clean_interval() # 获取消息清理间隔 (单位: 秒)
def clean_task():
from common.msg_db_helper import DBHelper
DBHelper().clean_expired_messages(days_to_keep=self.config_manager.get_msg_keep_days())
def start_weekly_clean_at_midnight():
# 1. 计算当前时间到下一个凌晨0点的毫秒数
now = QDateTime.currentDateTime()
tomorrow = QDate.currentDate().addDays(1) # 明天
next_midnight = QDateTime(tomorrow, QTime(0, 0, 0)) # 明天00:00:00
msecs_to_midnight = now.msecsTo(next_midnight) # 距离下一个凌晨的毫秒数
# 2. 首次触发:到零点后执行一次清理
clean_timer = QTimer()
clean_timer.timeout.connect(clean_task)
single_shot_timer = QTimer()
single_shot_timer.setSingleShot(True)
def first_clean():
clean_task() # 执行首次清理
# 之后每隔CLEAN_INTERVAL触发一次消息清理
clean_timer.start(self.MSG_CLEAN_INTERVAL)
# 启动一次性定时器, 进行首次清理
single_shot_timer.timeout.connect(first_clean)
single_shot_timer.start(msecs_to_midnight)
return clean_timer, single_shot_timer
# 启动定时器,并保存引用
self.msg_db_clean_timer, self.single_shot_timer = start_weekly_clean_at_midnight()
def onMsgDbCleanIntervalChanged(self, new_interval):
"""当消息数据清理间隔变化时,更新定时器"""
if self.MSG_CLEAN_INTERVAL == new_interval:
return # 清理间隔未变
if hasattr(self, 'single_shot_timer') and self.single_shot_timer is not None:
self.single_shot_timer.stop()
self.single_shot_timer.deleteLater()
self.single_shot_timer = None
if hasattr(self, 'msg_db_clean_timer') and self.msg_db_clean_timer is not None:
self.msg_db_clean_timer.stop() # 停止周期性触发消息清理
self.msg_db_clean_timer.deleteLater()
self.msg_db_clean_timer = None
# 用新间隔重新启动清理任务
self.start_msg_database_clean_task()
def _onOpcValueChanged(self, node_id, var_name, new_value):
"""
OPCUA值变化时的UI更新函数
"""
try:
update_method = getattr(self, f"_update_{var_name}", None)
if update_method:
update_method(new_value)
except Exception as e:
print(f"_onOpcValueChanged: 界面更新失败: {e}")
import traceback
traceback.print_exc()
def onUpdateUiByArtifactInfo(self, artifact_list:List[ArtifactInfoModel]):
# 更新管片任务
self.main_window.update_segment_tasks(artifact_list)
# 将opc服务中的 segment_tasks的值复原为 0以便下次触发管片更新
self.opc_client.write_value_by_name("segment_tasks", 0)
def onUpdateUiByPDRecord(self, pdrecord_list:List[PDRecordModel]):
# 更新派单任务
self.main_window.update_dispatch_tasks(pdrecord_list)
# 将opc服务中的 dispatch_tasks的值复原为 0以便下次触发派单更新
self.opc_client.write_value_by_name("dispatch_tasks", 0)
# ======================== OPCUA值更新界面方法 ======================
def _update_upper_weight(self, val):
# 更新上料斗重量
self.hopper_controller.onUpdateUpperHopperWeight(val)
def _update_lower_weight(self, val):
# 更新下料斗重量
self.hopper_controller.onUpdateLowerHopperWeight(val)
def _update_upper_volume(self, val):
# 更新上料斗方量
self.hopper_controller.onUpdateUpperHopperVolume(val)
def _update_production_progress(self, val):
# 更新生产进度限制为100, 进度为去掉百分号之后的整数
progress = val
self.main_window.arc_progress.setProgress(progress)
self.main_window.production_progress.setProgress(progress)
def _update_lower_clamp_angle(self, val):
# 更新下料斗夹爪角度
self.hopper_controller.onUpdateLowerClampAngle(val)
def _update_upper_clamp_status(self, val):
# 更新上料斗夹爪状态 0表示关闭 1表示打开
self.hopper_controller.onUpdateUpperClampStatus(val)
def _update_upper_hopper_position(self, val):
# 更新上料斗位置 5表示料斗到位到达振捣室处 66表示在搅拌楼处
self.hopper_controller.onUpdateUpperHopperPosition(val)
if val == UpperHopperPosition.MIXING_TOWER.value:
self.main_window.mixer_widget.startBladeMix()
else:
self.main_window.mixer_widget.stopBladeMix()
def _update_segment_tasks(self, val):
if val: # 需要更新管片任务
"""更新左侧的管片任务"""
if hasattr(self, "segment_query_thread") and self.segment_query_thread.isRunning():
return
# 1. 管片信息查询线程
self.segment_query_thread = ArtifactInfoQueryThread()
# 2. 主线程更新管片任务UI
self.segment_query_thread.query_finished.connect(self.onUpdateUiByArtifactInfo)
# 3. 查询管片信息错误
self.segment_query_thread.query_error.connect(self.onQueryInfoError)
self.segment_query_thread.start()
def _update_dispatch_tasks(self, val):
if val: # 需要更新派单任务
"""更新右侧的派单任务"""
if hasattr(self, "dispatch_query_thread") and self.dispatch_query_thread.isRunning():
return
# 1. 派单信息查询线程
self.dispatch_query_thread = PDRecordQueryThread()
# 2. 主线程更新派单任务UI
self.dispatch_query_thread.query_finished.connect(self.onUpdateUiByPDRecord)
# 3. 查询派单信息错误
self.dispatch_query_thread.query_error.connect(self.onQueryInfoError)
self.dispatch_query_thread.start()
def onQueryInfoError(self, error_msg:str):
# 查询信息失败预警
self.msg_recorder.warning_record(error_msg)
def _update_vibration_frequency(self, val):
# 更新振捣频率
self.main_window.frequency_button_group.set_selected_frequency(val) # 频率选择按钮上显示的选中的频率
self.main_window.arc_progress.setFrequency(val) # 模具车上显示的振捣频率
def _update_mould_vibrate_status(self, vibrate_status:bool):
# 更新模具车上显示的振捣状态
# vibrate_status: False:未振捣、True:振捣中
if vibrate_status:
self.main_window.arc_progress.setState("振捣中")
else:
self.main_window.arc_progress.setState("未振捣")
self.opc_client.write_value_by_name("vibration_frequency", 0) # 将振捣频率设置为0
def _update_mould_finish_weight(self, finish_weight:int):
# 更新模具车中的下料重量
# finish_weight已下料重量
self.main_window.arc_progress.setWeight(finish_weight)
def _update_pd_mode(self, mode:int):
# 更新计划表单中的 派单模式(主界面下发状态的下面的切换开关),自动派单/手动派单
mode_mapping = {
1: "自动派单",
2: "手动派单",
0: None # 未知
}
mode_text = mode_mapping.get(mode)
if mode_text == "自动派单":
# 设置主界面下发状态的下面的切换开关
self.main_window.plan_table_widget.set_auto_dispatch(True) # 开启,自动派单
# 修改系统配置文件中的 派单状态为自动派单...
else:
self.main_window.plan_table_widget.set_auto_dispatch(False) # 关闭,手动派单
# 修改系统配置文件中的 派单状态为手动派单...