将从缓存文件中读取改成从内存数组中进行读取

This commit is contained in:
您çšpengqi
2025-10-10 09:49:30 +08:00
parent 5233d6ceaa
commit d3964811e9
5 changed files with 147 additions and 149 deletions

View File

@ -645,7 +645,7 @@ class CacheManager:
plc_status = self.plc_connection_status.get(plc_name, "unknown") plc_status = self.plc_connection_status.get(plc_name, "unknown")
return None, "Area is read-only", plc_status, 0 return None, "Area is read-only", plc_status, 0
cache_file = f"{plc_name}_area_{area_name}.log" array_name = f"{plc_name}_{area_name}"
# 计算实际DB偏移 # 计算实际DB偏移
db_offset = area["offset"] + offset db_offset = area["offset"] + offset
@ -673,7 +673,7 @@ class CacheManager:
try: try:
# 使用Snap7Client的read_generic方法 # 使用Snap7Client的read_generic方法
result = client.read_generic(area["db_number"], db_offset, data_type, cache_file, count) result = client.read_generic(area["db_number"], db_offset, data_type, array_name, count)
if result is None: if result is None:
return None, "Read failed", plc_status, 0 return None, "Read failed", plc_status, 0

View File

@ -1,7 +1,6 @@
import logging import logging
import time import time
import threading import threading
from config_loader import load_config
from plc_manager import PLCManager from plc_manager import PLCManager
from cache_manager import CacheManager from cache_manager import CacheManager
from api_server import APIServer from api_server import APIServer
@ -41,7 +40,7 @@ class GatewayApp:
# 重新初始化PLC连接 # 重新初始化PLC连接
if self.plc_manager: if self.plc_manager:
self.logger.info("Reinitializing PLC connections...") self.logger.info("Reinitializing PLC connections...")
self.plc_manager = PLCManager(config["plcs"]) self.plc_manager = PLCManager(config["plcs"], self.reader_threads)
self.plc_manager.connect_all() self.plc_manager.connect_all()
# 重新初始化缓存 # 重新初始化缓存
@ -92,7 +91,7 @@ class GatewayApp:
plc_name=plc_config["name"], plc_name=plc_config["name"],
area_config=area_config, area_config=area_config,
update_interval=0.03, update_interval=0.03,
output_file_prefix="area_" max_array_size=10000
) )
read_thread.start() read_thread.start()
# 存入线程列表,便于后续停止 # 存入线程列表,便于后续停止

View File

