Files
gateway_plc/gateway/snap7_client.py

626 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import snap7
import logging
from threading import Lock
import time
from snap7.util import *
class Snap7Client:
"""Snap7客户端处理与PLC的通信"""
def __init__(self, ip, rack, slot, reader_threads, max_retries=5, retry_base_delay=1):
"""
初始化Snap7客户端
Args:
ip: PLC IP地址
rack: Rack编号
slot: Slot编号
max_retries: 最大重试次数
retry_base_delay: 基础重试延迟(秒)
"""
self.ip = ip
self.rack = rack
self.slot = slot
self.client = snap7.client.Client()
self.lock = Lock()
self.connected = False
self.max_retries = max_retries
self.retry_base_delay = retry_base_delay
self.last_connect_attempt = 0
self.retry_count = 0
self.reader_threads = reader_threads
self.logger = logging.getLogger(f"Snap7Client[{ip}]")
def is_valid_connection(self):
"""检查连接是否真正有效"""
try:
# 尝试读取PLC的运行状态
cpu_state = self.client.get_cpu_state()
print("当前 CPU 状态:", cpu_state)
return cpu_state in ['S7CpuStatusRun', 'S7CpuStatusStop']
except:
return False
def connect(self):
"""建立与PLC的连接并验证"""
current_time = time.time()
# 指数退避重试
if self.retry_count > 0:
delay = min(self.retry_base_delay * (2 ** (self.retry_count - 1)), 30)
if current_time - self.last_connect_attempt < delay:
return False # 未到重试时间
self.last_connect_attempt = current_time
try:
with self.lock: # 进入锁,其他线程需等待
self.client.connect(self.ip, self.rack, self.slot)
# 验证连接是否真正有效
if self.client.get_connected() and self.is_valid_connection():
self.connected = True
self.retry_count = 0 # 重置重试计数
self.logger.info(f"Successfully connected to PLC {self.ip}")
return True
else:
self.connected = False
self.logger.warning(f"Connection to {self.ip} established but PLC is not responding")
try:
self.client.disconnect()
except:
pass
return False
except Exception as e:
self.retry_count = min(self.retry_count + 1, self.max_retries)
self.logger.error(f"Connection to {self.ip} failed (attempt {self.retry_count}/{self.max_retries}): {e}")
self.connected = False
return False
def read_db(self, db_number, offset, size):
"""
从DB块读取数据
Args:
db_number: DB编号
offset: 起始偏移量
size: 读取字节数
Returns:
bytearray: 读取的数据如果失败返回None
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock: # 进入锁,其他线程需等待
data = self.client.db_read(db_number, offset, size)
# 验证返回数据的有效性
if data is None or len(data) != size:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {size}, got {len(data) if data else 0})")
return None
return data
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def read_db_bool(self, db_number, offset, bit_length):
"""
从 DB 块中读取一个字节,并提取其中的多个 BOOL 位
Args:
db_number (int): DB块编号
offset (int): 要读取的字节偏移地址
bit_length: 第几位如1表示第1位
Returns:
result:返回位值
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock:
data = self.client.db_read(db_number, offset, 1)
result = {}
for bit in range(bit_length):
result[bit] = bool(data[0] & (1 << bit))
if result is None or len(result) != bit_length:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {bit_length}, got {len(result) if data else 0})")
return None
return result
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def write_generic(self, db_number, offset, data_type, value):
"""
通用写入接口,支持多种数据类型
Args:
db_number: DB块编号
offset: 起始偏移量字节或位对于bool类型
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
value: 要写入的值(可以是单个值或列表)
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 读取当前字节
current_byte = self.read_db(db_number, byte_offset, 1)
if current_byte is None:
return False
# 修改特定位
if isinstance(value, list):
# 多个bool值
for i, val in enumerate(value):
current_bit = bit_offset + i
byte_idx = current_bit // 8
bit_idx = current_bit % 8
if val:
current_byte[0] |= (1 << bit_idx)
else:
current_byte[0] &= ~(1 << bit_idx)
else:
# 单个bool值
if value:
current_byte[0] |= (1 << bit_offset)
else:
current_byte[0] &= ~(1 << bit_offset)
# 写回修改后的字节
return self.write_db_bool(db_number, byte_offset, current_byte)
elif data_type == 'byte':
if isinstance(value, list):
# 批量写入
for i, val in enumerate(value):
if val < 0 or val > 255:
self.logger.error(f"Byte value out of range: {val}")
return False
if not self.write_db(db_number, offset + i, bytes([val])):
return False
return True
else:
# 单个字节
if value < 0 or value > 255:
self.logger.error(f"Byte value out of range: {value}")
return False
return self.write_db(db_number, offset, bytes([value]))
elif data_type in ['int', 'word']:
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
# 确保int值在范围内
if data_type == 'int' and (val < -32768 or val > 32767):
self.logger.error(f"Int value out of range: {val}")
return False
elif data_type == 'word' and (val < 0 or val > 65535):
self.logger.error(f"Word value out of range: {val}")
return False
data = bytearray(2)
if data_type == 'int':
set_int(data, 0, val)
else:
set_word(data, 0, val)
if not self.write_db(db_number, offset + i * 2, data):
return False
return True
elif data_type in ['dint', 'dword', 'real']:
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
data = bytearray(4)
if data_type == 'dint':
if val < -2147483648 or val > 2147483647:
self.logger.error(f"DInt value out of range: {val}")
return False
set_dint(data, 0, val)
elif data_type == 'dword':
if val < 0 or val > 4294967295:
self.logger.error(f"DWord value out of range: {val}")
return False
set_dword(data, 0, val)
else: # real
set_real(data, 0, float(val))
if not self.write_db(db_number, offset + i * 4, data):
return False
return True
else:
self.logger.error(f"Unsupported data type: {data_type}")
return False
except Exception as e:
self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}")
return False
# -----------------
# 新增
# -----------------
def cache_large_data_block(self, db_number, offset, size=6000):
"""
一次性读取指定大小的数据并缓存
参数:
db_number: DB块编号
start_offset: 起始偏移量
size: 要读取的字节数默认6000
返回值:
成功返回True失败返回False
"""
try:
self.data_cache = self.read_db(db_number, offset, size)
if self.data_cache is not None:
self.cache_db_number = db_number
self.cache_offset = offset
self.logger.info(f"Successfully cached {size} bytes from DB{db_number} starting at offset {offset}")
return True
else:
self.logger.error(f"Failed to cache {size} bytes from DB{db_number} starting at offset {offset}")
return False
except Exception as e:
self.logger.error(f"Error caching data: {e}")
return False
def read_db_from_memory_cache(self, db_number, offset, required_size, array_name, reader_threads):
"""
通用内存数组读取从PLCDataReaderThread实例的内存数组中提取DB块的目标字节片段
参数说明:
- 已删除无用的 cache_file 参数(原文件缓存逻辑已替换为内存数组)
返回值:
bytearray | None: 所需的字节片段失败返回None
"""
try:
# -------------------------- 关键修改1正确遍历元组并匹配线程 --------------------------
target_thread = None
# 遍历 reader_threads每个元素是 (area_name_str, PLCDataReaderThread实例) 的元组
for area_name_str, thread_instance in reader_threads:
# 匹配条件:线程实例的 custom_array_name == 传入的 array_name数组名在_thread实例上
if thread_instance.custom_array_name == array_name:
target_thread = thread_instance # 拿到真正的线程实例
break
# -------------------------- 关键修改2校验线程实例是否找到 --------------------------
if target_thread is None:
# 若未找到,打印所有已存在的线程数组名,便于调试
existing_arrays = [t.custom_array_name for _, t in reader_threads]
self.logger.error(
f"❌ 未找到数组{array_name}对应的线程实例!"
f"已存在的数组名:{existing_arrays},请检查数组名是否拼写正确"
)
return None
# -------------------------- 后续逻辑不变(但需确认线程实例有效) --------------------------
# 校验线程实例上是否存在数组array_name 就是线程的 custom_array_name
if not hasattr(target_thread, array_name):
self.logger.error(f"❌ 线程实例{target_thread.name}上不存在数组{array_name}(线程初始化失败)")
return None
# 线程安全获取数组数据(调用线程自带方法,内部已加锁)
custom_array = target_thread.get_custom_array()
if not custom_array:
self.logger.warning(f"⚠️ 数组{array_name}为空,无可用数据")
return None
if not isinstance(custom_array[0], bytearray):
self.logger.error(
f"❌ 数组{array_name}的元素类型错误:"
f"期望bytearray实际{type(custom_array).__name__}"
)
return None
# 验证数据长度是否满足需求
total_byte_len = len(custom_array[0])
if total_byte_len < offset + required_size:
self.logger.warning(
f"⚠️ DB{db_number}内存数据不完整:"
f"数组{array_name}最新数据共{total_byte_len}字节,"
f"需从偏移{offset}读取{required_size}字节(需{offset + required_size}字节)"
)
available_size = total_byte_len - offset
if available_size <= 0:
self.logger.error(f"❌ 偏移{offset}超出数据范围(最大偏移:{total_byte_len - 1}")
return None
required_size = available_size
return custom_array[0]
except Exception as e:
self.logger.error(f"🔴 读取内存数组{array_name}异常:{str(e)}", exc_info=True)
return None
def read_generic(self, db_number, offset, data_type, array_name, count=1):
"""
通用读取接口(改进):
- DB100优先从缓存文件读取 → 再内存缓存 → 最后PLC
- 其他DB块保持原逻辑内存缓存→PLC
"""
# 1、处理DB数据块优先从缓存文件读取
raw_data = None
if raw_data is None:
array_name = array_name
raw_data = self.read_db_from_memory_cache(db_number, offset, count, array_name, self.reader_threads)
if raw_data is not None:
print(f"从内存数组中读取DB{db_number}的数据")
self.logger.debug(f"从内存数组中读取DB{db_number}(数组:{array_name}")
try:
if data_type == 'bool':
# 对于booloffset是位偏移
start_byte = offset // 8
start_bit_in_byte = offset % 8
# 计算需要读取的字节数
end_bit = offset + count - 1
end_byte = end_bit // 8
# 检查数据长度是否足够
result = [] # 用于存储解析出的bool值
for i in range(count):
current_global_bit = offset + i
current_byte = current_global_bit // 8
current_bit = current_global_bit % 8
result.append(bool(raw_data[current_byte] & (1 << current_bit)))
return result[0] if count == 1 else result
elif data_type == 'byte':
if offset + count > len(raw_data):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{count}字节,实际{len(raw_data)}字节")
else:
data = raw_data[offset:offset + count]
return [b for b in data] if count > 1 else data[0]
elif data_type in ['int', 'word']:
total_bytes = 2 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'int':
result.append(get_int(data, i * 2))
else: # word
result.append(get_word(data, i * 2))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
byte_per_data = 4
total_bytes = byte_per_data * count
if offset + total_bytes > len(raw_data):
self.logger.warning(
f"⚠️ DB100缓存文件数据不足需要偏移{offset}+{total_bytes}字节,实际{len(raw_data)}字节")
else:
result = []
for i in range(count):
start = offset + i * byte_per_data
slice_data = raw_data[start:start + byte_per_data]
if data_type == 'dint':
result.append(get_dint(slice_data, 0))
elif data_type == 'dword':
result.append(get_dword(slice_data, 0))
else: # real
result.append(get_real(slice_data, 0))
return result[0] if count == 1 else result
# 1.5 不支持的数据类型
else:
self.logger.error(f"❌ 不支持的数据类型:{data_type}")
return None
except Exception as e:
self.logger.error(f"❌ 解析DB{db_number}缓存文件数据异常(类型:{data_type},偏移:{offset}: {e}",
exc_info=True)
# 文件读取失败fallback到原逻辑内存缓存→PLC
self.logger.debug(f"⚠️ DB{db_number}缓存文件读取失败, fallback到内存缓存/PLC")
# 缓存读取失败或未启用缓存直接从PLC读取
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None
try:
print(f"从PLC中读取DB{db_number}的数据")
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 计算需要读取的字节数
last_bit = bit_offset + count - 1
last_byte = last_bit // 8
total_bytes = last_byte - byte_offset + 1
# 读取原始字节数据
data = self.read_db(db_number, byte_offset, total_bytes)
if data is None:
return None
# 解析bool值
result = []
for i in range(count):
curr_bit = bit_offset + i
byte_idx = curr_bit // 8
bit_idx = curr_bit % 8
result.append(bool(data[byte_idx] & (1 << bit_idx)))
return result[0] if count == 1 else result
elif data_type == 'byte':
data = self.read_db(db_number, offset, count)
if data is None:
return None
return [data[i] for i in range(count)] if count > 1 else data[0]
elif data_type in ['int', 'word']:
total_bytes = 2 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'int':
result.append(get_int(data, i * 2))
else: # word
result.append(get_word(data, i * 2))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
total_bytes = 4 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'dint':
result.append(get_dint(data, i * 4))
elif data_type == 'dword':
result.append(get_dword(data, i * 4))
else: # real
result.append(get_real(data, i * 4))
return result[0] if count == 1 else result
else:
self.logger.error(f"Unsupported data type: {data_type}")
return None
except Exception as e:
self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
return None