Files
Feeding_control_system/opc/opcua_client_test.py

479 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
OPC UA 客户端测试脚本
用于连接和测试 OPC UA 服务器
"""
from opcua import Client
import time
import sys
class OPCUAClientTest:
def __init__(self, server_url="opc.tcp://localhost:4840/zjsh_feed/server/"):
"""
初始化 OPC UA 客户端
Args:
server_url: 服务器URL地址
"""
self.client = Client(server_url)
self.connected = False
def connect(self):
"""连接到服务器"""
try:
self.client.connect()
self.connected = True
print(f"成功连接到 OPC UA 服务器: {self.client.server_url}")
return True
except Exception as e:
print(f"连接服务器失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self.connected:
self.client.disconnect()
self.connected = False
print("已断开与 OPC UA 服务器的连接")
def browse_nodes(self):
"""浏览服务器节点结构"""
if not self.connected:
print("请先连接到服务器")
return
try:
# 获取根节点
root = self.client.get_root_node()
print(f"根节点: {root}")
# 获取对象节点
objects = self.client.get_objects_node()
print(f"对象节点: {objects}")
# 浏览Objects下的所有子节点
print("\n=== Objects 节点下的子节点 ===")
for child in objects.get_children():
browse_name = child.get_browse_name()
print(f" 节点: {browse_name} (nodeId: {child.nodeid})")
# 如果是上料斗或下料斗对象,继续浏览它们的子节点
if "upper" in str(browse_name).lower() or "lower" in str(browse_name).lower():
try:
for sub_child in child.get_children():
sub_browse_name = sub_child.get_browse_name()
print(f" └─ {sub_browse_name} (nodeId: {sub_child.nodeid})")
except:
pass
# 尝试获取设备对象
print("\n=== 尝试获取设备对象 ===")
try:
upper_device = objects.get_child("2:upper")
print(f"上料斗对象: {upper_device}")
except Exception as e:
print(f"获取上料斗对象失败: {e}")
# 尝试其他可能的路径
try:
upper_device = objects.get_child(["2:upper"])
print(f"上料斗对象(列表方式): {upper_device}")
except Exception as e2:
print(f" 也无法通过列表方式获取: {e2}")
try:
lower_device = objects.get_child("2:lower")
print(f"下料斗对象: {lower_device}")
except Exception as e:
print(f"获取下料斗对象失败: {e}")
except Exception as e:
print(f"浏览节点时出错: {e}")
def get_node_path(self, obj_path: str) -> str:
"""
获取节点路径 - 尝试多种格式
Args:
obj_path: 对象名称(如 "upper", "upper_weight"
Returns:
str: 节点路径,如果找不到返回 None
"""
if not self.connected:
return None
try:
objects = self.client.get_objects_node()
# 尝试多种节点路径格式
path_formats = [
f"2:{obj_path}",
f"2:upper/2:{obj_path}",
f"2:lower/2:{obj_path}",
f"ns=2;{obj_path}",
obj_path
]
for path in path_formats:
try:
node = objects.get_child(path)
print(f" 找到节点: {path} -> {node}")
return path
except:
continue
return None
except Exception as e:
print(f"查找节点路径时出错: {e}")
return None
def write_data(self, node_path: str, value, data_type: str = "auto") -> bool:
"""
向OPC UA节点写入数据
Args:
node_path: 节点路径(如 "2:upper/2:upper_weight"
value: 要写入的值
data_type: 数据类型("int", "float", "bool", "string", "auto"
Returns:
bool: 写入成功返回True失败返回False
"""
if not self.connected:
print("请先连接到服务器")
return False
try:
# 获取对象节点
objects = self.client.get_objects_node()
# 尝试多种方式获取节点
node = None
error_msg = None
# 方式1: 直接使用路径
try:
node = objects.get_child(node_path)
except Exception as e:
error_msg = e
# 方式2: 分解路径
if node is None and "/" in node_path:
try:
parts = node_path.split("/")
node = objects
for part in parts:
node = node.get_child(part)
except:
pass
# 方式3: 尝试用数字索引
if node is None:
try:
node = objects.get_child([node_path])
except:
pass
if node is None:
print(f"写入数据失败 {node_path}: 找不到节点")
print(f" 详细错误: {error_msg}")
print(f" 提示: 请先运行 browse_nodes() 方法查看可用的节点路径")
return False
# 根据数据类型转换值
if data_type == "int":
value = int(value)
elif data_type == "float":
value = float(value)
elif data_type == "bool":
value = bool(value)
elif data_type == "string":
value = str(value)
# "auto" 模式下自动推断类型
# 写入数据
node.set_value(value)
# 获取节点名称用于显示
try:
node_name = node.get_browse_name()
except:
node_name = node_path
print(f"✓ 成功写入 {node_name}: {value}")
return True
except Exception as e:
print(f"✗ 写入数据失败 {node_path}: {e}")
return False
def write_weights_directly(self, upper_value, lower_value) -> bool:
"""
直接写入上下料斗重量(自动检测节点路径)
Args:
upper_value: 上料斗重量值
lower_value: 下料斗重量值
Returns:
bool: 写入成功返回True失败返回False
"""
if not self.connected:
print("请先连接到服务器")
return False
success = True
try:
objects = self.client.get_objects_node()
# 查找上料斗重量节点
upper_weight_node = None
lower_weight_node = None
# 遍历Objects下的所有节点
for child in objects.get_children():
browse_name = str(child.get_browse_name())
print(browse_name)
if "upper" in browse_name.lower():
upper_weight_node = child
print(f"找到上料斗重量节点: {browse_name}")
break
for child in objects.get_children():
browse_name = str(child.get_browse_name())
if "lower" in browse_name.lower():
lower_weight_node = child
print(f"找到下料斗重量节点: {browse_name}")
break
# 写入上料斗重量
if upper_weight_node:
try:
upper_weight_node.set_value(upper_value)
print(f"✓ 成功写入上料斗重量: {upper_value}")
except Exception as e:
print(f"✗ 写入上料斗重量失败: {e}")
success = False
else:
print("✗ 未找到上料斗重量节点")
success = False
# 写入下料斗重量
if lower_weight_node:
try:
lower_weight_node.set_value(lower_value)
print(f"✓ 成功写入下料斗重量: {lower_value}")
except Exception as e:
print(f"✗ 写入下料斗重量失败: {e}")
success = False
else:
print("✗ 未找到下料斗重量节点")
success = False
return success
except Exception as e:
print(f"写入重量数据时出错: {e}")
return False
def read_object_properties(self, upper_device, lower_device):
"""读取重量数值需要外部传入device对象"""
try:
# 读取重量
upper_weight = upper_device.get_child("2:upper_weight").get_value()
lower_weight = lower_device.get_child("2:lower_weight").get_value()
print(f"上料斗重量: {upper_weight}")
print(f"下料斗重量: {lower_weight}")
except Exception as e:
print(f"读取数据时出错: {e}")
def read_weights(self) -> tuple:
"""
直接读取上料斗和下料斗重量无需先获取device对象
Returns:
tuple: (上料斗重量, 下料斗重量),读取失败返回 (None, None)
"""
if not self.connected:
print("请先连接到服务器")
return None, None
try:
# 直接获取节点并读取数据
objects = self.client.get_objects_node()
# 使用列表格式访问节点freeopcua推荐的方式
upper_weight = objects.get_child(["2:upper", "2:upper_weight"]).get_value()
lower_weight = objects.get_child(["2:lower", "2:lower_weight"]).get_value()
print(f"上料斗重量: {upper_weight}")
print(f"下料斗重量: {lower_weight}")
return upper_weight, lower_weight
except Exception as e:
print(f"读取重量数据时出错: {e}")
return None, None
def write_multiple_values(self, values_dict: dict) -> dict:
"""
批量写入多个节点
Args:
values_dict: 字典key为节点路径value为要写入的值
Returns:
dict: 写入结果key为节点路径value为成功/失败状态
"""
results = {}
for node_path, value in values_dict.items():
results[node_path] = self.write_data(node_path, value)
return results
def write_test_data(self):
"""测试写入各种类型的数据"""
if not self.connected:
print("请先连接到服务器")
return
print("\n=== 测试写入数据 ===")
# 测试写入目标重量
self.write_data("2:upper/2:target_weight", 150.5, "float")
# 测试写入开关量
self.write_data("2:upper/2:valve_on", True, "bool")
# 测试写入整数
self.write_data("2:upper/2:cycle_count", 10, "int")
# 测试批量写入
values = {
"2:upper/2:target_weight": 200.0,
"2:lower/2:target_weight": 100.0,
}
results = self.write_multiple_values(values)
print("\n批量写入结果:")
for path, success in results.items():
status = "✓ 成功" if success else "✗ 失败"
print(f" {path}: {status}")
def monitor_data(self, duration=30):
"""监控数据变化"""
if not self.connected:
print("请先连接到服务器")
return
print(f"\n开始监控数据变化,持续 {duration} 秒...")
try:
# 获取传感器节点
objects = self.client.get_objects_node()
upper_device = objects.get_child("2:upper")
lower_device = objects.get_child("2:lower")
start_time = time.time()
while time.time() - start_time < duration:
print(f"\n--- {time.strftime('%H:%M:%S')} ---")
self.read_object_properties(upper_device, lower_device)
time.sleep(5) # 每5秒读取一次
except KeyboardInterrupt:
print("\n监控被用户中断")
except Exception as e:
print(f"监控数据时出错: {e}")
def main():
"""主函数"""
# 创建客户端
client = OPCUAClientTest("opc.tcp://localhost:4840/zjsh_feed/server/")
try:
# 连接到服务器
if not client.connect():
return
# 浏览节点结构(首先发现实际节点结构)
print("\n" + "="*60)
print("步骤1: 浏览服务器节点结构")
print("="*60)
client.browse_nodes()
# 尝试使用新方法写入数据
print("\n" + "="*60)
print("步骤2: 使用动态节点查找方法写入数据")
print("="*60)
# 方法1: 使用write_weights_directly自动查找节点
print("\n尝试方法1: write_weights_directly (自动查找节点)")
# client.write_data("2:upper/2:upper_weight", 180, "int")
# client.write_data("2:lower/2:lower_weight", 120, "int")
values = {
"2:upper/2:upper_weight": 200,
"2:lower/2:lower_weight": 100,
}
client.write_multiple_values(values)
# success1 = client.write_weights_directly(150, 120)
time.sleep(2)
# if not success1:
# # 方法2: 尝试可能的替代路径
# print("\n尝试方法2: 尝试其他节点路径格式")
# # 列出可能的节点路径格式
# possible_paths = [
# "2:upper_weight",
# "2:lower_weight",
# "2:upper/upper_weight",
# "2:lower/lower_weight",
# "ns=2;upper_weight",
# "ns=2;lower_weight"
# ]
# for path in possible_paths:
# print(f" 尝试写入: {path}")
# client.write_data(path, 150.5, "float")
# time.sleep(0.5)
print("\n" + "="*60)
print("步骤3: 读取验证数据")
print("="*60)
upper, lower = client.read_weights()
print(f"读取结果 - 上料斗: {upper}, 下料斗: {lower}")
except KeyboardInterrupt:
print("\n客户端被用户中断")
except Exception as e:
print(f"客户端运行错误: {e}")
import traceback
traceback.print_exc()
finally:
# 断开连接
client.disconnect()
if __name__ == "__main__":
if len(sys.argv) > 1:
# 支持自定义服务器地址
server_url = sys.argv[1]
client = OPCUAClientTest(server_url)
else:
client = OPCUAClientTest()
try:
main()
except Exception as e:
print(f"客户端运行错误: {e}")
sys.exit(1)