#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/9/28 13:36 # @Author : reenrr # @File : plc_data_reader.py # @Description : 通用PLC数据读取线程:按配置动态处理read/read_write类型的区域 # 支持结构化数据解析(real/int/bool等) ''' import threading import time import logging class PLCDataReaderThread(threading.Thread): def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, max_array_size=10000): """ 初始化PLC数据读取线程(配置驱动,支持多区域) 参数: plc_client: 已连接的Snap7Client实例(来自PLCManager) area_config: 单个区域的配置(来自config.json的plcs[].areas) 示例:{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]} update_interval: 读取间隔(秒),默认30ms max_array_size: 自定义数组的最大存储条数,超过后自动删除最早数据,默认1000条 """ # 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read") thread_name = f"{plc_name}_{area_config['name']}" super().__init__(name=thread_name, daemon=True) # 核心依赖(PLC客户端+区域配置) self.plc_client = plc_client self.plc_name = plc_name self.area_config = area_config # 动态区域配置,不再硬编码DB100 self.area_name = area_config["name"] self.db_number = area_config["db_number"] self.offset = area_config["offset"] self.size = area_config["size"] self.area_type = area_config["type"] # 区分read/read_write/write # 自定义内存数据配置 self.custom_array_name = f"{self.plc_name}_{self.area_name}" self.max_array_size = max_array_size # 动态创建自定义名称的内存数组 setattr(self, self.custom_array_name, []) # 线程与输出配置 self.update_interval = update_interval # 数据缓存(新增结构化数据存储) self.running = False self._latest_byte_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data) self._data_lock = threading.Lock() # 线程安全锁 # 日志 self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}") def start(self): """启动线程(验证PLC连接+读写类型适配)""" # 仅处理需要读的区域(read/read_write),write类型不启动 if self.area_type not in ["read", "read_write"]: self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)") return # 验证自定义数组是否创建成功 if not hasattr(self, self.custom_array_name): self.logger.error(f"❌ 自定义数组创建失败!数组名:{self.custom_array_name}") return self.running = True super().start() self.logger.info(f"✅ 线程启动成功(DB{self.db_number},{self.area_type})") self.logger.info(f"🔧 内存配置:自定义数组名={self.custom_array_name},最大存储条数={self.max_array_size}") self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms,读取{self.size}字节,存储到{self.custom_array_name}数组中") def stop(self): """停止线程(优雅清理)""" self.running = False if self.is_alive(): self.join(timeout=2.0) if self.is_alive(): self.logger.warning("⚠️ 线程未正常退出,强制终止") # 停止时输出数组统计信息 custom_array = getattr(self, self.custom_array_name, []) self.logger.info(f"🛑 线程已停止(DB{self.db_number})") self.logger.info(f"📊 内存数组统计:数组名={self.custom_array_name},总存储条数={len(custom_array)}") def get_latest_byte_data(self): """ 线程安全获取最新数据(返回原始字节+解析后的结构化数据) 返回示例: { "timestamp": "2025-09-28 10:00:00.123", "data_info": {"area_name":"DB100_Read", "db_number":100, "offset_range":"0-5999", "actual_length":6000}, "raw_bytes": bytearray(b'\x00\x10...'), "parsed_data": {"temperature":25.5, "pressure":100, "status":True} # 解析后的字段 } """ with self._data_lock: if self._latest_byte_data is None: self.logger.debug(f"⚠️ 自定义数组{self.custom_array_name}无最新数据") return None # 返回字节数据的副本,避免外部修改 return self._latest_byte_data.copy() def get_custom_array(self, max_records=None): """ 线程安全获取自定义数组数据 参数: max_records: 可选,指定返回的最大记录数 返回: 字节数据列表 """ with self._data_lock: custom_array = getattr(self, self.custom_array_name, []) return custom_array[-1:] if custom_array else [] # 返回所有记录的副本 def clear_custom_array(self): """清空自定义数组""" with self._data_lock: custom_array = getattr(self, self.custom_array_name, []) custom_array.clear() self.logger.info(f"🧹 已清空自定义数组:{self.custom_array_name}") def run(self): """线程主循环:读PLC→解析数据→更新缓存→写文件""" self.logger.debug(f"📌 主循环启动(DB{self.db_number},数组名:{self.custom_array_name})") while self.running: cycle_start = time.time() try: # 步骤1:读取PLC区域数据(调用Snap7Client的缓存方法) cache_success = self.plc_client.cache_large_data_block( db_number=self.db_number, offset=self.offset, size=self.size ) # 步骤2:处理读取结果(缓存+解析+写文件) if cache_success and self.plc_client.data_cache is not None: raw_data = self.plc_client.data_cache # 原始字节 # 步骤3:线程安全更新内存缓存 with self._data_lock: self._latest_byte_data = raw_data # 获取自定义数组并添加新的字节数据 custom_array = getattr(self, self.custom_array_name) custom_array.clear() # 清空旧数据 custom_array.append(raw_data) # 日志显示当前数组状态(仅1条) self.logger.debug(f"✅ 已更新最新PLC记录,数组{self.custom_array_name}当前记录数:1") else: self.logger.warning(f"⚠️ 数据读取失败(DB{self.db_number}),跳过本次更新") # 步骤6:精确控制读取间隔 cycle_elapsed = time.time() - cycle_start sleep_time = max(0, self.update_interval - cycle_elapsed) if sleep_time > 0: time.sleep(sleep_time) except Exception as e: self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True) time.sleep(self.update_interval)