@ -10,12 +10,9 @@
import threading import threading
import time import time
import logging import logging
from datetime import datetime
from snap7.util import get_real, get_int, get_bool, get_word, get_dint # 导入snap7解析工具
class PLCDataReaderThread(threading.Thread): class PLCDataReaderThread(threading.Thread):
def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, output_file_prefix="area_"): def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, max_array_size=10000):
""" """
初始化PLC数据读取线程配置驱动支持多区域 初始化PLC数据读取线程配置驱动支持多区域
参数: 参数:
@ -23,13 +20,13 @@ class PLCDataReaderThread(threading.Thread):
area_config: 单个区域的配置来自config.json的plcs[].areas area_config: 单个区域的配置来自config.json的plcs[].areas
示例:{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]} 示例:{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]}
update_interval: 读取间隔默认30ms update_interval: 读取间隔默认30ms
output_file_prefix: 输出文件前缀,最终文件名为“前缀+区域名.log” max_array_size: 自定义数组的最大存储条数超过后自动删除最早数据默认1000条
""" """
# 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read" # 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read"
thread_name = f"{plc_name}_Reader_{area_config['name']}" thread_name = f"{plc_name}_{area_config['name']}"
super().__init__(name=thread_name, daemon=True) super().__init__(name=thread_name, daemon=True)
# 1. 核心依赖PLC客户端+区域配置) # 核心依赖PLC客户端+区域配置)
self.plc_client = plc_client self.plc_client = plc_client
self.plc_name = plc_name self.plc_name = plc_name
self.area_config = area_config # 动态区域配置不再硬编码DB100 self.area_config = area_config # 动态区域配置不再硬编码DB100
@ -39,16 +36,21 @@ class PLCDataReaderThread(threading.Thread):
self.size = area_config["size"] self.size = area_config["size"]
self.area_type = area_config["type"] # 区分read/read_write/write self.area_type = area_config["type"] # 区分read/read_write/write
# 2. 线程与输出配置 # 自定义内存数据配置
self.update_interval = update_interval self.custom_array_name = f"{self.plc_name}_{self.area_name}"
self.output_file = f"{self.plc_name}_{output_file_prefix}{self.area_name}.log" # 输出文件名}{output_file_prefix}DB{self.db_number}.log" # 每个区域独立文件 self.max_array_size = max_array_size
# 动态创建自定义名称的内存数组
setattr(self, self.custom_array_name, [])
# 3. 数据缓存(新增结构化数据存储) # 线程与输出配置
self.update_interval = update_interval
# 数据缓存(新增结构化数据存储)
self.running = False self.running = False
self._latest_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data) self._latest_byte_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data)
self._data_lock = threading.Lock() # 线程安全锁 self._data_lock = threading.Lock() # 线程安全锁
# 4. 日志 # 日志
self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}") self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}")
def start(self): def start(self):
@ -58,10 +60,16 @@ class PLCDataReaderThread(threading.Thread):
self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)") self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)")
return return
# 验证自定义数组是否创建成功
if not hasattr(self, self.custom_array_name):
self.logger.error(f"❌ 自定义数组创建失败!数组名:{self.custom_array_name}")
return
self.running = True self.running = True
super().start() super().start()
self.logger.info(f"✅ 线程启动成功DB{self.db_number}{self.area_type}") self.logger.info(f"✅ 线程启动成功DB{self.db_number}{self.area_type}")
self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms读取{self.size}字节,输出{self.output_file}") self.logger.info(f"🔧 内存配置:自定义数组名={self.custom_array_name},最大存储条数={self.max_array_size}")
self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms读取{self.size}字节,存储到{self.custom_array_name}数组中")
def stop(self): def stop(self):
"""停止线程(优雅清理)""" """停止线程(优雅清理)"""
@ -70,9 +78,12 @@ class PLCDataReaderThread(threading.Thread):
self.join(timeout=2.0) self.join(timeout=2.0)
if self.is_alive(): if self.is_alive():
self.logger.warning("⚠️ 线程未正常退出,强制终止") self.logger.warning("⚠️ 线程未正常退出,强制终止")
# 停止时输出数组统计信息
custom_array = getattr(self, self.custom_array_name, [])
self.logger.info(f"🛑 线程已停止DB{self.db_number}") self.logger.info(f"🛑 线程已停止DB{self.db_number}")
self.logger.info(f"📊 内存数组统计:数组名={self.custom_array_name},总存储条数={len(custom_array)}")
def get_latest_data(self): def get_latest_byte_data(self):
""" """
线程安全获取最新数据(返回原始字节+解析后的结构化数据) 线程安全获取最新数据(返回原始字节+解析后的结构化数据)
返回示例: 返回示例:
@ -84,20 +95,34 @@ class PLCDataReaderThread(threading.Thread):
} }
""" """
with self._data_lock: with self._data_lock:
if self._latest_data is None: if self._latest_byte_data is None:
self.logger.debug("⚠️ 无最新数据缓存") self.logger.debug(f"⚠️ 自定义数组{self.custom_array_name}无最新数据")
return None return None
# 返回字节数据的副本,避免外部修改
return self._latest_byte_data.copy()
timestamp, data_info, raw_bytes, parsed_data = self._latest_data def get_custom_array(self, max_records=None):
return { """
"timestamp": timestamp, 线程安全获取自定义数组数据
"data_info": data_info.copy(), 参数:
"raw_bytes": raw_bytes.copy() max_records: 可选,指定返回的最大记录数
} 返回:
字节数据列表
"""
with self._data_lock:
custom_array = getattr(self, self.custom_array_name, [])
return custom_array[-1:] if custom_array else [] # 返回所有记录的副本
def clear_custom_array(self):
"""清空自定义数组"""
with self._data_lock:
custom_array = getattr(self, self.custom_array_name, [])
custom_array.clear()
self.logger.info(f"🧹 已清空自定义数组:{self.custom_array_name}")
def run(self): def run(self):
"""线程主循环读PLC→解析数据→更新缓存→写文件""" """线程主循环读PLC→解析数据→更新缓存→写文件"""
self.logger.debug(f"📌 主循环启动DB{self.db_number}") self.logger.debug(f"📌 主循环启动DB{self.db_number},数组名:{self.custom_array_name}")
while self.running: while self.running:
cycle_start = time.time() cycle_start = time.time()
try: try:
@ -111,24 +136,18 @@ class PLCDataReaderThread(threading.Thread):
# 步骤2处理读取结果缓存+解析+写文件) # 步骤2处理读取结果缓存+解析+写文件)
if cache_success and self.plc_client.data_cache is not None: if cache_success and self.plc_client.data_cache is not None:
raw_data = self.plc_client.data_cache # 原始字节 raw_data = self.plc_client.data_cache # 原始字节
data_len = len(raw_data)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 构造数据基本信息
data_info = {
"area_name": self.area_name,
"db_number": self.db_number,
"offset_range": f"0-{self.size - 1}",
"actual_length": data_len,
"area_type": self.area_type
}
# 步骤3线程安全更新内存缓存 # 步骤3线程安全更新内存缓存
with self._data_lock: with self._data_lock:
self._latest_data = (timestamp, data_info, raw_data.copy()) self._latest_byte_data = raw_data
# 步骤4写入文件含原始字节+解析后数据 # 获取自定义数组并添加新的字节数据
self._write_latest_data_to_file(timestamp, data_info, raw_data) custom_array = getattr(self, self.custom_array_name)
custom_array.clear() # 清空旧数据
custom_array.append(raw_data)
# 日志显示当前数组状态仅1条
self.logger.debug(f"✅ 已更新最新PLC记录数组{self.custom_array_name}当前记录数1")
else: else:
self.logger.warning(f"⚠️ 数据读取失败DB{self.db_number}),跳过本次更新") self.logger.warning(f"⚠️ 数据读取失败DB{self.db_number}),跳过本次更新")
@ -142,24 +161,3 @@ class PLCDataReaderThread(threading.Thread):
except Exception as e: except Exception as e:
self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True) self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True)
time.sleep(self.update_interval) time.sleep(self.update_interval)
def _write_latest_data_to_file(self, timestamp, data_info, raw_data):
"""
写入文件:含原始字节+解析后的结构化数据(每个区域独立文件)
"""
try:
# 处理原始字节为列表(便于查看)
data_list = list(raw_data) # 只显示前50字节避免文件过大
data_str = f"{data_list} (共{len(raw_data)}字节)"
# 覆盖写入文件
with open(self.output_file, "w", encoding="utf-8") as f:
f.write(f"[{timestamp}] 📝 {self.area_name} 最新数据\n")
f.write(
f" - 区域信息DB{data_info['db_number']}{data_info['offset_range']}),类型{data_info['area_type']}\n")
f.write(f" - 原始字节数据:{data_str}\n")
f.write("=" * 120 + "\n")
self.logger.debug(f"📤 最新DB{self.db_number}数据已覆盖写入文件:{self.output_file}")
except Exception as e:
self.logger.error(f"🔴 写入DB{self.db_number}数据到文件出错: {str(e)}", exc_info=True)

