#!/usr/bin/env python3 """ OPC UA 客户端测试脚本 用于连接和测试 OPC UA 服务器 """ from opcua import Client import time import sys class OPCUAClientTest: def __init__(self, server_url="opc.tcp://localhost:4840/zjsh_feed/server/"): """ 初始化 OPC UA 客户端 Args: server_url: 服务器URL地址 """ self.client = Client(server_url) self.connected = False def connect(self): """连接到服务器""" try: self.client.connect() self.connected = True print(f"成功连接到 OPC UA 服务器: {self.client.server_url}") return True except Exception as e: print(f"连接服务器失败: {e}") return False def disconnect(self): """断开连接""" if self.connected: self.client.disconnect() self.connected = False print("已断开与 OPC UA 服务器的连接") def browse_nodes(self): """浏览服务器节点结构""" if not self.connected: print("请先连接到服务器") return try: # 获取根节点 root = self.client.get_root_node() print(f"根节点: {root}") # 获取对象节点 objects = self.client.get_objects_node() print(f"对象节点: {objects}") # 浏览Objects下的所有子节点 print("\n=== Objects 节点下的子节点 ===") for child in objects.get_children(): browse_name = child.get_browse_name() print(f" 节点: {browse_name} (nodeId: {child.nodeid})") # 如果是上料斗或下料斗对象,继续浏览它们的子节点 if "upper" in str(browse_name).lower() or "lower" in str(browse_name).lower(): try: for sub_child in child.get_children(): sub_browse_name = sub_child.get_browse_name() print(f" └─ {sub_browse_name} (nodeId: {sub_child.nodeid})") except: pass # 尝试获取设备对象 print("\n=== 尝试获取设备对象 ===") try: upper_device = objects.get_child("2:upper") print(f"上料斗对象: {upper_device}") except Exception as e: print(f"获取上料斗对象失败: {e}") # 尝试其他可能的路径 try: upper_device = objects.get_child(["2:upper"]) print(f"上料斗对象(列表方式): {upper_device}") except Exception as e2: print(f" 也无法通过列表方式获取: {e2}") try: lower_device = objects.get_child("2:lower") print(f"下料斗对象: {lower_device}") except Exception as e: print(f"获取下料斗对象失败: {e}") except Exception as e: print(f"浏览节点时出错: {e}") def get_node_path(self, obj_path: str) -> str: """ 获取节点路径 - 尝试多种格式 Args: obj_path: 对象名称(如 "upper", "upper_weight") Returns: str: 节点路径,如果找不到返回 None """ if not self.connected: return None try: objects = self.client.get_objects_node() # 尝试多种节点路径格式 path_formats = [ f"2:{obj_path}", f"2:upper/2:{obj_path}", f"2:lower/2:{obj_path}", f"ns=2;{obj_path}", obj_path ] for path in path_formats: try: node = objects.get_child(path) print(f" 找到节点: {path} -> {node}") return path except: continue return None except Exception as e: print(f"查找节点路径时出错: {e}") return None def write_data(self, node_path: str, value, data_type: str = "auto") -> bool: """ 向OPC UA节点写入数据 Args: node_path: 节点路径(如 "2:upper/2:upper_weight") value: 要写入的值 data_type: 数据类型("int", "float", "bool", "string", "auto") Returns: bool: 写入成功返回True,失败返回False """ if not self.connected: print("请先连接到服务器") return False try: # 获取对象节点 objects = self.client.get_objects_node() # 尝试多种方式获取节点 node = None error_msg = None # 方式1: 直接使用路径 try: node = objects.get_child(node_path) except Exception as e: error_msg = e # 方式2: 分解路径 if node is None and "/" in node_path: try: parts = node_path.split("/") node = objects for part in parts: node = node.get_child(part) except: pass # 方式3: 尝试用数字索引 if node is None: try: node = objects.get_child([node_path]) except: pass if node is None: print(f"写入数据失败 {node_path}: 找不到节点") print(f" 详细错误: {error_msg}") print(f" 提示: 请先运行 browse_nodes() 方法查看可用的节点路径") return False # 根据数据类型转换值 if data_type == "int": value = int(value) elif data_type == "float": value = float(value) elif data_type == "bool": value = bool(value) elif data_type == "string": value = str(value) # "auto" 模式下自动推断类型 # 写入数据 node.set_value(value) # 获取节点名称用于显示 try: node_name = node.get_browse_name() except: node_name = node_path print(f"✓ 成功写入 {node_name}: {value}") return True except Exception as e: print(f"✗ 写入数据失败 {node_path}: {e}") return False def write_weights_directly(self, upper_value, lower_value) -> bool: """ 直接写入上下料斗重量(自动检测节点路径) Args: upper_value: 上料斗重量值 lower_value: 下料斗重量值 Returns: bool: 写入成功返回True,失败返回False """ if not self.connected: print("请先连接到服务器") return False success = True try: objects = self.client.get_objects_node() # 查找上料斗重量节点 upper_weight_node = None lower_weight_node = None # 遍历Objects下的所有节点 for child in objects.get_children(): browse_name = str(child.get_browse_name()) print(browse_name) if "upper" in browse_name.lower(): upper_weight_node = child print(f"找到上料斗重量节点: {browse_name}") break for child in objects.get_children(): browse_name = str(child.get_browse_name()) if "lower" in browse_name.lower(): lower_weight_node = child print(f"找到下料斗重量节点: {browse_name}") break # 写入上料斗重量 if upper_weight_node: try: upper_weight_node.set_value(upper_value) print(f"✓ 成功写入上料斗重量: {upper_value}") except Exception as e: print(f"✗ 写入上料斗重量失败: {e}") success = False else: print("✗ 未找到上料斗重量节点") success = False # 写入下料斗重量 if lower_weight_node: try: lower_weight_node.set_value(lower_value) print(f"✓ 成功写入下料斗重量: {lower_value}") except Exception as e: print(f"✗ 写入下料斗重量失败: {e}") success = False else: print("✗ 未找到下料斗重量节点") success = False return success except Exception as e: print(f"写入重量数据时出错: {e}") return False def read_object_properties(self, upper_device, lower_device): """读取重量数值(需要外部传入device对象)""" try: # 读取重量 upper_weight = upper_device.get_child("2:upper_weight").get_value() lower_weight = lower_device.get_child("2:lower_weight").get_value() print(f"上料斗重量: {upper_weight}") print(f"下料斗重量: {lower_weight}") except Exception as e: print(f"读取数据时出错: {e}") def read_weights(self) -> tuple: """ 直接读取上料斗和下料斗重量(无需先获取device对象) Returns: tuple: (上料斗重量, 下料斗重量),读取失败返回 (None, None) """ if not self.connected: print("请先连接到服务器") return None, None try: # 直接获取节点并读取数据 objects = self.client.get_objects_node() # 使用列表格式访问节点(freeopcua推荐的方式) upper_weight = objects.get_child(["2:upper", "2:upper_weight"]).get_value() lower_weight = objects.get_child(["2:lower", "2:lower_weight"]).get_value() print(f"上料斗重量: {upper_weight}") print(f"下料斗重量: {lower_weight}") return upper_weight, lower_weight except Exception as e: print(f"读取重量数据时出错: {e}") return None, None def write_multiple_values(self, values_dict: dict) -> dict: """ 批量写入多个节点 Args: values_dict: 字典,key为节点路径,value为要写入的值 Returns: dict: 写入结果,key为节点路径,value为成功/失败状态 """ results = {} for node_path, value in values_dict.items(): results[node_path] = self.write_data(node_path, value) return results def write_test_data(self): """测试写入各种类型的数据""" if not self.connected: print("请先连接到服务器") return print("\n=== 测试写入数据 ===") # 测试写入目标重量 self.write_data("2:upper/2:target_weight", 150.5, "float") # 测试写入开关量 self.write_data("2:upper/2:valve_on", True, "bool") # 测试写入整数 self.write_data("2:upper/2:cycle_count", 10, "int") # 测试批量写入 values = { "2:upper/2:target_weight": 200.0, "2:lower/2:target_weight": 100.0, } results = self.write_multiple_values(values) print("\n批量写入结果:") for path, success in results.items(): status = "✓ 成功" if success else "✗ 失败" print(f" {path}: {status}") def monitor_data(self, duration=30): """监控数据变化""" if not self.connected: print("请先连接到服务器") return print(f"\n开始监控数据变化,持续 {duration} 秒...") try: # 获取传感器节点 objects = self.client.get_objects_node() upper_device = objects.get_child("2:upper") lower_device = objects.get_child("2:lower") start_time = time.time() while time.time() - start_time < duration: print(f"\n--- {time.strftime('%H:%M:%S')} ---") self.read_object_properties(upper_device, lower_device) time.sleep(5) # 每5秒读取一次 except KeyboardInterrupt: print("\n监控被用户中断") except Exception as e: print(f"监控数据时出错: {e}") def main(): """主函数""" # 创建客户端 client = OPCUAClientTest("opc.tcp://localhost:4840/zjsh_feed/server/") try: # 连接到服务器 if not client.connect(): return # 浏览节点结构(首先发现实际节点结构) print("\n" + "="*60) print("步骤1: 浏览服务器节点结构") print("="*60) client.browse_nodes() # 尝试使用新方法写入数据 print("\n" + "="*60) print("步骤2: 使用动态节点查找方法写入数据") print("="*60) # 方法1: 使用write_weights_directly自动查找节点 print("\n尝试方法1: write_weights_directly (自动查找节点)") # client.write_data("2:upper/2:upper_weight", 180, "int") # client.write_data("2:lower/2:lower_weight", 120, "int") values = { "2:upper/2:upper_weight": 200, "2:lower/2:lower_weight": 100, } client.write_multiple_values(values) # success1 = client.write_weights_directly(150, 120) time.sleep(2) # if not success1: # # 方法2: 尝试可能的替代路径 # print("\n尝试方法2: 尝试其他节点路径格式") # # 列出可能的节点路径格式 # possible_paths = [ # "2:upper_weight", # "2:lower_weight", # "2:upper/upper_weight", # "2:lower/lower_weight", # "ns=2;upper_weight", # "ns=2;lower_weight" # ] # for path in possible_paths: # print(f" 尝试写入: {path}") # client.write_data(path, 150.5, "float") # time.sleep(0.5) print("\n" + "="*60) print("步骤3: 读取验证数据") print("="*60) upper, lower = client.read_weights() print(f"读取结果 - 上料斗: {upper}, 下料斗: {lower}") except KeyboardInterrupt: print("\n客户端被用户中断") except Exception as e: print(f"客户端运行错误: {e}") import traceback traceback.print_exc() finally: # 断开连接 client.disconnect() if __name__ == "__main__": if len(sys.argv) > 1: # 支持自定义服务器地址 server_url = sys.argv[1] client = OPCUAClientTest(server_url) else: client = OPCUAClientTest() try: main() except Exception as e: print(f"客户端运行错误: {e}") sys.exit(1)