V3.0 增加通用读写接口,待修复稳定性问题

This commit is contained in:
2025-08-24 23:45:24 +08:00
commit e43c5176bf
36 changed files with 4915 additions and 0 deletions

3
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# 默认忽略的文件
/shelf/
/workspace.xml

12
.idea/gateway_V3_20250818.iml generated Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="gateway" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyRedundantParenthesesInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true" />
</profile>
</component>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="gateway" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/gateway_V3_20250818.iml" filepath="$PROJECT_DIR$/.idea/gateway_V3_20250818.iml" />
</modules>
</component>
</project>

52
config/config.json Normal file
View File

@ -0,0 +1,52 @@
{
"plcs": [
{
"name": "PLC1",
"ip": "192.168.0.1",
"rack": 0,
"slot": 1,
"refresh_interval": 0.5,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 6000,
"structure": [
{
"name": "temperature",
"type": "real",
"offset": 0
},
{
"name": "pressure",
"type": "int",
"offset": 4
},
{
"name": "status",
"type": "bool",
"offset": 6,
"bit": 0
}
]
},
{
"name": "DB100_Write",
"type": "write",
"db_number": 100,
"offset": 0,
"size": 6000
},
{
"name": "DB202_Params",
"type": "read_write",
"db_number": 202,
"offset": 0,
"size": 816
}
]
}
]
}

52
config/config.json.bak Normal file
View File

@ -0,0 +1,52 @@
{
"plcs": [
{
"name": "PLC1",
"ip": "192.168.0.1",
"rack": 0,
"slot": 1,
"refresh_interval": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 5000,
"structure": [
{
"name": "temperature",
"type": "real",
"offset": 0
},
{
"name": "pressure",
"type": "int",
"offset": 4
},
{
"name": "status",
"type": "bool",
"offset": 6,
"bit": 0
}
]
},
{
"name": "DB100_Write",
"type": "write",
"db_number": 100,
"offset": 0,
"size": 5000
},
{
"name": "DB202_Params",
"type": "read_write",
"db_number": 202,
"offset": 0,
"size": 816
}
]
}
]
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

1602
gateway/api_server.py Normal file

File diff suppressed because it is too large Load Diff

446
gateway/api_server_html.py Normal file
View File

