Files
Feeding_control_system/opc/opcua_client_test.py

158 lines
4.6 KiB
Python
Raw Normal View History

2025-11-17 00:05:40 +08:00
#!/usr/bin/env python3
"""
OPC UA 客户端测试脚本
用于连接和测试 OPC UA 服务器
"""
from opcua import Client
import time
import sys
class OPCUAClientTest:
def __init__(self, server_url="opc.tcp://localhost:4840/zjsh_feed/server/"):
"""
初始化 OPC UA 客户端
Args:
server_url: 服务器URL地址
"""
self.client = Client(server_url)
self.connected = False
def connect(self):
"""连接到服务器"""
try:
self.client.connect()
self.connected = True
print(f"成功连接到 OPC UA 服务器: {self.client.server_url}")
return True
except Exception as e:
print(f"连接服务器失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self.connected:
self.client.disconnect()
self.connected = False
print("已断开与 OPC UA 服务器的连接")
def browse_nodes(self):
"""浏览服务器节点结构"""
if not self.connected:
print("请先连接到服务器")
return
try:
# 获取根节点
root = self.client.get_root_node()
print(f"根节点: {root}")
# 获取对象节点
objects = self.client.get_objects_node()
print(f"对象节点: {objects}")
# 浏览 IndustrialDevice 节点
upper_device = objects.get_child("2:upper")
print(f"\n工业设备节点: {upper_device}")
# 获取传感器节点
lower_device = objects.get_child("2:lower")
print(f"传感器节点: {lower_device}")
print(f"温度传感器: {upper_device}")
print(f"压力传感器: {lower_device}")
# 获取变量值
print("\n=== 当前传感器数据 ===")
self.read_sensor_values(upper_device, lower_device)
except Exception as e:
print(f"浏览节点时出错: {e}")
def read_sensor_values(self, upper_device, lower_device):
"""读取传感器数值"""
try:
# 读取温度
temp_value = upper_device.get_child("2:upper_weight").get_value()
temp_unit = upper_device.get_child("2:lower_weight").get_value()
print(f"温度: {temp_value} {temp_unit}")
except Exception as e:
print(f"读取传感器数据时出错: {e}")
def monitor_data(self, duration=30):
"""监控数据变化"""
if not self.connected:
print("请先连接到服务器")
return
print(f"\n开始监控数据变化,持续 {duration} 秒...")
try:
# 获取传感器节点
objects = self.client.get_objects_node()
upper_device = objects.get_child("2:upper")
lower_device = objects.get_child("2:lower")
start_time = time.time()
while time.time() - start_time < duration:
print(f"\n--- {time.strftime('%H:%M:%S')} ---")
self.read_sensor_values(upper_device, lower_device)
time.sleep(5) # 每5秒读取一次
except KeyboardInterrupt:
print("\n监控被用户中断")
except Exception as e:
print(f"监控数据时出错: {e}")
def main():
"""主函数"""
# 创建客户端
client = OPCUAClientTest("opc.tcp://localhost:4840/zjsh_feed/server/")
try:
# 连接到服务器
if not client.connect():
return
# 浏览节点结构
client.browse_nodes()
# 监控数据变化
client.monitor_data(duration=30)
# 测试写入数据
# client.write_test_data()
# 继续监控
print("\n继续监控数据...")
client.monitor_data(duration=15)
except KeyboardInterrupt:
print("\n客户端被用户中断")
finally:
# 断开连接
client.disconnect()
if __name__ == "__main__":
if len(sys.argv) > 1:
# 支持自定义服务器地址
server_url = sys.argv[1]
client = OPCUAClientTest(server_url)
else:
client = OPCUAClientTest()
try:
main()
except Exception as e:
print(f"客户端运行错误: {e}")
sys.exit(1)