diff --git a/gateway/db100_reader.py b/gateway/db100_reader.py deleted file mode 100644 index 8d615cd..0000000 --- a/gateway/db100_reader.py +++ /dev/null @@ -1,226 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -# @Time : 2025/9/26 09:40 -# @Author : reenrr -# @File : db100_reader.py -''' -import threading -import time -import logging -from datetime import datetime -from snap7_client import Snap7Client # 导入实际的Snap7Client类 - - -class DB100ReaderThread(threading.Thread): - def __init__(self, plc_client, update_interval=0.03, data_size=6000, output_file="db100_latest_data.log"): - """ - 初始化DB100数据读取线程(仅保留最新数据,支持直接提取) - 参数: - plc_config: PLC连接配置字典(包含ip, rack, slot) - update_interval: 更新间隔(秒),默认0.03秒(30ms) - data_size: 读取的数据大小,默认6000字节 - output_file: 最新数据存储文件路径(覆盖写入,仅存1条记录) - """ - super().__init__(name="DB100ReaderThread", daemon=True) # 设为守护线程,主程序退出时自动停止 - # 1. PLC连接配置与客户端初始化 - self.plc_client = plc_client - # 2. 线程与数据核心配置 - self.update_interval = update_interval - self.data_size = data_size - self.output_file = output_file - # 3. 线程状态与数据缓存(新增:内存缓存最新数据,避免频繁读文件) - self.running = False - self._latest_data = None # 内存缓存最新数据(格式:(timestamp, data_info, raw_bytes)) - self._data_lock = threading.Lock() # 线程锁:确保数据读写安全 - # 4. 日志配置 - self.logger = logging.getLogger("DB100Reader") - - - def start(self): - """启动线程(先验证PLC连接)""" - self.running = True - super().start() - self.logger.info("✅ DB100数据读取线程已启动(PLC连接成功)") - self.logger.info(f"🔧 配置:更新间隔{self.update_interval*1000}ms,每次读取{self.data_size}字节") - - def stop(self): - """停止线程(优雅清理)""" - self.running = False - if self.is_alive(): - self.join(timeout=1) - if self.is_alive(): - self.logger.warning("⚠️ 线程未正常退出,强制终止") - self.logger.info("🛑 DB100数据读取线程已停止(PLC连接已清理)") - - - def get_latest_data(self): - """ - 新增:获取最新数据的接口(线程安全) - 返回: - 字典格式:{ - "timestamp": "2025-09-26 10:13:50.606", # 毫秒级时间戳 - "data_info": { - "db_name": "DB100", - "offset_range": "0-5999", - "actual_length": 6000 # 实际读取字节数 - }, - "raw_bytes": bytearray(b'\x10\x00\x00...') # 原始字节数据(可直接用于后续提取) - } - 若未获取到数据,返回None - """ - with self._data_lock: # 加锁确保数据一致性(避免读取时数据正在更新) - if self._latest_data is None: - return None - # 解构内存缓存的最新数据,返回结构化字典 - timestamp, data_info, raw_bytes = self._latest_data - return { - "timestamp": timestamp, - "data_info": data_info, - "raw_bytes": raw_bytes.copy() # 返回副本,避免原始数据被外部修改 - } - - - def run(self): - """线程主循环:读取DB100→更新内存缓存→覆盖写入文件""" - self.logger.debug("📌 线程主循环已启动") - while self.running: - cycle_start = time.time() - - try: - # 步骤1:读取DB100数据 - cache_success = self.plc_client.cache_large_data_block( - db_number=100, - offset=0, - size=self.data_size - ) - - # 步骤2:更新内存缓存和文件(仅保留最新数据) - if cache_success and self.plc_client.data_cache is not None: - raw_data = self.plc_client.data_cache # 原始字节数据(bytearray) - data_len = len(raw_data) - # 生成毫秒级时间戳和数据信息 - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - data_info = { - "db_name": "DB100", - "offset_range": f"0-{self.data_size - 1}", - "actual_length": data_len - } - - # 线程安全更新内存缓存 - with self._data_lock: - self._latest_data = (timestamp, data_info, raw_data.copy()) - - # 覆盖写入文件(删除历史记录,仅存最新一条) - self._write_latest_data_to_file(timestamp, data_info, raw_data) - - else: - self.logger.warning("⚠️ DB100数据缓存失败,跳过本次更新") - - # 步骤3:精确控制更新间隔 - cycle_elapsed = time.time() - cycle_start - sleep_time = max(0, self.update_interval - cycle_elapsed) - if sleep_time > 0: - time.sleep(sleep_time) - - except Exception as e: - self.logger.error(f"🔴 循环读取DB100出错: {str(e)}", exc_info=True) - time.sleep(self.update_interval) - - - def _write_latest_data_to_file(self, timestamp, data_info, raw_data): - """ - 修改:覆盖写入文件(仅保留最新数据,删除历史记录) - 参数: - timestamp: 毫秒级时间戳 - data_info: 数据基本信息字典 - raw_data: 原始字节数据(bytearray) - """ - try: - # 处理原始数据显示(转为列表格式,便于查看) - data_list = list(raw_data) - data_str = f"{data_list} (共{len(raw_data)}字节)" - - # 覆盖模式写入("w"模式:清空文件后写入最新数据) - with open(self.output_file, "w", encoding="utf-8") as f: - f.write(f"[{timestamp}] 📝 DB100最新数据\n") - f.write(f" - 数据基本信息:{data_info['db_name']}(偏移{data_info['offset_range']}),实际长度{data_info['actual_length']}字节\n") - f.write(f" - 原始字节数据:{data_str}\n") - f.write("=" * 120 + "\n") - - self.logger.debug(f"📤 最新DB100数据已覆盖写入文件({self.output_file})") - - except Exception as e: - self.logger.error(f"🔴 写入最新DB100数据到文件出错: {str(e)}", exc_info=True) - - -# -------------------------- 程序入口(含使用示例) -------------------------- -if __name__ == "__main__": - # 1. 配置日志 - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - handlers=[ - logging.StreamHandler(), - logging.FileHandler("db100_reader_runtime.log", encoding="utf-8", mode="a") - ] - ) - - # 2. PLC连接配置(需根据实际PLC修改) - PLC_CONFIG = { - "ip": "192.168.1.0", # 替换为实际PLC IP - "rack": 0, # S7-1200默认0,S7-300默认0 - "slot": 1 # S7-1200默认1,S7-300默认2 - } - - # 3. 启动线程并示例“提取最新数据” - try: - db_reader = DB100ReaderThread( - plc_config=PLC_CONFIG, - update_interval=0.03, - data_size=6000, - output_file="db100_latest_data.log" # 仅存最新数据的文件 - ) - db_reader.start() - - logging.info("🎉 程序已启动,正在实时读取DB100数据...(按 Ctrl+C 停止)") - logging.info("📌 示例:每1秒提取一次最新数据(实际使用时可按需调用get_latest_data())") - - # 示例:每隔1秒提取一次最新数据(模拟后续业务逻辑) - while True: - latest_data = db_reader.get_latest_data() - if latest_data: - logging.info(f"\n【最新数据提取结果】") - logging.info(f"时间戳:{latest_data['timestamp']}") - logging.info(f"数据长度:{latest_data['data_info']['actual_length']}字节") - logging.info(f"前10字节原始数据:{latest_data['raw_bytes'][:10]}") # 示例提取前10字节 - else: - logging.info("⚠️ 暂未获取到DB100最新数据") - time.sleep(1) - - read_result = db_reader.plc_client.read_generic( - db_number=100, - offset=0, - data_type="bool", - count=1 - ) - - # 打印read_generic()的读取结果 - if read_result is not None: - logging.info(f"\n【read_generic()读取结果】") - logging.info(f"读取配置:DB100,偏移10,int类型,1个数据") - logging.info(f"解析后的值:{read_result}") - else: - logging.info("⚠️ read_generic()读取失败(无数据或解析错误)") - - time.sleep(1) # 每隔1秒读取一次 - - except KeyboardInterrupt: - logging.info("\n⚠️ 收到用户停止指令,正在清理资源...") - if 'db_reader' in locals(): - db_reader.stop() - logging.info("👋 程序已正常退出") - - except Exception as e: - logging.critical(f"💥 程序初始化失败: {str(e)}", exc_info=True) - exit(1) \ No newline at end of file