@ -0,0 +1,446 @@
from flask import Flask, jsonify, request, render_template_string, Response, render_template
import threading
import time
import json
from functools import wraps
from config_manager import ConfigManager
import logging
class APIServer:
"""REST API服务器提供PLC数据访问和配置管理功能"""
def __init__(self, cache_manager, config_path="config/config.json"):
"""
初始化API服务器
Args:
cache_manager: 缓存管理器实例
config_path: 配置文件路径
"""
self.cache_manager = cache_manager
self.config_manager = ConfigManager(config_path)
self.app = Flask(__name__)
self.logger = logging.getLogger("APIServer")
self.auth_enabled = True # 可通过配置关闭认证
self.username = "admin"
self.password = "admin123" # 实际应用中应从安全存储获取
self.start_time = time.strftime("%Y-%m-%d %H:%M:%S")
# 在初始化方法中调用 setup_routes
self.setup_routes()
def check_auth(self, username, password):
"""验证用户名和密码"""
return username == self.username and password == self.password
def authenticate(self):
"""发送401响应要求认证"""
return Response(
"Unauthorized",
401,
{"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'}
)
def requires_auth(self, f):
"""装饰器:需要认证的路由,保留函数元数据"""
@wraps(f)
def decorated(*args, **kwargs):
if not self.auth_enabled:
return f(*args, **kwargs)
auth = request.authorization
if not auth or not self.check_auth(auth.username, auth.password):
return self.authenticate()
return f(*args, **kwargs)
return decorated
def get_summary(self):
"""获取缓存摘要信息"""
summary = {}
for plc_name, areas in self.cache_manager.cache.items():
summary[plc_name] = {}
for area_name, area in areas.items():
last_update = self.cache_manager.last_update[plc_name][area_name]
plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown")
summary[plc_name][area_name] = {
"status": area["status"],
"plc_connection_status": plc_status,
"last_update": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
return summary
def setup_routes(self):
"""设置所有API路由"""
# ===========================
# 主页面 - 状态摘要
# ===========================
@self.app.route("/", endpoint="index")
def index():
summary = self.get_summary()
return render_template(
"status.html", # 模板文件名
start_time=self.start_time,
summary=summary,
plc_statuses=self.cache_manager.plc_connection_status
)
# ===========================
# 系统状态API
# ===========================
@self.app.route("/api/status", endpoint="system_status")
def system_status():
"""获取系统状态信息"""
plc_statuses = {}
for plc_name in self.cache_manager.plc_connection_status:
plc_statuses[plc_name] = {
"status": self.cache_manager.plc_connection_status[plc_name],
"last_connected": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(self.cache_manager.plc_last_connected[plc_name]))
if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never"
}
return jsonify({
"status": "running",
"start_time": self.start_time,
"plc_count": len(self.config_manager.get_config().get("plcs", [])),
"cache_size": sum(
len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()),
"plc_statuses": plc_statuses
})
# ===========================
# 配置管理相关路由
# ===========================
@self.app.route("/config", endpoint="config_page")
@self.requires_auth
def config_page():
"""配置编辑页面"""
config = self.config_manager.get_config()
config_json = json.dumps(config, indent=2)
return render_template(
'config.html',
config_json=config_json,
username=self.username,
password=self.password
)
# 配置验证API
@self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config")
@self.requires_auth
def validate_config():
"""验证配置是否有效"""
try:
config = request.json
is_valid, error = self.config_manager.validate_config(config)
if is_valid:
return jsonify({"valid": True})
else:
return jsonify({"valid": False, "message": error}), 400
except Exception as e:
return jsonify({"valid": False, "message": str(e)}), 400
# 配置获取API
@self.app.route("/api/config", methods=["GET"], endpoint="get_config")
@self.requires_auth
def get_config():
"""获取当前配置"""
return jsonify(self.config_manager.get_config())
# 配置保存API
@self.app.route("/api/config", methods=["POST"], endpoint="save_config")
@self.requires_auth
def save_config():
"""保存配置"""
try:
config = request.json
reload = request.args.get('reload', 'false').lower() == 'true'
success, message = self.config_manager.save_config(config)
if success:
if reload:
# 通知主应用程序重载配置
if hasattr(self.cache_manager, 'app') and self.cache_manager.app:
self.cache_manager.app.request_reload()
return jsonify({
"success": True,
"message": "Configuration saved and reload requested"
})
else:
return jsonify({
"success": True,
"message": "Configuration saved successfully (restart to apply changes)"
})
else:
return jsonify({
"success": False,
"message": message
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": f"Error saving config: {str(e)}"
}), 500
# ===========================
# 新增 API 文档接口
# ===========================
@self.app.route("/api/doc", endpoint="api_doc")
def api_doc():
"""API文档页面"""
return render_template('api_doc.html')
# ===========================
# 数据访问API
# ===========================
# 单个读取接口
@self.app.route("/api/read/<plc_name>/<area_name>/<int:offset>/<int:length>", methods=["GET"],
endpoint="single_read")
def single_read(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": list(data),
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个读取BOOL类型接口
@self.app.route("/api/read_bool/<plc_name>/<area_name>/<int:offset>/<int:length>", methods=["GET"],
endpoint="single_read_bool")
def single_read_bool(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset,
length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": [data], # list(data)
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入接口
@self.app.route("/api/write/<plc_name>/<area_name>/<int:offset>", methods=["POST"], endpoint="single_write")
def single_write(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": len(data),
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入BOOL类型接口
@self.app.route("/api/write_bool/<plc_name>/<area_name>/<int:offset>", methods=["POST"],
endpoint="single_write_bool")
def single_write_bool(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset,
data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": 1,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 批量读取接口
@self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read")
def batch_read():
"""批量读取多个区域的数据"""
try:
# 确保是JSON请求
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
# 添加详细日志
self.logger.info(f"Received batch read request: {json.dumps(requests)}")
results = self.cache_manager.batch_read(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch read error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 批量写入接口
@self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write")
def batch_write():
"""批量写入多个区域的数据"""
try:
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
self.logger.info(f"Received batch write request: {json.dumps(requests)}")
results = self.cache_manager.batch_write(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch write error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 区域状态检查
@self.app.route("/api/status/<plc_name>/<area_name>", methods=["GET"], endpoint="area_status")
def area_status(plc_name, area_name):
"""获取区域状态"""
status = self.cache_manager.get_area_status(plc_name, area_name)
return jsonify(status)
# 获取解析后的数据
@self.app.route("/api/data/<plc_name>/<area_name>", methods=["GET"], endpoint="get_parsed_data")
def get_parsed_data(plc_name, area_name):
"""获取解析后的数据"""
return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name))
def start(self):
"""启动API服务器"""
self.server_thread = threading.Thread(
target=self.app.run,
kwargs={
"host": "0.0.0.0",
"port": 5000,
"threaded": True,
"use_reloader": False # 避免在生产环境中使用重载器
},
daemon=True,
name="APIServerThread"
)
self.server_thread.start()
self.logger.info("API server started at http://0.0.0.0:5000")

877
gateway/cache_manager.py Normal file
View File

@ -0,0 +1,877 @@
import threading
import time
import logging
from snap7.util import *
import struct
class CacheManager:
"""PLC数据缓存管理器"""
def __init__(self, config, plc_manager, app=None):
"""
初始化缓存管理器
Args:
config: 配置对象
plc_manager: PLC管理器实例
app: 主应用程序引用(用于配置重载)
"""
self.plc_manager = plc_manager
self.config = config
self.app = app
self.cache = {}
self.refresh_interval = 1 # 1秒刷新一次
self.running = False
self.lock = threading.Lock()
self.thread = None
self.last_update = {} # 区域级最后更新时间
self.plc_last_connected = {} # PLC级最后连接时间
self.plc_connection_status = {} # PLC连接状态
self.logger = logging.getLogger("CacheManager")
self.init_cache()
def init_cache(self):
"""初始化缓存结构"""
for plc in self.config["plcs"]:
plc_name = plc["name"]
self.cache[plc_name] = {}
self.last_update[plc_name] = {}
self.plc_last_connected[plc_name] = 0 # 初始化为0未连接
self.plc_connection_status[plc_name] = "never_connected"
for area in plc["areas"]:
name = area["name"]
# 确保初始状态为断开
self.cache[plc_name][name] = {
"data": bytearray(area["size"]),
"db_number": area["db_number"],
"offset": area["offset"],
"size": area["size"],
"type": area["type"],
"structure": area.get("structure", []),
"status": "disconnected" # 初始状态为断开
}
self.last_update[plc_name][name] = 0
def refresh_cache(self):
"""后台线程:定期刷新缓存"""
while self.running:
start_time = time.time()
try:
for plc in self.config["plcs"]:
plc_name = plc["name"]
refresh_interval = plc.get("refresh_interval", 0.5)
client = self.plc_manager.get_plc(plc_name)
# 检查PLC连接状态
plc_connected = client.connected
# 更新PLC连接状态
with self.lock:
if plc_connected:
self.plc_last_connected[plc_name] = time.time()
self.plc_connection_status[plc_name] = "connected"
else:
if self.plc_last_connected[plc_name] == 0:
self.plc_connection_status[plc_name] = "never_connected"
else:
self.plc_connection_status[plc_name] = "disconnected"
# 刷新所有可读区域
for area in plc["areas"]:
if area["type"] in ["read", "read_write"]:
name = area["name"]
try:
data = client.read_db(area["db_number"], area["offset"], area["size"])
# 更新区域状态基于PLC连接状态和读取结果
with self.lock:
if plc_connected and data and len(data) == area["size"]:
self.cache[plc_name][name]["data"] = bytearray(data)
self.cache[plc_name][name]["status"] = "connected"
self.last_update[plc_name][name] = time.time()
else:
self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name]
# 如果之前有数据,保留旧数据但标记状态
if self.last_update[plc_name][name] > 0:
self.logger.info(f"PLC {plc_name} area {name} disconnected but keeping last valid data")
except Exception as e:
with self.lock:
self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name]
self.logger.warning(f"Error updating status for {plc_name}/{name}: {e}")
"""计算刷新一个PLC的时间"""
# 计算实际执行时间
execution_time = time.time() - start_time
#计算需要睡眠的时间确保总等于refresh_time
sleep_time = max(0, refresh_interval - execution_time)
time.sleep(sleep_time)
# 记录实际刷新间隔
self.logger.debug(f"plc_name: {plc_name},"
f"Cache refresh completed.Execution time: {execution_time:.3f}s,"
f"Sleep time: {sleep_time:.3f}s,"
f"Total interval: {execution_time + sleep_time:.3f}s")
time.sleep(self.refresh_interval)
except Exception as e:
self.logger.error(f"Error in refresh_cache: {e}")
time.sleep(self.refresh_interval)
def start(self):
"""启动缓存刷新线程"""
if self.running:
return
self.running = True
self.thread = threading.Thread(
target=self.refresh_cache,
name="CacheRefreshThread",
daemon=True
)
self.thread.start()
self.logger.info("Cache manager started")
def stop(self):
"""停止缓存刷新线程"""
if not self.running:
return
self.running = False
if self.thread:
# 等待线程结束,但设置超时防止卡死
self.thread.join(timeout=2.0)
if self.thread.is_alive():
self.logger.warning("Cache refresh thread did not terminate gracefully")
self.thread = None
self.logger.info("Cache manager stopped")
def get_plc_connection_status(self, plc_name):
"""获取PLC连接状态"""
with self.lock:
return self.plc_connection_status.get(plc_name, "unknown")
def get_last_update_time(self, plc_name, area_name):
"""获取区域数据最后更新时间"""
with self.lock:
return self.last_update.get(plc_name, {}).get(area_name, 0)
def get_summary(self):
"""获取缓存摘要信息"""
summary = {}
with self.lock:
for plc_name, areas in self.cache.items():
summary[plc_name] = {}
for area_name, area in areas.items():
last_update = self.last_update[plc_name][area_name]
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 区域状态应与PLC连接状态一致除非有有效数据
area_status = area["status"]
if plc_status == "never_connected":
area_status = "never_connected"
elif plc_status == "disconnected" and self.last_update[plc_name][area_name] == 0:
area_status = "disconnected"
summary[plc_name][area_name] = {
"status": area_status,
"plc_connection_status": plc_status,
"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
return summary
def get_area_status(self, plc_name, area_name):
"""获取区域状态"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return {"status": "not_found", "message": "PLC or area not found"}
plc_status = self.plc_connection_status.get(plc_name, "unknown")
last_update = self.last_update.get(plc_name, {}).get(area_name, 0)
# 区域状态应与PLC连接状态一致除非有有效数据
area_status = area["status"]
if plc_status == "never_connected":
area_status = "never_connected"
elif plc_status == "disconnected" and last_update == 0:
area_status = "disconnected"
return {
"status": area_status,
"plc_connection_status": plc_status,
"last_update": last_update,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
def read_area(self, plc_name, area_name, offset, length):
"""单个区域读取"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
print("read area :",area)
if not area:
return None, "Area not found", "unknown", 0
if offset + length > area["size"]:
return None, "Offset out of bounds", "unknown", 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return None, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
data = client.read_db(area["db_number"], area["offset"] + offset, length)
# 验证数据有效性
if data and len(data) == length:
# 更新缓存中的这部分数据
for i in range(length):
area["data"][offset + i] = data[i]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected"
return data, None, plc_status, update_time
else:
area["status"] = plc_status
return None, "Invalid data returned", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}")
return None, f"Read failed: {str(e)}", plc_status, 0
def read_area_bool(self, plc_name, area_name, offset, length):
"""单个区域读取"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return None, "Area not found", "unknown", 0
if offset + length > area["size"]:
return None, "Offset out of bounds", "unknown", 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return None, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
data = client.read_db_bool(area["db_number"], area["offset"] + offset, length)
# 验证数据有效性
if all(isinstance(val, bool) for val in data.values()):
# 按字典键顺序更新多个值
for i, val in data.items():
area["data"][offset + i] = val # 确保offset+i不越界
# area["data"][offset] = data.values
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected"
return data, None, plc_status, update_time
else:
area["status"] = plc_status
return None, "Invalid data returned", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}")
return None, f"Read failed: {str(e)}", plc_status, 0
def write_area(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
success = client.write_db(area["db_number"], area["offset"] + offset, data)
if success:
# 更新缓存中的这部分数据
for i in range(len(data)):
area["data"][offset + i] = data[i]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
return True, None, plc_status, update_time
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def batch_write_area(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
for i, byte in enumerate(data):
byte_data = bytes([byte])
current_offset = offset + (i * 2)
byte_value = byte_data[0]
value = bytearray(2)
if isinstance(byte_value, int):
set_int(value, 0, byte_value)
data = value
success = client.batch_write_db(area["db_number"], current_offset, data)
if success:
# 更新缓存中的这部分数据
for j in range(len(data)):
area["data"][offset + j] = data[j]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
return True, None, plc_status, update_time
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def batch_write_bool_area(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
value = bytearray(offset + 1)
for bit, bit_value in enumerate(data):
set_bool(value, offset, bit, bit_value)
data = value
success = client.batch_write_db_bool(area["db_number"], offset, data)
if success:
# 更新缓存中的这部分数据
for j in range(len(data)):
area["data"][offset + j] = data[j]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
return True, None, plc_status, update_time
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def write_area_bool(self, plc_name, area_name, offset, data):
"""单个区域写入"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
if offset + len(data) > area["size"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
success = client.write_db_bool(area["db_number"], area["offset"] + offset, data)
if success:
# 更新缓存中的这部分数据
for i in range(len(data)):
area["data"][offset + i] = data[i]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
return True, None, plc_status, update_time
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def batch_read(self, requests):
"""批量读取"""
results = []
for req in requests:
plc_name = req["plc_name"]
area_name = req["area_name"]
offset = req.get("offset", 0)
length = req.get("length", None)
# 获取PLC连接状态
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": 0,
"last_update_formatted": "N/A",
"message": f"PLC not connected (status: {plc_status})"
})
continue
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": 0,
"last_update_formatted": "N/A",
"message": "Area not found"
})
continue
# 如果未指定length读取整个区域
if length is None:
length = area["size"] - offset
data, error, _, update_time = self.read_area(plc_name, area_name, offset, length)
if error:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"message": error
})
else:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "success",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)),
"offset": offset,
"length": length,
"data": list(data)
})
return results
def batch_write(self, requests):
"""批量写入"""
results = []
for req in requests:
plc_name = req["plc_name"]
area_name = req["area_name"]
offset = req["offset"]
data = bytes(req["data"])
# 获取PLC连接状态
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": 0,
"last_update_formatted": "N/A",
"message": f"PLC not connected (status: {plc_status})",
"offset": offset
})
continue
success, error, _, update_time = self.batch_write_area(plc_name, area_name, offset, data)
if error:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"message": error,
"offset": offset
})
else:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "success",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)),
"offset": offset,
"length": len(data)
})
return results
def batch_write_bool(self, requests):
"""批量写入"""
results = []
for req in requests:
plc_name = req["plc_name"]
area_name = req["area_name"]
offset = req["offset"]
data = bytes(req["data"])
# 获取PLC连接状态
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": 0,
"last_update_formatted": "N/A",
"message": f"PLC not connected (status: {plc_status})",
"offset": offset
})
continue
success, error, _, update_time = self.batch_write_bool_area(plc_name, area_name, offset, data)
if error:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "error",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never",
"message": error,
"offset": offset
})
else:
results.append({
"plc_name": plc_name,
"area_name": area_name,
"status": "success",
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)),
"offset": offset,
"length": len(data)
})
return results
def read_generic(self, plc_name, area_name, offset, data_type, count=1):
"""通用读取接口"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
print("area:",area)
if not area:
return None, "Area not found", "unknown", 0
if area["type"] not in ["read", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return None, "Area is read-only", plc_status, 0
# 计算实际DB偏移
db_offset = area["offset"] + offset
# 确保在区域内
if data_type == 'bool':
required_size = (offset + count + 7) // 8
elif data_type in ['int', 'word']:
required_size = 2 * count
elif data_type in ['dint', 'dword', 'real']:
required_size = 4 * count
else: # byte
required_size = count
if db_offset + required_size > area["size"] or db_offset < 0:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return None, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return None, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
# 使用Snap7Client的read_generic方法
result = client.read_generic(area["db_number"], db_offset, data_type, count)
if result is None:
return None, "Read failed", plc_status, 0
# 对于bool类型需要特殊处理缓存
if data_type == 'bool':
for i in range(count):
byte_offset = offset // 8 + i // 8
bit_offset = (offset % 8) + (i % 8)
if bit_offset >= 8:
byte_offset += 1
bit_offset -= 8
# 读取当前字节值
current_byte = area["data"][byte_offset]
if result[i]:
# 设置位为1
new_byte = current_byte | (1 << bit_offset)
else:
# 设置位为0
new_byte = current_byte & ~(1 << bit_offset)
area["data"][byte_offset] = new_byte
else:
# 对于其他类型,直接更新缓存
if not isinstance(result, list):
result = [result]
if data_type == 'byte':
item_size = 1
elif data_type in ['int', 'word']:
item_size = 2
else: # dint, dword, real
item_size = 4
for i, val in enumerate(result):
item_offset = offset + i * item_size
if data_type == 'byte':
area["data"][item_offset] = val & 0xFF
elif data_type in ['int', 'word']:
# 2字节数据
packed = struct.pack(">h" if data_type == "int" else ">H", val)
for j in range(2):
area["data"][item_offset + j] = packed[j]
elif data_type in ['dint', 'dword', 'real']:
# 4字节数据
packed = struct.pack(
">l" if data_type == "dint" else
">I" if data_type == "dword" else
">f",
val
)
for j in range(4):
area["data"][item_offset + j] = packed[j]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected"
return result, None, plc_status, update_time
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}")
return None, f"Read failed: {str(e)}", plc_status, 0
def write_generic(self, plc_name, area_name, offset, data_type, value):
"""通用写入接口"""
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return False, "Area not found", "unknown", 0
if area["type"] not in ["write", "read_write"]:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Area is read-only", plc_status, 0
# 计算实际DB偏移
db_offset = area["offset"] + offset
# 确保在区域内
if data_type == 'bool':
# 确定存储这些布尔值需要多少字节。
required_size = (offset + (len(value) if isinstance(value, list) else 1) + 7) // 8
elif data_type in ['int', 'word']:
required_size = 2 * (len(value) if isinstance(value, list) else 1)
elif data_type in ['dint', 'dword', 'real']:
required_size = 4 * (len(value) if isinstance(value, list) else 1)
else: # byte
required_size = len(value) if isinstance(value, list) else 1
if db_offset + required_size > area["size"] or db_offset < 0:
plc_status = self.plc_connection_status.get(plc_name, "unknown")
return False, "Offset out of bounds", plc_status, 0
client = self.plc_manager.get_plc(plc_name)
plc_status = self.plc_connection_status.get(plc_name, "unknown")
# 如果PLC未连接直接返回错误
if plc_status != "connected":
return False, f"PLC not connected (status: {plc_status})", plc_status, 0
try:
# 使用Snap7Client的write_generic方法
success = client.write_generic(area["db_number"], db_offset, data_type, value)
if success:
# 根据数据类型更新缓存
if data_type == 'bool':
# 处理bool写入
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
byte_offset = offset // 8 + i // 8
bit_offset = (offset % 8) + (i % 8)
if bit_offset >= 8:
byte_offset += 1
bit_offset -= 8
# 读取当前字节值
current_byte = area["data"][byte_offset]
if val:
# 设置位为1
new_byte = current_byte | (1 << bit_offset)
else:
# 设置位为0
new_byte = current_byte & ~(1 << bit_offset)
area["data"][byte_offset] = new_byte
elif data_type == 'byte':
# 处理byte写入
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
area["data"][offset + i] = val & 0xFF
elif data_type in ['int', 'word']:
# 处理int/word写入
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
# 2字节数据
packed = struct.pack(">h" if data_type == "int" else ">H", val)
for j in range(2):
area["data"][offset + i * 2 + j] = packed[j]
elif data_type in ['dint', 'dword', 'real']:
# 处理dint/dword/real写入
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
# 4字节数据
packed = struct.pack(
">l" if data_type == "dint" else
">I" if data_type == "dword" else
">f",
val
)
for j in range(4):
area["data"][offset + i * 4 + j] = packed[j]
update_time = time.time()
self.last_update[plc_name][area_name] = update_time
area["status"] = "connected (last write)"
return True, None, plc_status, update_time
else:
area["status"] = plc_status
return False, "Write failed", plc_status, 0
except Exception as e:
area["status"] = plc_status
self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}")
return False, f"Write failed: {str(e)}", plc_status, 0
def get_parsed_data(self, plc_name, area_name):
"""获取解析后的数据"""
from data_parser import parse_data
with self.lock:
area = self.cache.get(plc_name, {}).get(area_name)
if not area:
return {"error": "Area not found"}
plc_status = self.plc_connection_status.get(plc_name, "unknown")
last_update = self.last_update.get(plc_name, {}).get(area_name, 0)
# 区域状态应与PLC连接状态一致除非有有效数据
area_status = area["status"]
if plc_status == "never_connected":
area_status = "never_connected"
elif plc_status == "disconnected" and last_update == 0:
area_status = "disconnected"
structure = area.get("structure", [])
if structure:
parsed = parse_data(area["data"], structure)
parsed["status"] = area_status
parsed["plc_connection_status"] = plc_status
parsed["last_update"] = last_update
parsed["last_update_formatted"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never"
return parsed
else:
return {
"raw_data": list(area["data"]),
"status": area_status,
"plc_connection_status": plc_status,
"last_update": last_update,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never"
}

10
gateway/config_loader.py Normal file
View File

@ -0,0 +1,10 @@
import json
import os
def load_config(config_path="config/config.json"):
"""加载配置文件"""
if not os.path.exists(config_path):
raise FileNotFoundError(f"Configuration file not found: {config_path}")
with open(config_path, "r") as f:
return json.load(f)

87
gateway/config_manager.py Normal file
View File

@ -0,0 +1,87 @@
import json
import os
import logging
from config_validator import ConfigValidator
class ConfigManager:
"""配置文件管理器"""
def __init__(self, config_path="../config/config.json"):
self.config_path = config_path
self.config = None
self.load_config()
def load_config(self):
"""加载配置文件"""
try:
if not os.path.exists(self.config_path):
# 尝试从备份恢复
backup_path = self.config_path + ".bak"
if os.path.exists(backup_path):
logging.warning(f"Main config not found, using backup: {backup_path}")
with open(backup_path, 'r') as src, open(self.config_path, 'w') as dst:
config_data = src.read()
dst.write(config_data)
else:
raise FileNotFoundError(f"Configuration file not found: {self.config_path}")
with open(self.config_path, 'r') as f:
self.config = json.load(f)
# 验证配置
is_valid, error = ConfigValidator.validate_config(self.config)
if not is_valid:
logging.error(f"Invalid configuration: {error}")
# 尝试从备份恢复
backup_path = self.config_path + ".bak"
if os.path.exists(backup_path):
logging.warning("Attempting to load from backup configuration")
with open(backup_path, 'r') as f:
self.config = json.load(f)
is_valid, error = ConfigValidator.validate_config(self.config)
if not is_valid:
raise ValueError(f"Backup config also invalid: {error}")
else:
raise ValueError(f"Invalid configuration: {error}")
return True, None
except Exception as e:
logging.error(f"Failed to load config: {e}")
self.config = {"plcs": []}
return False, str(e)
def get_config(self):
"""获取当前配置"""
return self.config
def validate_config(self, config):
"""验证配置是否有效"""
return ConfigValidator.validate_config(config)
def save_config(self, new_config):
"""保存配置文件"""
try:
# 验证配置
is_valid, error = self.validate_config(new_config)
if not is_valid:
return False, f"Invalid configuration: {error}"
# 备份旧配置
backup_path = self.config_path + ".bak"
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as src, open(backup_path, 'w') as dst:
dst.write(src.read())
# 保存新配置
with open(self.config_path, 'w') as f:
json.dump(new_config, f, indent=2)
self.config = new_config
return True, None
except Exception as e:
logging.error(f"Failed to save config: {e}")
return False, str(e)
def reload_config(self):
"""重新加载配置"""
return self.load_config()

View File

@ -0,0 +1,91 @@
import json
from jsonschema import validate, Draft7Validator, FormatChecker
class ConfigValidator:
"""配置文件验证器"""
SCHEMA = {
"type": "object",
"properties": {
"plcs": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "minLength": 1},
"ip": {"type": "string", "format": "ipv4"},
"rack": {"type": "integer", "minimum": 0},
"slot": {"type": "integer", "minimum": 0},
"refresh_interval": {
"type": "number",
"minimum": 0.01,
"default": 0.5
},
"areas": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"name": {"type": "string", "minLength": 1},
"type": {"type": "string", "enum": ["read", "write", "read_write"]},
"db_number": {"type": "integer", "minimum": 1},
"offset": {"type": "integer", "minimum": 0},
"size": {"type": "integer", "minimum": 1},
"structure": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"type": {"type": "string", "enum": ["bool", "byte", "int", "dint", "real", "word", "dword"]},
"offset": {"type": "integer", "minimum": 0},
"bit": {"type": "integer", "minimum": 0, "maximum": 7} # 修复了这里
},
"required": ["name", "type", "offset"]
}
}
},
"required": ["name", "type", "db_number", "offset", "size"]
}
}
},
"required": ["name", "ip", "rack", "slot", "areas"]
}
}
},
"required": ["plcs"]
}
"""
@staticmethod:静态方法装饰器使validate_config不依赖于类的实例可以通过类直接调用
如ConfigValidator.validate_config(config)
"""
@staticmethod
def validate_config(config):
"""使用JSONSchema验证配置是否符合规范"""
try:
# 添加IPv4格式验证
validator = Draft7Validator(
ConfigValidator.SCHEMA,
format_checker=FormatChecker(["ipv4"])
)
validator.validate(config)
return True, None
except Exception as e:
return False, str(e)
@staticmethod
def is_valid_ip(ip):
"""验证IP地址格式"""
parts = ip.split('.')
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
num = int(part)
if num < 0 or num > 255:
return False
return True

55
gateway/data_parser.py Normal file
View File

@ -0,0 +1,55 @@
from struct import unpack
import time
def parse_data(data, structure):
"""解析结构化数据"""
result = {"raw_data": list(data)}
if not structure:
return result
result["parsed"] = {}
for field in structure:
offset = field["offset"]
name = field["name"]
data_type = field["type"]
try:
if data_type == "int":
if offset + 2 > len(data):
raise ValueError("Offset out of bounds")
val = unpack(">h", data[offset:offset+2])[0]
elif data_type == "dint":
if offset + 4 > len(data):
raise ValueError("Offset out of bounds")
val = unpack(">l", data[offset:offset+4])[0]
elif data_type == "real":
if offset + 4 > len(data):
raise ValueError("Offset out of bounds")
val = unpack(">f", data[offset:offset+4])[0]
elif data_type == "bool":
bit = field.get("bit", 0)
if offset >= len(data):
raise ValueError("Offset out of bounds")
byte = data[offset]
val = bool((byte >> bit) & 1)
elif data_type == "byte":
if offset >= len(data):
raise ValueError("Offset out of bounds")
val = data[offset]
elif data_type == "word":
if offset + 2 > len(data):
raise ValueError("Offset out of bounds")
val = (data[offset] << 8) | data[offset + 1]
elif data_type == "dword":
if offset + 4 > len(data):
raise ValueError("Offset out of bounds")
val = (data[offset] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | data[offset+3]
else:
val = f"Unknown type: {data_type}"
result["parsed"][name] = val
except Exception as e:
result["parsed"][name] = f"Error: {str(e)}"
return result

109
gateway/main.py Normal file
View File

@ -0,0 +1,109 @@
import logging
import time
import threading
from config_loader import load_config
from plc_manager import PLCManager
from cache_manager import CacheManager
from api_server import APIServer
from config_manager import ConfigManager
class GatewayApp:
"""PLC网关应用程序主类"""
def __init__(self, config_path="../config/config.json"):
self.config_path = config_path
self.config_manager = ConfigManager(config_path)
self.plc_manager = None
self.cache_manager = None
self.api_server = None
self.reload_flag = False
self.reload_lock = threading.Lock()
self.logger = logging.getLogger("GatewayApp")
# 加载初始配置
self.load_configuration()
def load_configuration(self):
"""加载配置并初始化组件"""
# 加载配置
if not self.config_manager.load_config():
self.logger.error("Failed to load initial configuration")
return False
config = self.config_manager.get_config()
# 重新初始化PLC连接
if self.plc_manager:
self.logger.info("Reinitializing PLC connections...")
self.plc_manager = PLCManager(config["plcs"])
self.plc_manager.connect_all()
# 重新初始化缓存
if self.cache_manager:
self.logger.info("Stopping existing cache manager...")
self.cache_manager.stop()
self.logger.info("Initializing cache manager...")
self.cache_manager = CacheManager(config, self.plc_manager, app=self)
self.cache_manager.start()
# 重新初始化API服务器
if self.api_server:
self.logger.info("API server already running")
else:
self.logger.info("Starting API server...")
self.api_server = APIServer(self.cache_manager, self.config_path)
self.api_server.start()
self.logger.info("Configuration loaded successfully")
return True
def check_for_reload(self):
"""检查是否需要重载配置"""
while True:
with self.reload_lock:
if self.reload_flag:
self.reload_flag = False
self.load_configuration()
time.sleep(1)
def request_reload(self):
"""请求重载配置"""
with self.reload_lock:
self.reload_flag = True
self.logger.info("Configuration reload requested")
def run(self):
"""运行主程序"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger.info("Starting PLC Gateway...")
# 启动配置重载检查线程
reload_thread = threading.Thread(
target=self.check_for_reload,
name="ConfigReloadThread",
daemon=True
)
reload_thread.start()
try:
# 保持主程序运行
while True:
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Shutting down gracefully...")
finally:
if self.cache_manager:
self.cache_manager.stop()
self.logger.info("Shutdown complete")
def main():
app = GatewayApp()
app.run()
if __name__ == "__main__":
main()

