读正常,多字节写入异常

This commit is contained in:
2025-08-13 23:55:46 +08:00
parent e17bed9a6c
commit d3a38291a2
12 changed files with 700 additions and 159 deletions

View File

@ -2,7 +2,7 @@
"plcs": [ "plcs": [
{ {
"name": "PLC1", "name": "PLC1",
"ip": "192.168.0.100", "ip": "192.168.0.1",
"rack": 0, "rack": 0,
"slot": 1, "slot": 1,
"areas": [ "areas": [
@ -11,7 +11,7 @@
"type": "read", "type": "read",
"db_number": 100, "db_number": 100,
"offset": 0, "offset": 0,
"size": 4000, "size": 5000,
"structure": [ "structure": [
{ {
"name": "temperature", "name": "temperature",
@ -35,7 +35,7 @@
"name": "DB100_Write", "name": "DB100_Write",
"type": "write", "type": "write",
"db_number": 100, "db_number": 100,
"offset": 4000, "offset": 0,
"size": 5000 "size": 5000
}, },
{ {
@ -43,22 +43,7 @@
"type": "read_write", "type": "read_write",
"db_number": 202, "db_number": 202,
"offset": 0, "offset": 0,
"size": 2000 "size": 816
}
]
},
{
"name": "PLC2",
"ip": "192.168.0.101",
"rack": 0,
"slot": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 4000
} }
] ]
} }

View File

@ -2,7 +2,7 @@
"plcs": [ "plcs": [
{ {
"name": "PLC1", "name": "PLC1",
"ip": "192.168.0.100", "ip": "192.168.0.1",
"rack": 0, "rack": 0,
"slot": 1, "slot": 1,
"areas": [ "areas": [
@ -11,7 +11,7 @@
"type": "read", "type": "read",
"db_number": 100, "db_number": 100,
"offset": 0, "offset": 0,
"size": 4000, "size": 5000,
"structure": [ "structure": [
{ {
"name": "temperature", "name": "temperature",
@ -43,22 +43,7 @@
"type": "read_write", "type": "read_write",
"db_number": 202, "db_number": 202,
"offset": 0, "offset": 0,
"size": 2000 "size": 816
}
]
},
{
"name": "PLC2",
"ip": "192.168.0.101",
"rack": 0,
"slot": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 4000
} }
] ]
} }

View File

@ -176,6 +176,17 @@ class APIServer:
Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data
</div> </div>
<div class="api-endpoint">
<strong>Single Read_Bool:</strong> GET /api/read_bool/<plc_name>/<area_name>/<offset>/<length><br>
Example: /api/read_bool/PLC1/DB100_Read/0/2
</div>
<div class="api-endpoint">
<strong>Single Write_Bool:</strong> POST /api/write_bool/<plc_name>/<area_name>/<offset><br>
Body: Raw binary data<br>
Example: POST /api/write_bool/PLC1/DB100_Write/0
</div>
<div class="api-endpoint"> <div class="api-endpoint">
<strong>Batch Read:</strong> POST /api/batch_read<br> <strong>Batch Read:</strong> POST /api/batch_read<br>
Body: JSON array of read requests<br> Body: JSON array of read requests<br>
@ -765,6 +776,100 @@ class APIServer:
</div> </div>
</div> </div>
<div class="endpoint">
<h3>Single Read Bool</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/read_bool/<plc_name>/<area_name>/<offset>/<length></code>
</div>
<p>从指定区域读取数据。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Read</td>
</tr>
<tr>
<td>offset</td>
<td>起始偏移量(字节)</td>
</tr>
<tr>
<td>length</td>
<td>读取长度(字节)</td>
</tr>
</table>
<h4>响应示例</h4>
<div class="example">
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 2,
"data": [0:False, 1:False],
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
</div>
</div>
<div class="endpoint">
<h3>Single Write Bool</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/write_bool/<plc_name>/<area_name>/<offset></code>
</div>
<p>向指定区域写入数据。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Write</td>
</tr>
<tr>
<td>offset</td>
<td>起始偏移量(字节)</td>
</tr>
</table>
<h4>请求体</h4>
<p>{0:True}</p>
<h4>响应示例</h4>
<div class="example">
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"length": 1,
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30"
}
</div>
</div>
<div class="endpoint"> <div class="endpoint">
<h3>Batch Read</h3> <h3>Batch Read</h3>
<div> <div>
@ -1137,6 +1242,33 @@ class APIServer:
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
}) })
# 单个读取BOOL类型接口
@self.app.route("/api/read_bool/<plc_name>/<area_name>/<int:offset>/<int:length>", methods=["GET"], endpoint="single_read_bool")
def single_read_bool(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset, length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": [data], # list(data)
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入接口 # 单个写入接口
@self.app.route("/api/write/<plc_name>/<area_name>/<int:offset>", methods=["POST"], endpoint="single_write") @self.app.route("/api/write/<plc_name>/<area_name>/<int:offset>", methods=["POST"], endpoint="single_write")
def single_write(plc_name, area_name, offset): def single_write(plc_name, area_name, offset):
@ -1174,6 +1306,44 @@ class APIServer:
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
}) })
# 单个写入BOOL类型接口
@self.app.route("/api/write_bool/<plc_name>/<area_name>/<int:offset>", methods=["POST"], endpoint="single_write_bool")
def single_write_bool(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset, data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": 1,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 批量读取接口 # 批量读取接口
@self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read") @self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read")
def batch_read(): def batch_read():
@ -1189,7 +1359,7 @@ class APIServer:
"last_update_formatted": "N/A" "last_update_formatted": "N/A"
}), 400 }), 400
requests = request.json requests = request.get_json()
if not isinstance(requests, list): if not isinstance(requests, list):
return jsonify({ return jsonify({
"status": "error", "status": "error",
@ -1198,11 +1368,11 @@ class APIServer:
"last_update": 0, "last_update": 0,
"last_update_formatted": "N/A" "last_update_formatted": "N/A"
}), 400 }), 400
# 添加详细日志 # 添加详细日志
self.logger.info(f"Received batch read request: {json.dumps(requests)}") self.logger.info(f"Received batch read request: {json.dumps(requests)}")
results = self.cache_manager.batch_read(requests) results = self.cache_manager.batch_read(requests)
return jsonify(results) return jsonify(results)
except Exception as e: except Exception as e:
self.logger.error(f"Batch read error: {str(e)}", exc_info=True) self.logger.error(f"Batch read error: {str(e)}", exc_info=True)
@ -1252,6 +1422,43 @@ class APIServer:
"last_update_formatted": "N/A" "last_update_formatted": "N/A"
}), 500 }), 500
@self.app.route("/api/batch_write_bool", methods=["POST"], endpoint="batch_write_bool")
def batch_write_bool():
"""批量写入多个区域的数据"""
try:
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
self.logger.info(f"Received batch write request: {json.dumps(requests)}")
results = self.cache_manager.batch_write_bool(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch write error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 区域状态检查 # 区域状态检查
@self.app.route("/api/status/<plc_name>/<area_name>", methods=["GET"], endpoint="area_status") @self.app.route("/api/status/<plc_name>/<area_name>", methods=["GET"], endpoint="area_status")
def area_status(plc_name, area_name): def area_status(plc_name, area_name):

View File

@ -1,6 +1,7 @@
import threading import threading
import time import time
import logging import logging
from snap7.util import *
class CacheManager: class CacheManager:
"""PLC数据缓存管理器""" """PLC数据缓存管理器"""
@ -228,6 +229,45 @@ class CacheManager:
self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}")
return None, f"Read failed: {str(e)}", plc_status, 0 return None, f"Read failed: {str(e)}", plc_status, 0
def read_area_bool(self, plc_name, area_name, offset, length):
"""单个区域读取"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return None, "Area not found", "unknown", 0
if offset + length > area["size"]:
return None, "Offset out of bounds", "unknown", 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return None, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
data = client.read_db_bool(area["db_number"], area["offset"] + offset, length)
# 验证数据有效性
if all(isinstance(val, bool) for val in data.values()):
# 按字典键顺序更新多个值
for i, val in data.items():
area["data"][offset + i] = val # 确保offset+i不越界
# area["data"][offset] = data.values
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected"
return data, None, plc_status, update_time
else:
area["status"] = plc_status
return None, "Invalid data returned", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}")
return None, f"Read failed: {str(e)}", plc_status, 0
def write_area(self, plc_name, area_name, offset, data): def write_area(self, plc_name, area_name, offset, data):
"""单个区域写入""" """单个区域写入"""
with self.lock: with self.lock:
@ -251,6 +291,7 @@ class CacheManager:
return False, f"PLC not connected (status: {plc_status})", plc_status, 0 return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try: try:
print(data)
success = client.write_db(area["db_number"], area["offset"] + offset, data) success = client.write_db(area["db_number"], area["offset"] + offset, data)
if success: if success:
# 更新缓存中的这部分数据 # 更新缓存中的这部分数据
@ -269,124 +310,314 @@ class CacheManager:
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0 return False, f"Write failed: {str(e)}", plc_status, 0
def batch_write_area(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
print("data:", data)
for i, byte in enumerate(data):
print("i,byte:", i, byte)
byte_data = bytes([byte])
current_offset = offset + (i * 2)
byte_value = byte_data[0]
value = bytearray(2)
if isinstance(byte_value, int):
set_int(value, 0, byte_value)
data = value
print(area["db_number"], current_offset, data)
success = client.batch_write_db(area["db_number"], current_offset, data)
if success:
# 更新缓存中的这部分数据
for j in range(len(data)):
area["data"][offset + j] = data[j]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
return True, None, plc_status, update_time
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def batch_write_bool_area(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
value = bytearray(offset + 1)
for bit, bit_value in enumerate(data):
print("i,byte:", bit, bit_value)
set_bool(value, offset, bit, bit_value)
data = value
print(area["db_number"], offset, data)
success = client.batch_write_db_bool(area["db_number"], offset, data)
if success:
# 更新缓存中的这部分数据
for j in range(len(data)):
area["data"][offset + j] = data[j]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
return True, None, plc_status, update_time
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def write_area_bool(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
success = client.write_db_bool(area["db_number"], area["offset"] + offset, data)
if success:
# 更新缓存中的这部分数据
for i in range(len(data)):
area["data"][offset + i] = data[i]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
return True, None, plc_status, update_time
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def batch_read(self, requests): def batch_read(self, requests):
"""批量读取""" """批量读取"""
results = [] results = []
with self.lock:
for req in requests:
plc_name = req["plc_name"]
area_name = req["area_name"]
offset = req.get("offset", 0)
length = req.get("length", None)
# 获取PLC连接状态 for req in requests:
plc_status = self.plc_connection_status.get(plc_name, "unknown") plc_name = req["plc_name"]
area_name = req["area_name"]
offset = req.get("offset", 0)
length = req.get("length", None)
# 如果PLC连接,直接返回错误 # 获取PLC连接状态
if plc_status != "connected": plc_status = self.plc_connection_status.get(plc_name, "unknown")
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": 0,
"last_update_formatted": "N/A",
"message": f"PLC not connected (status: {plc_status})"
})
continue
area = self.cache.get(plc_name, {}).get(area_name) # 如果PLC未连接直接返回错误
if not area: if plc_status != "connected":
results.append({ results.append({
"plc_name": plc_name, "plc_name": plc_name,
"area_name": area_name, "area_name": area_name,
"status": "error", "status": "error",
"plc_connection_status": plc_status, "plc_connection_status": plc_status,
"last_update": 0, "last_update": 0,
"last_update_formatted": "N/A", "last_update_formatted": "N/A",
"message": "Area not found" "message": f"PLC not connected (status: {plc_status})"
}) })
continue continue
# 如果未指定length读取整个区域 area = self.cache.get(plc_name, {}).get(area_name)
if length is None: if not area:
length = area["size"] - offset results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": 0,
"last_update_formatted": "N/A",
"message": "Area not found"
})
continue
data, error, _, update_time = self.read_area(plc_name, area_name, offset, length) # 如果未指定length读取整个区域
if error: if length is None:
results.append({ length = area["size"] - offset
"plc_name": plc_name,
"area_name": area_name, data, error, _, update_time = self.read_area(plc_name, area_name, offset, length)
"status": "error", if error:
"plc_connection_status": plc_status, results.append({
"last_update": update_time, "plc_name": plc_name,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", "area_name": area_name,
"message": error "status": "error",
}) "plc_connection_status": plc_status,
else: "last_update": update_time,
results.append({ "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"plc_name": plc_name, "message": error
"area_name": area_name, })
"status": "success", else:
"plc_connection_status": plc_status, results.append({
"last_update": update_time, "plc_name": plc_name,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), "area_name": area_name,
"offset": offset, "status": "success",
"length": length, "plc_connection_status": plc_status,
"data": list(data) "last_update": update_time,
}) "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)),
"offset": offset,
"length": length,
"data": list(data)
})
return results return results
def batch_write(self, requests): def batch_write(self, requests):
"""批量写入""" """批量写入"""
results = [] results = []
with self.lock: for req in requests:
for req in requests: plc_name = req["plc_name"]
plc_name = req["plc_name"] area_name = req["area_name"]
area_name = req["area_name"] offset = req["offset"]
offset = req["offset"] data = bytes(req["data"])
data = bytes(req["data"])
# 获取PLC连接状态 # 获取PLC连接状态
plc_status = self.plc_connection_status.get(plc_name, "unknown") plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误 # 如果PLC未连接直接返回错误
if plc_status != "connected": if plc_status != "connected":
results.append({ results.append({
"plc_name": plc_name, "plc_name": plc_name,
"area_name": area_name, "area_name": area_name,
"status": "error", "status": "error",
"plc_connection_status": plc_status, "plc_connection_status": plc_status,
"last_update": 0, "last_update": 0,
"last_update_formatted": "N/A", "last_update_formatted": "N/A",
"message": f"PLC not connected (status: {plc_status})", "message": f"PLC not connected (status: {plc_status})",
"offset": offset "offset": offset
}) })
continue continue
success, error, _, update_time = self.batch_write_area(plc_name, area_name, offset, data)
if error:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"message": error,
"offset": offset
})
else:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "success",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)),
"offset": offset,
"length": len(data)
})
return results
success, error, _, update_time = self.write_area(plc_name, area_name, offset, data) def batch_write_bool(self, requests):
if error: """批量写入"""
results.append({ results = []
"plc_name": plc_name, for req in requests:
"area_name": area_name, plc_name = req["plc_name"]
"status": "error", area_name = req["area_name"]
"plc_connection_status": plc_status, offset = req["offset"]
"last_update": update_time, data = bytes(req["data"])
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"message": error, # 获取PLC连接状态
"offset": offset plc_status = self.plc_connection_status.get(plc_name, "unknown")
})
else: # 如果PLC未连接直接返回错误
results.append({ if plc_status != "connected":
"plc_name": plc_name, results.append({
"area_name": area_name, "plc_name": plc_name,
"status": "success", "area_name": area_name,
"plc_connection_status": plc_status, "status": "error",
"last_update": update_time, "plc_connection_status": plc_status,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), "last_update": 0,
"offset": offset, "last_update_formatted": "N/A",
"length": len(data) "message": f"PLC not connected (status: {plc_status})",
}) "offset": offset
})
continue
success, error, _, update_time = self.batch_write_bool_area(plc_name, area_name, offset, data)
if error:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"message": error,
"offset": offset
})
else:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "success",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)),
"offset": offset,
"length": len(data)
})
return results return results
def get_parsed_data(self, plc_name, area_name): def get_parsed_data(self, plc_name, area_name):

View File

@ -2,6 +2,8 @@ import snap7
import logging import logging
from threading import Lock from threading import Lock
import time import time
from snap7.util import *
import ast
class Snap7Client: class Snap7Client:
"""Snap7客户端处理与PLC的通信""" """Snap7客户端处理与PLC的通信"""
@ -34,8 +36,8 @@ class Snap7Client:
try: try:
# 尝试读取PLC的运行状态 # 尝试读取PLC的运行状态
cpu_state = self.client.get_cpu_state() cpu_state = self.client.get_cpu_state()
return cpu_state in [snap7.snap7types.cpu_statuses['S7CpuStatusRun'], print("当前 CPU 状态:", cpu_state)
snap7.snap7types.cpu_statuses['S7CpuStatusStop']] return cpu_state in ['S7CpuStatusRun', 'S7CpuStatusStop']
except: except:
return False return False
@ -51,6 +53,7 @@ class Snap7Client:
self.last_connect_attempt = current_time self.last_connect_attempt = current_time
try: try:
self.client.connect(self.ip, self.rack, self.slot) self.client.connect(self.ip, self.rack, self.slot)
# 验证连接是否真正有效 # 验证连接是否真正有效
if self.client.get_connected() and self.is_valid_connection(): if self.client.get_connected() and self.is_valid_connection():
self.connected = True self.connected = True
@ -88,7 +91,7 @@ class Snap7Client:
return None # 返回None而不是零填充数据 return None # 返回None而不是零填充数据
try: try:
with self.lock: with self.lock: # 进入锁,其他线程需等待
data = self.client.db_read(db_number, offset, size) data = self.client.db_read(db_number, offset, size)
# 验证返回数据的有效性 # 验证返回数据的有效性
if data is None or len(data) != size: if data is None or len(data) != size:
@ -101,6 +104,40 @@ class Snap7Client:
self.connected = False self.connected = False
return None return None
def read_db_bool(self, db_number, offset, bit_length):
"""
从 DB 块中读取一个字节,并提取其中的多个 BOOL 位
Args:
db_number (int): DB块编号
offset (int): 要读取的字节偏移地址
bit_length: 第几位如1表示第1位
Returns:
result:返回位值
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock:
data = self.client.db_read(db_number, offset, 1)
result = {}
for bit in range(bit_length):
result[bit] = bool(data[0] & (1 << bit))
if result is None or len(result) != bit_length:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {bit_length}, got {len(result) if data else 0})")
return None
return result
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def write_db(self, db_number, offset, data): def write_db(self, db_number, offset, data):
""" """
向DB块写入数据 向DB块写入数据
@ -108,7 +145,41 @@ class Snap7Client:
Args: Args:
db_number: DB编号 db_number: DB编号
offset: 起始偏移量 offset: 起始偏移量
要写入的数据 data: 要写入的数据
Returns:
bool: 是否写入成功
"""
values = int(data)
print("values:", values)
value = bytearray(0)
if isinstance(values, int):
set_int(value, offset, values)
data = value
print(data)
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns: Returns:
bool: 是否写入成功 bool: 是否写入成功
@ -126,3 +197,65 @@ class Snap7Client:
self.logger.error(f"Write DB{db_number} error: {e}") self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False self.connected = False
return False return False
def write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
data = data.decode('utf-8')
# 将字符串安全转换为字典
data_dict = ast.literal_eval(data) # 输出: {0: True}
# value = bytearray(1)
value = bytearray(offset+1)
for bit, val in data_dict.items():
set_bool(value, offset, bit, val)
data = value
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False