View File

@ -1,10 +1,11 @@
from snap7_client import Snap7Client from snap7_client import Snap7Client
import logging import logging
class PLCManager: class PLCManager:
"""PLC连接管理器管理多个PLC连接""" """PLC连接管理器管理多个PLC连接"""
def __init__(self, plcs_config): def __init__(self, plcs_config, reader_threads):
""" """
初始化PLC管理器 初始化PLC管理器
@ -17,7 +18,8 @@ class PLCManager:
self.plcs[name] = Snap7Client( self.plcs[name] = Snap7Client(
plc_config["ip"], plc_config["ip"],
plc_config["rack"], plc_config["rack"],
plc_config["slot"] plc_config["slot"],
reader_threads
) )
self.logger = logging.getLogger("PLCManager") self.logger = logging.getLogger("PLCManager")

View File

@ -1,16 +1,14 @@
from copyreg import dispatch_table
import snap7 import snap7
import logging import logging
from threading import Lock from threading import Lock
import time import time
from snap7.util import * from snap7.util import *
import ast
class Snap7Client: class Snap7Client:
"""Snap7客户端处理与PLC的通信""" """Snap7客户端处理与PLC的通信"""
def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1): def __init__(self, ip, rack, slot, reader_threads, max_retries=5, retry_base_delay=1):
""" """
初始化Snap7客户端 初始化Snap7客户端
@ -31,6 +29,7 @@ class Snap7Client:
self.retry_base_delay = retry_base_delay self.retry_base_delay = retry_base_delay
self.last_connect_attempt = 0 self.last_connect_attempt = 0
self.retry_count = 0 self.retry_count = 0
self.reader_threads = reader_threads
self.logger = logging.getLogger(f"Snap7Client[{ip}]") self.logger = logging.getLogger(f"Snap7Client[{ip}]")
def is_valid_connection(self): def is_valid_connection(self):
@ -392,71 +391,74 @@ class Snap7Client:
self.logger.error(f"Error caching data: {e}") self.logger.error(f"Error caching data: {e}")
return False return False
def read_db_from_file_cache(self, db_number, offset, required_size, cache_file): def read_db_from_memory_cache(self, db_number, offset, required_size, array_name, reader_threads):
""" """
通用文件缓存读取从指定文件读取DB块缓存返回需要的字节片段 通用内存数组读取从PLCDataReaderThread实例的内存数组中提取DB块的目标字节片段
参数: 参数说明:
db_number: DB块编号 - 已删除无用的 cache_file 参数(原文件缓存逻辑已替换为内存数组)
offset: 需要读取的起始偏移(字节)
required_size: 需要读取的字节数
cache_file: 缓存文件名(动态生成)
返回值: 返回值:
bytearray | None: 所需的字节片段失败返回None bytearray | None: 所需的字节片段失败返回None
""" """
try: try:
# 1. 读取缓存文件内容 # -------------------------- 关键修改1正确遍历元组并匹配线程 --------------------------
with open(cache_file, "r", encoding="utf-8") as f: target_thread = None
content = f.read() # 遍历 reader_threads每个元素是 (area_name_str, PLCDataReaderThread实例) 的元组
for area_name_str, thread_instance in reader_threads:
# 2. 定位"原始字节数据:"行匹配DB100ReaderThread的文件格式 # 匹配条件:线程实例的 custom_array_name == 传入的 array_name数组名在_thread实例上
db_match = False if thread_instance.custom_array_name == array_name:
data_line = None target_thread = thread_instance # 拿到真正的线程实例
for line in content.splitlines():
if f"DB{db_number}" in line and "区域信息" in line:
db_match = True
if db_match and "原始字节数据:" in line:
data_line = line.strip()
break break
if not db_match or not data_line:
self.logger.error(f"文件缓存中无DB{db_number}的有效数据(文件:{cache_file}") # -------------------------- 关键修改2校验线程实例是否找到 --------------------------
if target_thread is None:
# 若未找到,打印所有已存在的线程数组名,便于调试
existing_arrays = [t.custom_array_name for _, t in reader_threads]
self.logger.error(
f"❌ 未找到数组{array_name}对应的线程实例!"
f"已存在的数组名:{existing_arrays},请检查数组名是否拼写正确"
)
return None return None
# 3. 提取字节列表字符串(如"[16,0,0,...]" # -------------------------- 后续逻辑不变(但需确认线程实例有效) --------------------------
list_start = data_line.find("[") # 校验线程实例上是否存在数组array_name 就是线程的 custom_array_name
list_end = data_line.find("]") + 1 # 包含闭合括号 if not hasattr(target_thread, array_name):
self.logger.error(f"❌ 线程实例{target_thread.name}上不存在数组{array_name}(线程初始化失败)")
if list_start == -1 or list_end == 0:
self.logger.error(f"❌ DB{db_number}缓存文件格式错误:未找到有效字节列表")
return None
byte_list_str = data_line[list_start:list_end]
# 4. 解析字符串为整数列表再转成bytearray
byte_list = ast.literal_eval(byte_list_str)
# 验证列表有效性必须是0-255的整数
if not isinstance(byte_list, list) or not all(
isinstance(b, int) and 0 <= b <= 255 for b in byte_list
):
self.logger.error(f"❌ DB{db_number}缓存文件数据无效:字节列表包含非整数或超出范围值")
return None return None
# 5. 验证数据长度至少满足DB100的6000字节 # 线程安全获取数组数据(调用线程自带方法,内部已加锁
if len(byte_list) < required_size: custom_array = target_thread.get_custom_array()
self.logger.warning(f"⚠️ DB{db_number}缓存文件数据不完整(仅{len(byte_list)}字节,期望{required_size}字节)") if not custom_array:
self.logger.debug(f"✅ 从缓存文件读取DB{db_number}数据({len(byte_list)}字节)") self.logger.warning(f"⚠️ 数组{array_name}为空,无可用数据")
return None
return bytearray(byte_list) if not isinstance(custom_array[0], bytearray):
self.logger.error(
f"❌ 数组{array_name}的元素类型错误:"
f"期望bytearray实际{type(custom_array).__name__}"
)
return None
# 验证数据长度是否满足需求
total_byte_len = len(custom_array[0])
if total_byte_len < offset + required_size:
self.logger.warning(
f"⚠️ DB{db_number}内存数据不完整:"
f"数组{array_name}最新数据共{total_byte_len}字节,"
f"需从偏移{offset}读取{required_size}字节(需{offset + required_size}字节)"
)
available_size = total_byte_len - offset
if available_size <= 0:
self.logger.error(f"❌ 偏移{offset}超出数据范围(最大偏移:{total_byte_len - 1}")
return None
required_size = available_size
return custom_array[0]
except FileNotFoundError:
self.logger.warning(f"⚠️ DB{db_number}缓存文件不存在:{cache_file}")
return None
except ast.literal_eval.Error as e:
self.logger.error(f"❌ 解析DB{db_number}字节列表失败: {e}")
return None
except Exception as e: except Exception as e:
self.logger.error(f" 读取DB{db_number}缓存文件异常: {e}", exc_info=True) self.logger.error(f"🔴 读取内存数组{array_name}异常:{str(e)}", exc_info=True)
return None return None
def read_generic(self, db_number, offset, data_type, cache_file, count=1): def read_generic(self, db_number, offset, data_type, array_name, count=1):
""" """
通用读取接口(改进): 通用读取接口(改进):
- DB100优先从缓存文件读取 → 再内存缓存 → 最后PLC - DB100优先从缓存文件读取 → 再内存缓存 → 最后PLC
@ -465,11 +467,11 @@ class Snap7Client:
# 1、处理DB数据块优先从缓存文件读取 # 1、处理DB数据块优先从缓存文件读取
raw_data = None raw_data = None
if raw_data is None: if raw_data is None:
cache_file = cache_file array_name = array_name
raw_data = self.read_db_from_file_cache(db_number, offset, count, cache_file) raw_data = self.read_db_from_memory_cache(db_number, offset, count, array_name, self.reader_threads)
if raw_data is not None: if raw_data is not None:
print(f"缓存文件中读取DB{db_number}的数据") print(f"内存数组中读取DB{db_number}的数据")
self.logger.debug(f"文件缓存读取DB{db_number}文件{cache_file}") self.logger.debug(f"内存数组中读取DB{db_number}数组{array_name}")
try: try:
if data_type == 'bool': if data_type == 'bool':
@ -481,17 +483,14 @@ class Snap7Client:
end_bit = offset + count - 1 end_bit = offset + count - 1
end_byte = end_bit // 8 end_byte = end_bit // 8
# 检查数据长度是否足够 # 检查数据长度是否足够
if end_byte >= len(raw_data):
self.logger.warning( result = [] # 用于存储解析出的bool值
f"⚠️ DB{db_number}缓存文件数据不足:需要字节{end_byte},实际{len(raw_data)}字节") for i in range(count):
else: current_global_bit = offset + i
result = [] # 用于存储解析出的bool值 current_byte = current_global_bit // 8
for i in range(count): current_bit = current_global_bit % 8
current_global_bit = offset + i result.append(bool(raw_data[current_byte] & (1 << current_bit)))
current_byte = current_global_bit // 8 return result[0] if count == 1 else result
current_bit = current_global_bit % 8
result.append(bool(raw_data[current_byte] & (1 << current_bit)))
return result[0] if count == 1 else result
elif data_type == 'byte': elif data_type == 'byte':
if offset + count > len(raw_data): if offset + count > len(raw_data):
@ -502,18 +501,18 @@ class Snap7Client:
return [b for b in data] if count > 1 else data[0] return [b for b in data] if count > 1 else data[0]
elif data_type in ['int', 'word']: elif data_type in ['int', 'word']:
byte_per_data = 2 total_bytes = 2 * count
total_bytes = byte_per_data * count data = self.read_db(db_number, offset, total_bytes)
if offset + total_bytes > len(raw_data): if data is None:
self.logger.warning( return None
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{total_bytes}字节,实际{len(raw_data)}字节")
else: result = []
result = [] for i in range(count):
for i in range(count): if data_type == 'int':
start = offset + i * byte_per_data result.append(get_int(data, i * 2))
slice_data = raw_data[start:start + byte_per_data] else: # word
result.append(get_int(slice_data, 0) if data_type == 'int' else get_word(slice_data, 0)) result.append(get_word(data, i * 2))
return result[0] if count == 1 else result return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']: elif data_type in ['dint', 'dword', 'real']:
byte_per_data = 4 byte_per_data = 4