#!/usr/bin/env python3 """ OPC UA 客户端测试脚本 用于连接和测试 OPC UA 服务器 """ from opcua import Client import time import sys class OPCUAClientTest: def __init__(self, server_url="opc.tcp://localhost:4840/zjsh_feed/server/"): """ 初始化 OPC UA 客户端 Args: server_url: 服务器URL地址 """ self.client = Client(server_url) self.connected = False def connect(self): """连接到服务器""" try: self.client.connect() self.connected = True print(f"成功连接到 OPC UA 服务器: {self.client.server_url}") return True except Exception as e: print(f"连接服务器失败: {e}") return False def disconnect(self): """断开连接""" if self.connected: self.client.disconnect() self.connected = False print("已断开与 OPC UA 服务器的连接") def browse_nodes(self): """浏览服务器节点结构""" if not self.connected: print("请先连接到服务器") return try: # 获取根节点 root = self.client.get_root_node() print(f"根节点: {root}") # 获取对象节点 objects = self.client.get_objects_node() print(f"对象节点: {objects}") upper_device = objects.get_child("2:upper") print(f"\n上料斗对象: {upper_device}") lower_device = objects.get_child("2:lower") print(f"下料斗对象: {lower_device}") print("\n=== 当前对象属性===") self.read_object_properties(upper_device, lower_device) except Exception as e: print(f"浏览节点时出错: {e}") def read_object_properties(self, upper_device, lower_device): """读取重量数值""" try: # 读取重量 upper_weight = upper_device.get_child("2:upper_weight").get_value() lower_weight = lower_device.get_child("2:lower_weight").get_value() print(f"上料斗重量: {upper_weight}") print(f"下料斗重量: {lower_weight}") except Exception as e: print(f"读取数据时出错: {e}") def monitor_data(self, duration=30): """监控数据变化""" if not self.connected: print("请先连接到服务器") return print(f"\n开始监控数据变化,持续 {duration} 秒...") try: # 获取传感器节点 objects = self.client.get_objects_node() upper_device = objects.get_child("2:upper") lower_device = objects.get_child("2:lower") start_time = time.time() while time.time() - start_time < duration: print(f"\n--- {time.strftime('%H:%M:%S')} ---") self.read_sensor_values(upper_device, lower_device) time.sleep(5) # 每5秒读取一次 except KeyboardInterrupt: print("\n监控被用户中断") except Exception as e: print(f"监控数据时出错: {e}") def main(): """主函数""" # 创建客户端 client = OPCUAClientTest("opc.tcp://localhost:4840/zjsh_feed/server/") try: # 连接到服务器 if not client.connect(): return # 浏览节点结构 client.browse_nodes() # 监控数据变化 client.monitor_data(duration=30) # 测试写入数据 # client.write_test_data() # 继续监控 print("\n继续监控数据...") client.monitor_data(duration=15) except KeyboardInterrupt: print("\n客户端被用户中断") finally: # 断开连接 client.disconnect() if __name__ == "__main__": if len(sys.argv) > 1: # 支持自定义服务器地址 server_url = sys.argv[1] client = OPCUAClientTest(server_url) else: client = OPCUAClientTest() try: main() except Exception as e: print(f"客户端运行错误: {e}") sys.exit(1)