Files
gateway_plc/gateway/db100_reader.py

226 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/26 09:40
# @Author : reenrr
# @File : db100_reader.py
'''
import threading
import time
import logging
from datetime import datetime
from snap7_client import Snap7Client # 导入实际的Snap7Client类
class DB100ReaderThread(threading.Thread):
def __init__(self, plc_client, update_interval=0.03, data_size=6000, output_file="db100_latest_data.log"):
"""
初始化DB100数据读取线程仅保留最新数据支持直接提取
参数:
plc_config: PLC连接配置字典包含ip, rack, slot
update_interval: 更新间隔(秒)默认0.03秒(30ms)
data_size: 读取的数据大小默认6000字节
output_file: 最新数据存储文件路径覆盖写入仅存1条记录
"""
super().__init__(name="DB100ReaderThread", daemon=True) # 设为守护线程,主程序退出时自动停止
# 1. PLC连接配置与客户端初始化
self.plc_client = plc_client
# 2. 线程与数据核心配置
self.update_interval = update_interval
self.data_size = data_size
self.output_file = output_file
# 3. 线程状态与数据缓存(新增:内存缓存最新数据,避免频繁读文件)
self.running = False
self._latest_data = None # 内存缓存最新数据(格式:(timestamp, data_info, raw_bytes)
self._data_lock = threading.Lock() # 线程锁:确保数据读写安全
# 4. 日志配置
self.logger = logging.getLogger("DB100Reader")
def start(self):
"""启动线程先验证PLC连接"""
self.running = True
super().start()
self.logger.info("✅ DB100数据读取线程已启动PLC连接成功")
self.logger.info(f"🔧 配置:更新间隔{self.update_interval*1000}ms每次读取{self.data_size}字节")
def stop(self):
"""停止线程(优雅清理)"""
self.running = False
if self.is_alive():
self.join(timeout=1)
if self.is_alive():
self.logger.warning("⚠️ 线程未正常退出,强制终止")
self.logger.info("🛑 DB100数据读取线程已停止PLC连接已清理")
def get_latest_data(self):
"""
新增:获取最新数据的接口(线程安全)
返回:
字典格式:{
"timestamp": "2025-09-26 10:13:50.606", # 毫秒级时间戳
"data_info": {
"db_name": "DB100",
"offset_range": "0-5999",
"actual_length": 6000 # 实际读取字节数
},
"raw_bytes": bytearray(b'\x10\x00\x00...') # 原始字节数据(可直接用于后续提取)
}
若未获取到数据返回None
"""
with self._data_lock: # 加锁确保数据一致性(避免读取时数据正在更新)
if self._latest_data is None:
return None
# 解构内存缓存的最新数据,返回结构化字典
timestamp, data_info, raw_bytes = self._latest_data
return {
"timestamp": timestamp,
"data_info": data_info,
"raw_bytes": raw_bytes.copy() # 返回副本,避免原始数据被外部修改
}
def run(self):
"""线程主循环读取DB100→更新内存缓存→覆盖写入文件"""
self.logger.debug("📌 线程主循环已启动")
while self.running:
cycle_start = time.time()
try:
# 步骤1读取DB100数据
cache_success = self.plc_client.cache_large_data_block(
db_number=100,
offset=0,
size=self.data_size
)
# 步骤2更新内存缓存和文件仅保留最新数据
if cache_success and self.plc_client.data_cache is not None:
raw_data = self.plc_client.data_cache # 原始字节数据bytearray
data_len = len(raw_data)
# 生成毫秒级时间戳和数据信息
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
data_info = {
"db_name": "DB100",
"offset_range": f"0-{self.data_size - 1}",
"actual_length": data_len
}
# 线程安全更新内存缓存
with self._data_lock:
self._latest_data = (timestamp, data_info, raw_data.copy())
# 覆盖写入文件(删除历史记录,仅存最新一条)
self._write_latest_data_to_file(timestamp, data_info, raw_data)
else:
self.logger.warning("⚠️ DB100数据缓存失败跳过本次更新")
# 步骤3精确控制更新间隔
cycle_elapsed = time.time() - cycle_start
sleep_time = max(0, self.update_interval - cycle_elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
except Exception as e:
self.logger.error(f"🔴 循环读取DB100出错: {str(e)}", exc_info=True)
time.sleep(self.update_interval)
def _write_latest_data_to_file(self, timestamp, data_info, raw_data):
"""
修改:覆盖写入文件(仅保留最新数据,删除历史记录)
参数:
timestamp: 毫秒级时间戳
data_info: 数据基本信息字典
raw_data: 原始字节数据bytearray
"""
try:
# 处理原始数据显示(转为列表格式,便于查看)
data_list = list(raw_data)
data_str = f"{data_list} (共{len(raw_data)}字节)"
# 覆盖模式写入("w"模式:清空文件后写入最新数据)
with open(self.output_file, "w", encoding="utf-8") as f:
f.write(f"[{timestamp}] 📝 DB100最新数据\n")
f.write(f" - 数据基本信息:{data_info['db_name']}(偏移{data_info['offset_range']}),实际长度{data_info['actual_length']}字节\n")
f.write(f" - 原始字节数据:{data_str}\n")
f.write("=" * 120 + "\n")
self.logger.debug(f"📤 最新DB100数据已覆盖写入文件{self.output_file}")
except Exception as e:
self.logger.error(f"🔴 写入最新DB100数据到文件出错: {str(e)}", exc_info=True)
# -------------------------- 程序入口(含使用示例) --------------------------
if __name__ == "__main__":
# 1. 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("db100_reader_runtime.log", encoding="utf-8", mode="a")
]
)
# 2. PLC连接配置需根据实际PLC修改
PLC_CONFIG = {
"ip": "192.168.1.0", # 替换为实际PLC IP
"rack": 0, # S7-1200默认0S7-300默认0
"slot": 1 # S7-1200默认1S7-300默认2
}
# 3. 启动线程并示例“提取最新数据”
try:
db_reader = DB100ReaderThread(
plc_config=PLC_CONFIG,
update_interval=0.03,
data_size=6000,
output_file="db100_latest_data.log" # 仅存最新数据的文件
)
db_reader.start()
logging.info("🎉 程序已启动正在实时读取DB100数据...(按 Ctrl+C 停止)")
logging.info("📌 示例每1秒提取一次最新数据实际使用时可按需调用get_latest_data()")
# 示例每隔1秒提取一次最新数据模拟后续业务逻辑
while True:
latest_data = db_reader.get_latest_data()
if latest_data:
logging.info(f"\n【最新数据提取结果】")
logging.info(f"时间戳:{latest_data['timestamp']}")
logging.info(f"数据长度:{latest_data['data_info']['actual_length']}字节")
logging.info(f"前10字节原始数据{latest_data['raw_bytes'][:10]}") # 示例提取前10字节
else:
logging.info("⚠️ 暂未获取到DB100最新数据")
time.sleep(1)
read_result = db_reader.plc_client.read_generic(
db_number=100,
offset=0,
data_type="bool",
count=1
)
# 打印read_generic()的读取结果
if read_result is not None:
logging.info(f"\n【read_generic()读取结果】")
logging.info(f"读取配置DB100偏移10int类型1个数据")
logging.info(f"解析后的值:{read_result}")
else:
logging.info("⚠️ read_generic()读取失败(无数据或解析错误)")
time.sleep(1) # 每隔1秒读取一次
except KeyboardInterrupt:
logging.info("\n⚠️ 收到用户停止指令,正在清理资源...")
if 'db_reader' in locals():
db_reader.stop()
logging.info("👋 程序已正常退出")
except Exception as e:
logging.critical(f"💥 程序初始化失败: {str(e)}", exc_info=True)
exit(1)