将读写功能修改为read_genetic,write_genetic并且添加.exe文件
This commit is contained in:
@ -7,11 +7,11 @@ import ast
|
||||
|
||||
class Snap7Client:
|
||||
"""Snap7客户端,处理与PLC的通信"""
|
||||
|
||||
|
||||
def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1):
|
||||
"""
|
||||
初始化Snap7客户端
|
||||
|
||||
|
||||
Args:
|
||||
ip: PLC IP地址
|
||||
rack: Rack编号
|
||||
@ -77,19 +77,19 @@ class Snap7Client:
|
||||
def read_db(self, db_number, offset, size):
|
||||
"""
|
||||
从DB块读取数据
|
||||
|
||||
|
||||
Args:
|
||||
db_number: DB编号
|
||||
offset: 起始偏移量
|
||||
size: 读取字节数
|
||||
|
||||
|
||||
Returns:
|
||||
bytearray: 读取的数据,如果失败返回None
|
||||
"""
|
||||
if not self.connected and not self.connect():
|
||||
self.logger.warning(f"Read failed: not connected to {self.ip}")
|
||||
return None # 返回None而不是零填充数据
|
||||
|
||||
|
||||
try:
|
||||
with self.lock: # 进入锁,其他线程需等待
|
||||
data = self.client.db_read(db_number, offset, size)
|
||||
@ -155,16 +155,10 @@ class Snap7Client:
|
||||
return False
|
||||
|
||||
try:
|
||||
with self.lock:
|
||||
values = int(data)
|
||||
value = bytearray(0)
|
||||
if isinstance(values, int):
|
||||
set_int(value, offset, values)
|
||||
data = value
|
||||
|
||||
self.client.db_write(db_number, offset, data)
|
||||
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
|
||||
return True
|
||||
self.client.db_write(db_number, offset, data)
|
||||
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Write DB{db_number} error: {e}")
|
||||
self.connected = False
|
||||
@ -214,13 +208,6 @@ class Snap7Client:
|
||||
|
||||
try:
|
||||
with self.lock:
|
||||
data = data.decode('utf-8')
|
||||
# 将字符串安全转换为字典
|
||||
data_dict = ast.literal_eval(data) # 输出: {0: True}
|
||||
value = bytearray(offset + 1)
|
||||
for bit, val in data_dict.items():
|
||||
set_bool(value, offset, bit, val)
|
||||
data = value
|
||||
|
||||
self.client.db_write(db_number, offset, data)
|
||||
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
|
||||
@ -255,4 +242,207 @@ class Snap7Client:
|
||||
except Exception as e:
|
||||
self.logger.error(f"Write DB{db_number} error: {e}")
|
||||
self.connected = False
|
||||
return False
|
||||
|
||||
def read_generic(self, db_number, offset, data_type, count=1):
|
||||
"""
|
||||
通用读取接口,支持多种数据类型
|
||||
Args:
|
||||
db_number: DB块编号
|
||||
offset: 起始偏移量(字节或位,对于bool类型)
|
||||
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
|
||||
count: 要读取的数据个数
|
||||
Returns:
|
||||
解析后的数据(单个值或值列表),失败返回None
|
||||
"""
|
||||
if not self.connected and not self.connect():
|
||||
self.logger.warning(f"Read failed: not connected to {self.ip}")
|
||||
return None
|
||||
|
||||
try:
|
||||
if data_type == 'bool':
|
||||
# 对于bool,offset是位偏移
|
||||
byte_offset = offset // 8
|
||||
bit_offset = offset % 8
|
||||
# 计算需要读取的字节数
|
||||
last_bit = bit_offset + count - 1
|
||||
last_byte = last_bit // 8
|
||||
total_bytes = last_byte - byte_offset + 1
|
||||
|
||||
# 读取原始字节数据
|
||||
data = self.read_db(db_number, byte_offset, total_bytes)
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
# 解析bool值
|
||||
result = []
|
||||
for i in range(count):
|
||||
current_bit = bit_offset + i
|
||||
byte_idx = current_bit // 8
|
||||
bit_idx = current_bit % 8
|
||||
result.append(bool(data[byte_idx] & (1 << bit_idx)))
|
||||
|
||||
return result[0] if count == 1 else result
|
||||
|
||||
elif data_type == 'byte':
|
||||
data = self.read_db(db_number, offset, count)
|
||||
if data is None:
|
||||
return None
|
||||
return [data[i] for i in range(count)] if count > 1 else data[0]
|
||||
|
||||
elif data_type in ['int', 'word']:
|
||||
total_bytes = 2 * count
|
||||
data = self.read_db(db_number, offset, total_bytes)
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
result = []
|
||||
for i in range(count):
|
||||
if data_type == 'int':
|
||||
result.append(get_int(data, i * 2))
|
||||
else: # word
|
||||
result.append(get_word(data, i * 2))
|
||||
return result[0] if count == 1 else result
|
||||
|
||||
elif data_type in ['dint', 'dword', 'real']:
|
||||
total_bytes = 4 * count
|
||||
data = self.read_db(db_number, offset, total_bytes)
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
result = []
|
||||
for i in range(count):
|
||||
if data_type == 'dint':
|
||||
result.append(get_dint(data, i * 4))
|
||||
elif data_type == 'dword':
|
||||
result.append(get_dword(data, i * 4))
|
||||
else: # real
|
||||
result.append(get_real(data, i * 4))
|
||||
return result[0] if count == 1 else result
|
||||
|
||||
else:
|
||||
self.logger.error(f"Unsupported data type: {data_type}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
|
||||
return None
|
||||
|
||||
def write_generic(self, db_number, offset, data_type, value):
|
||||
"""
|
||||
通用写入接口,支持多种数据类型
|
||||
Args:
|
||||
db_number: DB块编号
|
||||
offset: 起始偏移量(字节或位,对于bool类型)
|
||||
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
|
||||
value: 要写入的值(可以是单个值或列表)
|
||||
Returns:
|
||||
bool: 是否写入成功
|
||||
"""
|
||||
if not self.connected and not self.connect():
|
||||
self.logger.warning(f"Write failed: not connected to {self.ip}")
|
||||
return False
|
||||
|
||||
try:
|
||||
if data_type == 'bool':
|
||||
# 对于bool,offset是位偏移
|
||||
byte_offset = offset // 8
|
||||
bit_offset = offset % 8
|
||||
|
||||
# 读取当前字节
|
||||
current_byte = self.read_db(db_number, byte_offset, 1)
|
||||
if current_byte is None:
|
||||
return False
|
||||
|
||||
# 修改特定位
|
||||
if isinstance(value, list):
|
||||
# 多个bool值
|
||||
for i, val in enumerate(value):
|
||||
current_bit = bit_offset + i
|
||||
byte_idx = current_bit // 8
|
||||
bit_idx = current_bit % 8
|
||||
|
||||
if val:
|
||||
current_byte[0] |= (1 << bit_idx)
|
||||
else:
|
||||
current_byte[0] &= ~(1 << bit_idx)
|
||||
else:
|
||||
# 单个bool值
|
||||
if value:
|
||||
current_byte[0] |= (1 << bit_offset)
|
||||
else:
|
||||
current_byte[0] &= ~(1 << bit_offset)
|
||||
|
||||
# 写回修改后的字节
|
||||
return self.write_db_bool(db_number, byte_offset, current_byte)
|
||||
|
||||
elif data_type == 'byte':
|
||||
if isinstance(value, list):
|
||||
# 批量写入
|
||||
for i, val in enumerate(value):
|
||||
if val < 0 or val > 255:
|
||||
self.logger.error(f"Byte value out of range: {val}")
|
||||
return False
|
||||
if not self.write_db(db_number, offset + i, bytes([val])):
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
# 单个字节
|
||||
if value < 0 or value > 255:
|
||||
self.logger.error(f"Byte value out of range: {value}")
|
||||
return False
|
||||
return self.write_db(db_number, offset, bytes([value]))
|
||||
|
||||
elif data_type in ['int', 'word']:
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
|
||||
for i, val in enumerate(value):
|
||||
# 确保int值在范围内
|
||||
if data_type == 'int' and (val < -32768 or val > 32767):
|
||||
self.logger.error(f"Int value out of range: {val}")
|
||||
return False
|
||||
elif data_type == 'word' and (val < 0 or val > 65535):
|
||||
self.logger.error(f"Word value out of range: {val}")
|
||||
return False
|
||||
|
||||
data = bytearray(2)
|
||||
if data_type == 'int':
|
||||
set_int(data, 0, val)
|
||||
else:
|
||||
set_word(data, 0, val)
|
||||
|
||||
if not self.write_db(db_number, offset + i * 2, data):
|
||||
return False
|
||||
return True
|
||||
|
||||
elif data_type in ['dint', 'dword', 'real']:
|
||||
if not isinstance(value, list):
|
||||
value = [value]
|
||||
|
||||
for i, val in enumerate(value):
|
||||
data = bytearray(4)
|
||||
if data_type == 'dint':
|
||||
if val < -2147483648 or val > 2147483647:
|
||||
self.logger.error(f"DInt value out of range: {val}")
|
||||
return False
|
||||
set_dint(data, 0, val)
|
||||
elif data_type == 'dword':
|
||||
if val < 0 or val > 4294967295:
|
||||
self.logger.error(f"DWord value out of range: {val}")
|
||||
return False
|
||||
set_dword(data, 0, val)
|
||||
else: # real
|
||||
set_real(data, 0, float(val))
|
||||
|
||||
if not self.write_db(db_number, offset + i * 4, data):
|
||||
return False
|
||||
return True
|
||||
|
||||
else:
|
||||
self.logger.error(f"Unsupported data type: {data_type}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}")
|
||||
return False
|
||||
Reference in New Issue
Block a user