import logging import time import threading from plc_manager import PLCManager from cache_manager import CacheManager from api_server import APIServer from config_manager import ConfigManager from plc_data_reader import PLCDataReaderThread class GatewayApp: """PLC网关应用程序主类""" def __init__(self, config_path="../config/config.json"): self.config_path = config_path self.config_manager = ConfigManager(config_path) self.plc_manager = None self.cache_manager = None self.api_server = None self.reload_flag = False self.reload_lock = threading.Lock() self.logger = logging.getLogger("GatewayApp") # 存储所有读取线程(便于配置重载时停止) self.reader_threads = [] # 加载初始配置 self.load_configuration() def load_configuration(self): """加载配置并初始化组件""" # 停止所有已启动的读取线程 self.stop_all_reader_threads() # 加载配置 if not self.config_manager.load_config(): self.logger.error("Failed to load initial configuration") return False config = self.config_manager.get_config() # 重新初始化PLC连接 if self.plc_manager: self.logger.info("Reinitializing PLC connections...") self.plc_manager = PLCManager(config["plcs"], self.reader_threads) self.plc_manager.connect_all() # 重新初始化缓存 if self.cache_manager: self.logger.info("Stopping existing cache manager...") self.cache_manager.stop() self.logger.info("Initializing cache manager...") self.cache_manager = CacheManager(config, self.plc_manager, app=self) self.cache_manager.start() # 重新初始化API服务器 if self.api_server: self.logger.info("API server already running") else: self.logger.info("Starting API server...") self.api_server = APIServer(self.cache_manager, self.config_path) self.api_server.start() # 动态启动多区域读取线程 self.start_all_reader_threads(config) self.logger.info("Configuration loaded successfully") return True def start_all_reader_threads(self, config): """ 遍历PLC的areas,为每个read/read_write区域启动读取线程 """ for plc_name, plc_client in self.plc_manager.plcs.items(): plc_config = next((p for p in config["plcs"] if p["name"] == plc_name), None) if not plc_config or "areas" not in plc_config: self.logger.warning(f"PLC {plc_name} 无areas配置,跳过启动读取线程") continue # 遍历当前PLC的每个areas for area_config in plc_config["areas"]: area_name = area_config["name"] area_type = area_config["type"] # 仅处理需要读的区域 if area_type not in ["read", "read_write"]: self.logger.debug(f"PLC {plc_name} 区域{area_name}({area_type})无需启动读取线程") continue try: # 创建并启动线程 read_thread = PLCDataReaderThread( plc_client=plc_client, plc_name=plc_config["name"], area_config=area_config, update_interval=0.03, max_array_size=10000 ) read_thread.start() # 存入线程列表,便于后续停止 self.reader_threads.append((area_name, read_thread)) self.logger.info(f"✅ 启动区域读取线程:PLC {plc_name} → {area_name}(DB{area_config['db_number']})") except Exception as e: self.logger.error(f"❌ 启动区域{area_name}线程失败: {str(e)}", exc_info=True) def stop_all_reader_threads(self): """停止所有已启动的读取线程""" if not self.reader_threads: return self.logger.info("Stopping all reader threads...") for area_name, thread in self.reader_threads: if thread.is_alive(): thread.stop() self.logger.debug(f"Stopped reader thread for area {area_name}") self.reader_threads.clear() def check_for_reload(self): """检查是否需要重载配置""" while True: with self.reload_lock: if self.reload_flag: self.reload_flag = False self.load_configuration() time.sleep(1) def request_reload(self): """请求重载配置""" with self.reload_lock: self.reload_flag = True self.logger.info("Configuration reload requested") def run(self): """运行主程序""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' ) self.logger.info("Starting PLC Gateway...") # 启动配置重载检查线程 reload_thread = threading.Thread( target=self.check_for_reload, name="ConfigReloadThread", daemon=True ) reload_thread.start() try: # 保持主程序运行 while True: time.sleep(1) except KeyboardInterrupt: self.logger.info("Shutting down gracefully...") finally: if self.cache_manager: self.cache_manager.stop() self.logger.info("Shutdown complete") def main(): app = GatewayApp() app.run() if __name__ == "__main__": main()