152 lines
4.4 KiB
Python
152 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OPC UA 客户端测试脚本
|
|
用于连接和测试 OPC UA 服务器
|
|
"""
|
|
|
|
from opcua import Client
|
|
import time
|
|
import sys
|
|
|
|
class OPCUAClientTest:
|
|
def __init__(self, server_url="opc.tcp://localhost:4840/zjsh_feed/server/"):
|
|
"""
|
|
初始化 OPC UA 客户端
|
|
|
|
Args:
|
|
server_url: 服务器URL地址
|
|
"""
|
|
self.client = Client(server_url)
|
|
self.connected = False
|
|
|
|
def connect(self):
|
|
"""连接到服务器"""
|
|
try:
|
|
self.client.connect()
|
|
self.connected = True
|
|
print(f"成功连接到 OPC UA 服务器: {self.client.server_url}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"连接服务器失败: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""断开连接"""
|
|
if self.connected:
|
|
self.client.disconnect()
|
|
self.connected = False
|
|
print("已断开与 OPC UA 服务器的连接")
|
|
|
|
def browse_nodes(self):
|
|
"""浏览服务器节点结构"""
|
|
if not self.connected:
|
|
print("请先连接到服务器")
|
|
return
|
|
|
|
try:
|
|
# 获取根节点
|
|
root = self.client.get_root_node()
|
|
print(f"根节点: {root}")
|
|
|
|
# 获取对象节点
|
|
objects = self.client.get_objects_node()
|
|
print(f"对象节点: {objects}")
|
|
|
|
upper_device = objects.get_child("2:upper")
|
|
print(f"\n上料斗对象: {upper_device}")
|
|
|
|
lower_device = objects.get_child("2:lower")
|
|
print(f"下料斗对象: {lower_device}")
|
|
|
|
print("\n=== 当前对象属性===")
|
|
self.read_object_properties(upper_device, lower_device)
|
|
|
|
except Exception as e:
|
|
print(f"浏览节点时出错: {e}")
|
|
|
|
def read_object_properties(self, upper_device, lower_device):
|
|
"""读取重量数值"""
|
|
try:
|
|
# 读取重量
|
|
upper_weight = upper_device.get_child("2:upper_weight").get_value()
|
|
lower_weight = lower_device.get_child("2:lower_weight").get_value()
|
|
print(f"上料斗重量: {upper_weight}")
|
|
print(f"下料斗重量: {lower_weight}")
|
|
|
|
|
|
except Exception as e:
|
|
print(f"读取数据时出错: {e}")
|
|
|
|
def monitor_data(self, duration=30):
|
|
"""监控数据变化"""
|
|
if not self.connected:
|
|
print("请先连接到服务器")
|
|
return
|
|
|
|
print(f"\n开始监控数据变化,持续 {duration} 秒...")
|
|
|
|
try:
|
|
# 获取传感器节点
|
|
objects = self.client.get_objects_node()
|
|
upper_device = objects.get_child("2:upper")
|
|
lower_device = objects.get_child("2:lower")
|
|
|
|
|
|
|
|
|
|
start_time = time.time()
|
|
while time.time() - start_time < duration:
|
|
print(f"\n--- {time.strftime('%H:%M:%S')} ---")
|
|
self.read_sensor_values(upper_device, lower_device)
|
|
time.sleep(5) # 每5秒读取一次
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n监控被用户中断")
|
|
except Exception as e:
|
|
print(f"监控数据时出错: {e}")
|
|
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
# 创建客户端
|
|
client = OPCUAClientTest("opc.tcp://localhost:4840/zjsh_feed/server/")
|
|
|
|
try:
|
|
# 连接到服务器
|
|
if not client.connect():
|
|
return
|
|
|
|
# 浏览节点结构
|
|
client.browse_nodes()
|
|
|
|
# 监控数据变化
|
|
client.monitor_data(duration=30)
|
|
|
|
# 测试写入数据
|
|
# client.write_test_data()
|
|
|
|
# 继续监控
|
|
print("\n继续监控数据...")
|
|
client.monitor_data(duration=15)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n客户端被用户中断")
|
|
finally:
|
|
# 断开连接
|
|
client.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1:
|
|
# 支持自定义服务器地址
|
|
server_url = sys.argv[1]
|
|
client = OPCUAClientTest(server_url)
|
|
else:
|
|
client = OPCUAClientTest()
|
|
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
print(f"客户端运行错误: {e}")
|
|
sys.exit(1) |