Files
gateway_plc/gateway/plc_data_reader.py
2025-09-29 14:41:04 +08:00

165 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/28 13:36
# @Author : reenrr
# @File : plc_data_reader.py
# @Description : 通用PLC数据读取线程按配置动态处理read/read_write类型的区域
# 支持结构化数据解析real/int/bool等
'''
import threading
import time
import logging
from datetime import datetime
from snap7.util import get_real, get_int, get_bool, get_word, get_dint # 导入snap7解析工具
class PLCDataReaderThread(threading.Thread):
def __init__(self, plc_client, plc_name, area_config, update_interval=0.03, output_file_prefix="area_"):
"""
初始化PLC数据读取线程配置驱动支持多区域
参数:
plc_client: 已连接的Snap7Client实例来自PLCManager
area_config: 单个区域的配置来自config.json的plcs[].areas
示例:{"name":"DB100_Read", "type":"read", "db_number":100, "offset":0, "size":6000, "structure":[...]}
update_interval: 读取间隔默认30ms
output_file_prefix: 输出文件前缀,最终文件名为“前缀+区域名.log”
"""
# 线程名包含区域名,便于日志区分(如"PLCDataReader_DB100_Read"
thread_name = f"{plc_name}_Reader_{area_config['name']}"
super().__init__(name=thread_name, daemon=True)
# 1. 核心依赖PLC客户端+区域配置)
self.plc_client = plc_client
self.plc_name = plc_name
self.area_config = area_config # 动态区域配置不再硬编码DB100
self.area_name = area_config["name"]
self.db_number = area_config["db_number"]
self.offset = area_config["offset"]
self.size = area_config["size"]
self.area_type = area_config["type"] # 区分read/read_write/write
# 2. 线程与输出配置
self.update_interval = update_interval
self.output_file = f"{self.plc_name}_{output_file_prefix}{self.area_name}.log" # 输出文件名}{output_file_prefix}DB{self.db_number}.log" # 每个区域独立文件
# 3. 数据缓存(新增结构化数据存储)
self.running = False
self._latest_data = None # 格式:(timestamp, data_info, raw_bytes, parsed_data)
self._data_lock = threading.Lock() # 线程安全锁
# 4. 日志
self.logger = logging.getLogger(f"PLCDataReader.{self.area_name}")
def start(self):
"""启动线程验证PLC连接+读写类型适配)"""
# 仅处理需要读的区域read/read_writewrite类型不启动
if self.area_type not in ["read", "read_write"]:
self.logger.warning(f"跳过启动:区域类型为{self.area_type}(无需循环读取)")
return
self.running = True
super().start()
self.logger.info(f"✅ 线程启动成功DB{self.db_number}{self.area_type}")
self.logger.info(f"🔧 配置:间隔{self.update_interval * 1000}ms读取{self.size}字节,输出{self.output_file}")
def stop(self):
"""停止线程(优雅清理)"""
self.running = False
if self.is_alive():
self.join(timeout=2.0)
if self.is_alive():
self.logger.warning("⚠️ 线程未正常退出,强制终止")
self.logger.info(f"🛑 线程已停止DB{self.db_number}")
def get_latest_data(self):
"""
线程安全获取最新数据(返回原始字节+解析后的结构化数据)
返回示例:
{
"timestamp": "2025-09-28 10:00:00.123",
"data_info": {"area_name":"DB100_Read", "db_number":100, "offset_range":"0-5999", "actual_length":6000},
"raw_bytes": bytearray(b'\x00\x10...'),
"parsed_data": {"temperature":25.5, "pressure":100, "status":True} # 解析后的字段
}
"""
with self._data_lock:
if self._latest_data is None:
self.logger.debug("⚠️ 无最新数据缓存")
return None
timestamp, data_info, raw_bytes, parsed_data = self._latest_data
return {
"timestamp": timestamp,
"data_info": data_info.copy(),
"raw_bytes": raw_bytes.copy()
}
def run(self):
"""线程主循环读PLC→解析数据→更新缓存→写文件"""
self.logger.debug(f"📌 主循环启动DB{self.db_number}")
while self.running:
cycle_start = time.time()
try:
# 步骤1读取PLC区域数据调用Snap7Client的缓存方法
cache_success = self.plc_client.cache_large_data_block(
db_number=self.db_number,
offset=self.offset,
size=self.size
)
# 步骤2处理读取结果缓存+解析+写文件)
if cache_success and self.plc_client.data_cache is not None:
raw_data = self.plc_client.data_cache # 原始字节
data_len = len(raw_data)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 构造数据基本信息
data_info = {
"area_name": self.area_name,
"db_number": self.db_number,
"offset_range": f"0-{self.size - 1}",
"actual_length": data_len,
"area_type": self.area_type
}
# 步骤3线程安全更新内存缓存
with self._data_lock:
self._latest_data = (timestamp, data_info, raw_data.copy())
# 步骤4写入文件含原始字节+解析后数据)
self._write_latest_data_to_file(timestamp, data_info, raw_data)
else:
self.logger.warning(f"⚠️ 数据读取失败DB{self.db_number}),跳过本次更新")
# 步骤6精确控制读取间隔
cycle_elapsed = time.time() - cycle_start
sleep_time = max(0, self.update_interval - cycle_elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
self.logger.error(f"🔴 循环读取出错: {str(e)}", exc_info=True)
time.sleep(self.update_interval)
def _write_latest_data_to_file(self, timestamp, data_info, raw_data):
"""
写入文件:含原始字节+解析后的结构化数据(每个区域独立文件)
"""
try:
# 处理原始字节为列表(便于查看)
data_list = list(raw_data) # 只显示前50字节避免文件过大
data_str = f"{data_list} (共{len(raw_data)}字节)"
# 覆盖写入文件
with open(self.output_file, "w", encoding="utf-8") as f:
f.write(f"[{timestamp}] 📝 {self.area_name} 最新数据\n")
f.write(
f" - 区域信息DB{data_info['db_number']}{data_info['offset_range']}),类型{data_info['area_type']}\n")
f.write(f" - 原始字节数据:{data_str}\n")
f.write("=" * 120 + "\n")
self.logger.debug(f"📤 最新DB{self.db_number}数据已覆盖写入文件:{self.output_file}")
except Exception as e:
self.logger.error(f"🔴 写入DB{self.db_number}数据到文件出错: {str(e)}", exc_info=True)