From 29c459e0f2277ff7efc0df4df2cdc434828ae90c Mon Sep 17 00:00:00 2001 From: yanganjie Date: Sun, 11 Jan 2026 18:24:08 +0800 Subject: [PATCH] =?UTF-8?q?add(=E8=AE=BE=E5=A4=87=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E3=80=81=E8=AF=BB=E5=8F=96=E7=AE=A1=E7=89=87=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E3=80=81opcua=E5=AE=A2=E6=88=B7=E7=AB=AF)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/constant_config_manager.py | 132 +++++++++++++++++++++ config/constant_config.ini | 18 +++ config/monitor_config.ini | 22 ++++ service/artifact_query_thread.py | 27 +++++ service/device_monitor_thread.py | 148 ++++++++++++++++++++++++ service/opcua_ui_client.py | 185 ++++++++++++++++++++++++++++++ 6 files changed, 532 insertions(+) create mode 100644 common/constant_config_manager.py create mode 100644 config/constant_config.ini create mode 100644 config/monitor_config.ini create mode 100644 service/artifact_query_thread.py create mode 100644 service/device_monitor_thread.py create mode 100644 service/opcua_ui_client.py diff --git a/common/constant_config_manager.py b/common/constant_config_manager.py new file mode 100644 index 0000000..5f546a0 --- /dev/null +++ b/common/constant_config_manager.py @@ -0,0 +1,132 @@ +# common/config_manager.py +import os +import configparser +from PySide6.QtCore import QTimer, QMutex, Signal, QObject + +class ConfigManager(QObject): + _instance = None + _mutex = QMutex() + _initialized = False # 是否已完成初始化 + + # 方量调节控制器的参数变化 (新的最小值、新的最大值、新的初始值) + adjuster_params_changed = Signal(float, float, float) + + # 消息清理间隔变化(新时间间隔 单位:秒) + msg_clean_interval_changed = Signal(int) + + def __new__(cls): + cls._mutex.lock() + try: + if not cls._instance: + cls._instance = super().__new__(cls) + finally: + cls._mutex.unlock() + return cls._instance + + def __init__(self): + # __init__ 只执行一次 + if self._initialized: + return + super().__init__() + # 执行初始化逻辑 + self._init_config() + self._start_refresh_timer() + self._initialized = True + + def _init_config(self): + """初始化配置(加载默认值或配置文件)""" + self.config_path = "config/constant_config.ini" + + # 默认配置 + # 系统状态消息 和 预警消息相关的默认配置 + self.MSG_KEEP_DAYS = 14 # 消息保留天数 单位: 天 + self.CLEAN_INTERVAL = 7 * 24 * 3600 # 清理周期(秒) 单位: 秒 + + # 方量调节控件(ValueAdjuster)默认值 + self.ADJUSTER_MIN = 0.0 # 最小值 + self.ADJUSTER_MAX = 99.0 # 最大值 + self.ADJUSTER_INITIAL = 2.5 # 初始值 + + # 首次加载配置文件 + self._load_config() + + def _load_config(self): + """从配置文件加载参数(覆盖默认值)""" + if os.path.exists(self.config_path): + config = configparser.ConfigParser() + config.read(self.config_path, encoding='utf-8') + + # 消息相关的参数 + # 读取消息保留天数 + self.MSG_KEEP_DAYS = config.getint( + "message_clean", + "keep_days", + fallback=self.MSG_KEEP_DAYS + ) + # 读取清理周期(天→秒) + self.CLEAN_INTERVAL = config.getint( + "message_clean", + "interval_days", + fallback=self.CLEAN_INTERVAL // (24 * 3600) + ) * 24 * 3600 + self.msg_clean_interval_changed.emit(self.CLEAN_INTERVAL) + + + # 方量调节控件(ValueAdjuster)参数 + # 读取最小值(fallback为当前默认值) + min_value = config.getfloat( + "adjuster_value", + "min", + fallback=self.ADJUSTER_MIN + ) + # 读取最大值 + max_value = config.getfloat( + "adjuster_value", + "max", + fallback=self.ADJUSTER_MAX + ) + # 读取初始值 + initial_value = config.getfloat( + "adjuster_value", + "initial", + fallback=self.ADJUSTER_INITIAL + ) + # 验证配置的值的有效性 + if min_value <= max_value and min_value <= initial_value <= max_value: + self.ADJUSTER_MIN = min_value + self.ADJUSTER_MAX = max_value + self.ADJUSTER_INITIAL = initial_value + self.adjuster_params_changed.emit(self.ADJUSTER_MIN, self.ADJUSTER_MAX, self.ADJUSTER_INITIAL) + + def _start_refresh_timer(self): + """启动定时刷新, 每5分钟刷新一次配置""" + self.refresh_timer = QTimer() + self.refresh_timer.timeout.connect(self._load_config) # 定时重新加载 + self.refresh_timer.start(5 * 60 * 1000) # 5分钟(毫秒) + # 测试 30秒刷新一次 + # self.refresh_timer.start(30 * 1000) + + def manual_refresh(self): + """手动刷新配置: 供立即生效使用""" + # print("手动刷新配置...") + self._load_config() + + # 获取消息保存天数 + def get_msg_keep_days(self): + return self.MSG_KEEP_DAYS + + # 获取消息清理间隔 + def get_clean_interval(self): + return self.CLEAN_INTERVAL + + # 获取方量调节器的最小值 + def get_adjuster_min(self): + return self.ADJUSTER_MIN + + # 获取方量调节器的最大值 + def get_adjuster_max(self): + return self.ADJUSTER_MAX + + # 获取方量调节器的初始值 + def get_adjuster_initial(self): + return self.ADJUSTER_INITIAL \ No newline at end of file diff --git a/config/constant_config.ini b/config/constant_config.ini new file mode 100644 index 0000000..d356d88 --- /dev/null +++ b/config/constant_config.ini @@ -0,0 +1,18 @@ +# 界面所需的常量配置 +[message_clean] +# 消息保留天数 +keep_days = 14 +# 消息清理间隔(天) +interval_days = 10 + +[adjuster_value] +# 一位小数 +# 方量调节控件最小值 +min = 0.0 +# 方量调节控件最大值 +max = 99.0 +# 方量调节控件初始值 +initial = 2.5 + + + diff --git a/config/monitor_config.ini b/config/monitor_config.ini new file mode 100644 index 0000000..b5ce263 --- /dev/null +++ b/config/monitor_config.ini @@ -0,0 +1,22 @@ +# 系统诊断检测的设备 +# 上料斗变送器 +[上料斗] +ip = 192.168.250.63 + +# 下料斗变送器 +[下料斗] +ip = 192.168.250.66 + +# 振捣室 rfid读卡器 +[rfid1] +ip = 192.168.250.67 + +# 振捣室外 rfid读卡器 +[rfid2] +ip = 192.168.250.77 + +[PLC] +ip = 192.168.250.233 + +[本地] +ip = 127.0.0.1 diff --git a/service/artifact_query_thread.py b/service/artifact_query_thread.py new file mode 100644 index 0000000..f9a40d5 --- /dev/null +++ b/service/artifact_query_thread.py @@ -0,0 +1,27 @@ +from PySide6.QtWidgets import QWidget +from PySide6.QtCore import QThread, Signal # 只需要导入QThread和Signal即可 +from typing import List +from busisness.blls import ArtifactBll +from busisness.models import ArtifactInfoModel + +class ArtifactInfoQueryThread(QThread): + # 定义信号:子线程查询完成 + query_finished = Signal(List[ArtifactInfoModel]) + # 定义信号:发送错误信息 + query_error = Signal(str) + + def __init__(self): + super().__init__() + + def run(self): + try: + artifact_dal = ArtifactBll() + artifacts = artifact_dal.get_artifact_task() + + if artifacts: + # 查询完成,发射信号 + self.query_finished.emit(artifacts) + else: + raise ValueError("未查询到有效数据") + except Exception as e: + self.query_error.emit(f"更新管片任务数据失败: {str(e)}") \ No newline at end of file diff --git a/service/device_monitor_thread.py b/service/device_monitor_thread.py new file mode 100644 index 0000000..f42ba46 --- /dev/null +++ b/service/device_monitor_thread.py @@ -0,0 +1,148 @@ +import configparser +from typing import Dict, Union +from icmplib import ping, ICMPLibError +from PySide6.QtCore import QThread, Signal + +class DeviceMonitorThread(QThread): + connect_success = Signal(str, int) # 成功信号:设备名称(str) + 延迟毫秒数(int) + connect_failed = Signal(str) # 失败信号:设备名称(str) + state_result = Signal(str, int) # 全局状态(str: normal/warning/error) + 异常设备数量(int) + check_finished = Signal() # 本轮检测结束 + + def __init__(self, config_path = "config/monitor_config.ini", parent=None): + super().__init__(parent) + # 初始化你的原有配置参数,完全不变 + self.config_path = config_path + self.ping_timeout_ms = 2000 + self.ping_count = 2 + self.check_interval = 10 # 默认检测间隔10秒 + self.warning_delay = 30 # 默认的警告的网络延迟(单位: ms) + self.is_stop = False # 线程停止标志位 + self.force_check = False # 立即检测标志 + + def _ping_device(self, ip: str, timeout_ms: int = 2000) -> Union[int, None]: + """设备连接状态检测""" + try: + response = ping( + ip, + count=self.ping_count, + timeout=timeout_ms / 1000, + privileged=False, + interval=0.1 + ) + if response.is_alive: + return int(response.avg_rtt) + return None + except ICMPLibError as e: + print(f"IP[{ip}] ICMP异常: {str(e)}") + return None + except Exception as e: + print(f"IP[{ip}] 未知异常: {str(e)}") + return None + + def _read_device_config(self) -> Dict[str, str]: + """读取ini配置文件""" + device_dict = {} + config = configparser.ConfigParser() + try: + config.read(self.config_path, encoding="utf-8") + for section in config.sections(): + if "ip" in config[section]: + device_ip = config[section]["ip"].strip() + device_dict[section] = device_ip + except FileNotFoundError: + print(f"配置文件[{self.config_path}]不存在,请检查路径!") + except Exception as e: + print(f"读取配置文件失败: {str(e)}") + return device_dict + + def run(self) -> None: + """网络设备检测""" + print("✅ 设备检测线程已启动") + self.is_stop = False # 重置停止标志位 + while True: + # 线程退出 + if self.is_stop: + break + + # 批量检测所有设备 + device_dict = self._read_device_config() + check_result = {} # 所有设备的检测结果 + if not device_dict: + print("设备检测线程: 未读取到任何设备配置!") + else: + for device_name, device_ip in device_dict.items(): + if self.is_stop: + break + delay_ms = self._ping_device(device_ip, self.ping_timeout_ms) + if delay_ms is not None: + self.connect_success.emit(device_name, delay_ms) # 发送成功信号 + else: + self.connect_failed.emit(device_name) # 发送失败信号 + check_result[device_name] = delay_ms + + # 本轮检测完成 + self.check_finished.emit() + + # 设备状态统计 + self._calc_device_state(check_result) + + + # 等待指定间隔后,继续下一次检测 + # self.msleep(self.check_interval * 1000) + sleep_total_ms = self.check_interval * 1000 + sleep_slice_ms = 200 # 每次休眠200ms + sleep_count = int(sleep_total_ms / sleep_slice_ms) + for _ in range(sleep_count): + if self.is_stop: # 每次休眠后检测退出标志 + break + if self.force_check: + self.force_check = False + break + self.msleep(sleep_slice_ms) + + # ============ 设备状态统计 ============ + def _calc_device_state(self, check_result:dict): + offline_count = 0 # 离线设备数 + delay_warn_count = 0# 延迟超30ms的设备数 + # 遍历结果统计 + for delay in check_result.values(): + if delay is None: + offline_count += 1 + elif delay >= self.warning_delay: + delay_warn_count += 1 + # 按优先级判定全局状态 + if offline_count > 0: + # 设备异常 → error + 离线(异常)数量 + self.state_result.emit("error", offline_count) + elif delay_warn_count > 0: + # 全部在线但有延迟超标 → warning + 延迟超标数量 + self.state_result.emit("warning", delay_warn_count) + else: + # 全部在线且延迟都<30ms → normal + 0 + self.state_result.emit("normal", 0) + + # ============ 修改检测间隔 ============ + def set_check_interval(self, interval: int): + """ + 修改检测时间间隔 + :param interval: 间隔秒数(int) + """ + if isinstance(interval, int) and interval >= 3: + self.check_interval = interval + + # ============ 修改警告延迟 ============ + def set_warning_delay(self, delay:int): + if isinstance(delay, int) and delay >= 1: + self.warning_delay = delay + + # ============ 停止检测线程============ + def stop_thread(self): + self.is_stop = True + self.wait() + print("设备检测线程已退出") + + # ============ 立即检测============ + def force_immediate_check(self): + """马上执行新一轮设备检测""" + self.force_check = True diff --git a/service/opcua_ui_client.py b/service/opcua_ui_client.py new file mode 100644 index 0000000..b12e4d3 --- /dev/null +++ b/service/opcua_ui_client.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +from PySide6.QtCore import QObject, Signal +from opcua import Client, ua +import time +from datetime import datetime + +class OpcuaUiSignal(QObject): + # 定义值变化信号:参数为(node_id, var_name, new_value) + value_changed = Signal(str, str, object) + +# Opcua回调处理器 +class SubscriptionHandler: + def __init__(self, opc_signal:OpcuaUiSignal): + # 初始化nodeid→变量名映射表 + self.node_id_to_name = {} + self.opc_signal = opc_signal + + def datachange_notification(self, node, val, data): + """ + python-opcua标准的回调函数 + :param node: 变化的节点对象 + :param val: 节点新值 + :param data: 附加数据(包含时间戳等) + """ + try: + # 1. 解析时间戳 + # time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # try: + # utc_time = data.monitored_item.Value.SourceTimestamp + # beijing_time = utc_time + datetime.timedelta(hours=8) + # time_str = beijing_time.strftime('%Y-%m-%d %H:%M:%S') + # except: + # pass + + # 2. 获取nodeid并解析变量名 + node_id = node.nodeid.to_string() + # 从映射表获取可读变量名 + var_name = self.node_id_to_name.get(node_id) + + # 3. 打印变化通知 + # print(f"\n 节点值发生变化!") + # print(f" 节点ID: {node_id}") + # print(f" 变量名称: {var_name}") + # print(f" 时间: {time_str}") + # print(f" 新值: {val}") + + self.opc_signal.value_changed.emit(node_id, var_name, val) + + except Exception as e: + print(f"解析值变化事件失败: {e}") + import traceback + traceback.print_exc() + +class OpcuaUiClient: + def __init__(self, server_url="opc.tcp://localhost:4840/zjsh_feed/server/"): + """初始化 OPC UA 客户端""" + self.client = Client(server_url) + + self.connected = False + self.subscription = None + self.monitored_items = [] + + # 创建Qt信号对象(用于跨线程传递数据) + self.opc_signal = OpcuaUiSignal() + self.handler = SubscriptionHandler(self.opc_signal) # 回调处理器 + # 定义需要监控的变量路径(object_name + var_name) + # 格式:(变量可读名称, [object路径, 变量路径]) + self.target_var_paths = [ + ("upper_weight", ["2:upper", "2:upper_weight"]), + ("lower_weight", ["2:lower", "2:lower_weight"]) + ] + self.node_id_mapping = {} # 存储nodeid→变量名的映射表 + + def connect(self): + """连接到服务器""" + try: + self.client.connect() + self.connected = True + print(f"成功连接到 OPC UA 服务器: {self.client.server_url}") + return True + except Exception as e: + print(f"连接服务器失败: {e}") + return False + + def disconnect(self): + """断开连接(包含取消订阅)""" + if self.subscription: + try: + self.subscription.delete() + print("已取消节点订阅") + except: + pass + if self.connected: + self.client.disconnect() + self.connected = False + print("已断开与 OPC UA 服务器的连接") + + def build_node_id_mapping(self): + """ + 根据object_name+var_name路径获取nodeid,建立映射表 + """ + if not self.connected: + print("请先连接到服务器") + return False + + try: + print("\n 开始构建nodeid映射表...") + objects_node = self.client.get_objects_node() # 获取根Objects节点 + + for var_name, path_list in self.target_var_paths: + # 根据层级路径找到目标节点 + target_node = objects_node.get_child(path_list) + # 提取nodeid字符串 + node_id = target_node.nodeid.to_string() + # 存入映射表 + self.node_id_mapping[node_id] = var_name + print(f"映射成功: {node_id} → {var_name}") + + # 将映射表传给回调处理器 + self.handler.node_id_to_name = self.node_id_mapping + return True + except Exception as e: + print(f"构建映射表失败: {e}") + import traceback + traceback.print_exc() + return False + + def create_multi_subscription(self, interval=500): + """订阅多个变量(基于映射表的nodeid)""" + if not self.connected: + print("请先连接到服务器") + return + + # 先构建映射表,失败则直接返回 + if not self.node_id_mapping: + if not self.build_node_id_mapping(): + return + + try: + interval = int(interval) + # 1. 创建订阅 + self.subscription = self.client.create_subscription(interval, self.handler) + print(f"\n订阅创建成功(间隔:{interval}ms)") + + # 2. 遍历映射表,为每个nodeid创建监控项 + for node_id, var_name in self.node_id_mapping.items(): + var_node = self.client.get_node(node_id) + monitored_item = self.subscription.subscribe_data_change(var_node) + self.monitored_items.append(monitored_item) + print(f"已订阅变量: {var_name} (nodeid: {node_id})") + + except Exception as e: + print(f"创建批量订阅失败: {e}") + import traceback + traceback.print_exc() + + def write_value_by_name(self, var_readable_name, value): + """ + 根据变量可读名称写入值(主要用于修改方量, 类型为 Double类型) + :param var_readable_name: 变量可读名称(如"upper_weight") + :param value: 要写入的值 + """ + if not self.connected: + print("请先连接到服务器") + return + + # 反向查找:通过变量名找nodeid + target_node_id = None + for node_id, name in self.node_id_mapping.items(): + if name == var_readable_name: + target_node_id = node_id + break + + if not target_node_id: + print(f"未找到变量名 {var_readable_name} 对应的nodeid") + return + + try: + target_node = self.client.get_node(target_node_id) + # 明确指定值类型为Double,避免类型错误 + variant = ua.Variant(float(value), ua.VariantType.Double) + target_node.set_value(variant) + except Exception as e: + print(f"写入值失败: {e}") +