调整多字节写入接口,解决逻辑混淆的代码

This commit is contained in:
2025-08-14 10:10:38 +08:00
parent d3a38291a2
commit a9981fc84d
15 changed files with 754 additions and 413 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

12
.idea/gateway.iml generated Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/gateway.iml" filepath="$PROJECT_DIR$/.idea/gateway.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -3,13 +3,14 @@ import time
import logging
from snap7.util import *
class CacheManager:
"""PLC数据缓存管理器"""
def __init__(self, config, plc_manager, app=None):
"""
初始化缓存管理器
Args:
config: 配置对象
plc_manager: PLC管理器实例
@ -28,7 +29,7 @@ class CacheManager:
self.plc_connection_status = {} # PLC连接状态
self.logger = logging.getLogger("CacheManager")
self.init_cache()
def init_cache(self):
"""初始化缓存结构"""
for plc in self.config["plcs"]:
@ -37,7 +38,7 @@ class CacheManager:
self.last_update[plc_name] = {}
self.plc_last_connected[plc_name] = 0 # 初始化为0未连接
self.plc_connection_status[plc_name] = "never_connected"
for area in plc["areas"]:
name = area["name"]
# 确保初始状态为断开
@ -51,7 +52,7 @@ class CacheManager:
"status": "disconnected" # 初始状态为断开
}
self.last_update[plc_name][name] = 0
def refresh_cache(self):
"""后台线程:定期刷新缓存"""
while self.running:
@ -59,10 +60,10 @@ class CacheManager:
for plc in self.config["plcs"]:
plc_name = plc["name"]
client = self.plc_manager.get_plc(plc_name)
# 检查PLC连接状态
plc_connected = client.connected
# 更新PLC连接状态
with self.lock:
if plc_connected:
@ -73,14 +74,14 @@ class CacheManager:
self.plc_connection_status[plc_name] = "never_connected"
else:
self.plc_connection_status[plc_name] = "disconnected"
# 刷新所有可读区域
for area in plc["areas"]:
if area["type"] in ["read", "read_write"]:
name = area["name"]
try:
data = client.read_db(area["db_number"], area["offset"], area["size"])
# 更新区域状态基于PLC连接状态和读取结果
with self.lock:
if plc_connected and data and len(data) == area["size"]:
@ -91,22 +92,23 @@ class CacheManager:
self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name]
# 如果之前有数据,保留旧数据但标记状态
if self.last_update[plc_name][name] > 0:
self.logger.info(f"PLC {plc_name} area {name} disconnected but keeping last valid data")
self.logger.info(
f"PLC {plc_name} area {name} disconnected but keeping last valid data")
except Exception as e:
with self.lock:
self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name]
self.logger.warning(f"Error updating status for {plc_name}/{name}: {e}")
time.sleep(self.refresh_interval)
except Exception as e:
self.logger.error(f"Error in refresh_cache: {e}")
time.sleep(self.refresh_interval)
def start(self):
"""启动缓存刷新线程"""
if self.running:
return
self.running = True
self.thread = threading.Thread(
target=self.refresh_cache,
@ -115,12 +117,12 @@ class CacheManager:
)
self.thread.start()
self.logger.info("Cache manager started")
def stop(self):
"""停止缓存刷新线程"""
if not self.running:
return
self.running = False
if self.thread:
# 等待线程结束,但设置超时防止卡死
@ -129,17 +131,17 @@ class CacheManager:
self.logger.warning("Cache refresh thread did not terminate gracefully")
self.thread = None
self.logger.info("Cache manager stopped")
def get_plc_connection_status(self, plc_name):
"""获取PLC连接状态"""
with self.lock:
return self.plc_connection_status.get(plc_name, "unknown")
def get_last_update_time(self, plc_name, area_name):
"""获取区域数据最后更新时间"""
with self.lock:
return self.last_update.get(plc_name, {}).get(area_name, 0)
def get_summary(self):
"""获取缓存摘要信息"""
summary = {}
@ -149,49 +151,51 @@ class CacheManager:
for area_name, area in areas.items():
last_update = self.last_update[plc_name][area_name]
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 区域状态应与PLC连接状态一致除非有有效数据
area_status = area["status"]
if plc_status == "never_connected":
area_status = "never_connected"
elif plc_status == "disconnected" and self.last_update[plc_name][area_name] == 0:
area_status = "disconnected"
summary[plc_name][area_name] = {
"status": area_status,
"plc_connection_status": plc_status,
"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never",
"last_update": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
return summary
def get_area_status(self, plc_name, area_name):
"""获取区域状态"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return {"status": "not_found", "message": "PLC or area not found"}
plc_status = self.plc_connection_status.get(plc_name, "unknown")
last_update = self.last_update.get(plc_name, {}).get(area_name, 0)
# 区域状态应与PLC连接状态一致除非有有效数据
area_status = area["status"]
if plc_status == "never_connected":
area_status = "never_connected"
elif plc_status == "disconnected" and last_update == 0:
area_status = "disconnected"
return {
"status": area_status,
"plc_connection_status": plc_status,
"last_update": last_update,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never",
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
def read_area(self, plc_name, area_name, offset, length):
"""单个区域读取"""
with self.lock:
@ -201,14 +205,14 @@ class CacheManager:
if offset + length > area["size"]:
return None, "Offset out of bounds", "unknown", 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return None, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
data = client.read_db(area["db_number"], area["offset"] + offset, length)
# 验证数据有效性
@ -219,7 +223,7 @@ class CacheManager:
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected"
return data, None, plc_status, update_time
else:
area["status"] = plc_status
@ -267,29 +271,29 @@ class CacheManager:
area["status"] = plc_status
self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}")
return None, f"Read failed: {str(e)}", plc_status, 0
def write_area(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
print(data)
success = client.write_db(area["db_number"], area["offset"] + offset, data)
@ -300,7 +304,7 @@ class CacheManager:
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
return True, None, plc_status, update_time
else:
area["status"] = plc_status
@ -450,7 +454,7 @@ class CacheManager:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def batch_read(self, requests):
"""批量读取"""
results = []
@ -502,7 +506,8 @@ class CacheManager:
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never",
"message": error
})
else:
@ -552,7 +557,8 @@ class CacheManager:
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never",
"message": error,
"offset": offset
})
@ -603,7 +609,8 @@ class CacheManager:
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never",
"message": error,
"offset": offset
})
@ -619,33 +626,34 @@ class CacheManager:
"length": len(data)
})
return results
def get_parsed_data(self, plc_name, area_name):
"""获取解析后的数据"""
from data_parser import parse_data
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return {"error": "Area not found"}
plc_status = self.plc_connection_status.get(plc_name, "unknown")
last_update = self.last_update.get(plc_name, {}).get(area_name, 0)
# 区域状态应与PLC连接状态一致除非有有效数据
area_status = area["status"]
if plc_status == "never_connected":
area_status = "never_connected"
elif plc_status == "disconnected" and last_update == 0:
area_status = "disconnected"
structure = area.get("structure", [])
if structure:
parsed = parse_data(area["data"], structure)
parsed["status"] = area_status
parsed["plc_connection_status"] = plc_status
parsed["last_update"] = last_update
parsed["last_update_formatted"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never"
parsed["last_update_formatted"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(
last_update)) if last_update > 0 else "Never"
return parsed
else:
return {
@ -653,5 +661,6 @@ class CacheManager:
"status": area_status,
"plc_connection_status": plc_status,
"last_update": last_update,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never"
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(last_update)) if last_update > 0 else "Never"
}

