增强稳定性,使用线程实时读取PLC中db_100中的数据并存储起来,read_generic从存储的文件中读取

This commit is contained in:
您çšpengqi
2025-09-26 19:49:56 +08:00
parent 83b374ffea
commit 485bbcc3f0
5 changed files with 592 additions and 101 deletions

View File

@ -1,3 +1,5 @@
from copyreg import dispatch_table
import snap7
import logging
from threading import Lock
@ -31,6 +33,11 @@ class Snap7Client:
self.retry_count = 0
self.logger = logging.getLogger(f"Snap7Client[{ip}]")
# ---------------
# 新增
# ---------------
self.db100_cache_file = "db100_latest_data.log"
def is_valid_connection(self):
"""检查连接是否真正有效"""
try:
@ -244,89 +251,89 @@ class Snap7Client:
self.connected = False
return False
def read_generic(self, db_number, offset, data_type, count=1):
"""
通用读取接口,支持多种数据类型
Args:
db_number: DB块编号
offset: 起始偏移量字节或位对于bool类型
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
count: 要读取的数据个数
Returns:
解析后的数据单个值或值列表失败返回None
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None
try:
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 计算需要读取的字节数
last_bit = bit_offset + count - 1
last_byte = last_bit // 8
total_bytes = last_byte - byte_offset + 1
# 读取原始字节数据
data = self.read_db(db_number, byte_offset, total_bytes)
if data is None:
return None
# 解析bool值
result = []
for i in range(count):
current_bit = bit_offset + i
byte_idx = current_bit // 8
bit_idx = current_bit % 8
result.append(bool(data[byte_idx] & (1 << bit_idx)))
return result[0] if count == 1 else result
elif data_type == 'byte':
data = self.read_db(db_number, offset, count)
if data is None:
return None
return [data[i] for i in range(count)] if count > 1 else data[0]
elif data_type in ['int', 'word']:
total_bytes = 2 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'int':
result.append(get_int(data, i * 2))
else: # word
result.append(get_word(data, i * 2))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
total_bytes = 4 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'dint':
result.append(get_dint(data, i * 4))
elif data_type == 'dword':
result.append(get_dword(data, i * 4))
else: # real
result.append(get_real(data, i * 4))
return result[0] if count == 1 else result
else:
self.logger.error(f"Unsupported data type: {data_type}")
return None
except Exception as e:
self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
return None
# def read_generic(self, db_number, offset, data_type, count=1):
# """
# 通用读取接口,支持多种数据类型
# Args:
# db_number: DB块编号
# offset: 起始偏移量字节或位对于bool类型
# data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
# count: 要读取的数据个数
# Returns:
# 解析后的数据单个值或值列表失败返回None
# """
# if not self.connected and not self.connect():
# self.logger.warning(f"Read failed: not connected to {self.ip}")
# return None
#
# try:
# if data_type == 'bool':
# # 对于booloffset是位偏移
# byte_offset = offset // 8
# bit_offset = offset % 8
# # 计算需要读取的字节数
# last_bit = bit_offset + count - 1
# last_byte = last_bit // 8
# total_bytes = last_byte - byte_offset + 1
#
# # 读取原始字节数据
# data = self.read_db(db_number, byte_offset, total_bytes)
# if data is None:
# return None
#
# # 解析bool值
# result = []
# for i in range(count):
# current_bit = bit_offset + i
# byte_idx = current_bit // 8
# bit_idx = current_bit % 8
# result.append(bool(data[byte_idx] & (1 << bit_idx)))
#
# return result[0] if count == 1 else result
#
# elif data_type == 'byte':
# data = self.read_db(db_number, offset, count)
# if data is None:
# return None
# return [data[i] for i in range(count)] if count > 1 else data[0]
#
# elif data_type in ['int', 'word']:
# total_bytes = 2 * count
# data = self.read_db(db_number, offset, total_bytes)
# if data is None:
# return None
#
# result = []
# for i in range(count):
# if data_type == 'int':
# result.append(get_int(data, i * 2))
# else: # word
# result.append(get_word(data, i * 2))
# return result[0] if count == 1 else result
#
# elif data_type in ['dint', 'dword', 'real']:
# total_bytes = 4 * count
# data = self.read_db(db_number, offset, total_bytes)
# if data is None:
# return None
#
# result = []
# for i in range(count):
# if data_type == 'dint':
# result.append(get_dint(data, i * 4))
# elif data_type == 'dword':
# result.append(get_dword(data, i * 4))
# else: # real
# result.append(get_real(data, i * 4))
# return result[0] if count == 1 else result
#
# else:
# self.logger.error(f"Unsupported data type: {data_type}")
# return None
#
# except Exception as e:
# self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
# return None
def write_generic(self, db_number, offset, data_type, value):
"""
@ -445,4 +452,249 @@ class Snap7Client:
except Exception as e:
self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}")
return False
return False
# -----------------
# 新增
# -----------------
def cache_large_data_block(self, db_number, offset, size=6000):
"""
一次性读取指定大小的数据并缓存
参数:
db_number: DB块编号
start_offset: 起始偏移量
size: 要读取的字节数默认6000
返回值:
成功返回True失败返回False
"""
try:
self.data_cache = self.read_db(db_number, offset, size)
if self.data_cache is not None:
self.cache_db_number = db_number
self.cache_offset = offset
self.logger.info(f"Successfully cached {size} bytes from DB{db_number} starting at offset {offset}")
return True
else:
self.logger.error(f"Failed to cache {size} bytes from DB{db_number} starting at offset {offset}")
return False
except Exception as e:
self.logger.error(f"Error caching data: {e}")
return False
def read_db100_from_file(self):
"""
从缓存文件中解析DB100的原始字节数据
返回: bytearray成功/ None失败
"""
try:
# 1. 读取缓存文件内容
with open(self.db100_cache_file, "r", encoding="utf-8") as f:
content = f.read()
# 2. 定位"原始字节数据:"行匹配DB100ReaderThread的文件格式
data_line = None
for line in content.splitlines():
if "原始字节数据:" in line:
data_line = line.strip()
break
if not data_line:
self.logger.error(f"❌ DB100缓存文件格式错误未找到'原始字节数据'")
return None
# 3. 提取字节列表字符串(如"[16,0,0,...]"
list_start = data_line.find("[")
list_end = data_line.find("]") + 1 # 包含闭合括号
if list_start == -1 or list_end == 0:
self.logger.error(f"❌ DB100缓存文件格式错误未找到有效字节列表")
return None
byte_list_str = data_line[list_start:list_end]
# 4. 解析字符串为整数列表再转成bytearray
byte_list = ast.literal_eval(byte_list_str)
# 验证列表有效性必须是0-255的整数
if not isinstance(byte_list, list) or not all(
isinstance(b, int) and 0 <= b <= 255 for b in byte_list
):
self.logger.error(f"❌ DB100缓存文件数据无效字节列表包含非整数或超出范围值")
return None
# 5. 验证数据长度至少满足DB100的6000字节
if len(byte_list) < 6000:
self.logger.warning(f"⚠️ DB100缓存文件数据不完整{len(byte_list)}字节期望6000字节")
self.logger.debug(f"✅ 从缓存文件读取DB100数据{len(byte_list)}字节)")
return bytearray(byte_list)
except FileNotFoundError:
self.logger.warning(f"⚠️ DB100缓存文件不存在{self.db100_cache_file}")
return None
except ast.literal_eval.Error as e:
self.logger.error(f"❌ 解析DB100字节列表失败: {e}")
return None
except Exception as e:
self.logger.error(f"❌ 读取DB100缓存文件异常: {e}", exc_info=True)
return None
def read_generic(self, db_number, offset, data_type, count=1):
"""
通用读取接口(改进):
- DB100优先从缓存文件读取 → 再内存缓存 → 最后PLC
- 其他DB块保持原逻辑内存缓存→PLC
"""
# 1、处理DB100优先从缓存文件读取
if db_number == 100:
print(f"从缓存文件中读取{db_number}的数据")
db100_raw = self.read_db100_from_file()
if db100_raw is not None:
try:
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 计算需要读取的字节数
last_bit = bit_offset + count - 1
last_byte = last_bit // 8
# 检查数据长度是否足够
if last_byte >= len(db100_raw):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要字节{last_byte},实际{len(db100_raw)}字节")
else:
result = []
for i in range(count):
curr_bit = bit_offset + i
curr_byte = curr_bit // 8
curr_bit_in_byte = curr_bit % 8
result.append(bool(db100_raw[curr_byte] & (1 << curr_bit_in_byte)))
return result[0] if count == 1 else result
elif data_type == 'byte':
if offset + count > len(db100_raw):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{count}字节,实际{len(db100_raw)}字节")
else:
data = db100_raw[offset:offset + count]
return [b for b in data] if count > 1 else data[0]
elif data_type in ['int', 'word']:
byte_per_data = 2
total_bytes = byte_per_data * count
if offset + total_bytes > len(db100_raw):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{total_bytes}字节,实际{len(db100_raw)}字节")
else:
result = []
for i in range(count):
start = offset + i * byte_per_data
slice_data = db100_raw[start:start + byte_per_data]
result.append(get_int(slice_data, 0) if data_type == 'int' else get_word(slice_data, 0))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
byte_per_data = 4
total_bytes = byte_per_data * count
if offset + total_bytes > len(db100_raw):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{total_bytes}字节,实际{len(db100_raw)}字节")
else:
result = []
for i in range(count):
start = offset + i * byte_per_data
slice_data = db100_raw[start:start + byte_per_data]
if data_type == 'dint':
result.append(get_dint(slice_data, 0))
elif data_type == 'dword':
result.append(get_dword(slice_data, 0))
else: # real
result.append(get_real(slice_data, 0))
return result[0] if count == 1 else result
# 1.5 不支持的数据类型
else:
self.logger.error(f"❌ 不支持的数据类型:{data_type}")
return None
except Exception as e:
self.logger.error(f"❌ 解析DB100缓存文件数据异常类型{data_type},偏移:{offset}: {e}",
exc_info=True)
# 文件读取失败fallback到原逻辑内存缓存→PLC
self.logger.debug(f"⚠️ DB100缓存文件读取失败 fallback到内存缓存/PLC")
# 缓存读取失败或未启用缓存直接从PLC读取
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None
try:
print(f"从PLC中读取{db_number}的数据")
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 计算需要读取的字节数
last_bit = bit_offset + count - 1
last_byte = last_bit // 8
total_bytes = last_byte - byte_offset + 1
# 读取原始字节数据
data = self.read_db(db_number, byte_offset, total_bytes)
if data is None:
return None
# 解析bool值
result = []
for i in range(count):
curr_bit = bit_offset + i
byte_idx = curr_bit // 8
bit_idx = curr_bit % 8
result.append(bool(data[byte_idx] & (1 << bit_idx)))
return result[0] if count == 1 else result
elif data_type == 'byte':
data = self.read_db(db_number, offset, count)
if data is None:
return None
return [data[i] for i in range(count)] if count > 1 else data[0]
elif data_type in ['int', 'word']:
total_bytes = 2 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'int':
result.append(get_int(data, i * 2))
else: # word
result.append(get_word(data, i * 2))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
total_bytes = 4 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'dint':
result.append(get_dint(data, i * 4))
elif data_type == 'dword':
result.append(get_dword(data, i * 4))
else: # real
result.append(get_real(data, i * 4))
return result[0] if count == 1 else result
else:
self.logger.error(f"Unsupported data type: {data_type}")
return None
except Exception as e:
self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
return None