Files
zjsh_weight_system_ui/opcua_client.py

268 lines
11 KiB
Python
Raw Permalink Normal View History

2025-11-18 11:22:04 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/11/17 18:51
# @Author : reenrr
# @File : opcua_client_test1111.py
'''
from PySide6.QtCore import QTimer, Slot, Signal, QObject, QThread, Qt
from opcua import Client, ua
from datetime import datetime
# -----------
# 参数配置
# -----------
OPC_SERVER_URL = "opc.tcp://localhost:4841/zjsh_feed/server/"
2025-11-18 11:22:04 +08:00
RECONNECT_INTERVAL = 2000 # 重连间隔(毫秒)
DATA_READ_INTERVAL = 2000 # 数据读取间隔(毫秒,与服务端更新频率一致)
class OpcuaClient(QObject):
# 客户端的信号接口
weight_updated = Signal(dict) # 重量更新信号(传递 {"hopper_up_weight": 值, "hopper_down_weight": 值}
connection_status_changed = Signal(bool) # 连接状态更新信号True=连接False=断开)
upper_weight_error = Signal(bool) # 上料斗重量异常信号True=异常False=正常)
down_weight_error = Signal(bool) # 下料斗重量异常信号True=异常False=正常)
def __init__(self, parent=None):
super().__init__(parent)
self.is_running = False # 系统运行状态标记
self.latest_weight_data = {"hopper_up_weight": 0, "hopper_down_weight": 0} # 缓存最新重量数据
# ---------------
# OPC UA 客户端核心配置
# ---------------
self.opc_client = None # OPC UA 客户端实例(延迟初始化)
self.opc_server_url = OPC_SERVER_URL
self.upper_weight_node = None # 上料斗重量节点对象
self.lower_weight_node = None # 下料斗重量节点对象
self.is_opc_connected = False # OPC UA 连接状态标记
self.has_connected_once = False # 至少连接成功一次标记
self.reconnect_count = 0 # 重连次数计数器
# 重连定时器(连接断开时触发)
self.reconnect_timer = QTimer(self)
self.reconnect_timer.setInterval(RECONNECT_INTERVAL)
self.reconnect_timer.timeout.connect(self.reconnect_to_server)
# 数据读取定时器(连接成功后触发,定时读取重量数据)
self.data_read_timer = QTimer(self)
self.data_read_timer.setInterval(DATA_READ_INTERVAL)
self.data_read_timer.timeout.connect(self._read_weight_data)
# 初始化 OPC UA 客户端
self._init_opc_client()
# ---------------------
# OPC UA 初始化相关函数
# ---------------------
def _init_opc_client(self):
"""初始化 OPC UA 客户端实例"""
try:
self.opc_client = Client(self.opc_server_url)
print("OPC UA 客户端初始化成功")
except Exception as e:
print(f"OPC UA 客户端初始化失败:{e}")
self.opc_client = None
def start_connect(self):
"""启动连接(供外部调用,主动发起连接请求)"""
self._connect_to_server()
# ---------------------
# 连接管理相关函数
# ---------------------
def reconnect_to_server(self):
"""重连执行函数(与原 TCP 客户端一致)"""
if not self.is_opc_connected:
self.reconnect_count += 1
# 日志优化每10次重连提示一次避免日志刷屏
if self.reconnect_count % 10 == 0:
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
else:
print(f"{self.reconnect_count}次重连请求:{self.opc_server_url}")
self._connect_to_server()
def _connect_to_server(self):
"""主动发起 OPC UA 连接(核心连接逻辑)"""
if self.is_opc_connected or not self.opc_client:
return
try:
# 1. 连接到 OPC UA 服务器
self.opc_client.connect()
print(f"OPC UA 连接成功:{self.opc_server_url}")
# 2. 获取重量节点对象(仅连接成功时获取一次)
objects = self.opc_client.get_objects_node() # 根节点
self.upper_weight_node = objects.get_child(["2:upper", "2:upper_weight"])
objects = self.opc_client.get_objects_node() # 根节点
self.lower_weight_node = objects.get_child(["2:lower", "2:lower_weight"])
2025-11-18 11:22:04 +08:00
# 3. 验证节点是否有效
self._verify_nodes()
# 4. 更新连接状态
self.is_opc_connected = True
self.has_connected_once = True
self.reconnect_count = 0
self.is_running = True
# 5. 发送信号(连接成功+错误状态重置)
self.connection_status_changed.emit(True)
self.upper_weight_error.emit(False)
self.down_weight_error.emit(False)
# 6. 启动数据读取定时器,停止重连定时器
self.data_read_timer.start()
self.reconnect_timer.stop()
# 7. 立即读取一次初始数据
self._read_weight_data()
except Exception as e:
print(f"OPC UA 连接失败:{e}")
self._handle_connect_failed()
def _reconnect_to_server(self):
"""重连执行函数"""
if self.is_opc_connected:
return
self.reconnect_count += 1
# 日志优化每10次重连提示一次避免日志刷屏
if self.reconnect_count % 10 == 0:
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
else:
print(f"{self.reconnect_count}次重连请求:{self.opc_server_url}")
self._connect_to_server()
def _disconnect_from_server(self):
"""主动断开 OPC UA 连接"""
if self.opc_client and self.is_opc_connected:
try:
self.opc_client.disconnect()
print(f"OPC UA 连接断开:{self.opc_server_url}")
except Exception as e:
print(f"OPC UA 断开连接失败:{e}")
# 更新状态
self.is_opc_connected = False
self.is_running = False
self.upper_weight_node = None
self.lower_weight_node = None
# 停止数据读取定时器
self.data_read_timer.stop()
def _handle_connect_failed(self):
"""处理连接失败逻辑"""
self.is_opc_connected = False
self.is_running = False
# 发送连接断开信号
self.connection_status_changed.emit(False)
# 发送重量清零信号
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
# 标记重量异常
self.upper_weight_error.emit(True)
self.down_weight_error.emit(True)
# 启动重连定时器(首次连接失败或重连失败时)
if not self.reconnect_timer.isActive():
print(f"将在{RECONNECT_INTERVAL / 1000}秒后启动重连...")
self.reconnect_timer.start()
def _verify_nodes(self):
"""验证重量节点是否有效避免节点ID错误"""
try:
# 读取节点的基本信息,验证节点存在
upper_node_id = self.upper_weight_node.nodeid
lower_node_id = self.lower_weight_node.nodeid
print(f"节点验证成功:上料斗节点={upper_node_id}, 下料斗节点={lower_node_id}")
except Exception as e:
raise RuntimeError(f"节点验证失败节点ID可能错误{e}")
# ---------------------
# 数据读取与处理相关函数
# ---------------------
@Slot()
def _read_weight_data(self):
"""定时读取重量数据(核心业务逻辑)"""
if not self.is_opc_connected or not self.upper_weight_node or not self.lower_weight_node:
return
try:
# 读取 OPC UA 节点的重量值
upper_weight = self.upper_weight_node.get_value()
lower_weight = self.lower_weight_node.get_value()
# 数据有效性校验(过滤异常值)
upper_weight = round(float(upper_weight), 2) if self._is_valid_weight(upper_weight) else "error"
lower_weight = round(float(lower_weight), 2) if self._is_valid_weight(lower_weight) else "error"
# 构建重量数据字典(与原 TCP 客户端格式一致)
weight_data = {
"hopper_up_weight": upper_weight,
"hopper_down_weight": lower_weight
}
self.latest_weight_data = weight_data
# 错误状态判断(完全复用原 TCP 客户端逻辑)
if upper_weight == "error" and lower_weight == "error":
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
self.upper_weight_error.emit(True)
self.down_weight_error.emit(True)
elif upper_weight == "error":
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": lower_weight})
self.upper_weight_error.emit(True)
self.down_weight_error.emit(False)
elif lower_weight == "error":
self.weight_updated.emit({"hopper_up_weight": upper_weight, "hopper_down_weight": ""})
self.upper_weight_error.emit(False)
self.down_weight_error.emit(True)
else:
self.weight_updated.emit(weight_data)
self.upper_weight_error.emit(False)
self.down_weight_error.emit(False)
print(f"OPC UA 数据读取成功:上料斗={upper_weight} kg, 下料斗={lower_weight} kg")
except Exception as e:
print(f"OPC UA 数据读取失败:{e}")
# 读取失败时标记为异常
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
self.upper_weight_error.emit(True)
self.down_weight_error.emit(True)
# 读取失败可能是连接异常,触发重连
self._disconnect_from_server()
if not self.reconnect_timer.isActive():
self.reconnect_timer.start()
def _is_valid_weight(self, weight_value):
"""验证重量值是否有效"""
try:
weight = float(weight_value)
# 结合业务场景重量值应大于0模拟传感器有效范围
return weight >= 0
except (ValueError, TypeError):
return False
# ---------------------
# 外部接口函数(与原 TCP 客户端一致)
# ---------------------
def stop(self):
"""停止 OPC UA 客户端(断开连接、停止定时器)"""
self.reconnect_timer.stop()
self.data_read_timer.stop()
self._disconnect_from_server()
self.is_running = False
print("OPC UA 客户端已停止")
def get_current_time(self):
"""获取格式化的当前时间(保持原接口)"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')