#!/usr/bin/env python3 """ 简单的OPC UA服务器示例 用于工业自动化数据通信 """ from opcua import Server, ua import time import random import threading from datetime import datetime from core.system import SystemState class SimpleOPCUAServer: def __init__(self, state, endpoint="opc.tcp://0.0.0.0:4840/zjsh_feed/server/", name="Feed_Server"): """ 初始化OPC UA服务器 Args: endpoint: 服务器端点地址 name: 服务器名称 """ self.server = Server() self.server.set_endpoint(endpoint) self.server.set_server_name(name) self.state = state # 设置服务器命名空间 self.namespace = self.server.register_namespace("Feed_Control_System") # 获取对象节点 self.objects = self.server.get_objects_node() # 创建自定义对象 self.create_object_structure() # 运行标志 self.running = False def create_object_structure(self): """创建OPC UA对象结构""" # 创建上料斗对象 self.upper = self.objects.add_object(self.namespace, "upper") self.lower=self.objects.add_object(self.namespace, "lower") self.sys=self.objects.add_object(self.namespace, "sys") # 创建变量 self.create_variables() def create_variables(self): """创建OPC UA变量""" # 上料斗重量变量 self.upper_weight = self.upper.add_variable(self.namespace, "upper_weight", 0.0) self.lower_weight = self.lower.add_variable(self.namespace, "lower_weight", 0.0) # 设置变量为可写 self.upper_weight.set_writable() self.lower_weight.set_writable() def setup_state_listeners(self): """设置状态监听器 - 事件驱动更新""" if hasattr(self.state, 'state_updated'): self.state.state_updated.connect(self.on_state_changed) print("状态监听器已设置 - 事件驱动模式") def on_state_changed(self, property_name, value): """状态变化时的回调函数""" try: # 根据属性名更新对应的OPC UA变量 if property_name == "upper_weight": self.upper_weight.set_value(value) elif property_name == "lower_weight": self.lower_weight.set_value(value) # 可以在这里添加更多状态映射 print(f"状态更新: {property_name} = {value}") except Exception as e: print(f"状态更新错误: {e}") def start(self): """启动服务器""" try: self.server.start() self.running = True print(f"OPC UA服务器启动成功!") print(f"服务器端点: opc.tcp://0.0.0.0:4840/freeopcua/server/") print(f"命名空间: {self.namespace}") # 初始化当前值 if self.state: self.upper_weight.set_value(self.state._upper_weight) self.lower_weight.set_value(self.state._lower_weight) print("已同步初始状态值") # 设置状态监听器 - 关键步骤! self.setup_state_listeners() # # 只有在没有状态系统时才使用模拟线程 # if not self.state: # print("使用模拟数据模式") # self.simulation_thread = threading.Thread(target=self.simulate_data) # self.simulation_thread.daemon = True # self.simulation_thread.start() except Exception as e: print(f"启动服务器失败: {e}") def stop(self): """停止服务器""" self.running = False if hasattr(self, 'simulation_thread'): self.simulation_thread.join(timeout=2) self.server.stop() print("OPC UA服务器已停止") # 断开状态监听器 if hasattr(self.state, 'state_updated'): try: self.state.state_updated.disconnect(self.on_state_changed) except: pass def simulate_data(self): """模拟数据更新""" while self.running: try: # 更新变量值 self.upper_weight.set_value(self.state.upper_weight) self.lower_weight.set_value(self.state.lower_weight) # 模拟延迟 time.sleep(1) except Exception as e: print(f"数据更新错误: {e}") continue def main(): """主函数""" # 创建系统状态实例 state = SystemState() # 创建并启动服务器 server = SimpleOPCUAServer( state=state, endpoint="opc.tcp://0.0.0.0:4840/freeopcua/server/", name="工业自动化 OPC UA 服务器" ) try: server.start() print("服务器正在运行,按 Ctrl+C 停止...") # 保持服务器运行 while True: time.sleep(1) except KeyboardInterrupt: print("\n正在停止服务器...") server.stop() except Exception as e: print(f"服务器运行错误: {e}") server.stop() if __name__ == "__main__": main()