From 485bbcc3f0f76452f9a6e655569a0aac9bd36391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=A8=C3=A7=C2=9Apengqi?= <您çpengqi@xj-robot.com> Date: Fri, 26 Sep 2025 19:49:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E7=A8=B3=E5=AE=9A=E6=80=A7?= =?UTF-8?q?=EF=BC=8C=E4=BD=BF=E7=94=A8=E7=BA=BF=E7=A8=8B=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E8=AF=BB=E5=8F=96PLC=E4=B8=ADdb=5F100=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=B9=B6=E5=AD=98=E5=82=A8=E8=B5=B7=E6=9D=A5?= =?UTF-8?q?=EF=BC=8Cread=5Fgeneric=E4=BB=8E=E5=AD=98=E5=82=A8=E7=9A=84?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=AD=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gateway/api_server.py | 1 - gateway/cache_manager.py | 1 - gateway/db100_reader.py | 226 +++++++++++++++++++++ gateway/main.py | 45 +++-- gateway/snap7_client.py | 420 +++++++++++++++++++++++++++++++-------- 5 files changed, 592 insertions(+), 101 deletions(-) create mode 100644 gateway/db100_reader.py diff --git a/gateway/api_server.py b/gateway/api_server.py index 43d8bb6..3584613 100644 --- a/gateway/api_server.py +++ b/gateway/api_server.py @@ -1427,7 +1427,6 @@ class APIServer: endpoint="read_generic") def read_generic(plc_name, area_name, offset, data_type): """通用读取接口""" - print("Enter Read generic") # 检查请求参数 count = request.args.get('count', 1, type=int) if count < 1: diff --git a/gateway/cache_manager.py b/gateway/cache_manager.py index fd6621d..f44076a 100644 --- a/gateway/cache_manager.py +++ b/gateway/cache_manager.py @@ -636,7 +636,6 @@ class CacheManager: """通用读取接口""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) - print("area:",area) if not area: return None, "Area not found", "unknown", 0 diff --git a/gateway/db100_reader.py b/gateway/db100_reader.py new file mode 100644 index 0000000..8d615cd --- /dev/null +++ b/gateway/db100_reader.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/26 09:40 +# @Author : reenrr +# @File : db100_reader.py +''' +import threading +import time +import logging +from datetime import datetime +from snap7_client import Snap7Client # 导入实际的Snap7Client类 + + +class DB100ReaderThread(threading.Thread): + def __init__(self, plc_client, update_interval=0.03, data_size=6000, output_file="db100_latest_data.log"): + """ + 初始化DB100数据读取线程(仅保留最新数据,支持直接提取) + 参数: + plc_config: PLC连接配置字典(包含ip, rack, slot) + update_interval: 更新间隔(秒),默认0.03秒(30ms) + data_size: 读取的数据大小,默认6000字节 + output_file: 最新数据存储文件路径(覆盖写入,仅存1条记录) + """ + super().__init__(name="DB100ReaderThread", daemon=True) # 设为守护线程,主程序退出时自动停止 + # 1. PLC连接配置与客户端初始化 + self.plc_client = plc_client + # 2. 线程与数据核心配置 + self.update_interval = update_interval + self.data_size = data_size + self.output_file = output_file + # 3. 线程状态与数据缓存(新增:内存缓存最新数据,避免频繁读文件) + self.running = False + self._latest_data = None # 内存缓存最新数据(格式:(timestamp, data_info, raw_bytes)) + self._data_lock = threading.Lock() # 线程锁:确保数据读写安全 + # 4. 日志配置 + self.logger = logging.getLogger("DB100Reader") + + + def start(self): + """启动线程(先验证PLC连接)""" + self.running = True + super().start() + self.logger.info("✅ DB100数据读取线程已启动(PLC连接成功)") + self.logger.info(f"🔧 配置:更新间隔{self.update_interval*1000}ms,每次读取{self.data_size}字节") + + def stop(self): + """停止线程(优雅清理)""" + self.running = False + if self.is_alive(): + self.join(timeout=1) + if self.is_alive(): + self.logger.warning("⚠️ 线程未正常退出,强制终止") + self.logger.info("🛑 DB100数据读取线程已停止(PLC连接已清理)") + + + def get_latest_data(self): + """ + 新增:获取最新数据的接口(线程安全) + 返回: + 字典格式:{ + "timestamp": "2025-09-26 10:13:50.606", # 毫秒级时间戳 + "data_info": { + "db_name": "DB100", + "offset_range": "0-5999", + "actual_length": 6000 # 实际读取字节数 + }, + "raw_bytes": bytearray(b'\x10\x00\x00...') # 原始字节数据(可直接用于后续提取) + } + 若未获取到数据,返回None + """ + with self._data_lock: # 加锁确保数据一致性(避免读取时数据正在更新) + if self._latest_data is None: + return None + # 解构内存缓存的最新数据,返回结构化字典 + timestamp, data_info, raw_bytes = self._latest_data + return { + "timestamp": timestamp, + "data_info": data_info, + "raw_bytes": raw_bytes.copy() # 返回副本,避免原始数据被外部修改 + } + + + def run(self): + """线程主循环:读取DB100→更新内存缓存→覆盖写入文件""" + self.logger.debug("📌 线程主循环已启动") + while self.running: + cycle_start = time.time() + + try: + # 步骤1:读取DB100数据 + cache_success = self.plc_client.cache_large_data_block( + db_number=100, + offset=0, + size=self.data_size + ) + + # 步骤2:更新内存缓存和文件(仅保留最新数据) + if cache_success and self.plc_client.data_cache is not None: + raw_data = self.plc_client.data_cache # 原始字节数据(bytearray) + data_len = len(raw_data) + # 生成毫秒级时间戳和数据信息 + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + data_info = { + "db_name": "DB100", + "offset_range": f"0-{self.data_size - 1}", + "actual_length": data_len + } + + # 线程安全更新内存缓存 + with self._data_lock: + self._latest_data = (timestamp, data_info, raw_data.copy()) + + # 覆盖写入文件(删除历史记录,仅存最新一条) + self._write_latest_data_to_file(timestamp, data_info, raw_data) + + else: + self.logger.warning("⚠️ DB100数据缓存失败,跳过本次更新") + + # 步骤3:精确控制更新间隔 + cycle_elapsed = time.time() - cycle_start + sleep_time = max(0, self.update_interval - cycle_elapsed) + if sleep_time > 0: + time.sleep(sleep_time) + + except Exception as e: + self.logger.error(f"🔴 循环读取DB100出错: {str(e)}", exc_info=True) + time.sleep(self.update_interval) + + + def _write_latest_data_to_file(self, timestamp, data_info, raw_data): + """ + 修改:覆盖写入文件(仅保留最新数据,删除历史记录) + 参数: + timestamp: 毫秒级时间戳 + data_info: 数据基本信息字典 + raw_data: 原始字节数据(bytearray) + """ + try: + # 处理原始数据显示(转为列表格式,便于查看) + data_list = list(raw_data) + data_str = f"{data_list} (共{len(raw_data)}字节)" + + # 覆盖模式写入("w"模式:清空文件后写入最新数据) + with open(self.output_file, "w", encoding="utf-8") as f: + f.write(f"[{timestamp}] 📝 DB100最新数据\n") + f.write(f" - 数据基本信息:{data_info['db_name']}(偏移{data_info['offset_range']}),实际长度{data_info['actual_length']}字节\n") + f.write(f" - 原始字节数据:{data_str}\n") + f.write("=" * 120 + "\n") + + self.logger.debug(f"📤 最新DB100数据已覆盖写入文件({self.output_file})") + + except Exception as e: + self.logger.error(f"🔴 写入最新DB100数据到文件出错: {str(e)}", exc_info=True) + + +# -------------------------- 程序入口(含使用示例) -------------------------- +if __name__ == "__main__": + # 1. 配置日志 + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("db100_reader_runtime.log", encoding="utf-8", mode="a") + ] + ) + + # 2. PLC连接配置(需根据实际PLC修改) + PLC_CONFIG = { + "ip": "192.168.1.0", # 替换为实际PLC IP + "rack": 0, # S7-1200默认0,S7-300默认0 + "slot": 1 # S7-1200默认1,S7-300默认2 + } + + # 3. 启动线程并示例“提取最新数据” + try: + db_reader = DB100ReaderThread( + plc_config=PLC_CONFIG, + update_interval=0.03, + data_size=6000, + output_file="db100_latest_data.log" # 仅存最新数据的文件 + ) + db_reader.start() + + logging.info("🎉 程序已启动,正在实时读取DB100数据...(按 Ctrl+C 停止)") + logging.info("📌 示例:每1秒提取一次最新数据(实际使用时可按需调用get_latest_data())") + + # 示例:每隔1秒提取一次最新数据(模拟后续业务逻辑) + while True: + latest_data = db_reader.get_latest_data() + if latest_data: + logging.info(f"\n【最新数据提取结果】") + logging.info(f"时间戳:{latest_data['timestamp']}") + logging.info(f"数据长度:{latest_data['data_info']['actual_length']}字节") + logging.info(f"前10字节原始数据:{latest_data['raw_bytes'][:10]}") # 示例提取前10字节 + else: + logging.info("⚠️ 暂未获取到DB100最新数据") + time.sleep(1) + + read_result = db_reader.plc_client.read_generic( + db_number=100, + offset=0, + data_type="bool", + count=1 + ) + + # 打印read_generic()的读取结果 + if read_result is not None: + logging.info(f"\n【read_generic()读取结果】") + logging.info(f"读取配置:DB100,偏移10,int类型,1个数据") + logging.info(f"解析后的值:{read_result}") + else: + logging.info("⚠️ read_generic()读取失败(无数据或解析错误)") + + time.sleep(1) # 每隔1秒读取一次 + + except KeyboardInterrupt: + logging.info("\n⚠️ 收到用户停止指令,正在清理资源...") + if 'db_reader' in locals(): + db_reader.stop() + logging.info("👋 程序已正常退出") + + except Exception as e: + logging.critical(f"💥 程序初始化失败: {str(e)}", exc_info=True) + exit(1) \ No newline at end of file diff --git a/gateway/main.py b/gateway/main.py index 0295fac..9dee6fe 100644 --- a/gateway/main.py +++ b/gateway/main.py @@ -6,10 +6,12 @@ from plc_manager import PLCManager from cache_manager import CacheManager from api_server import APIServer from config_manager import ConfigManager +from db100_reader import DB100ReaderThread + class GatewayApp: """PLC网关应用程序主类""" - + def __init__(self, config_path="../config/config.json"): self.config_path = config_path self.config_manager = ConfigManager(config_path) @@ -18,35 +20,38 @@ class GatewayApp: self.api_server = None self.reload_flag = False self.reload_lock = threading.Lock() + # DB100ReaderThread线程相关初始化 + self.db100_reader_thread = None + self.logger = logging.getLogger("GatewayApp") - + # 加载初始配置 self.load_configuration() - + def load_configuration(self): """加载配置并初始化组件""" # 加载配置 if not self.config_manager.load_config(): self.logger.error("Failed to load initial configuration") return False - + config = self.config_manager.get_config() - + # 重新初始化PLC连接 if self.plc_manager: self.logger.info("Reinitializing PLC connections...") self.plc_manager = PLCManager(config["plcs"]) self.plc_manager.connect_all() - + # 重新初始化缓存 if self.cache_manager: self.logger.info("Stopping existing cache manager...") self.cache_manager.stop() - + self.logger.info("Initializing cache manager...") self.cache_manager = CacheManager(config, self.plc_manager, app=self) self.cache_manager.start() - + # 重新初始化API服务器 if self.api_server: self.logger.info("API server already running") @@ -54,10 +59,18 @@ class GatewayApp: self.logger.info("Starting API server...") self.api_server = APIServer(self.cache_manager, self.config_path) self.api_server.start() - + + # 重新初始化DB100ReaderThread线程 + for plc in config["plcs"]: + plc_name = plc["name"] + client = self.plc_manager.get_plc(plc_name) + self.logger.info("Starting db100_reader_thread...") + self.db100_reader_thread = DB100ReaderThread(client, output_file="db100_latest_data.log") + self.db100_reader_thread.start() + self.logger.info("Configuration loaded successfully") return True - + def check_for_reload(self): """检查是否需要重载配置""" while True: @@ -66,22 +79,22 @@ class GatewayApp: self.reload_flag = False self.load_configuration() time.sleep(1) - + def request_reload(self): """请求重载配置""" with self.reload_lock: self.reload_flag = True self.logger.info("Configuration reload requested") - + def run(self): """运行主程序""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' ) - + self.logger.info("Starting PLC Gateway...") - + # 启动配置重载检查线程 reload_thread = threading.Thread( target=self.check_for_reload, @@ -89,7 +102,7 @@ class GatewayApp: daemon=True ) reload_thread.start() - + try: # 保持主程序运行 while True: @@ -101,9 +114,11 @@ class GatewayApp: self.cache_manager.stop() self.logger.info("Shutdown complete") + def main(): app = GatewayApp() app.run() + if __name__ == "__main__": main() \ No newline at end of file diff --git a/gateway/snap7_client.py b/gateway/snap7_client.py index e972dd0..c6a9421 100644 --- a/gateway/snap7_client.py +++ b/gateway/snap7_client.py @@ -1,3 +1,5 @@ +from copyreg import dispatch_table + import snap7 import logging from threading import Lock @@ -31,6 +33,11 @@ class Snap7Client: self.retry_count = 0 self.logger = logging.getLogger(f"Snap7Client[{ip}]") + # --------------- + # 新增 + # --------------- + self.db100_cache_file = "db100_latest_data.log" + def is_valid_connection(self): """检查连接是否真正有效""" try: @@ -244,89 +251,89 @@ class Snap7Client: self.connected = False return False - def read_generic(self, db_number, offset, data_type, count=1): - """ - 通用读取接口,支持多种数据类型 - Args: - db_number: DB块编号 - offset: 起始偏移量(字节或位,对于bool类型) - data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword') - count: 要读取的数据个数 - Returns: - 解析后的数据(单个值或值列表),失败返回None - """ - if not self.connected and not self.connect(): - self.logger.warning(f"Read failed: not connected to {self.ip}") - return None - - try: - if data_type == 'bool': - # 对于bool,offset是位偏移 - byte_offset = offset // 8 - bit_offset = offset % 8 - # 计算需要读取的字节数 - last_bit = bit_offset + count - 1 - last_byte = last_bit // 8 - total_bytes = last_byte - byte_offset + 1 - - # 读取原始字节数据 - data = self.read_db(db_number, byte_offset, total_bytes) - if data is None: - return None - - # 解析bool值 - result = [] - for i in range(count): - current_bit = bit_offset + i - byte_idx = current_bit // 8 - bit_idx = current_bit % 8 - result.append(bool(data[byte_idx] & (1 << bit_idx))) - - return result[0] if count == 1 else result - - elif data_type == 'byte': - data = self.read_db(db_number, offset, count) - if data is None: - return None - return [data[i] for i in range(count)] if count > 1 else data[0] - - elif data_type in ['int', 'word']: - total_bytes = 2 * count - data = self.read_db(db_number, offset, total_bytes) - if data is None: - return None - - result = [] - for i in range(count): - if data_type == 'int': - result.append(get_int(data, i * 2)) - else: # word - result.append(get_word(data, i * 2)) - return result[0] if count == 1 else result - - elif data_type in ['dint', 'dword', 'real']: - total_bytes = 4 * count - data = self.read_db(db_number, offset, total_bytes) - if data is None: - return None - - result = [] - for i in range(count): - if data_type == 'dint': - result.append(get_dint(data, i * 4)) - elif data_type == 'dword': - result.append(get_dword(data, i * 4)) - else: # real - result.append(get_real(data, i * 4)) - return result[0] if count == 1 else result - - else: - self.logger.error(f"Unsupported data type: {data_type}") - return None - - except Exception as e: - self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}") - return None + # def read_generic(self, db_number, offset, data_type, count=1): + # """ + # 通用读取接口,支持多种数据类型 + # Args: + # db_number: DB块编号 + # offset: 起始偏移量(字节或位,对于bool类型) + # data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword') + # count: 要读取的数据个数 + # Returns: + # 解析后的数据(单个值或值列表),失败返回None + # """ + # if not self.connected and not self.connect(): + # self.logger.warning(f"Read failed: not connected to {self.ip}") + # return None + # + # try: + # if data_type == 'bool': + # # 对于bool,offset是位偏移 + # byte_offset = offset // 8 + # bit_offset = offset % 8 + # # 计算需要读取的字节数 + # last_bit = bit_offset + count - 1 + # last_byte = last_bit // 8 + # total_bytes = last_byte - byte_offset + 1 + # + # # 读取原始字节数据 + # data = self.read_db(db_number, byte_offset, total_bytes) + # if data is None: + # return None + # + # # 解析bool值 + # result = [] + # for i in range(count): + # current_bit = bit_offset + i + # byte_idx = current_bit // 8 + # bit_idx = current_bit % 8 + # result.append(bool(data[byte_idx] & (1 << bit_idx))) + # + # return result[0] if count == 1 else result + # + # elif data_type == 'byte': + # data = self.read_db(db_number, offset, count) + # if data is None: + # return None + # return [data[i] for i in range(count)] if count > 1 else data[0] + # + # elif data_type in ['int', 'word']: + # total_bytes = 2 * count + # data = self.read_db(db_number, offset, total_bytes) + # if data is None: + # return None + # + # result = [] + # for i in range(count): + # if data_type == 'int': + # result.append(get_int(data, i * 2)) + # else: # word + # result.append(get_word(data, i * 2)) + # return result[0] if count == 1 else result + # + # elif data_type in ['dint', 'dword', 'real']: + # total_bytes = 4 * count + # data = self.read_db(db_number, offset, total_bytes) + # if data is None: + # return None + # + # result = [] + # for i in range(count): + # if data_type == 'dint': + # result.append(get_dint(data, i * 4)) + # elif data_type == 'dword': + # result.append(get_dword(data, i * 4)) + # else: # real + # result.append(get_real(data, i * 4)) + # return result[0] if count == 1 else result + # + # else: + # self.logger.error(f"Unsupported data type: {data_type}") + # return None + # + # except Exception as e: + # self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}") + # return None def write_generic(self, db_number, offset, data_type, value): """ @@ -445,4 +452,249 @@ class Snap7Client: except Exception as e: self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}") - return False \ No newline at end of file + return False + + # ----------------- + # 新增 + # ----------------- + def cache_large_data_block(self, db_number, offset, size=6000): + """ + 一次性读取指定大小的数据并缓存 + 参数: + db_number: DB块编号 + start_offset: 起始偏移量 + size: 要读取的字节数,默认6000 + 返回值: + 成功返回True,失败返回False + """ + try: + self.data_cache = self.read_db(db_number, offset, size) + if self.data_cache is not None: + self.cache_db_number = db_number + self.cache_offset = offset + self.logger.info(f"Successfully cached {size} bytes from DB{db_number} starting at offset {offset}") + return True + else: + self.logger.error(f"Failed to cache {size} bytes from DB{db_number} starting at offset {offset}") + return False + except Exception as e: + self.logger.error(f"Error caching data: {e}") + return False + + def read_db100_from_file(self): + """ + 从缓存文件中解析DB100的原始字节数据 + 返回: bytearray(成功)/ None(失败) + """ + try: + # 1. 读取缓存文件内容 + with open(self.db100_cache_file, "r", encoding="utf-8") as f: + content = f.read() + + # 2. 定位"原始字节数据:"行(匹配DB100ReaderThread的文件格式) + data_line = None + for line in content.splitlines(): + if "原始字节数据:" in line: + data_line = line.strip() + break + if not data_line: + self.logger.error(f"❌ DB100缓存文件格式错误:未找到'原始字节数据'行") + return None + + # 3. 提取字节列表字符串(如"[16,0,0,...]") + list_start = data_line.find("[") + list_end = data_line.find("]") + 1 # 包含闭合括号 + if list_start == -1 or list_end == 0: + self.logger.error(f"❌ DB100缓存文件格式错误:未找到有效字节列表") + return None + byte_list_str = data_line[list_start:list_end] + + # 4. 解析字符串为整数列表,再转成bytearray + byte_list = ast.literal_eval(byte_list_str) + # 验证列表有效性(必须是0-255的整数) + if not isinstance(byte_list, list) or not all( + isinstance(b, int) and 0 <= b <= 255 for b in byte_list + ): + self.logger.error(f"❌ DB100缓存文件数据无效:字节列表包含非整数或超出范围值") + return None + + # 5. 验证数据长度(至少满足DB100的6000字节) + if len(byte_list) < 6000: + self.logger.warning(f"⚠️ DB100缓存文件数据不完整(仅{len(byte_list)}字节,期望6000字节)") + self.logger.debug(f"✅ 从缓存文件读取DB100数据({len(byte_list)}字节)") + return bytearray(byte_list) + + except FileNotFoundError: + self.logger.warning(f"⚠️ DB100缓存文件不存在:{self.db100_cache_file}") + return None + except ast.literal_eval.Error as e: + self.logger.error(f"❌ 解析DB100字节列表失败: {e}") + return None + except Exception as e: + self.logger.error(f"❌ 读取DB100缓存文件异常: {e}", exc_info=True) + return None + + def read_generic(self, db_number, offset, data_type, count=1): + """ + 通用读取接口(改进): + - DB100:优先从缓存文件读取 → 再内存缓存 → 最后PLC + - 其他DB块:保持原逻辑(内存缓存→PLC) + """ + # 1、处理DB100:优先从缓存文件读取 + if db_number == 100: + print(f"从缓存文件中读取{db_number}的数据") + db100_raw = self.read_db100_from_file() + + if db100_raw is not None: + try: + if data_type == 'bool': + # 对于bool,offset是位偏移 + byte_offset = offset // 8 + bit_offset = offset % 8 + # 计算需要读取的字节数 + last_bit = bit_offset + count - 1 + last_byte = last_bit // 8 + # 检查数据长度是否足够 + if last_byte >= len(db100_raw): + self.logger.warning( + f"⚠️ DB100缓存文件数据不足:需要字节{last_byte},实际{len(db100_raw)}字节") + else: + result = [] + for i in range(count): + curr_bit = bit_offset + i + curr_byte = curr_bit // 8 + curr_bit_in_byte = curr_bit % 8 + result.append(bool(db100_raw[curr_byte] & (1 << curr_bit_in_byte))) + return result[0] if count == 1 else result + + elif data_type == 'byte': + if offset + count > len(db100_raw): + self.logger.warning( + f"⚠️ DB100缓存文件数据不足:需要偏移{offset}+{count}字节,实际{len(db100_raw)}字节") + else: + data = db100_raw[offset:offset + count] + return [b for b in data] if count > 1 else data[0] + + elif data_type in ['int', 'word']: + byte_per_data = 2 + total_bytes = byte_per_data * count + if offset + total_bytes > len(db100_raw): + self.logger.warning( + f"⚠️ DB100缓存文件数据不足:需要偏移{offset}+{total_bytes}字节,实际{len(db100_raw)}字节") + else: + result = [] + for i in range(count): + start = offset + i * byte_per_data + slice_data = db100_raw[start:start + byte_per_data] + result.append(get_int(slice_data, 0) if data_type == 'int' else get_word(slice_data, 0)) + return result[0] if count == 1 else result + + elif data_type in ['dint', 'dword', 'real']: + byte_per_data = 4 + total_bytes = byte_per_data * count + if offset + total_bytes > len(db100_raw): + self.logger.warning( + f"⚠️ DB100缓存文件数据不足:需要偏移{offset}+{total_bytes}字节,实际{len(db100_raw)}字节") + else: + result = [] + for i in range(count): + start = offset + i * byte_per_data + slice_data = db100_raw[start:start + byte_per_data] + if data_type == 'dint': + result.append(get_dint(slice_data, 0)) + elif data_type == 'dword': + result.append(get_dword(slice_data, 0)) + else: # real + result.append(get_real(slice_data, 0)) + return result[0] if count == 1 else result + + # 1.5 不支持的数据类型 + else: + self.logger.error(f"❌ 不支持的数据类型:{data_type}") + return None + + except Exception as e: + self.logger.error(f"❌ 解析DB100缓存文件数据异常(类型:{data_type},偏移:{offset}): {e}", + exc_info=True) + + # 文件读取失败,fallback到原逻辑(内存缓存→PLC) + self.logger.debug(f"⚠️ DB100缓存文件读取失败, fallback到内存缓存/PLC") + + # 缓存读取失败或未启用缓存,直接从PLC读取 + if not self.connected and not self.connect(): + self.logger.warning(f"Read failed: not connected to {self.ip}") + return None + + try: + print(f"从PLC中读取{db_number}的数据") + if data_type == 'bool': + # 对于bool,offset是位偏移 + byte_offset = offset // 8 + bit_offset = offset % 8 + # 计算需要读取的字节数 + last_bit = bit_offset + count - 1 + last_byte = last_bit // 8 + total_bytes = last_byte - byte_offset + 1 + + # 读取原始字节数据 + data = self.read_db(db_number, byte_offset, total_bytes) + if data is None: + return None + + # 解析bool值 + result = [] + for i in range(count): + curr_bit = bit_offset + i + byte_idx = curr_bit // 8 + bit_idx = curr_bit % 8 + result.append(bool(data[byte_idx] & (1 << bit_idx))) + + return result[0] if count == 1 else result + + elif data_type == 'byte': + data = self.read_db(db_number, offset, count) + if data is None: + return None + return [data[i] for i in range(count)] if count > 1 else data[0] + + elif data_type in ['int', 'word']: + total_bytes = 2 * count + data = self.read_db(db_number, offset, total_bytes) + if data is None: + return None + + result = [] + for i in range(count): + if data_type == 'int': + result.append(get_int(data, i * 2)) + else: # word + result.append(get_word(data, i * 2)) + return result[0] if count == 1 else result + + elif data_type in ['dint', 'dword', 'real']: + total_bytes = 4 * count + data = self.read_db(db_number, offset, total_bytes) + if data is None: + return None + + result = [] + for i in range(count): + if data_type == 'dint': + result.append(get_dint(data, i * 4)) + elif data_type == 'dword': + result.append(get_dword(data, i * 4)) + else: # real + result.append(get_real(data, i * 4)) + return result[0] if count == 1 else result + + + else: + self.logger.error(f"Unsupported data type: {data_type}") + return None + + except Exception as e: + self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}") + return None + + +