268 lines
11 KiB
Python
268 lines
11 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
'''
|
||
# @Time : 2025/11/17 18:51
|
||
# @Author : reenrr
|
||
# @File : opcua_client_test1111.py
|
||
'''
|
||
from PySide6.QtCore import QTimer, Slot, Signal, QObject, QThread, Qt
|
||
from opcua import Client, ua
|
||
from datetime import datetime
|
||
|
||
# -----------
|
||
# 参数配置
|
||
# -----------
|
||
OPC_SERVER_URL = "opc.tcp://localhost:4841/zjsh_feed/server/"
|
||
|
||
RECONNECT_INTERVAL = 2000 # 重连间隔(毫秒)
|
||
DATA_READ_INTERVAL = 2000 # 数据读取间隔(毫秒,与服务端更新频率一致)
|
||
|
||
class OpcuaClient(QObject):
|
||
# 客户端的信号接口
|
||
weight_updated = Signal(dict) # 重量更新信号(传递 {"hopper_up_weight": 值, "hopper_down_weight": 值})
|
||
connection_status_changed = Signal(bool) # 连接状态更新信号(True=连接,False=断开)
|
||
upper_weight_error = Signal(bool) # 上料斗重量异常信号(True=异常,False=正常)
|
||
down_weight_error = Signal(bool) # 下料斗重量异常信号(True=异常,False=正常)
|
||
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
|
||
self.is_running = False # 系统运行状态标记
|
||
self.latest_weight_data = {"hopper_up_weight": 0, "hopper_down_weight": 0} # 缓存最新重量数据
|
||
|
||
# ---------------
|
||
# OPC UA 客户端核心配置
|
||
# ---------------
|
||
self.opc_client = None # OPC UA 客户端实例(延迟初始化)
|
||
self.opc_server_url = OPC_SERVER_URL
|
||
self.upper_weight_node = None # 上料斗重量节点对象
|
||
self.lower_weight_node = None # 下料斗重量节点对象
|
||
|
||
self.is_opc_connected = False # OPC UA 连接状态标记
|
||
self.has_connected_once = False # 至少连接成功一次标记
|
||
self.reconnect_count = 0 # 重连次数计数器
|
||
|
||
# 重连定时器(连接断开时触发)
|
||
self.reconnect_timer = QTimer(self)
|
||
self.reconnect_timer.setInterval(RECONNECT_INTERVAL)
|
||
self.reconnect_timer.timeout.connect(self.reconnect_to_server)
|
||
|
||
# 数据读取定时器(连接成功后触发,定时读取重量数据)
|
||
self.data_read_timer = QTimer(self)
|
||
self.data_read_timer.setInterval(DATA_READ_INTERVAL)
|
||
self.data_read_timer.timeout.connect(self._read_weight_data)
|
||
|
||
# 初始化 OPC UA 客户端
|
||
self._init_opc_client()
|
||
|
||
# ---------------------
|
||
# OPC UA 初始化相关函数
|
||
# ---------------------
|
||
def _init_opc_client(self):
|
||
"""初始化 OPC UA 客户端实例"""
|
||
try:
|
||
self.opc_client = Client(self.opc_server_url)
|
||
print("OPC UA 客户端初始化成功")
|
||
except Exception as e:
|
||
print(f"OPC UA 客户端初始化失败:{e}")
|
||
self.opc_client = None
|
||
|
||
def start_connect(self):
|
||
"""启动连接(供外部调用,主动发起连接请求)"""
|
||
self._connect_to_server()
|
||
|
||
# ---------------------
|
||
# 连接管理相关函数
|
||
# ---------------------
|
||
def reconnect_to_server(self):
|
||
"""重连执行函数(与原 TCP 客户端一致)"""
|
||
if not self.is_opc_connected:
|
||
self.reconnect_count += 1
|
||
# 日志优化:每10次重连提示一次,避免日志刷屏
|
||
if self.reconnect_count % 10 == 0:
|
||
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
|
||
else:
|
||
print(f"第{self.reconnect_count}次重连请求:{self.opc_server_url}")
|
||
self._connect_to_server()
|
||
|
||
def _connect_to_server(self):
|
||
"""主动发起 OPC UA 连接(核心连接逻辑)"""
|
||
if self.is_opc_connected or not self.opc_client:
|
||
return
|
||
|
||
try:
|
||
# 1. 连接到 OPC UA 服务器
|
||
self.opc_client.connect()
|
||
print(f"OPC UA 连接成功:{self.opc_server_url}")
|
||
|
||
# 2. 获取重量节点对象(仅连接成功时获取一次)
|
||
objects = self.opc_client.get_objects_node() # 根节点
|
||
self.upper_weight_node = objects.get_child(["2:upper", "2:upper_weight"])
|
||
|
||
objects = self.opc_client.get_objects_node() # 根节点
|
||
self.lower_weight_node = objects.get_child(["2:lower", "2:lower_weight"])
|
||
|
||
# 3. 验证节点是否有效
|
||
self._verify_nodes()
|
||
|
||
# 4. 更新连接状态
|
||
self.is_opc_connected = True
|
||
self.has_connected_once = True
|
||
self.reconnect_count = 0
|
||
self.is_running = True
|
||
|
||
# 5. 发送信号(连接成功+错误状态重置)
|
||
self.connection_status_changed.emit(True)
|
||
self.upper_weight_error.emit(False)
|
||
self.down_weight_error.emit(False)
|
||
|
||
# 6. 启动数据读取定时器,停止重连定时器
|
||
self.data_read_timer.start()
|
||
self.reconnect_timer.stop()
|
||
|
||
# 7. 立即读取一次初始数据
|
||
self._read_weight_data()
|
||
|
||
except Exception as e:
|
||
print(f"OPC UA 连接失败:{e}")
|
||
self._handle_connect_failed()
|
||
|
||
def _reconnect_to_server(self):
|
||
"""重连执行函数"""
|
||
if self.is_opc_connected:
|
||
return
|
||
|
||
self.reconnect_count += 1
|
||
# 日志优化:每10次重连提示一次,避免日志刷屏
|
||
if self.reconnect_count % 10 == 0:
|
||
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
|
||
else:
|
||
print(f"第{self.reconnect_count}次重连请求:{self.opc_server_url}")
|
||
|
||
self._connect_to_server()
|
||
|
||
def _disconnect_from_server(self):
|
||
"""主动断开 OPC UA 连接"""
|
||
if self.opc_client and self.is_opc_connected:
|
||
try:
|
||
self.opc_client.disconnect()
|
||
print(f"OPC UA 连接断开:{self.opc_server_url}")
|
||
except Exception as e:
|
||
print(f"OPC UA 断开连接失败:{e}")
|
||
|
||
# 更新状态
|
||
self.is_opc_connected = False
|
||
self.is_running = False
|
||
self.upper_weight_node = None
|
||
self.lower_weight_node = None
|
||
|
||
# 停止数据读取定时器
|
||
self.data_read_timer.stop()
|
||
|
||
def _handle_connect_failed(self):
|
||
"""处理连接失败逻辑"""
|
||
self.is_opc_connected = False
|
||
self.is_running = False
|
||
|
||
# 发送连接断开信号
|
||
self.connection_status_changed.emit(False)
|
||
# 发送重量清零信号
|
||
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
|
||
# 标记重量异常
|
||
self.upper_weight_error.emit(True)
|
||
self.down_weight_error.emit(True)
|
||
|
||
# 启动重连定时器(首次连接失败或重连失败时)
|
||
if not self.reconnect_timer.isActive():
|
||
print(f"将在{RECONNECT_INTERVAL / 1000}秒后启动重连...")
|
||
self.reconnect_timer.start()
|
||
|
||
def _verify_nodes(self):
|
||
"""验证重量节点是否有效(避免节点ID错误)"""
|
||
try:
|
||
# 读取节点的基本信息,验证节点存在
|
||
upper_node_id = self.upper_weight_node.nodeid
|
||
lower_node_id = self.lower_weight_node.nodeid
|
||
print(f"节点验证成功:上料斗节点={upper_node_id}, 下料斗节点={lower_node_id}")
|
||
except Exception as e:
|
||
raise RuntimeError(f"节点验证失败(节点ID可能错误):{e}")
|
||
|
||
# ---------------------
|
||
# 数据读取与处理相关函数
|
||
# ---------------------
|
||
@Slot()
|
||
def _read_weight_data(self):
|
||
"""定时读取重量数据(核心业务逻辑)"""
|
||
if not self.is_opc_connected or not self.upper_weight_node or not self.lower_weight_node:
|
||
return
|
||
|
||
try:
|
||
# 读取 OPC UA 节点的重量值
|
||
upper_weight = self.upper_weight_node.get_value()
|
||
lower_weight = self.lower_weight_node.get_value()
|
||
|
||
# 数据有效性校验(过滤异常值)
|
||
upper_weight = round(float(upper_weight), 2) if self._is_valid_weight(upper_weight) else "error"
|
||
lower_weight = round(float(lower_weight), 2) if self._is_valid_weight(lower_weight) else "error"
|
||
|
||
# 构建重量数据字典(与原 TCP 客户端格式一致)
|
||
weight_data = {
|
||
"hopper_up_weight": upper_weight,
|
||
"hopper_down_weight": lower_weight
|
||
}
|
||
self.latest_weight_data = weight_data
|
||
|
||
# 错误状态判断(完全复用原 TCP 客户端逻辑)
|
||
if upper_weight == "error" and lower_weight == "error":
|
||
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
|
||
self.upper_weight_error.emit(True)
|
||
self.down_weight_error.emit(True)
|
||
elif upper_weight == "error":
|
||
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": lower_weight})
|
||
self.upper_weight_error.emit(True)
|
||
self.down_weight_error.emit(False)
|
||
elif lower_weight == "error":
|
||
self.weight_updated.emit({"hopper_up_weight": upper_weight, "hopper_down_weight": ""})
|
||
self.upper_weight_error.emit(False)
|
||
self.down_weight_error.emit(True)
|
||
else:
|
||
self.weight_updated.emit(weight_data)
|
||
self.upper_weight_error.emit(False)
|
||
self.down_weight_error.emit(False)
|
||
|
||
print(f"OPC UA 数据读取成功:上料斗={upper_weight} kg, 下料斗={lower_weight} kg")
|
||
|
||
except Exception as e:
|
||
print(f"OPC UA 数据读取失败:{e}")
|
||
# 读取失败时标记为异常
|
||
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
|
||
self.upper_weight_error.emit(True)
|
||
self.down_weight_error.emit(True)
|
||
# 读取失败可能是连接异常,触发重连
|
||
self._disconnect_from_server()
|
||
if not self.reconnect_timer.isActive():
|
||
self.reconnect_timer.start()
|
||
|
||
def _is_valid_weight(self, weight_value):
|
||
"""验证重量值是否有效"""
|
||
try:
|
||
weight = float(weight_value)
|
||
# 结合业务场景:重量值应大于0(模拟传感器有效范围)
|
||
return weight >= 0
|
||
except (ValueError, TypeError):
|
||
return False
|
||
|
||
# ---------------------
|
||
# 外部接口函数(与原 TCP 客户端一致)
|
||
# ---------------------
|
||
def stop(self):
|
||
"""停止 OPC UA 客户端(断开连接、停止定时器)"""
|
||
self.reconnect_timer.stop()
|
||
self.data_read_timer.stop()
|
||
self._disconnect_from_server()
|
||
self.is_running = False
|
||
print("OPC UA 客户端已停止")
|
||
|
||
def get_current_time(self):
|
||
"""获取格式化的当前时间(保持原接口)"""
|
||
return datetime.now().strftime('%Y-%m-%d %H:%M:%S') |