Files
zjsh_weight_system_ui/opcua_client.py

267 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/11/17 18:51
# @Author : reenrr
# @File : opcua_client_test1111.py
'''
from PySide6.QtCore import QTimer, Slot, Signal, QObject, QThread, Qt
from opcua import Client, ua
from datetime import datetime
# -----------
# 参数配置
# -----------
OPC_SERVER_URL = "opc.tcp://localhost:4840/zjsh_feed/server/"
UPPER_WEIGHT_NODE_ID = "ns=2;s=upper_weight"
LOWER_WEIGHT_NODE_ID = "ns=2;s=lower_weight"
RECONNECT_INTERVAL = 2000 # 重连间隔(毫秒)
DATA_READ_INTERVAL = 2000 # 数据读取间隔(毫秒,与服务端更新频率一致)
class OpcuaClient(QObject):
# 客户端的信号接口
weight_updated = Signal(dict) # 重量更新信号(传递 {"hopper_up_weight": 值, "hopper_down_weight": 值}
connection_status_changed = Signal(bool) # 连接状态更新信号True=连接False=断开)
upper_weight_error = Signal(bool) # 上料斗重量异常信号True=异常False=正常)
down_weight_error = Signal(bool) # 下料斗重量异常信号True=异常False=正常)
def __init__(self, parent=None):
super().__init__(parent)
self.is_running = False # 系统运行状态标记
self.latest_weight_data = {"hopper_up_weight": 0, "hopper_down_weight": 0} # 缓存最新重量数据
# ---------------
# OPC UA 客户端核心配置
# ---------------
self.opc_client = None # OPC UA 客户端实例(延迟初始化)
self.opc_server_url = OPC_SERVER_URL
self.upper_weight_node = None # 上料斗重量节点对象
self.lower_weight_node = None # 下料斗重量节点对象
self.is_opc_connected = False # OPC UA 连接状态标记
self.has_connected_once = False # 至少连接成功一次标记
self.reconnect_count = 0 # 重连次数计数器
# 重连定时器(连接断开时触发)
self.reconnect_timer = QTimer(self)
self.reconnect_timer.setInterval(RECONNECT_INTERVAL)
self.reconnect_timer.timeout.connect(self.reconnect_to_server)
# 数据读取定时器(连接成功后触发,定时读取重量数据)
self.data_read_timer = QTimer(self)
self.data_read_timer.setInterval(DATA_READ_INTERVAL)
self.data_read_timer.timeout.connect(self._read_weight_data)
# 初始化 OPC UA 客户端
self._init_opc_client()
# ---------------------
# OPC UA 初始化相关函数
# ---------------------
def _init_opc_client(self):
"""初始化 OPC UA 客户端实例"""
try:
self.opc_client = Client(self.opc_server_url)
print("OPC UA 客户端初始化成功")
except Exception as e:
print(f"OPC UA 客户端初始化失败:{e}")
self.opc_client = None
def start_connect(self):
"""启动连接(供外部调用,主动发起连接请求)"""
self._connect_to_server()
# ---------------------
# 连接管理相关函数
# ---------------------
def reconnect_to_server(self):
"""重连执行函数(与原 TCP 客户端一致)"""
if not self.is_opc_connected:
self.reconnect_count += 1
# 日志优化每10次重连提示一次避免日志刷屏
if self.reconnect_count % 10 == 0:
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
else:
print(f"{self.reconnect_count}次重连请求:{self.opc_server_url}")
self._connect_to_server()
def _connect_to_server(self):
"""主动发起 OPC UA 连接(核心连接逻辑)"""
if self.is_opc_connected or not self.opc_client:
return
try:
# 1. 连接到 OPC UA 服务器
self.opc_client.connect()
print(f"OPC UA 连接成功:{self.opc_server_url}")
# 2. 获取重量节点对象(仅连接成功时获取一次)
self.upper_weight_node = self.opc_client.get_node(UPPER_WEIGHT_NODE_ID)
self.lower_weight_node = self.opc_client.get_node(LOWER_WEIGHT_NODE_ID)
# 3. 验证节点是否有效
self._verify_nodes()
# 4. 更新连接状态
self.is_opc_connected = True
self.has_connected_once = True
self.reconnect_count = 0
self.is_running = True
# 5. 发送信号(连接成功+错误状态重置)
self.connection_status_changed.emit(True)
self.upper_weight_error.emit(False)
self.down_weight_error.emit(False)
# 6. 启动数据读取定时器,停止重连定时器
self.data_read_timer.start()
self.reconnect_timer.stop()
# 7. 立即读取一次初始数据
self._read_weight_data()
except Exception as e:
print(f"OPC UA 连接失败:{e}")
self._handle_connect_failed()
def _reconnect_to_server(self):
"""重连执行函数"""
if self.is_opc_connected:
return
self.reconnect_count += 1
# 日志优化每10次重连提示一次避免日志刷屏
if self.reconnect_count % 10 == 0:
print(f"已连续重连{self.reconnect_count}次,仍未连接服务端,请检查服务端状态...")
else:
print(f"{self.reconnect_count}次重连请求:{self.opc_server_url}")
self._connect_to_server()
def _disconnect_from_server(self):
"""主动断开 OPC UA 连接"""
if self.opc_client and self.is_opc_connected:
try:
self.opc_client.disconnect()
print(f"OPC UA 连接断开:{self.opc_server_url}")
except Exception as e:
print(f"OPC UA 断开连接失败:{e}")
# 更新状态
self.is_opc_connected = False
self.is_running = False
self.upper_weight_node = None
self.lower_weight_node = None
# 停止数据读取定时器
self.data_read_timer.stop()
def _handle_connect_failed(self):
"""处理连接失败逻辑"""
self.is_opc_connected = False
self.is_running = False
# 发送连接断开信号
self.connection_status_changed.emit(False)
# 发送重量清零信号
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
# 标记重量异常
self.upper_weight_error.emit(True)
self.down_weight_error.emit(True)
# 启动重连定时器(首次连接失败或重连失败时)
if not self.reconnect_timer.isActive():
print(f"将在{RECONNECT_INTERVAL / 1000}秒后启动重连...")
self.reconnect_timer.start()
def _verify_nodes(self):
"""验证重量节点是否有效避免节点ID错误"""
try:
# 读取节点的基本信息,验证节点存在
upper_node_id = self.upper_weight_node.nodeid
lower_node_id = self.lower_weight_node.nodeid
print(f"节点验证成功:上料斗节点={upper_node_id}, 下料斗节点={lower_node_id}")
except Exception as e:
raise RuntimeError(f"节点验证失败节点ID可能错误{e}")
# ---------------------
# 数据读取与处理相关函数
# ---------------------
@Slot()
def _read_weight_data(self):
"""定时读取重量数据(核心业务逻辑)"""
if not self.is_opc_connected or not self.upper_weight_node or not self.lower_weight_node:
return
try:
# 读取 OPC UA 节点的重量值
upper_weight = self.upper_weight_node.get_value()
lower_weight = self.lower_weight_node.get_value()
# 数据有效性校验(过滤异常值)
upper_weight = round(float(upper_weight), 2) if self._is_valid_weight(upper_weight) else "error"
lower_weight = round(float(lower_weight), 2) if self._is_valid_weight(lower_weight) else "error"
# 构建重量数据字典(与原 TCP 客户端格式一致)
weight_data = {
"hopper_up_weight": upper_weight,
"hopper_down_weight": lower_weight
}
self.latest_weight_data = weight_data
# 错误状态判断(完全复用原 TCP 客户端逻辑)
if upper_weight == "error" and lower_weight == "error":
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
self.upper_weight_error.emit(True)
self.down_weight_error.emit(True)
elif upper_weight == "error":
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": lower_weight})
self.upper_weight_error.emit(True)
self.down_weight_error.emit(False)
elif lower_weight == "error":
self.weight_updated.emit({"hopper_up_weight": upper_weight, "hopper_down_weight": ""})
self.upper_weight_error.emit(False)
self.down_weight_error.emit(True)
else:
self.weight_updated.emit(weight_data)
self.upper_weight_error.emit(False)
self.down_weight_error.emit(False)
print(f"OPC UA 数据读取成功:上料斗={upper_weight} kg, 下料斗={lower_weight} kg")
except Exception as e:
print(f"OPC UA 数据读取失败:{e}")
# 读取失败时标记为异常
self.weight_updated.emit({"hopper_up_weight": "", "hopper_down_weight": ""})
self.upper_weight_error.emit(True)
self.down_weight_error.emit(True)
# 读取失败可能是连接异常,触发重连
self._disconnect_from_server()
if not self.reconnect_timer.isActive():
self.reconnect_timer.start()
def _is_valid_weight(self, weight_value):
"""验证重量值是否有效"""
try:
weight = float(weight_value)
# 结合业务场景重量值应大于0模拟传感器有效范围
return weight >= 0
except (ValueError, TypeError):
return False
# ---------------------
# 外部接口函数(与原 TCP 客户端一致)
# ---------------------
def stop(self):
"""停止 OPC UA 客户端(断开连接、停止定时器)"""
self.reconnect_timer.stop()
self.data_read_timer.stop()
self._disconnect_from_server()
self.is_running = False
print("OPC UA 客户端已停止")
def get_current_time(self):
"""获取格式化的当前时间(保持原接口)"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')