#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/11/17 18:51 # @Author : reenrr # @File : opcua_client_test1111.py ''' from PySide6.QtCore import QTimer, Slot, Signal, QObject, QThread, Qt from opcua import Client, ua from datetime import datetime # ----------- # 参数配置 # ----------- OPC_SERVER_URL = "opc.tcp://localhost:4841/zjsh_feed/server/" RECONNECT_INTERVAL = 2000 # 重连间隔(毫秒) DATA_READ_INTERVAL = 2000 # 数据读取间隔(毫秒,与服务端更新频率一致) class OpcuaClient(QObject): # 客户端的信号接口 weight_updated = Signal(dict) # 重量更新信号(传递 {"hopper_up_weight": 值, "hopper_down_weight": 值}) connection_status_changed = Signal(bool) # 连接状态更新信号(True=连接,False=断开) upper_weight_error = Signal(bool) # 上料斗重量异常信号(True=异常,False=正常) down_weight_error = Signal(bool) # 下料斗重量异常信号(True=异常,False=正常) def __init__(self, parent=None): super().__init__(parent) self.is_running = False # 系统运行状态标记 self.latest_weight_data = {"hopper_up_weight": 0, "hopper_down_weight": 0} # 缓存最新重量数据 # --------------- # OPC UA 客户端核心配置 # --------------- self.opc_client = None # OPC UA 客户端实例(延迟初始化) self.opc_server_url = OPC_SERVER_URL self.upper_weight_node = None # 上料斗重量节点对象 self.lower_weight_node = None # 下料斗重量节点对象 self.is_opc_connected = False # OPC UA 连接状态标记 self.has_connected_once = False # 至少连接成功一次标记 self.reconnect_count = 0 # 重连次数计数器 # 重连定时器(连接断开时触发) self.reconnect_timer = QTimer(self) self.reconnect_timer.setInterval(RECONNECT_INTERVAL) self.reconnect_timer.timeout.connect(self.reconnect_to_server) # 数据读取定时器(连接成功后触发,定时读取重量数据) self.data_read_timer = QTimer(self) self.data_read_timer.setInterval(DATA_READ_INTERVAL) self.data_read_timer.timeout.connect(self._read_weight_data) # 初始化 OPC UA 客户端 self._init_opc_client() # --------------------- # OPC UA 初始化相关函数 # --------------------- def _init_opc_client(self): """初始化 OPC UA 客户端实例""" try: self.opc_client = Client(self.opc_server_url) print("OPC UA 客户端初始化成功") except Exception as e: print(f"OPC UA 客户端初始化失败:{e}") self.opc_client = None def start_connect(self): """启动连接(供外部调用,主动发起连接请求)""" self._connect_to_server() # --------------------- # 连接管理相关函数 # --------------------- def reconnect_to_server(self): """重连执行函数(与原 TCP 客户端一致)""" if not self.is_opc_connected: self.reconnect_count += 1 # 日志优化:每10次重连提示一次,避免日志刷屏 if self.reconnect_count % 10 == 0: print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...") else: print(f"第{self.reconnect_count}次重连请求:{self.opc_server_url}") self._connect_to_server() def _connect_to_server(self): """主动发起 OPC UA 连接(核心连接逻辑)""" if self.is_opc_connected or not self.opc_client: return try: # 1. 连接到 OPC UA 服务器 self.opc_client.connect() print(f"OPC UA 连接成功:{self.opc_server_url}") # 2. 获取重量节点对象(仅连接成功时获取一次) objects = self.opc_client.get_objects_node() # 根节点 self.upper_weight_node = objects.get_child(["2:upper", "2:upper_weight"]) objects = self.opc_client.get_objects_node() # 根节点 self.lower_weight_node = objects.get_child(["2:lower", "2:lower_weight"]) # 3. 验证节点是否有效 self._verify_nodes() # 4. 更新连接状态 self.is_opc_connected = True self.has_connected_once = True self.reconnect_count = 0 self.is_running = True # 5. 发送信号(连接成功+错误状态重置) self.connection_status_changed.emit(True) self.upper_weight_error.emit(False) self.down_weight_error.emit(False) # 6. 启动数据读取定时器,停止重连定时器 self.data_read_timer.start() self.reconnect_timer.stop() # 7. 立即读取一次初始数据 self._read_weight_data() except Exception as e: print(f"OPC UA 连接失败:{e}") self._handle_connect_failed() def _reconnect_to_server(self): """重连执行函数""" if self.is_opc_connected: return self.reconnect_count += 1 # 日志优化:每10次重连提示一次,避免日志刷屏 if self.reconnect_count % 10 == 0: print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...") else: print(f"第{self.reconnect_count}次重连请求:{self.opc_server_url}") self._connect_to_server() def _disconnect_from_server(self): """主动断开 OPC UA 连接""" if self.opc_client and self.is_opc_connected: try: self.opc_client.disconnect() print(f"OPC UA 连接断开:{self.opc_server_url}") except Exception as e: print(f"OPC UA 断开连接失败:{e}") # 更新状态 self.is_opc_connected = False self.is_running = False self.upper_weight_node = None self.lower_weight_node = None # 停止数据读取定时器 self.data_read_timer.stop() def _handle_connect_failed(self): """处理连接失败逻辑""" self.is_opc_connected = False self.is_running = False # 发送连接断开信号 self.connection_status_changed.emit(False) # 发送重量清零信号 self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""}) # 标记重量异常 self.upper_weight_error.emit(True) self.down_weight_error.emit(True) # 启动重连定时器(首次连接失败或重连失败时) if not self.reconnect_timer.isActive(): print(f"将在{RECONNECT_INTERVAL / 1000}秒后启动重连...") self.reconnect_timer.start() def _verify_nodes(self): """验证重量节点是否有效(避免节点ID错误)""" try: # 读取节点的基本信息,验证节点存在 upper_node_id = self.upper_weight_node.nodeid lower_node_id = self.lower_weight_node.nodeid print(f"节点验证成功:上料斗节点={upper_node_id}, 下料斗节点={lower_node_id}") except Exception as e: raise RuntimeError(f"节点验证失败(节点ID可能错误):{e}") # --------------------- # 数据读取与处理相关函数 # --------------------- @Slot() def _read_weight_data(self): """定时读取重量数据(核心业务逻辑)""" if not self.is_opc_connected or not self.upper_weight_node or not self.lower_weight_node: return try: # 读取 OPC UA 节点的重量值 upper_weight = self.upper_weight_node.get_value() lower_weight = self.lower_weight_node.get_value() # 数据有效性校验(过滤异常值) upper_weight = round(float(upper_weight), 2) if self._is_valid_weight(upper_weight) else "error" lower_weight = round(float(lower_weight), 2) if self._is_valid_weight(lower_weight) else "error" # 构建重量数据字典(与原 TCP 客户端格式一致) weight_data = { "hopper_up_weight": upper_weight, "hopper_down_weight": lower_weight } self.latest_weight_data = weight_data # 错误状态判断(完全复用原 TCP 客户端逻辑) if upper_weight == "error" and lower_weight == "error": self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""}) self.upper_weight_error.emit(True) self.down_weight_error.emit(True) elif upper_weight == "error": self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": lower_weight}) self.upper_weight_error.emit(True) self.down_weight_error.emit(False) elif lower_weight == "error": self.weight_updated.emit({"hopper_up_weight": upper_weight, "hopper_down_weight": ""}) self.upper_weight_error.emit(False) self.down_weight_error.emit(True) else: self.weight_updated.emit(weight_data) self.upper_weight_error.emit(False) self.down_weight_error.emit(False) print(f"OPC UA 数据读取成功:上料斗={upper_weight} kg, 下料斗={lower_weight} kg") except Exception as e: print(f"OPC UA 数据读取失败:{e}") # 读取失败时标记为异常 self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""}) self.upper_weight_error.emit(True) self.down_weight_error.emit(True) # 读取失败可能是连接异常,触发重连 self._disconnect_from_server() if not self.reconnect_timer.isActive(): self.reconnect_timer.start() def _is_valid_weight(self, weight_value): """验证重量值是否有效""" try: weight = float(weight_value) # 结合业务场景:重量值应大于0(模拟传感器有效范围) return weight >= 0 except (ValueError, TypeError): return False # --------------------- # 外部接口函数(与原 TCP 客户端一致) # --------------------- def stop(self): """停止 OPC UA 客户端(断开连接、停止定时器)""" self.reconnect_timer.stop() self.data_read_timer.stop() self._disconnect_from_server() self.is_running = False print("OPC UA 客户端已停止") def get_current_time(self): """获取格式化的当前时间(保持原接口)""" return datetime.now().strftime('%Y-%m-%d %H:%M:%S')