Files
gateway_plc/gateway/main.py

164 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import time
import threading
from plc_manager import PLCManager
from cache_manager import CacheManager
from api_server import APIServer
from config_manager import ConfigManager
from plc_data_reader import PLCDataReaderThread
class GatewayApp:
"""PLC网关应用程序主类"""
def __init__(self, config_path="../config/config.json"):
self.config_path = config_path
self.config_manager = ConfigManager(config_path)
self.plc_manager = None
self.cache_manager = None
self.api_server = None
self.reload_flag = False
self.reload_lock = threading.Lock()
self.logger = logging.getLogger("GatewayApp")
# 存储所有读取线程(便于配置重载时停止)
self.reader_threads = []
# 加载初始配置
self.load_configuration()
def load_configuration(self):
"""加载配置并初始化组件"""
# 停止所有已启动的读取线程
self.stop_all_reader_threads()
# 加载配置
if not self.config_manager.load_config():
self.logger.error("Failed to load initial configuration")
return False
config = self.config_manager.get_config()
# 重新初始化PLC连接
if self.plc_manager:
self.logger.info("Reinitializing PLC connections...")
self.plc_manager = PLCManager(config["plcs"], self.reader_threads)
self.plc_manager.connect_all()
# 重新初始化缓存
if self.cache_manager:
self.logger.info("Stopping existing cache manager...")
self.cache_manager.stop()
self.logger.info("Initializing cache manager...")
self.cache_manager = CacheManager(config, self.plc_manager, app=self)
self.cache_manager.start()
# 重新初始化API服务器
if self.api_server:
self.logger.info("API server already running")
else:
self.logger.info("Starting API server...")
self.api_server = APIServer(self.cache_manager, self.config_path)
self.api_server.start()
# 动态启动多区域读取线程
self.start_all_reader_threads(config)
self.logger.info("Configuration loaded successfully")
return True
def start_all_reader_threads(self, config):
"""
遍历PLC的areas,为每个read/read_write区域启动读取线程
"""
for plc_name, plc_client in self.plc_manager.plcs.items():
plc_config = next((p for p in config["plcs"] if p["name"] == plc_name), None)
if not plc_config or "areas" not in plc_config:
self.logger.warning(f"PLC {plc_name} 无areas配置跳过启动读取线程")
continue
# 遍历当前PLC的每个areas
for area_config in plc_config["areas"]:
area_name = area_config["name"]
area_type = area_config["type"]
# 仅处理需要读的区域
if area_type not in ["read", "read_write"]:
self.logger.debug(f"PLC {plc_name} 区域{area_name}{area_type})无需启动读取线程")
continue
try:
# 创建并启动线程
read_thread = PLCDataReaderThread(
plc_client=plc_client,
plc_name=plc_config["name"],
area_config=area_config,
update_interval=0.03,
max_array_size=10000
)
read_thread.start()
# 存入线程列表,便于后续停止
self.reader_threads.append((area_name, read_thread))
self.logger.info(f"✅ 启动区域读取线程PLC {plc_name}{area_name}DB{area_config['db_number']}")
except Exception as e:
self.logger.error(f"❌ 启动区域{area_name}线程失败: {str(e)}", exc_info=True)
def stop_all_reader_threads(self):
"""停止所有已启动的读取线程"""
if not self.reader_threads:
return
self.logger.info("Stopping all reader threads...")
for area_name, thread in self.reader_threads:
if thread.is_alive():
thread.stop()
self.logger.debug(f"Stopped reader thread for area {area_name}")
self.reader_threads.clear()
def check_for_reload(self):
"""检查是否需要重载配置"""
while True:
with self.reload_lock:
if self.reload_flag:
self.reload_flag = False
self.load_configuration()
time.sleep(1)
def request_reload(self):
"""请求重载配置"""
with self.reload_lock:
self.reload_flag = True
self.logger.info("Configuration reload requested")
def run(self):
"""运行主程序"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger.info("Starting PLC Gateway...")
# 启动配置重载检查线程
reload_thread = threading.Thread(
target=self.check_for_reload,
name="ConfigReloadThread",
daemon=True
)
reload_thread.start()
try:
# 保持主程序运行
while True:
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Shutting down gracefully...")
finally:
if self.cache_manager:
self.cache_manager.stop()
self.logger.info("Shutdown complete")
def main():
app = GatewayApp()
app.run()
if __name__ == "__main__":
main()