42
gateway/plc_manager.py Normal file
View File

@ -0,0 +1,42 @@
from snap7_client import Snap7Client
import logging
class PLCManager:
"""PLC连接管理器管理多个PLC连接"""
def __init__(self, plcs_config):
"""
初始化PLC管理器
Args:
plcs_config: PLC配置列表
"""
self.plcs = {}
for plc_config in plcs_config:
name = plc_config["name"]
self.plcs[name] = Snap7Client(
plc_config["ip"],
plc_config["rack"],
plc_config["slot"]
)
self.logger = logging.getLogger("PLCManager")
def get_plc(self, name):
"""获取指定名称的PLC客户端"""
return self.plcs.get(name)
def connect_all(self):
"""连接所有配置的PLC"""
for name, client in self.plcs.items():
client.connect()
def get_connection_status(self):
"""获取所有PLC的连接状态"""
status = {}
for name, client in self.plcs.items():
status[name] = {
"ip": client.ip,
"connected": client.connected,
"retry_count": client.retry_count
}
return status

448
gateway/snap7_client.py Normal file
View File

@ -0,0 +1,448 @@
import snap7
import logging
from threading import Lock
import time
from snap7.util import *
import ast
class Snap7Client:
"""Snap7客户端处理与PLC的通信"""
def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1):
"""
初始化Snap7客户端
Args:
ip: PLC IP地址
rack: Rack编号
slot: Slot编号
max_retries: 最大重试次数
retry_base_delay: 基础重试延迟(秒)
"""
self.ip = ip
self.rack = rack
self.slot = slot
self.client = snap7.client.Client()
self.lock = Lock()
self.connected = False
self.max_retries = max_retries
self.retry_base_delay = retry_base_delay
self.last_connect_attempt = 0
self.retry_count = 0
self.logger = logging.getLogger(f"Snap7Client[{ip}]")
def is_valid_connection(self):
"""检查连接是否真正有效"""
try:
# 尝试读取PLC的运行状态
cpu_state = self.client.get_cpu_state()
print("当前 CPU 状态:", cpu_state)
return cpu_state in ['S7CpuStatusRun', 'S7CpuStatusStop']
except:
return False
def connect(self):
"""建立与PLC的连接并验证"""
current_time = time.time()
# 指数退避重试
if self.retry_count > 0:
delay = min(self.retry_base_delay * (2 ** (self.retry_count - 1)), 30)
if current_time - self.last_connect_attempt < delay:
return False # 未到重试时间
self.last_connect_attempt = current_time
try:
self.client.connect(self.ip, self.rack, self.slot)
# 验证连接是否真正有效
if self.client.get_connected() and self.is_valid_connection():
self.connected = True
self.retry_count = 0 # 重置重试计数
self.logger.info(f"Successfully connected to PLC {self.ip}")
return True
else:
self.connected = False
self.logger.warning(f"Connection to {self.ip} established but PLC is not responding")
try:
self.client.disconnect()
except:
pass
return False
except Exception as e:
self.retry_count = min(self.retry_count + 1, self.max_retries)
self.logger.error(f"Connection to {self.ip} failed (attempt {self.retry_count}/{self.max_retries}): {e}")
self.connected = False
return False
def read_db(self, db_number, offset, size):
"""
从DB块读取数据
Args:
db_number: DB编号
offset: 起始偏移量
size: 读取字节数
Returns:
bytearray: 读取的数据如果失败返回None
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock: # 进入锁,其他线程需等待
data = self.client.db_read(db_number, offset, size)
# 验证返回数据的有效性
if data is None or len(data) != size:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {size}, got {len(data) if data else 0})")
return None
return data
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def read_db_bool(self, db_number, offset, bit_length):
"""
从 DB 块中读取一个字节,并提取其中的多个 BOOL 位
Args:
db_number (int): DB块编号
offset (int): 要读取的字节偏移地址
bit_length: 第几位如1表示第1位
Returns:
result:返回位值
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None # 返回None而不是零填充数据
try:
with self.lock:
data = self.client.db_read(db_number, offset, 1)
result = {}
for bit in range(bit_length):
result[bit] = bool(data[0] & (1 << bit))
if result is None or len(result) != bit_length:
self.connected = False
self.logger.error(f"Read DB{db_number} returned invalid data size (expected {bit_length}, got {len(result) if data else 0})")
return None
return result
except Exception as e:
self.logger.error(f"Read DB{db_number} error: {e}")
self.connected = False
return None
def write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def batch_write_db_bool(self, db_number, offset, data):
"""
向DB块写入数据
Args:
db_number: DB编号
offset: 起始偏移量
data: 要写入的bool类型数据
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
with self.lock:
print(db_number, offset, data)
self.client.db_write(db_number, offset, data)
self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}")
return True
except Exception as e:
self.logger.error(f"Write DB{db_number} error: {e}")
self.connected = False
return False
def read_generic(self, db_number, offset, data_type, count=1):
"""
通用读取接口,支持多种数据类型
Args:
db_number: DB块编号
offset: 起始偏移量字节或位对于bool类型
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
count: 要读取的数据个数
Returns:
解析后的数据单个值或值列表失败返回None
"""
if not self.connected and not self.connect():
self.logger.warning(f"Read failed: not connected to {self.ip}")
return None
try:
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 计算需要读取的字节数
last_bit = bit_offset + count - 1
last_byte = last_bit // 8
total_bytes = last_byte - byte_offset + 1
# 读取原始字节数据
data = self.read_db(db_number, byte_offset, total_bytes)
if data is None:
return None
# 解析bool值
result = []
for i in range(count):
current_bit = bit_offset + i
byte_idx = current_bit // 8
bit_idx = current_bit % 8
result.append(bool(data[byte_idx] & (1 << bit_idx)))
return result[0] if count == 1 else result
elif data_type == 'byte':
data = self.read_db(db_number, offset, count)
if data is None:
return None
return [data[i] for i in range(count)] if count > 1 else data[0]
elif data_type in ['int', 'word']:
total_bytes = 2 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'int':
result.append(get_int(data, i * 2))
else: # word
result.append(get_word(data, i * 2))
return result[0] if count == 1 else result
elif data_type in ['dint', 'dword', 'real']:
total_bytes = 4 * count
data = self.read_db(db_number, offset, total_bytes)
if data is None:
return None
result = []
for i in range(count):
if data_type == 'dint':
result.append(get_dint(data, i * 4))
elif data_type == 'dword':
result.append(get_dword(data, i * 4))
else: # real
result.append(get_real(data, i * 4))
return result[0] if count == 1 else result
else:
self.logger.error(f"Unsupported data type: {data_type}")
return None
except Exception as e:
self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}")
return None
def write_generic(self, db_number, offset, data_type, value):
"""
通用写入接口,支持多种数据类型
Args:
db_number: DB块编号
offset: 起始偏移量字节或位对于bool类型
data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword')
value: 要写入的值(可以是单个值或列表)
Returns:
bool: 是否写入成功
"""
if not self.connected and not self.connect():
self.logger.warning(f"Write failed: not connected to {self.ip}")
return False
try:
if data_type == 'bool':
# 对于booloffset是位偏移
byte_offset = offset // 8
bit_offset = offset % 8
# 读取当前字节
current_byte = self.read_db(db_number, byte_offset, 1)
if current_byte is None:
return False
# 修改特定位
if isinstance(value, list):
# 多个bool值
for i, val in enumerate(value):
current_bit = bit_offset + i
byte_idx = current_bit // 8
bit_idx = current_bit % 8
if val:
current_byte[0] |= (1 << bit_idx)
else:
current_byte[0] &= ~(1 << bit_idx)
else:
# 单个bool值
if value:
current_byte[0] |= (1 << bit_offset)
else:
current_byte[0] &= ~(1 << bit_offset)
# 写回修改后的字节
return self.write_db_bool(db_number, byte_offset, current_byte)
elif data_type == 'byte':
if isinstance(value, list):
# 批量写入
for i, val in enumerate(value):
if val < 0 or val > 255:
self.logger.error(f"Byte value out of range: {val}")
return False
if not self.write_db(db_number, offset + i, bytes([val])):
return False
return True
else:
# 单个字节
if value < 0 or value > 255:
self.logger.error(f"Byte value out of range: {value}")
return False
return self.write_db(db_number, offset, bytes([value]))
elif data_type in ['int', 'word']:
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
# 确保int值在范围内
if data_type == 'int' and (val < -32768 or val > 32767):
self.logger.error(f"Int value out of range: {val}")
return False
elif data_type == 'word' and (val < 0 or val > 65535):
self.logger.error(f"Word value out of range: {val}")
return False
data = bytearray(2)
if data_type == 'int':
set_int(data, 0, val)
else:
set_word(data, 0, val)
if not self.write_db(db_number, offset + i * 2, data):
return False
return True
elif data_type in ['dint', 'dword', 'real']:
if not isinstance(value, list):
value = [value]
for i, val in enumerate(value):
data = bytearray(4)
if data_type == 'dint':
if val < -2147483648 or val > 2147483647:
self.logger.error(f"DInt value out of range: {val}")
return False
set_dint(data, 0, val)
elif data_type == 'dword':
if val < 0 or val > 4294967295:
self.logger.error(f"DWord value out of range: {val}")
return False
set_dword(data, 0, val)
else: # real
set_real(data, 0, float(val))
if not self.write_db(db_number, offset + i * 4, data):
return False
return True
else:
self.logger.error(f"Unsupported data type: {data_type}")
return False
except Exception as e:
self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}")
return False

View File

@ -0,0 +1,670 @@
<html>
<head>
<title>PLC Gateway API Documentation</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
h1, h2, h3 { color: #2c3e50; }
.endpoint {
background-color: #f8f9fa;
border-left: 4px solid #3498db;
padding: 15px;
margin: 20px 0;
border-radius: 4px;
}
.method {
display: inline-block;
padding: 3px 8px;
border-radius: 4px;
color: white;
font-weight: bold;
margin-right: 10px;
}
.method-get { background-color: #27ae60; }
.method-post { background-color: #3498db; }
.method-put { background-color: #f39c12; }
.method-delete { background-color: #e74c3c; }
table {
border-collapse: collapse;
width: 100%;
margin: 15px 0;
}
th, td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #f2f2f2;
}
code {
background-color: #f5f5f5;
padding: 2px 4px;
border-radius: 3px;
font-family: monospace;
}
.example {
background-color: #f9f9f9;
padding: 10px;
border-radius: 4px;
margin: 10px 0;
font-family: monospace;
}
.nav {
background-color: #e9f7fe;
padding: 10px;
border-radius: 4px;
margin-bottom: 20px;
}
.nav a {
margin-right: 15px;
text-decoration: none;
color: #3498db;
}
</style>
</head>
<body>
<h1>PLC Gateway API Documentation</h1>
<div class="nav">
<a href="#status-api">Status API</a> |
<a href="#data-api">Data API</a> |
<a href="#config-api">Configuration API</a>
</div>
<h2 id="status-api">Status API</h2>
<div class="endpoint">
<h3>System Status</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/status</code>
</div>
<p>获取系统状态信息包括启动时间、PLC数量和缓存大小。</p>
<h4>响应示例</h4>
<div class="example">
{
"status": "running",
"start_time": "2023-10-30 14:30:22",
"plc_count": 2,
"cache_size": 11000,
"plc_statuses": {
"PLC1": {
"status": "connected",
"last_connected": "2023-10-30 14:35:10"
},
"PLC2": {
"status": "disconnected",
"last_connected": "Never"
}
}
}
</div>
</div>
<div class="endpoint">
<h3>Area Status</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/status/<plc_name>/<area_name></code>
</div>
<p>获取指定PLC区域的状态信息。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Read</td>
</tr>
</table>
<h4>响应示例</h4>
<div class="example">
{
"status": "connected",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01",
"size": 4000,
"type": "read"
}
</div>
</div>
<h2 id="data-api">Data API</h2>
<div class="endpoint">
<h3>Single Read</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/read/<plc_name>/<area_name>/<offset>/<length></code>
</div>
<p>从指定区域读取数据。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Read</td>
</tr>
<tr>
<td>offset</td>
<td>起始偏移量(字节)</td>
</tr>
<tr>
<td>length</td>
<td>读取长度(字节)</td>
</tr>
</table>
<h4>响应示例</h4>
<div class="example">
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 4,
"data": [0, 0, 123, 45],
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
</div>
</div>
<div class="endpoint">
<h3>Single Write</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/write/<plc_name>/<area_name>/<offset></code>
</div>
<p>向指定区域写入数据。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Write</td>
</tr>
<tr>
<td>offset</td>
<td>起始偏移量(字节)</td>
</tr>
</table>
<h4>请求体</h4>
<p>原始二进制数据</p>
<h4>响应示例</h4>
<div class="example">
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"length": 4,
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30"
}
</div>
</div>
<div class="endpoint">
<h3>Single Read Bool</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/read_bool/<plc_name>/<area_name>/<offset>/<length></code>
</div>
<p>从指定区域读取数据。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Read</td>
</tr>
<tr>
<td>offset</td>
<td>起始偏移量(字节)</td>
</tr>
<tr>
<td>length</td>
<td>读取长度(字节)</td>
</tr>
</table>
<h4>响应示例</h4>
<div class="example">
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 2,
"data": [0:False, 1:False],
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
</div>
</div>
<div class="endpoint">
<h3>Single Write Bool</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/write_bool/<plc_name>/<area_name>/<offset></code>
</div>
<p>向指定区域写入数据。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Write</td>
</tr>
<tr>
<td>offset</td>
<td>起始偏移量(字节)</td>
</tr>
</table>
<h4>请求体</h4>
<p>{0:True}</p>
<h4>响应示例</h4>
<div class="example">
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"length": 1,
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30"
}
</div>
</div>
<div class="endpoint">
<h3>Batch Read</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/batch_read</code>
</div>
<p>批量读取多个区域的数据。</p>
<h4>请求体</h4>
<table>
<tr>
<th>字段</th>
<th>类型</th>
<th>必需</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>string</td>
<td></td>
<td>PLC名称与配置中一致</td>
</tr>
<tr>
<td>area_name</td>
<td>string</td>
<td></td>
<td>区域名称(与配置中一致)</td>
</tr>
<tr>
<td>offset</td>
<td>number</td>
<td></td>
<td>起始偏移量字节默认为0</td>
</tr>
<tr>
<td>length</td>
<td>number</td>
<td></td>
<td>读取长度(字节),不提供则读取整个区域</td>
</tr>
</table>
<h4>请求示例</h4>
<div class="example">
[
{
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 4
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"offset": 10,
"length": 2
}
]
</div>
<h4>响应示例</h4>
<div class="example">
[
{
"plc_name": "PLC1",
"area_name": "DB100_Read",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01",
"offset": 0,
"length": 4,
"data": [0, 0, 123, 45]
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754322.123,
"last_update_formatted": "2023-10-30 14:12:02",
"offset": 10,
"length": 2,
"data": [255, 0]
}
]
</div>
</div>
<div class="endpoint">
<h3>Batch Write</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/batch_write</code>
</div>
<p>批量写入多个区域的数据。</p>
<h4>请求体</h4>
<table>
<tr>
<th>字段</th>
<th>类型</th>
<th>必需</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>string</td>
<td></td>
<td>PLC名称与配置中一致</td>
</tr>
<tr>
<td>area_name</td>
<td>string</td>
<td></td>
<td>区域名称(与配置中一致)</td>
</tr>
<tr>
<td>offset</td>
<td>number</td>
<td></td>
<td>起始偏移量(字节)</td>
</tr>
<tr>
<td>data</td>
<td>array</td>
<td></td>
<td>要写入的数据(字节数组)</td>
</tr>
</table>
<h4>请求示例</h4>
<div class="example">
[
{
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"data": [1, 2, 3, 4]
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"offset": 10,
"data": [255, 0]
}
]
</div>
<h4>响应示例</h4>
<div class="example">
[
{
"plc_name": "PLC1",
"area_name": "DB100_Write",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30",
"offset": 0,
"length": 4
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754351.234,
"last_update_formatted": "2023-10-30 14:12:31",
"offset": 10,
"length": 2
}
]
</div>
</div>
<div class="endpoint">
<h3>Parsed Data</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/data/<plc_name>/<area_name></code>
</div>
<p>获取解析后的数据(如果配置了结构)。</p>
<h4>路径参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>plc_name</td>
<td>PLC名称如PLC1</td>
</tr>
<tr>
<td>area_name</td>
<td>区域名称如DB100_Read</td>
</tr>
</table>
<h4>响应示例(配置了解析结构)</h4>
<div class="example">
{
"parsed": {
"temperature": 25.5,
"pressure": 100,
"status": true
},
"raw_data": [0, 0, 128, 65, 0, 100],
"status": "connected",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
</div>
<h4>响应示例(未配置解析结构)</h4>
<div class="example">
{
"raw_data": [0, 0, 128, 65, 0, 100],
"status": "connected",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
</div>
</div>
<h2 id="config-api">Configuration API</h2>
<div class="endpoint">
<h3>Get Configuration</h3>
<div>
<span class="method method-get">GET</span>
<code>/api/config</code>
</div>
<p>获取当前配置。</p>
<h4>认证要求</h4>
<p>需要Basic Auth认证</p>
<h4>响应示例</h4>
<div class="example">
{
"plcs": [
{
"name": "PLC1",
"ip": "192.168.0.10",
"rack": 0,
"slot": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 4000,
"structure": [
{"name": "temperature", "type": "real", "offset": 0},
{"name": "pressure", "type": "int", "offset": 4}
]
}
]
}
]
}
</div>
</div>
<div class="endpoint">
<h3>Validate Configuration</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/config/validate</code>
</div>
<p>验证配置是否有效。</p>
<h4>认证要求</h4>
<p>需要Basic Auth认证</p>
<h4>请求体</h4>
<p>要验证的配置JSON</p>
<h4>响应示例(有效)</h4>
<div class="example">
{
"valid": true
}
</div>
<h4>响应示例(无效)</h4>
<div class="example">
{
"valid": false,
"message": "Invalid configuration: 'ip' is a required property"
}
</div>
</div>
<div class="endpoint">
<h3>Save Configuration</h3>
<div>
<span class="method method-post">POST</span>
<code>/api/config</code>
</div>
<p>保存配置。</p>
<h4>查询参数</h4>
<table>
<tr>
<th>参数</th>
<th>描述</th>
</tr>
<tr>
<td>reload</td>
<td>是否立即重载配置true/false</td>
</tr>
</table>
<h4>认证要求</h4>
<p>需要Basic Auth认证</p>
<h4>请求体</h4>
<p>要保存的配置JSON</p>
<h4>响应示例</h4>
<div class="example">
{
"success": true,
"message": "Configuration saved and reload requested"
}
</div>
</div>
<div class="footer" style="margin-top: 40px; padding-top: 10px; border-top: 1px solid #ddd; color: #777;">
<p>PLC Gateway v1.0 | <a href="/">Back to Status Page</a></p>
</div>
</body>
</html>

View File

@ -0,0 +1,209 @@
<!DOCTYPE html>
<html>
<head>
<title>PLC Gateway Configuration</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
h1 { color: #2c3e50; }
.config-container {
display: flex;
flex-direction: column;
max-width: 1200px;
}
textarea {
width: 100%;
height: 500px;
font-family: monospace;
padding: 10px;
margin-bottom: 10px;
border: 1px solid #ddd;
border-radius: 4px;
}
.button-group {
display: flex;
gap: 10px;
}
button {
padding: 8px 16px;
background-color: #3498db;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background-color: #2980b9;
}
.status-message {
padding: 10px;
margin-top: 10px;
border-radius: 4px;
display: none;
}
.success { background-color: #d4edda; color: #155724; }
.error { background-color: #f8d7da; color: #721c24; }
.info { background-color: #d1ecf1; color: #0c5460; }
.config-help {
margin-top: 20px;
padding: 15px;
background-color: #f8f9fa;
border-left: 4px solid #3498db;
}
.config-help h3 {
margin-top: 0;
}
.doc-link {
margin-top: 10px;
padding: 10px;
background-color: #e9f7fe;
border-radius: 4px;
}
</style>
</head>
<body>
<h1>PLC Gateway Configuration</h1>
<div class="doc-link">
<p><a href="/api/doc">View API documentation</a> - Learn how to use the API</p>
</div>
<div class="config-container">
<p>Edit the configuration JSON below. Be careful with the syntax.</p>
<form id="configForm">
<textarea id="configEditor" name="config">{{ config_json }}</textarea>
<div class="button-group">
<button type="button" onclick="validateConfig()">Validate</button>
<button type="button" onclick="saveConfig(false)">Save</button>
<button type="button" onclick="saveConfig(true)">Save & Reload</button>
</div>
</form>
<div id="statusMessage" class="status-message"></div>
<div class="config-help">
<h3>Configuration Guide</h3>
<p><strong>PLC Configuration:</strong></p>
<ul>
<li><code>name</code>: Unique name for the PLC</li>
<li><code>ip</code>: IP address of the PLC</li>
<li><code>rack</code>: Rack number (usually 0)</li>
<li><code>slot</code>: Slot number (usually 1 for S7-1200)</li>
</ul>
<p><strong>Data Area Configuration:</strong></p>
<ul>
<li><code>name</code>: Name of the data area</li>
<li><code>type</code>: <code>read</code>, <code>write</code>, or <code>read_write</code></li>
<li><code>db_number</code>: DB number (e.g., 100 for DB100)</li>
<li><code>offset</code>: Starting byte offset</li>
<li><code>size</code>: Size in bytes</li>
<li><code>structure</code> (optional): Define how to parse the data</li>
</ul>
<p><strong>Example:</strong></p>
<pre>{
"plcs": [
{
"name": "PLC1",
"ip": "192.168.0.10",
"rack": 0,
"slot": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 4000,
"structure": [
{"name": "temperature", "type": "real", "offset": 0},
{"name": "pressure", "type": "int", "offset": 4}
]
}
]
}
]
}</pre>
</div>
</div>
<script>
function showStatus(message, type) {
const statusDiv = document.getElementById('statusMessage');
statusDiv.textContent = message;
statusDiv.className = 'status-message ' + type;
statusDiv.style.display = 'block';
}
function validateConfig() {
try {
const config = JSON.parse(document.getElementById('configEditor').value);
fetch('/api/config/validate', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + btoa('{{ username }}:{{ password }}')
},
body: JSON.stringify(config)
})
.then(response => {
if (!response.ok) {
return response.json().then(err => { throw new Error(err.message || 'Validation failed'); });
}
return response.json();
})
.then(data => {
if (data.valid) {
showStatus('Configuration is valid!', 'success');
} else {
showStatus('Validation error: ' + data.message, 'error');
}
})
.catch(error => {
showStatus('Validation error: ' + error.message, 'error');
});
} catch (e) {
showStatus('JSON error: ' + e.message, 'error');
}
}
function saveConfig(reload) {
try {
const config = JSON.parse(document.getElementById('configEditor').value);
const url = reload ? '/api/config?reload=true' : '/api/config';
fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + btoa('{{ username }}:{{ password }}')
},
body: JSON.stringify(config)
})
.then(response => {
if (!response.ok) {
return response.json().then(err => { throw new Error(err.message || 'Save failed'); });
}
return response.json();
})
.then(data => {
if (data.success) {
const msg = reload ?
'Configuration saved and reloaded successfully!' :
'Configuration saved successfully. Restart to apply changes.';
showStatus(msg, 'success');
} else {
showStatus('Save error: ' + data.message, 'error');
}
})
.catch(error => {
showStatus('Error: ' + error.message, 'error');
});
} catch (e) {
showStatus('JSON error: ' + e.message, 'error');
}
}
</script>
</body>
</html>

View File

@ -0,0 +1,126 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>PLC Gateway Status</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
h1 { color: #2c3e50; }
table { border-collapse: collapse; width: 100%; margin-top: 20px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.status-connected { color: green; font-weight: bold; }
.status-disconnected { color: red; }
.status-never-connected { color: orange; }
.plc-connected { background-color: #d4edda; }
.plc-disconnected { background-color: #f8d7da; }
.plc-never-connected { background-color: #fff3cd; }
.api-section { margin-top: 30px; }
.api-endpoint { background-color: #f9f9f9; padding: 10px; margin: 5px 0; border-radius: 4px; }
.config-link { margin-top: 20px; padding: 10px; background-color: #e9f7fe; border-radius: 4px; }
.footer { margin-top: 40px; padding-top: 10px; border-top: 1px solid #ddd; color: #777; }
.doc-link { margin-top: 10px; padding: 10px; background-color: #e9f7fe; border-radius: 4px; }
</style>
</head>
<body>
<h1>PLC Gateway Status</h1>
<p>Gateway running since: {{ start_time }}</p>
{% for plc_name, areas in summary.items() %}
{% set plc_status = plc_statuses.get(plc_name, "unknown") %}
{% set plc_class = {
'connected': 'plc-connected',
'disconnected': 'plc-disconnected'
}.get(plc_status, 'plc-never-connected') %}
<h2 class="{{plc_class}}">PLC:{{plc_name}} (Status: {{plc_status}})</h2>
<table>
<tr>
<th>Area Name</th>
<th>Type</th>
<th>Size (bytes)</th>
<th>Status</th>
<th>PLC Connection</th>
<th>Last Update</th>
</tr>
{% for area_name, area in areas.items() %}
{% set status_class = {
'connected': 'status-connected',
'disconnected': 'status-disconnected',
'never_connected': 'status-never-connected'
}.get(area.status, 'status-disconnected') %}
{% set status_text = {
'connected': 'Connected',
'disconnected': 'Disconnected',
'never_connected': 'Never connected'
}.get(area.status, area.status) %}
<tr>
<td>{{area_name}}</td>
<td>{{area['type']}}</td>
<td>{{area['size']}}</td>
<td class="{{status_class}}">{{status_text}}</td>
<td>{{area['plc_connection_status']}}</td>
<td>{{area['last_update']}}</td>
</tr>
{% endfor %}
</table>
{% endfor %}
<div class="api-section">
<h2>API Endpoints</h2>
<div class="api-endpoint">
<strong>Single Read:</strong> GET /api/read/&lt;plc_name&gt;/&lt;area_name&gt;/&lt;offset&gt;/&lt;length&gt;<br>
Example: /api/read/PLC1/DB100_Read/10/4
</div>
<div class="api-endpoint">
<strong>Single Write:</strong> POST /api/write/&lt;plc_name&gt;/&lt;area_name&gt;/&lt;offset&gt;<br>
Body: Raw binary data<br>
Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data
</div>
<div class="api-endpoint">
<strong>Single Read_Bool:</strong> GET /api/read_bool/&lt;plc_name&gt;/&lt;area_name&gt;/&lt;offset&gt;/&lt;length&gt;<br>
Example: /api/read_bool/PLC1/DB100_Read/0/2
</div>
<div class="api-endpoint">
<strong>Single Write_Bool:</strong> POST /api/write_bool/&lt;plc_name&gt;/&lt;area_name&gt;/&lt;offset&gt;<br>
Body: Raw binary data<br>
Example: POST /api/write_bool/PLC1/DB100_Write/0
</div>
<div class="api-endpoint">
<strong>Batch Read:</strong> POST /api/batch_read<br>
Body: JSON array of read requests<br>
Example: [{"plc_name":"PLC1", "area_name":"DB100_Read", "offset":0, "length":4}]
</div>
<div class="api-endpoint">
<strong>Batch Write:</strong> POST /api/batch_write<br>
Body: JSON array of write requests<br>
Example: [{"plc_name":"PLC1", "area_name":"DB100_Write", "offset":0, "data":[1,2,3,4]}]
</div>
<div class="api-endpoint">
<strong>Configuration:</strong> GET/POST /api/config<br>
Manage gateway configuration
</div>
</div>
<div class="doc-link">
<h2>API Documentation</h2>
<p><a href="/api/doc">View detailed API documentation</a> - Full description of all API endpoints</p>
</div>
<div class="config-link">
<h2>Configuration</h2>
<p><a href="/config">Edit configuration</a> - Modify PLC connections and data areas</p>
</div>
<div class="footer">
<p>PLC Gateway v1.0 | <a href="/api/status">System Status</a></p>
</div>
</body>
</html>