#!/usr/bin/env python3 from opcua import Client, ua import time from datetime import datetime import configparser from threading import Thread,RLock import logging import queue logging.getLogger("opcua").setLevel(logging.WARNING) logging.getLogger("opcua.client.ua_client").setLevel(logging.WARNING) logging.getLogger("opcua.uaprotocol").setLevel(logging.WARNING) #控制程序opcua 需要需要在opc_config中配置_f结尾的key才会收到通知 # Opcua回调处理器 class SubscriptionHandler: def __init__(self,parent): self.node_id_to_name = {} #设置派单模式(1自动派单 2手动派单0未知) # self.pd_set_mode=0 # #设置方量{ID:派单表ID,Volumn:修改后方量} # self.pd_set_volumn='' # #派单通知数据{ErpID:ID:派单表ID,Flag:状态,ErrorMsg:失败异常信息 } # self.pd_notify='' # 添加队列和锁 self.notification_queue = queue.Queue() self.processing_thread = Thread(target=self._process_notifications, daemon=True,name="opcua_recv_thread") self.lock = RLock() # 可重入锁 self.parent=parent self.notify_callback=self.parent.notify_callback def start_accept(self): self.processing_thread.start() def datachange_notification(self, node, val, data): try: node_id = node.nodeid.to_string() var_name = self.node_id_to_name.get(node_id) print(f"node_id: {node_id}, var_name: {var_name}, val: {val}") self.notification_queue.put_nowait((var_name, val)) # setattr(self, var_name, val) # if self.notify_callback: # self.notify_callback(var_name,val) except Exception as e: err_msg = f"opcua解析值变化事件失败: {e}" print(err_msg) def _process_notifications(self): """后台线程处理通知(串行执行)""" while self.parent.is_running: try: var_name, val = self.notification_queue.get(timeout=1) # 使用锁保护共享资源 with self.lock: self.notify_callback(var_name,val) self.notification_queue.task_done() except queue.Empty: continue except Exception as e: print(f"处理通知失败: {e}") class OpcuaClientFeed(Thread): def __init__(self,notify_callback=None, parent=None): super().__init__(parent) self.name="opcua_feed_client" self.target_var_paths = [] #订阅配置文件中的参数 self.subscription_name=["pd_notify","pd_notify_finish","pd_set_mode","pd_set_volume"] # self.server_url = "" self.read_opc_config() # 读取配置文件 self.client = Client(self.server_url) # 初始化opc客户端 self.notify_callback=notify_callback self.connected = False self.subscription = None self.monitored_items = [] self.is_running = True # 线程运行标志位 self.node_id_mapping = {} # node_id 和 可读变量名的映射表 self.is_reconnect_tip_sent = False # 重连失败提示是否已发送 self.handler = SubscriptionHandler(self) def start_accept(self): self.handler.start_accept() def stop_run(self): """停止线程+断开连接""" self.is_running = False self.disconnect() # self.wait() print("opcua客户端线程已退出") def connect(self): """连接到OPC服务器""" try: self.client.connect() self.connected = True msg = f"成功连接到OPCUA服务器: {self.server_url}" print(msg) self.is_reconnect_tip_sent = False return True except Exception as e: self.connected = False err_msg = f"连接OPCUA服务器失败: {e}" print(err_msg) if not self.is_reconnect_tip_sent: print(err_msg) # 标记为已发送,后续不重复在UI上显示 self.is_reconnect_tip_sent = True return False def disconnect(self): """断开连接""" self.connected = False try: if self.monitored_items: for item in self.monitored_items: try: self.subscription.unsubscribe(item) except Exception: pass self.monitored_items.clear() if self.subscription: try: self.subscription.delete() except Exception: pass self.subscription = None if self.client: try: self.client.disconnect() except Exception: pass self.node_id_mapping.clear() if hasattr(self, 'handler') and self.handler: self.handler.node_id_to_name = {} except Exception as e: print(f"opcua断开连接异常: {e}") def build_node_id_mapping(self): """根据object_name+var_name路径获取nodeid,建立映射表""" if not self.connected: return False try: objects_node = self.client.get_objects_node() self.handler.node_id_to_name = self.node_id_mapping for var_name, path_list in self.target_var_paths: target_node = objects_node.get_child(path_list) node_id = target_node.nodeid.to_string() self.node_id_mapping[node_id] = var_name print("nodeid映射表构建成功") return True except Exception as e: err_msg = f"构建{var_name}映射表失败: {e}" print(err_msg) return False def create_multi_subscription(self, interval=None): """订阅多个变量(基于映射表的nodeid)""" if not self.connected: return if not self.node_id_mapping: return try: interval = int(interval) if interval else self.sub_interval self.subscription = self.client.create_subscription(interval, self.handler) for node_id, var_name in self.node_id_mapping.items(): if var_name in self.subscription_name: var_node = self.client.get_node(node_id) monitored_item = self.subscription.subscribe_data_change(var_node) self.monitored_items.append(monitored_item) print(f"已订阅变量: {var_name} (nodeid: {node_id})") except Exception as e: err_msg = f"创建批量订阅失败: {e}" print(err_msg) def read_opc_config(self, cfg_path = "config/opc_config.ini"): """读取OPC配置文件, 初始化所有参数和节点列表""" try: cfg = configparser.ConfigParser() cfg.read(cfg_path, encoding="utf-8") # 1. 读取服务器基础配置 self.server_url = cfg.get("OPC_SERVER_CONFIG", "server_url") self.heartbeat_interval = cfg.getint("OPC_SERVER_CONFIG", "heartbeat_interval") self.reconnect_interval = cfg.getint("OPC_SERVER_CONFIG", "reconnect_interval") self.sub_interval = cfg.getint("OPC_SERVER_CONFIG", "sub_interval") # 2. 读取OPC节点配置 node_section = cfg["OPC_NODE_LIST"] for readable_name, node_path_str in node_section.items(): node_path_list = node_path_str.split(",") self.target_var_paths.append( (readable_name, node_path_list) ) print("target_var_paths", self.target_var_paths) except Exception as e: print(f"读取配置文件失败: {e},使用默认配置启动!") self.server_url = "opc.tcp://localhost:4840/zjsh_feed/server/" self.heartbeat_interval = 4 self.reconnect_interval = 2 self.sub_interval = 500 # 参数合法性检验 self.heartbeat_interval = self.heartbeat_interval if isinstance(self.heartbeat_interval, int) and self.heartbeat_interval >=1 else 4 self.reconnect_interval = self.reconnect_interval if isinstance(self.reconnect_interval, int) and self.reconnect_interval >=1 else 2 self.sub_interval = self.sub_interval if isinstance(self.sub_interval, int) and self.sub_interval >=100 else 500 def write_value_by_name(self, var_readable_name, value): """ 根据变量可读名称写入值(主要用于修改方量, 方量的类型为 Double类型) :param var_readable_name: 变量可读名称(如"upper_weight") :param value: 要写入的值 """ max_wait = 30 # 最大等待30秒 wait_count = 0 while (not self.node_id_mapping and wait_count < max_wait): time.sleep(0.5) wait_count += 1 if wait_count % 2 == 0: # 每2秒打印一次 print(f"等待OPC连接中...({wait_count/2}秒)") if not self.connected: print(f"{var_readable_name}写入失败: OPC服务未连接") return target_node_id = None for node_id, name in self.node_id_mapping.items(): if name == var_readable_name: target_node_id = node_id break if not target_node_id: return try: target_node = self.client.get_node(target_node_id) target_node.set_value(value) except Exception as e: err_msg = f"opcua写入值失败: {e}" print(err_msg) # ===== 心跳检测函数 ===== def _heartbeat_check(self): """心跳检测: 判断opc服务是否存活""" try: self.client.get_node("i=2258").get_value() return True except Exception as e: err_msg = f"心跳检测失败, OPCUA服务已断开 {e}" print(err_msg) return False # ===== 掉线重连函数 ===== def _auto_reconnect(self): """掉线后自动重连+重建映射+恢复订阅""" print("OPC服务掉线, 开始自动重连...") try: self.disconnect() except Exception as e: print(f"_auto_reconnect: 断开旧连接时出现异常: {e}") while self.is_running: print(f"重试连接OPC服务器: {self.server_url}") if self.connect(): self.build_node_id_mapping() self.create_multi_subscription() print("OPCUA服务器重连成功, 所有订阅已恢复正常") break time.sleep(self.reconnect_interval) def _init_connect_with_retry(self): """连接opc服务器""" print("OPC客户端初始化, 开始连接服务器...") while self.is_running: if self.connect(): self.build_node_id_mapping() self.create_multi_subscription() break print(f"连接OPCUA服务器失败, {self.reconnect_interval}秒后重试...") time.sleep(self.reconnect_interval) def run(self) -> None: """opcua客户端线程主函数""" # 连接opc服务器 self._init_connect_with_retry() while self.is_running: if self.connected: if not self._heartbeat_check(): self.connected = False self._auto_reconnect() else: self._auto_reconnect() time.sleep(self.heartbeat_interval) if __name__ == "__main__": opcua_client = OpcuaClientFeed() opcua_client.start() opcua_client.write_value_by_name("pd_set_mode", 1) while True: time.sleep(1)