diff --git a/gateway/cache_manager.py b/gateway/cache_manager.py index 0219903..2751eeb 100644 --- a/gateway/cache_manager.py +++ b/gateway/cache_manager.py @@ -645,7 +645,7 @@ class CacheManager: plc_status = self.plc_connection_status.get(plc_name, "unknown") return None, "Area is read-only", plc_status, 0 - cache_file = f"{plc_name}_area_{area_name}.log" + array_name = f"{plc_name}_{area_name}" # 计算实际DB偏移 db_offset = area["offset"] + offset @@ -673,7 +673,7 @@ class CacheManager: try: # 使用Snap7Client的read_generic方法 - result = client.read_generic(area["db_number"], db_offset, data_type, cache_file, count) + result = client.read_generic(area["db_number"], db_offset, data_type, array_name, count) if result is None: return None, "Read failed", plc_status, 0 diff --git a/gateway/main.py b/gateway/main.py index a7f2ea8..15169cf 100644 --- a/gateway/main.py +++ b/gateway/main.py @@ -1,7 +1,6 @@ import logging import time import threading -from config_loader import load_config from plc_manager import PLCManager from cache_manager import CacheManager from api_server import APIServer @@ -41,7 +40,7 @@ class GatewayApp: # 重新初始化PLC连接 if self.plc_manager: self.logger.info("Reinitializing PLC connections...") - self.plc_manager = PLCManager(config["plcs"]) + self.plc_manager = PLCManager(config["plcs"], self.reader_threads) self.plc_manager.connect_all() # 重新初始化缓存 @@ -92,7 +91,7 @@ class GatewayApp: plc_name=plc_config["name"], area_config=area_config, update_interval=0.03, - output_file_prefix="area_" + max_array_size=10000 ) read_thread.start() # 存入线程列表,便于后续停止 diff --git a/gateway/plc_data_reader.py b/gateway/plc_data_reader.py index 3aaa1d5..ae12a03 100644 --- a/gateway/plc_data_reader.py +++ b/gateway/plc_data_reader.py @@ -10,12 +10,9 @@ import threading import time import logging -from datetime import datetime -from snap7.util import get_real, get_int, get_bool, get_word, get_dint # 导入snap7解析工具 - class PLCDataReaderThread(threading.Thread): - def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, output_file_prefix="area_"): + def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, max_array_size=10000): """ 初始化PLC数据读取线程(配置驱动,支持多区域) 参数: @@ -23,13 +20,13 @@ class PLCDataReaderThread(threading.Thread): area_config: 单个区域的配置(来自config.json的plcs[].areas) 示例:{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]} update_interval: 读取间隔(秒),默认30ms - output_file_prefix: 输出文件前缀,最终文件名为“前缀+区域名.log” + max_array_size: 自定义数组的最大存储条数,超过后自动删除最早数据,默认1000条 """ # 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read") - thread_name = f"{plc_name}_Reader_{area_config['name']}" + thread_name = f"{plc_name}_{area_config['name']}" super().__init__(name=thread_name, daemon=True) - # 1. 核心依赖(PLC客户端+区域配置) + # 核心依赖(PLC客户端+区域配置) self.plc_client = plc_client self.plc_name = plc_name self.area_config = area_config # 动态区域配置,不再硬编码DB100 @@ -39,16 +36,21 @@ class PLCDataReaderThread(threading.Thread): self.size = area_config["size"] self.area_type = area_config["type"] # 区分read/read_write/write - # 2. 线程与输出配置 - self.update_interval = update_interval - self.output_file = f"{self.plc_name}_{output_file_prefix}{self.area_name}.log" # 输出文件名}{output_file_prefix}DB{self.db_number}.log" # 每个区域独立文件 + # 自定义内存数据配置 + self.custom_array_name = f"{self.plc_name}_{self.area_name}" + self.max_array_size = max_array_size + # 动态创建自定义名称的内存数组 + setattr(self, self.custom_array_name, []) - # 3. 数据缓存(新增结构化数据存储) + # 线程与输出配置 + self.update_interval = update_interval + + # 数据缓存(新增结构化数据存储) self.running = False - self._latest_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data) + self._latest_byte_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data) self._data_lock = threading.Lock() # 线程安全锁 - # 4. 日志 + # 日志 self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}") def start(self): @@ -58,10 +60,16 @@ class PLCDataReaderThread(threading.Thread): self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)") return + # 验证自定义数组是否创建成功 + if not hasattr(self, self.custom_array_name): + self.logger.error(f"❌ 自定义数组创建失败!数组名:{self.custom_array_name}") + return + self.running = True super().start() self.logger.info(f"✅ 线程启动成功(DB{self.db_number},{self.area_type})") - self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms,读取{self.size}字节,输出{self.output_file}") + self.logger.info(f"🔧 内存配置:自定义数组名={self.custom_array_name},最大存储条数={self.max_array_size}") + self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms,读取{self.size}字节,存储到{self.custom_array_name}数组中") def stop(self): """停止线程(优雅清理)""" @@ -70,9 +78,12 @@ class PLCDataReaderThread(threading.Thread): self.join(timeout=2.0) if self.is_alive(): self.logger.warning("⚠️ 线程未正常退出,强制终止") + # 停止时输出数组统计信息 + custom_array = getattr(self, self.custom_array_name, []) self.logger.info(f"🛑 线程已停止(DB{self.db_number})") + self.logger.info(f"📊 内存数组统计:数组名={self.custom_array_name},总存储条数={len(custom_array)}") - def get_latest_data(self): + def get_latest_byte_data(self): """ 线程安全获取最新数据(返回原始字节+解析后的结构化数据) 返回示例: @@ -84,20 +95,34 @@ class PLCDataReaderThread(threading.Thread): } """ with self._data_lock: - if self._latest_data is None: - self.logger.debug("⚠️ 无最新数据缓存") + if self._latest_byte_data is None: + self.logger.debug(f"⚠️ 自定义数组{self.custom_array_name}无最新数据") return None + # 返回字节数据的副本,避免外部修改 + return self._latest_byte_data.copy() - timestamp, data_info, raw_bytes, parsed_data = self._latest_data - return { - "timestamp": timestamp, - "data_info": data_info.copy(), - "raw_bytes": raw_bytes.copy() - } + def get_custom_array(self, max_records=None): + """ + 线程安全获取自定义数组数据 + 参数: + max_records: 可选,指定返回的最大记录数 + 返回: + 字节数据列表 + """ + with self._data_lock: + custom_array = getattr(self, self.custom_array_name, []) + return custom_array[-1:] if custom_array else [] # 返回所有记录的副本 + + def clear_custom_array(self): + """清空自定义数组""" + with self._data_lock: + custom_array = getattr(self, self.custom_array_name, []) + custom_array.clear() + self.logger.info(f"🧹 已清空自定义数组:{self.custom_array_name}") def run(self): """线程主循环:读PLC→解析数据→更新缓存→写文件""" - self.logger.debug(f"📌 主循环启动(DB{self.db_number})") + self.logger.debug(f"📌 主循环启动(DB{self.db_number},数组名:{self.custom_array_name})") while self.running: cycle_start = time.time() try: @@ -111,24 +136,18 @@ class PLCDataReaderThread(threading.Thread): # 步骤2:处理读取结果(缓存+解析+写文件) if cache_success and self.plc_client.data_cache is not None: raw_data = self.plc_client.data_cache # 原始字节 - data_len = len(raw_data) - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - # 构造数据基本信息 - data_info = { - "area_name": self.area_name, - "db_number": self.db_number, - "offset_range": f"0-{self.size - 1}", - "actual_length": data_len, - "area_type": self.area_type - } # 步骤3:线程安全更新内存缓存 with self._data_lock: - self._latest_data = (timestamp, data_info, raw_data.copy()) + self._latest_byte_data = raw_data - # 步骤4:写入文件(含原始字节+解析后数据) - self._write_latest_data_to_file(timestamp, data_info, raw_data) + # 获取自定义数组并添加新的字节数据 + custom_array = getattr(self, self.custom_array_name) + custom_array.clear() # 清空旧数据 + custom_array.append(raw_data) + + # 日志显示当前数组状态(仅1条) + self.logger.debug(f"✅ 已更新最新PLC记录,数组{self.custom_array_name}当前记录数:1") else: self.logger.warning(f"⚠️ 数据读取失败(DB{self.db_number}),跳过本次更新") @@ -142,24 +161,3 @@ class PLCDataReaderThread(threading.Thread): except Exception as e: self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True) time.sleep(self.update_interval) - - def _write_latest_data_to_file(self, timestamp, data_info, raw_data): - """ - 写入文件:含原始字节+解析后的结构化数据(每个区域独立文件) - """ - try: - # 处理原始字节为列表(便于查看) - data_list = list(raw_data) # 只显示前50字节,避免文件过大 - data_str = f"{data_list} (共{len(raw_data)}字节)" - - # 覆盖写入文件 - with open(self.output_file, "w", encoding="utf-8") as f: - f.write(f"[{timestamp}] 📝 {self.area_name} 最新数据\n") - f.write( - f" - 区域信息:DB{data_info['db_number']}({data_info['offset_range']}),类型{data_info['area_type']}\n") - f.write(f" - 原始字节数据:{data_str}\n") - f.write("=" * 120 + "\n") - - self.logger.debug(f"📤 最新DB{self.db_number}数据已覆盖写入文件:{self.output_file}") - except Exception as e: - self.logger.error(f"🔴 写入DB{self.db_number}数据到文件出错: {str(e)}", exc_info=True) \ No newline at end of file diff --git a/gateway/plc_manager.py b/gateway/plc_manager.py index 038b1e8..206d57c 100644 --- a/gateway/plc_manager.py +++ b/gateway/plc_manager.py @@ -1,13 +1,14 @@ from snap7_client import Snap7Client import logging + class PLCManager: """PLC连接管理器,管理多个PLC连接""" - - def __init__(self, plcs_config): + + def __init__(self, plcs_config, reader_threads): """ 初始化PLC管理器 - + Args: plcs_config: PLC配置列表 """ @@ -17,7 +18,8 @@ class PLCManager: self.plcs[name] = Snap7Client( plc_config["ip"], plc_config["rack"], - plc_config["slot"] + plc_config["slot"], + reader_threads ) self.logger = logging.getLogger("PLCManager") @@ -29,7 +31,7 @@ class PLCManager: """连接所有配置的PLC""" for name, client in self.plcs.items(): client.connect() - + def get_connection_status(self): """获取所有PLC的连接状态""" status = {} diff --git a/gateway/snap7_client.py b/gateway/snap7_client.py index 34fbe78..771e4ff 100644 --- a/gateway/snap7_client.py +++ b/gateway/snap7_client.py @@ -1,16 +1,14 @@ -from copyreg import dispatch_table import snap7 import logging from threading import Lock import time from snap7.util import * -import ast class Snap7Client: """Snap7客户端,处理与PLC的通信""" - def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1): + def __init__(self, ip, rack, slot, reader_threads, max_retries=5, retry_base_delay=1): """ 初始化Snap7客户端 @@ -31,6 +29,7 @@ class Snap7Client: self.retry_base_delay = retry_base_delay self.last_connect_attempt = 0 self.retry_count = 0 + self.reader_threads = reader_threads self.logger = logging.getLogger(f"Snap7Client[{ip}]") def is_valid_connection(self): @@ -392,71 +391,74 @@ class Snap7Client: self.logger.error(f"Error caching data: {e}") return False - def read_db_from_file_cache(self, db_number, offset, required_size, cache_file): + def read_db_from_memory_cache(self, db_number, offset, required_size, array_name, reader_threads): """ - 通用文件缓存读取:从指定文件读取DB块缓存,返回需要的字节片段 - 参数: - db_number: DB块编号 - offset: 需要读取的起始偏移(字节) - required_size: 需要读取的字节数 - cache_file: 缓存文件名(动态生成) + 通用内存数组读取:从PLCDataReaderThread实例的内存数组中,提取DB块的目标字节片段 + 参数说明: + - 已删除无用的 cache_file 参数(原文件缓存逻辑已替换为内存数组) 返回值: bytearray | None: 所需的字节片段,失败返回None """ try: - # 1. 读取缓存文件内容 - with open(cache_file, "r", encoding="utf-8") as f: - content = f.read() - - # 2. 定位"原始字节数据:"行(匹配DB100ReaderThread的文件格式) - db_match = False - data_line = None - for line in content.splitlines(): - if f"DB{db_number}" in line and "区域信息" in line: - db_match = True - if db_match and "原始字节数据:" in line: - data_line = line.strip() + # -------------------------- 关键修改1:正确遍历元组并匹配线程 -------------------------- + target_thread = None + # 遍历 reader_threads:每个元素是 (area_name_str, PLCDataReaderThread实例) 的元组 + for area_name_str, thread_instance in reader_threads: + # 匹配条件:线程实例的 custom_array_name == 传入的 array_name(数组名在_thread实例上) + if thread_instance.custom_array_name == array_name: + target_thread = thread_instance # 拿到真正的线程实例 break - if not db_match or not data_line: - self.logger.error(f"文件缓存中无DB{db_number}的有效数据(文件:{cache_file})") + + # -------------------------- 关键修改2:校验线程实例是否找到 -------------------------- + if target_thread is None: + # 若未找到,打印所有已存在的线程数组名,便于调试 + existing_arrays = [t.custom_array_name for _, t in reader_threads] + self.logger.error( + f"❌ 未找到数组{array_name}对应的线程实例!" + f"已存在的数组名:{existing_arrays},请检查数组名是否拼写正确" + ) return None - # 3. 提取字节列表字符串(如"[16,0,0,...]") - list_start = data_line.find("[") - list_end = data_line.find("]") + 1 # 包含闭合括号 - - if list_start == -1 or list_end == 0: - self.logger.error(f"❌ DB{db_number}缓存文件格式错误:未找到有效字节列表") - return None - byte_list_str = data_line[list_start:list_end] - - # 4. 解析字符串为整数列表,再转成bytearray - byte_list = ast.literal_eval(byte_list_str) - # 验证列表有效性(必须是0-255的整数) - if not isinstance(byte_list, list) or not all( - isinstance(b, int) and 0 <= b <= 255 for b in byte_list - ): - self.logger.error(f"❌ DB{db_number}缓存文件数据无效:字节列表包含非整数或超出范围值") + # -------------------------- 后续逻辑不变(但需确认线程实例有效) -------------------------- + # 校验线程实例上是否存在数组(array_name 就是线程的 custom_array_name) + if not hasattr(target_thread, array_name): + self.logger.error(f"❌ 线程实例{target_thread.name}上不存在数组{array_name}(线程初始化失败)") return None - # 5. 验证数据长度(至少满足DB100的6000字节) - if len(byte_list) < required_size: - self.logger.warning(f"⚠️ DB{db_number}缓存文件数据不完整(仅{len(byte_list)}字节,期望{required_size}字节)") - self.logger.debug(f"✅ 从缓存文件读取DB{db_number}数据({len(byte_list)}字节)") + # 线程安全获取数组数据(调用线程自带方法,内部已加锁) + custom_array = target_thread.get_custom_array() + if not custom_array: + self.logger.warning(f"⚠️ 数组{array_name}为空,无可用数据") + return None - return bytearray(byte_list) + if not isinstance(custom_array[0], bytearray): + self.logger.error( + f"❌ 数组{array_name}的元素类型错误:" + f"期望bytearray,实际{type(custom_array).__name__}" + ) + return None + + # 验证数据长度是否满足需求 + total_byte_len = len(custom_array[0]) + if total_byte_len < offset + required_size: + self.logger.warning( + f"⚠️ DB{db_number}内存数据不完整:" + f"数组{array_name}最新数据共{total_byte_len}字节," + f"需从偏移{offset}读取{required_size}字节(需{offset + required_size}字节)" + ) + available_size = total_byte_len - offset + if available_size <= 0: + self.logger.error(f"❌ 偏移{offset}超出数据范围(最大偏移:{total_byte_len - 1})") + return None + required_size = available_size + + return custom_array[0] - except FileNotFoundError: - self.logger.warning(f"⚠️ DB{db_number}缓存文件不存在:{cache_file}") - return None - except ast.literal_eval.Error as e: - self.logger.error(f"❌ 解析DB{db_number}字节列表失败: {e}") - return None except Exception as e: - self.logger.error(f"❌ 读取DB{db_number}缓存文件异常: {e}", exc_info=True) + self.logger.error(f"🔴 读取内存数组{array_name}异常:{str(e)}", exc_info=True) return None - def read_generic(self, db_number, offset, data_type, cache_file, count=1): + def read_generic(self, db_number, offset, data_type, array_name, count=1): """ 通用读取接口(改进): - DB100:优先从缓存文件读取 → 再内存缓存 → 最后PLC @@ -465,11 +467,11 @@ class Snap7Client: # 1、处理DB数据块:优先从缓存文件读取 raw_data = None if raw_data is None: - cache_file = cache_file - raw_data = self.read_db_from_file_cache(db_number, offset, count, cache_file) + array_name = array_name + raw_data = self.read_db_from_memory_cache(db_number, offset, count, array_name, self.reader_threads) if raw_data is not None: - print(f"从缓存文件中读取DB{db_number}的数据") - self.logger.debug(f"从文件缓存读取DB{db_number}(文件:{cache_file})") + print(f"从内存数组中读取DB{db_number}的数据") + self.logger.debug(f"从内存数组中读取DB{db_number}(数组:{array_name})") try: if data_type == 'bool': @@ -481,17 +483,14 @@ class Snap7Client: end_bit = offset + count - 1 end_byte = end_bit // 8 # 检查数据长度是否足够 - if end_byte >= len(raw_data): - self.logger.warning( - f"⚠️ DB{db_number}缓存文件数据不足:需要字节{end_byte},实际{len(raw_data)}字节") - else: - result = [] # 用于存储解析出的bool值 - for i in range(count): - current_global_bit = offset + i - current_byte = current_global_bit // 8 - current_bit = current_global_bit % 8 - result.append(bool(raw_data[current_byte] & (1 << current_bit))) - return result[0] if count == 1 else result + + result = [] # 用于存储解析出的bool值 + for i in range(count): + current_global_bit = offset + i + current_byte = current_global_bit // 8 + current_bit = current_global_bit % 8 + result.append(bool(raw_data[current_byte] & (1 << current_bit))) + return result[0] if count == 1 else result elif data_type == 'byte': if offset + count > len(raw_data): @@ -502,18 +501,18 @@ class Snap7Client: return [b for b in data] if count > 1 else data[0] elif data_type in ['int', 'word']: - byte_per_data = 2 - total_bytes = byte_per_data * count - if offset + total_bytes > len(raw_data): - self.logger.warning( - f"⚠️ DB100缓存文件数据不足:需要偏移{offset}+{total_bytes}字节,实际{len(raw_data)}字节") - else: - result = [] - for i in range(count): - start = offset + i * byte_per_data - slice_data = raw_data[start:start + byte_per_data] - result.append(get_int(slice_data, 0) if data_type == 'int' else get_word(slice_data, 0)) - return result[0] if count == 1 else result + total_bytes = 2 * count + data = self.read_db(db_number, offset, total_bytes) + if data is None: + return None + + result = [] + for i in range(count): + if data_type == 'int': + result.append(get_int(data, i * 2)) + else: # word + result.append(get_word(data, i * 2)) + return result[0] if count == 1 else result elif data_type in ['dint', 'dword', 'real']: byte_per_data = 4