Files
gateway_plc/gateway/plc_data_reader.py

164 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/28 13:36
# @Author : reenrr
# @File : plc_data_reader.py
# @Description : 通用PLC数据读取线程按配置动态处理read/read_write类型的区域
# 支持结构化数据解析real/int/bool等
'''
import threading
import time
import logging
class PLCDataReaderThread(threading.Thread):
def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, max_array_size=10000):
"""
初始化PLC数据读取线程配置驱动支持多区域
参数:
plc_client: 已连接的Snap7Client实例来自PLCManager
area_config: 单个区域的配置来自config.json的plcs[].areas
示例:{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]}
update_interval: 读取间隔默认30ms
max_array_size: 自定义数组的最大存储条数超过后自动删除最早数据默认1000条
"""
# 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read"
thread_name = f"{plc_name}_{area_config['name']}"
super().__init__(name=thread_name, daemon=True)
# 核心依赖PLC客户端+区域配置)
self.plc_client = plc_client
self.plc_name = plc_name
self.area_config = area_config # 动态区域配置不再硬编码DB100
self.area_name = area_config["name"]
self.db_number = area_config["db_number"]
self.offset = area_config["offset"]
self.size = area_config["size"]
self.area_type = area_config["type"] # 区分read/read_write/write
# 自定义内存数据配置
self.custom_array_name = f"{self.plc_name}_{self.area_name}"
self.max_array_size = max_array_size
# 动态创建自定义名称的内存数组
setattr(self, self.custom_array_name, [])
# 线程与输出配置
self.update_interval = update_interval
# 数据缓存(新增结构化数据存储)
self.running = False
self._latest_byte_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data)
self._data_lock = threading.Lock() # 线程安全锁
# 日志
self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}")
def start(self):
"""启动线程验证PLC连接+读写类型适配)"""
# 仅处理需要读的区域read/read_writewrite类型不启动
if self.area_type not in ["read", "read_write"]:
self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)")
return
# 验证自定义数组是否创建成功
if not hasattr(self, self.custom_array_name):
self.logger.error(f"❌ 自定义数组创建失败!数组名:{self.custom_array_name}")
return
self.running = True
super().start()
self.logger.info(f"✅ 线程启动成功DB{self.db_number}{self.area_type}")
self.logger.info(f"🔧 内存配置:自定义数组名={self.custom_array_name},最大存储条数={self.max_array_size}")
self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms读取{self.size}字节,存储到{self.custom_array_name}数组中")
def stop(self):
"""停止线程(优雅清理)"""
self.running = False
if self.is_alive():
self.join(timeout=2.0)
if self.is_alive():
self.logger.warning("⚠️ 线程未正常退出,强制终止")
# 停止时输出数组统计信息
custom_array = getattr(self, self.custom_array_name, [])
self.logger.info(f"🛑 线程已停止DB{self.db_number}")
self.logger.info(f"📊 内存数组统计:数组名={self.custom_array_name},总存储条数={len(custom_array)}")
def get_latest_byte_data(self):
"""
线程安全获取最新数据(返回原始字节+解析后的结构化数据)
返回示例:
{
"timestamp": "2025-09-28 10:00:00.123",
"data_info": {"area_name":"DB100_Read", "db_number":100, "offset_range":"0-5999", "actual_length":6000},
"raw_bytes": bytearray(b'\x00\x10...'),
"parsed_data": {"temperature":25.5, "pressure":100, "status":True} # 解析后的字段
}
"""
with self._data_lock:
if self._latest_byte_data is None:
self.logger.debug(f"⚠️ 自定义数组{self.custom_array_name}无最新数据")
return None
# 返回字节数据的副本,避免外部修改
return self._latest_byte_data.copy()
def get_custom_array(self, max_records=None):
"""
线程安全获取自定义数组数据
参数:
max_records: 可选,指定返回的最大记录数
返回:
字节数据列表
"""
with self._data_lock:
custom_array = getattr(self, self.custom_array_name, [])
return custom_array[-1:] if custom_array else [] # 返回所有记录的副本
def clear_custom_array(self):
"""清空自定义数组"""
with self._data_lock:
custom_array = getattr(self, self.custom_array_name, [])
custom_array.clear()
self.logger.info(f"🧹 已清空自定义数组:{self.custom_array_name}")
def run(self):
"""线程主循环读PLC→解析数据→更新缓存→写文件"""
self.logger.debug(f"📌 主循环启动DB{self.db_number},数组名:{self.custom_array_name}")
while self.running:
cycle_start = time.time()
try:
# 步骤1读取PLC区域数据调用Snap7Client的缓存方法
cache_success = self.plc_client.cache_large_data_block(
db_number=self.db_number,
offset=self.offset,
size=self.size
)
# 步骤2处理读取结果缓存+解析+写文件)
if cache_success and self.plc_client.data_cache is not None:
raw_data = self.plc_client.data_cache # 原始字节
# 步骤3线程安全更新内存缓存
with self._data_lock:
self._latest_byte_data = raw_data
# 获取自定义数组并添加新的字节数据
custom_array = getattr(self, self.custom_array_name)
custom_array.clear() # 清空旧数据
custom_array.append(raw_data)
# 日志显示当前数组状态仅1条
self.logger.debug(f"✅ 已更新最新PLC记录数组{self.custom_array_name}当前记录数1")
else:
self.logger.warning(f"⚠️ 数据读取失败DB{self.db_number}),跳过本次更新")
# 步骤6精确控制读取间隔
cycle_elapsed = time.time() - cycle_start
sleep_time = max(0, self.update_interval - cycle_elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True)
time.sleep(self.update_interval)