View File

@ -3,7 +3,8 @@ import logging
from threading import Lock
import time
from snap7.util import *
import ast
import struct
import json
class Snap7Client:
"""Snap7客户端处理与PLC的通信"""
@ -140,23 +141,24 @@ class Snap7Client:
def write_db(self, db_number, offset, data):
"""
向DB块写入数据
向DB块写入原始字节数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
data: 字节序列 (bytes, bytearray 或 list of int)
Returns:
bool: 是否写入成功
"""
values = int(data)
print("values:", values)
value = bytearray(0)
if isinstance(values, int):
set_int(value, offset, values)
data = value
print(data)
# 确保data是字节序列
if isinstance(data, list):
data = bytes(data)
elif isinstance(data, int):
data = bytes([data])
elif not isinstance(data, (bytes, bytearray)):
self.logger.error("Data must be bytes, bytearray or list of integers")
return False
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
@ -172,90 +174,204 @@ class Snap7Client:
self.connected = False
return False
def batch_write_db(self, db_number, offset, data):
def write_bit(self, db_number, byte_offset, bit_offset, value):
"""
DB块写入数据
写入DB块中的单个位
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
byte_offset: 字节偏移量
bit_offset: 位偏移量 (0-7)
value: 布尔值 (True/False)
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
# 验证位偏移量
if bit_offset < 0 or bit_offset > 7:
raise ValueError("Bit offset must be between 0 and 7")
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
# 读取包含目标位的字节
byte_data = self.read_db(db_number, byte_offset, 1)
if not byte_data or len(byte_data) != 1:
self.logger.error(f"Failed to read byte at offset {byte_offset} for bit write")
def write_db_bool(self, db_number, offset, data):
# def batch_write_db(self, db_number, offset, data):
# """
# 向DB块写入数据
#
# Args:
# db_number: DB编号
# offset: 起始偏移量
# data: 要写入的数据
#
# Returns:
# bool: 是否写入成功
# """
# if not self.connected and not self.connect():
# self.logger.warning(f"Write failed: not connected to {self.ip}")
# return False
#
# try:
# with self.lock:
# self.client.db_write(db_number, offset, data)
# self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
# return True
# except Exception as e:
# self.logger.error(f"Write DB{db_number} error: {e}")
# self.connected = False
# return False
# def write_db_bool(self, db_number, offset, data):
# """
# 向DB块写入数据
#
# Args:
# db_number: DB编号
# offset: 起始偏移量
# data: 要写入的bool类型数据
#
# Returns:
# bool: 是否写入成功
# """
# data = data.decode('utf-8')
# # 将字符串安全转换为字典
# data_dict = ast.literal_eval(data) # 输出: {0: True}
# # value = bytearray(1)
# value = bytearray(offset+1)
# for bit, val in data_dict.items():
# set_bool(value, offset, bit, val)
# data = value
# if not self.connected and not self.connect():
# self.logger.warning(f"Write failed: not connected to {self.ip}")
# return False
#
# try:
# with self.lock:
# print(db_number, offset, data)
# self.client.db_write(db_number, offset, data)
# self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
# return True
# except Exception as e:
# self.logger.error(f"Write DB{db_number} error: {e}")
# self.connected = False
# return False
def write_data(self, db_number, offset, data_type, value):
"""
向DB块写入数据
以指定数据类型向DB块写入单个数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
data_type: 数据类型 ('bool', 'byte', 'int', 'dint', 'real', 'word', 'dword')
value: 要写入的值
Returns:
bool: 是否写入成功
"""
data = data.decode('utf-8')
# 将字符串安全转换为字典
data_dict = ast.literal_eval(data) # 输出: {0: True}
# value = bytearray(1)
value = bytearray(offset+1)
for bit, val in data_dict.items():
set_bool(value, offset, bit, val)
data = value
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
# 将高级数据类型转换为字节序列
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
if data_type == "bool":
# bool类型需要特殊处理位写入
byte_offset = offset // 8
bit_offset = offset % 8
return self.write_bit(db_number, byte_offset, bit_offset, value)
elif data_type == "byte":
# 单字节
return self.write_db(db_number, offset, [int(value) & 0xFF])
elif data_type == "int":
# 有符号16位整数 (2字节)
packed = struct.pack(">h", int(value))
return self.write_db(db_number, offset, packed)
elif data_type == "dint":
# 有符号32位整数 (4字节)
packed = struct.pack(">l", int(value))
return self.write_db(db_number, offset, packed)
elif data_type == "real":
# 32位浮点数 (4字节)
packed = struct.pack(">f", float(value))
return self.write_db(db_number, offset, packed)
elif data_type == "word":
# 无符号16位整数 (2字节)
packed = struct.pack(">H", int(value) & 0xFFFF)
return self.write_db(db_number, offset, packed)
elif data_type == "dword":
# 无符号32位整数 (4字节)
packed = struct.pack(">I", int(value) & 0xFFFFFFFF)
return self.write_db(db_number, offset, packed)
else:
self.logger.error(f"Unsupported data type: {data_type}")
return False
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
self.logger.error(f"Error converting {data_type} value {value}: {e}")
return False
def batch_write_db_bool(self, db_number, offset, data):
def write_array(self, db_number, offset, data_type, values):
"""
向DB块写入数据
批量写入相同类型的数据数组
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
data_type: 数据类型 ('bool', 'byte', 'int', 'dint', 'real', 'word', 'dword')
values: 要写入的值列表
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
# 检查是否是支持的数据类型
if data_type not in ["bool", "byte", "int", "dint", "real", "word", "dword"]:
self.logger.error(f"Unsupported data type for array write: {data_type}")
return False
# 对于bool类型需要特殊处理
if data_type == "bool":
# 逐个写入bool值
for i, value in enumerate(values):
if not self.write_data(db_number, offset + i, "bool", value):
return False
return True
# 计算总字节数
if data_type in ["int", "word"]:
item_size = 2
elif data_type in ["dint", "real", "dword"]:
item_size = 4
else: # byte
item_size = 1
total_size = item_size * len(values)
# 打包所有数据
packed_data = b''
for value in values:
if data_type == "byte":
packed_data += struct.pack("B", int(value) & 0xFF)
elif data_type == "int":
packed_data += struct.pack(">h", int(value))
elif data_type == "dint":
packed_data += struct.pack(">l", int(value))
elif data_type == "real":
packed_data += struct.pack(">f", float(value))
elif data_type == "word":
packed_data += struct.pack(">H", int(value) & 0xFFFF)
elif data_type == "dword":
packed_data += struct.pack(">I", int(value) & 0xFFFFFFFF)
# 一次性写入所有数据
return self.write_db(db_number, offset, packed_data)
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
self.logger.error(f"Error writing {data_type} array: {e}")
return False