添加其他数据块的读取线程,并且解决线程冲突问题。另外解决了之前bool类型读取的Bug

This commit is contained in:
您çšpengqi
2025-09-29 11:51:38 +08:00
parent 144def0e7d
commit 262e76c5db
3 changed files with 385 additions and 171 deletions

View File

@ -6,7 +6,7 @@ from plc_manager import PLCManager
from cache_manager import CacheManager
from api_server import APIServer
from config_manager import ConfigManager
from db100_reader import DB100ReaderThread
from plc_data_reader import PLCDataReaderThread
class GatewayApp:
@ -20,21 +20,22 @@ class GatewayApp:
self.api_server = None
self.reload_flag = False
self.reload_lock = threading.Lock()
# DB100ReaderThread线程相关初始化
self.db100_reader_thread = None
self.logger = logging.getLogger("GatewayApp")
# 存储所有读取线程(便于配置重载时停止)
self.reader_threads = []
# 加载初始配置
self.load_configuration()
def load_configuration(self):
"""加载配置并初始化组件"""
# 停止所有已启动的读取线程
self.stop_all_reader_threads()
# 加载配置
if not self.config_manager.load_config():
self.logger.error("Failed to load initial configuration")
return False
config = self.config_manager.get_config()
# 重新初始化PLC连接
@ -47,7 +48,6 @@ class GatewayApp:
if self.cache_manager:
self.logger.info("Stopping existing cache manager...")
self.cache_manager.stop()
self.logger.info("Initializing cache manager...")
self.cache_manager = CacheManager(config, self.plc_manager, app=self)
self.cache_manager.start()
@ -60,17 +60,57 @@ class GatewayApp:
self.api_server = APIServer(self.cache_manager, self.config_path)
self.api_server.start()
# 重新初始化DB100ReaderThread线程
for plc in config["plcs"]:
plc_name = plc["name"]
client = self.plc_manager.get_plc(plc_name)
self.logger.info("Starting db100_reader_thread...")
self.db100_reader_thread = DB100ReaderThread(client, output_file="db100_latest_data.log")
self.db100_reader_thread.start()
# 动态启动多区域读取线程
self.start_all_reader_threads(config)
self.logger.info("Configuration loaded successfully")
return True
def start_all_reader_threads(self, config):
"""
遍历PLC的areas,为每个read/read_write区域启动读取线程
"""
for plc_name, plc_client in self.plc_manager.plcs.items():
plc_config = next((p for p in config["plcs"] if p["name"] == plc_name), None)
if not plc_config or "areas" not in plc_config:
self.logger.warning(f"PLC {plc_name} 无areas配置跳过启动读取线程")
continue
# 遍历当前PLC的每个areas
for area_config in plc_config["areas"]:
area_name = area_config["name"]
area_type = area_config["type"]
# 仅处理需要读的区域
if area_type not in ["read", "read_write"]:
self.logger.debug(f"PLC {plc_name} 区域{area_name}{area_type})无需启动读取线程")
continue
try:
# 创建并启动线程
read_thread = PLCDataReaderThread(
plc_client=plc_client,
area_config=area_config,
update_interval=0.03,
output_file_prefix="plc_area_"
)
read_thread.start()
# 存入线程列表,便于后续停止
self.reader_threads.append((area_name, read_thread))
self.logger.info(f"✅ 启动区域读取线程PLC {plc_name}{area_name}DB{area_config['db_number']}")
except Exception as e:
self.logger.error(f"❌ 启动区域{area_name}线程失败: {str(e)}", exc_info=True)
def stop_all_reader_threads(self):
"""停止所有已启动的读取线程"""
if not self.reader_threads:
return
self.logger.info("Stopping all reader threads...")
for area_name, thread in self.reader_threads:
if thread.is_alive():
thread.stop()
self.logger.debug(f"Stopped reader thread for area {area_name}")
self.reader_threads.clear()
def check_for_reload(self):
"""检查是否需要重载配置"""
while True: