Files
gateway_plc/gateway/snap7_client.py
2025-09-29 14:41:04 +08:00

627 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from copyreg import dispatch_table
import snap7
import logging
from threading import Lock
import time
from snap7.util import *
import ast
class Snap7Client:
"""Snap7客户端处理与PLC的通信"""
def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1):
"""
初始化Snap7客户端
Args:
ip: PLC IP地址
rack: Rack编号
slot: Slot编号
max_retries: 最大重试次数
retry_base_delay: 基础重试延迟(秒)
"""
self.ip = ip
self.rack = rack
self.slot = slot
self.client = snap7.client.Client()
self.lock = Lock()
self.connected = False
self.max_retries = max_retries
self.retry_base_delay = retry_base_delay
self.last_connect_attempt = 0
self.retry_count = 0
self.logger = logging.getLogger(f"Snap7Client[{ip}]")
def is_valid_connection(self):
"""检查连接是否真正有效"""
try:
# 尝试读取PLC的运行状态
cpu_state = self.client.get_cpu_state()
print("当前 CPU 状态:", cpu_state)
return cpu_state in ['S7CpuStatusRun', 'S7CpuStatusStop']
except:
return False
def connect(self):
"""建立与PLC的连接并验证"""
current_time = time.time()
# 指数退避重试
if self.retry_count > 0:
delay = min(self.retry_base_delay * (2 ** (self.retry_count - 1)), 30)
if current_time - self.last_connect_attempt < delay:
return False # 未到重试时间
self.last_connect_attempt = current_time
try:
with self.lock: # 进入锁,其他线程需等待
self.client.connect(self.ip, self.rack, self.slot)
# 验证连接是否真正有效
if self.client.get_connected() and self.is_valid_connection():
self.connected = True
self.retry_count = 0 # 重置重试计数
self.logger.info(f"Successfully connected to PLC {self.ip}")
return True
else:
self.connected = False
self.logger.warning(f"Connection to {self.ip} established but PLC is not responding")
try:
self.client.disconnect()
except:
pass
return False
except Exception as e:
self.retry_count = min(self.retry_count + 1, self.max_retries)
self.logger.error(f"Connection to {self.ip} failed (attempt {self.retry_count}/{self.max_retries}): {e}")
self.connected = False
return False
def read_db(self, db_number, offset, size):
"""
从DB块读取数据
Args:
db_number: DB编号
offset: 起始偏移量
size: 读取字节数
Returns:
bytearray: 读取的数据如果失败返回None
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock: # 进入锁,其他线程需等待
data = self.client.db_read(db_number, offset, size)
# 验证返回数据的有效性
if data is None or len(data) != size:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {size}, got {len(data) if data else 0})")
return None
return data
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def read_db_bool(self, db_number, offset, bit_length):
"""
从 DB 块中读取一个字节,并提取其中的多个 BOOL 位
Args:
db_number (int): DB块编号
offset (int): 要读取的字节偏移地址
bit_length: 第几位如1表示第1位
Returns:
result:返回位值
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock:
data = self.client.db_read(db_number, offset, 1)
result = {}
for bit in range(bit_length):
result[bit] = bool(data[0] & (1 << bit))
if result is None or len(result) != bit_length:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {bit_length}, got {len(result) if data else 0})")
return None
return result
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def write_generic(self, db_number, offset, data_type, value):
"""
通用写入接口,支持多种数据类型
Args:
db_number: DB块编号
offset: 起始偏移量字节或位对于bool类型
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
value: 要写入的值(可以是单个值或列表)
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 读取当前字节
current_byte = self.read_db(db_number, byte_offset, 1)
if current_byte is None:
return False
# 修改特定位
if isinstance(value, list):
# 多个bool值
for i, val in enumerate(value):
current_bit = bit_offset + i
byte_idx = current_bit // 8
bit_idx = current_bit % 8
if val:
current_byte[0] |= (1 << bit_idx)
else:
current_byte[0] &= ~(1 << bit_idx)
else:
# 单个bool值
if value:
current_byte[0] |= (1 << bit_offset)
else:
current_byte[0] &= ~(1 << bit_offset)
# 写回修改后的字节
return self.write_db_bool(db_number, byte_offset, current_byte)
elif data_type == 'byte':
if isinstance(value, list):
# 批量写入
for i, val in enumerate(value):
if val < 0 or val > 255:
self.logger.error(f"Byte value out of range: {val}")
return False
if not self.write_db(db_number, offset + i, bytes([val])):
return False
return True
else:
# 单个字节
if value < 0 or value > 255:
self.logger.error(f"Byte value out of range: {value}")
return False
return self.write_db(db_number, offset, bytes([value]))
elif data_type in ['int', 'word']:
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
# 确保int值在范围内
if data_type == 'int' and (val < -32768 or val > 32767):
self.logger.error(f"Int value out of range: {val}")
return False
elif data_type == 'word' and (val < 0 or val > 65535):
self.logger.error(f"Word value out of range: {val}")
return False
data = bytearray(2)
if data_type == 'int':
set_int(data, 0, val)
else:
set_word(data, 0, val)
if not self.write_db(db_number, offset + i * 2, data):
return False
return True
elif data_type in ['dint', 'dword', 'real']:
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
data = bytearray(4)
if data_type == 'dint':
if val < -2147483648 or val > 2147483647:
self.logger.error(f"DInt value out of range: {val}")
return False
set_dint(data, 0, val)
elif data_type == 'dword':
if val < 0 or val > 4294967295:
self.logger.error(f"DWord value out of range: {val}")
return False
set_dword(data, 0, val)
else: # real
set_real(data, 0, float(val))
if not self.write_db(db_number, offset + i * 4, data):
return False
return True
else:
self.logger.error(f"Unsupported data type: {data_type}")
return False
except Exception as e:
self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}")
return False
# -----------------
# 新增
# -----------------
def cache_large_data_block(self, db_number, offset, size=6000):
"""
一次性读取指定大小的数据并缓存
参数:
db_number: DB块编号
start_offset: 起始偏移量
size: 要读取的字节数默认6000
返回值:
成功返回True失败返回False
"""
try:
self.data_cache = self.read_db(db_number, offset, size)
if self.data_cache is not None:
self.cache_db_number = db_number
self.cache_offset = offset
self.logger.info(f"Successfully cached {size} bytes from DB{db_number} starting at offset {offset}")
return True
else:
self.logger.error(f"Failed to cache {size} bytes from DB{db_number} starting at offset {offset}")
return False
except Exception as e:
self.logger.error(f"Error caching data: {e}")
return False
def read_db_from_file_cache(self, db_number, offset, required_size, cache_file):
"""
通用文件缓存读取从指定文件读取DB块缓存返回需要的字节片段
参数:
db_number: DB块编号
offset: 需要读取的起始偏移(字节)
required_size: 需要读取的字节数
cache_file: 缓存文件名(动态生成)
返回值:
bytearray | None: 所需的字节片段失败返回None
"""
try:
# 1. 读取缓存文件内容
with open(cache_file, "r", encoding="utf-8") as f:
content = f.read()
# 2. 定位"原始字节数据:"行匹配DB100ReaderThread的文件格式
db_match = False
data_line = None
for line in content.splitlines():
if f"DB{db_number}" in line and "区域信息" in line:
db_match = True
if db_match and "原始字节数据:" in line:
data_line = line.strip()
break
if not db_match or not data_line:
self.logger.error(f"文件缓存中无DB{db_number}的有效数据(文件:{cache_file}")
return None
# 3. 提取字节列表字符串(如"[16,0,0,...]"
list_start = data_line.find("[")
list_end = data_line.find("]") + 1 # 包含闭合括号
if list_start == -1 or list_end == 0:
self.logger.error(f"❌ DB{db_number}缓存文件格式错误:未找到有效字节列表")
return None
byte_list_str = data_line[list_start:list_end]
# 4. 解析字符串为整数列表再转成bytearray
byte_list = ast.literal_eval(byte_list_str)
# 验证列表有效性必须是0-255的整数
if not isinstance(byte_list, list) or not all(
isinstance(b, int) and 0 <= b <= 255 for b in byte_list
):
self.logger.error(f"❌ DB{db_number}缓存文件数据无效:字节列表包含非整数或超出范围值")
return None
# 5. 验证数据长度至少满足DB100的6000字节
if len(byte_list) < required_size:
self.logger.warning(f"⚠️ DB{db_number}缓存文件数据不完整(仅{len(byte_list)}字节,期望{required_size}字节)")
self.logger.debug(f"✅ 从缓存文件读取DB{db_number}数据({len(byte_list)}字节)")
return bytearray(byte_list)
except FileNotFoundError:
self.logger.warning(f"⚠️ DB{db_number}缓存文件不存在:{cache_file}")
return None
except ast.literal_eval.Error as e:
self.logger.error(f"❌ 解析DB{db_number}字节列表失败: {e}")
return None
except Exception as e:
self.logger.error(f"❌ 读取DB{db_number}缓存文件异常: {e}", exc_info=True)
return None
def read_generic(self, db_number, offset, data_type, cache_file, count=1):
"""
通用读取接口(改进):
- DB100优先从缓存文件读取 → 再内存缓存 → 最后PLC
- 其他DB块保持原逻辑内存缓存→PLC
"""
# 1、处理DB数据块优先从缓存文件读取
raw_data = None
if raw_data is None:
cache_file = cache_file
raw_data = self.read_db_from_file_cache(db_number, offset, count, cache_file)
if raw_data is not None:
print(f"从缓存文件中读取DB{db_number}的数据")
self.logger.debug(f"从文件缓存读取DB{db_number}(文件:{cache_file}")
try:
if data_type == 'bool':
# 对于booloffset是位偏移
start_byte = offset // 8
start_bit_in_byte = offset % 8
# 计算需要读取的字节数
end_bit = offset + count - 1
end_byte = end_bit // 8
# 检查数据长度是否足够
if end_byte >= len(raw_data):
self.logger.warning(
f"⚠️ DB{db_number}缓存文件数据不足:需要字节{end_byte},实际{len(raw_data)}字节")
else:
result = [] # 用于存储解析出的bool值
for i in range(count):
current_global_bit = offset + i
current_byte = current_global_bit // 8
current_bit = current_global_bit % 8
result.append(bool(raw_data[current_byte] & (1 << current_bit)))
return result[0] if count == 1 else result
elif data_type == 'byte':
if offset + count > len(raw_data):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{count}字节,实际{len(raw_data)}字节")
else:
data = raw_data[offset:offset + count]
return [b for b in data] if count > 1 else data[0]
elif data_type in ['int', 'word']:
byte_per_data = 2
total_bytes = byte_per_data * count
if offset + total_bytes > len(raw_data):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{total_bytes}字节,实际{len(raw_data)}字节")
else:
result = []
for i in range(count):
start = offset + i * byte_per_data
slice_data = raw_data[start:start + byte_per_data]
result.append(get_int(slice_data, 0) if data_type == 'int' else get_word(slice_data, 0))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
byte_per_data = 4
total_bytes = byte_per_data * count
if offset + total_bytes > len(raw_data):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{total_bytes}字节,实际{len(raw_data)}字节")
else:
result = []
for i in range(count):
start = offset + i * byte_per_data
slice_data = raw_data[start:start + byte_per_data]
if data_type == 'dint':
result.append(get_dint(slice_data, 0))
elif data_type == 'dword':
result.append(get_dword(slice_data, 0))
else: # real
result.append(get_real(slice_data, 0))
return result[0] if count == 1 else result
# 1.5 不支持的数据类型
else:
self.logger.error(f"❌ 不支持的数据类型:{data_type}")
return None
except Exception as e:
self.logger.error(f"❌ 解析DB{db_number}缓存文件数据异常(类型:{data_type},偏移:{offset}: {e}",
exc_info=True)
# 文件读取失败fallback到原逻辑内存缓存→PLC
self.logger.debug(f"⚠️ DB{db_number}缓存文件读取失败, fallback到内存缓存/PLC")
# 缓存读取失败或未启用缓存直接从PLC读取
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None
try:
print(f"从PLC中读取DB{db_number}的数据")
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 计算需要读取的字节数
last_bit = bit_offset + count - 1
last_byte = last_bit // 8
total_bytes = last_byte - byte_offset + 1
# 读取原始字节数据
data = self.read_db(db_number, byte_offset, total_bytes)
if data is None:
return None
# 解析bool值
result = []
for i in range(count):
curr_bit = bit_offset + i
byte_idx = curr_bit // 8
bit_idx = curr_bit % 8
result.append(bool(data[byte_idx] & (1 << bit_idx)))
return result[0] if count == 1 else result
elif data_type == 'byte':
data = self.read_db(db_number, offset, count)
if data is None:
return None
return [data[i] for i in range(count)] if count > 1 else data[0]
elif data_type in ['int', 'word']:
total_bytes = 2 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'int':
result.append(get_int(data, i * 2))
else: # word
result.append(get_word(data, i * 2))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
total_bytes = 4 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'dint':
result.append(get_dint(data, i * 4))
elif data_type == 'dword':
result.append(get_dword(data, i * 4))
else: # real
result.append(get_real(data, i * 4))
return result[0] if count == 1 else result
else:
self.logger.error(f"Unsupported data type: {data_type}")
return None
except Exception as e:
self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
return None