Files
gateway_plc/gateway/plc_data_reader.py

164 lines
7.4 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/28 13:36
# @Author : reenrr
# @File : plc_data_reader.py
# @Description : 通用PLC数据读取线程按配置动态处理read/read_write类型的区域
# 支持结构化数据解析real/int/bool等
'''
import threading
import time
import logging
class PLCDataReaderThread(threading.Thread):
def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, max_array_size=10000):
"""
初始化PLC数据读取线程配置驱动支持多区域
参数:
plc_client: 已连接的Snap7Client实例来自PLCManager
area_config: 单个区域的配置来自config.json的plcs[].areas
示例{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]}
update_interval: 读取间隔默认30ms
max_array_size: 自定义数组的最大存储条数超过后自动删除最早数据默认1000条
"""
# 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read"
thread_name = f"{plc_name}_{area_config['name']}"
super().__init__(name=thread_name, daemon=True)
# 核心依赖PLC客户端+区域配置)
self.plc_client = plc_client
2025-09-29 14:41:04 +08:00
self.plc_name = plc_name
self.area_config = area_config # 动态区域配置不再硬编码DB100
self.area_name = area_config["name"]
self.db_number = area_config["db_number"]
self.offset = area_config["offset"]
self.size = area_config["size"]
self.area_type = area_config["type"] # 区分read/read_write/write
# 自定义内存数据配置
self.custom_array_name = f"{self.plc_name}_{self.area_name}"
self.max_array_size = max_array_size
# 动态创建自定义名称的内存数组
setattr(self, self.custom_array_name, [])
# 线程与输出配置
self.update_interval = update_interval
# 数据缓存(新增结构化数据存储)
self.running = False
self._latest_byte_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data)
self._data_lock = threading.Lock() # 线程安全锁
# 日志
self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}")
def start(self):
"""启动线程验证PLC连接+读写类型适配)"""
# 仅处理需要读的区域read/read_writewrite类型不启动
if self.area_type not in ["read", "read_write"]:
self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)")
return
# 验证自定义数组是否创建成功
if not hasattr(self, self.custom_array_name):
self.logger.error(f"❌ 自定义数组创建失败!数组名:{self.custom_array_name}")
return
self.running = True
super().start()
self.logger.info(f"✅ 线程启动成功DB{self.db_number}{self.area_type}")
self.logger.info(f"🔧 内存配置:自定义数组名={self.custom_array_name},最大存储条数={self.max_array_size}")
self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms读取{self.size}字节,存储到{self.custom_array_name}数组中")
def stop(self):
"""停止线程(优雅清理)"""
self.running = False
if self.is_alive():
self.join(timeout=2.0)
if self.is_alive():
self.logger.warning("⚠️ 线程未正常退出,强制终止")
# 停止时输出数组统计信息
custom_array = getattr(self, self.custom_array_name, [])
self.logger.info(f"🛑 线程已停止DB{self.db_number}")
self.logger.info(f"📊 内存数组统计:数组名={self.custom_array_name},总存储条数={len(custom_array)}")
def get_latest_byte_data(self):
"""
线程安全获取最新数据返回原始字节+解析后的结构化数据
返回示例
{
"timestamp": "2025-09-28 10:00:00.123",
"data_info": {"area_name":"DB100_Read", "db_number":100, "offset_range":"0-5999", "actual_length":6000},
"raw_bytes": bytearray(b'\x00\x10...'),
"parsed_data": {"temperature":25.5, "pressure":100, "status":True} # 解析后的字段
}
"""
with self._data_lock:
if self._latest_byte_data is None:
self.logger.debug(f"⚠️ 自定义数组{self.custom_array_name}无最新数据")
return None
# 返回字节数据的副本,避免外部修改
return self._latest_byte_data.copy()
def get_custom_array(self, max_records=None):
"""
线程安全获取自定义数组数据
参数:
max_records: 可选指定返回的最大记录数
返回:
字节数据列表
"""
with self._data_lock:
custom_array = getattr(self, self.custom_array_name, [])
return custom_array[-1:] if custom_array else [] # 返回所有记录的副本
def clear_custom_array(self):
"""清空自定义数组"""
with self._data_lock:
custom_array = getattr(self, self.custom_array_name, [])
custom_array.clear()
self.logger.info(f"🧹 已清空自定义数组:{self.custom_array_name}")
def run(self):
"""线程主循环读PLC→解析数据→更新缓存→写文件"""
self.logger.debug(f"📌 主循环启动DB{self.db_number},数组名:{self.custom_array_name}")
while self.running:
cycle_start = time.time()
try:
# 步骤1读取PLC区域数据调用Snap7Client的缓存方法
cache_success = self.plc_client.cache_large_data_block(
db_number=self.db_number,
offset=self.offset,
size=self.size
)
# 步骤2处理读取结果缓存+解析+写文件)
if cache_success and self.plc_client.data_cache is not None:
raw_data = self.plc_client.data_cache # 原始字节
# 步骤3线程安全更新内存缓存
with self._data_lock:
self._latest_byte_data = raw_data
# 获取自定义数组并添加新的字节数据
custom_array = getattr(self, self.custom_array_name)
custom_array.clear() # 清空旧数据
custom_array.append(raw_data)
# 日志显示当前数组状态仅1条
self.logger.debug(f"✅ 已更新最新PLC记录数组{self.custom_array_name}当前记录数1")
else:
self.logger.warning(f"⚠️ 数据读取失败DB{self.db_number}),跳过本次更新")
# 步骤6精确控制读取间隔
cycle_elapsed = time.time() - cycle_start
sleep_time = max(0, self.update_interval - cycle_elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True)
time.sleep(self.update_interval)