commit e43c5176bf439bade34327931e5de851ea11ddb4 Author: pengqi Date: Sun Aug 24 23:45:24 2025 +0800 V3.0 增加通用读写接口,待修复稳定性问题 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/gateway_V3_20250818.iml b/.idea/gateway_V3_20250818.iml new file mode 100644 index 0000000..026865d --- /dev/null +++ b/.idea/gateway_V3_20250818.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..f6df581 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..fca8e89 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..b96f153 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..ef0ff59 --- /dev/null +++ b/config/config.json @@ -0,0 +1,52 @@ +{ + "plcs": [ + { + "name": "PLC1", + "ip": "192.168.0.1", + "rack": 0, + "slot": 1, + "refresh_interval": 0.5, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 6000, + "structure": [ + { + "name": "temperature", + "type": "real", + "offset": 0 + }, + { + "name": "pressure", + "type": "int", + "offset": 4 + }, + { + "name": "status", + "type": "bool", + "offset": 6, + "bit": 0 + } + ] + }, + { + "name": "DB100_Write", + "type": "write", + "db_number": 100, + "offset": 0, + "size": 6000 + }, + { + "name": "DB202_Params", + "type": "read_write", + "db_number": 202, + "offset": 0, + "size": 816 + } + ] + } + ] +} \ No newline at end of file diff --git a/config/config.json.bak b/config/config.json.bak new file mode 100644 index 0000000..630d729 --- /dev/null +++ b/config/config.json.bak @@ -0,0 +1,52 @@ +{ + "plcs": [ + { + "name": "PLC1", + "ip": "192.168.0.1", + "rack": 0, + "slot": 1, + "refresh_interval": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 5000, + "structure": [ + { + "name": "temperature", + "type": "real", + "offset": 0 + }, + { + "name": "pressure", + "type": "int", + "offset": 4 + }, + { + "name": "status", + "type": "bool", + "offset": 6, + "bit": 0 + } + ] + }, + { + "name": "DB100_Write", + "type": "write", + "db_number": 100, + "offset": 0, + "size": 5000 + }, + { + "name": "DB202_Params", + "type": "read_write", + "db_number": 202, + "offset": 0, + "size": 816 + } + ] + } + ] +} \ No newline at end of file diff --git a/gateway/__pycache__/api_server.cpython-310.pyc b/gateway/__pycache__/api_server.cpython-310.pyc new file mode 100644 index 0000000..e7e66e7 Binary files /dev/null and b/gateway/__pycache__/api_server.cpython-310.pyc differ diff --git a/gateway/__pycache__/api_server.cpython-313.pyc b/gateway/__pycache__/api_server.cpython-313.pyc new file mode 100644 index 0000000..fbb54fd Binary files /dev/null and b/gateway/__pycache__/api_server.cpython-313.pyc differ diff --git a/gateway/__pycache__/api_server_html.cpython-313.pyc b/gateway/__pycache__/api_server_html.cpython-313.pyc new file mode 100644 index 0000000..345f0c3 Binary files /dev/null and b/gateway/__pycache__/api_server_html.cpython-313.pyc differ diff --git a/gateway/__pycache__/cache_manager.cpython-310.pyc b/gateway/__pycache__/cache_manager.cpython-310.pyc new file mode 100644 index 0000000..0f9813a Binary files /dev/null and b/gateway/__pycache__/cache_manager.cpython-310.pyc differ diff --git a/gateway/__pycache__/cache_manager.cpython-313.pyc b/gateway/__pycache__/cache_manager.cpython-313.pyc new file mode 100644 index 0000000..0d95641 Binary files /dev/null and b/gateway/__pycache__/cache_manager.cpython-313.pyc differ diff --git a/gateway/__pycache__/config_loader.cpython-310.pyc b/gateway/__pycache__/config_loader.cpython-310.pyc new file mode 100644 index 0000000..7ff5a86 Binary files /dev/null and b/gateway/__pycache__/config_loader.cpython-310.pyc differ diff --git a/gateway/__pycache__/config_loader.cpython-313.pyc b/gateway/__pycache__/config_loader.cpython-313.pyc new file mode 100644 index 0000000..e2d28f5 Binary files /dev/null and b/gateway/__pycache__/config_loader.cpython-313.pyc differ diff --git a/gateway/__pycache__/config_manager.cpython-310.pyc b/gateway/__pycache__/config_manager.cpython-310.pyc new file mode 100644 index 0000000..5b594a5 Binary files /dev/null and b/gateway/__pycache__/config_manager.cpython-310.pyc differ diff --git a/gateway/__pycache__/config_manager.cpython-313.pyc b/gateway/__pycache__/config_manager.cpython-313.pyc new file mode 100644 index 0000000..14ab16d Binary files /dev/null and b/gateway/__pycache__/config_manager.cpython-313.pyc differ diff --git a/gateway/__pycache__/config_validator.cpython-310.pyc b/gateway/__pycache__/config_validator.cpython-310.pyc new file mode 100644 index 0000000..abc2361 Binary files /dev/null and b/gateway/__pycache__/config_validator.cpython-310.pyc differ diff --git a/gateway/__pycache__/config_validator.cpython-313.pyc b/gateway/__pycache__/config_validator.cpython-313.pyc new file mode 100644 index 0000000..3346fa3 Binary files /dev/null and b/gateway/__pycache__/config_validator.cpython-313.pyc differ diff --git a/gateway/__pycache__/plc_manager.cpython-310.pyc b/gateway/__pycache__/plc_manager.cpython-310.pyc new file mode 100644 index 0000000..9c294bc Binary files /dev/null and b/gateway/__pycache__/plc_manager.cpython-310.pyc differ diff --git a/gateway/__pycache__/plc_manager.cpython-313.pyc b/gateway/__pycache__/plc_manager.cpython-313.pyc new file mode 100644 index 0000000..ecf9db7 Binary files /dev/null and b/gateway/__pycache__/plc_manager.cpython-313.pyc differ diff --git a/gateway/__pycache__/snap7_client.cpython-310.pyc b/gateway/__pycache__/snap7_client.cpython-310.pyc new file mode 100644 index 0000000..a58dd58 Binary files /dev/null and b/gateway/__pycache__/snap7_client.cpython-310.pyc differ diff --git a/gateway/__pycache__/snap7_client.cpython-313.pyc b/gateway/__pycache__/snap7_client.cpython-313.pyc new file mode 100644 index 0000000..b8090c5 Binary files /dev/null and b/gateway/__pycache__/snap7_client.cpython-313.pyc differ diff --git a/gateway/api_server.py b/gateway/api_server.py new file mode 100644 index 0000000..43d8bb6 --- /dev/null +++ b/gateway/api_server.py @@ -0,0 +1,1602 @@ +from flask import Flask, jsonify, request, render_template_string, Response +import threading +import time +import json +from functools import wraps +from config_manager import ConfigManager +import logging + +class APIServer: + """REST API服务器,提供PLC数据访问和配置管理功能""" + + def __init__(self, cache_manager, config_path="./config/config.json"): + """ + 初始化API服务器 + + Args: + cache_manager: 缓存管理器实例 + config_path: 配置文件路径 + """ + self.cache_manager = cache_manager + self.config_manager = ConfigManager(config_path) + self.app = Flask(__name__) + self.logger = logging.getLogger("APIServer") + self.auth_enabled = True # 可通过配置关闭认证 + self.username = "admin" + self.password = "admin123" # 实际应用中应从安全存储获取 + self.start_time = time.strftime("%Y-%m-%d %H:%M:%S") + + # 在初始化方法中调用 setup_routes + self.setup_routes() + + def check_auth(self, username, password): + """验证用户名和密码""" + return username == self.username and password == self.password + + def authenticate(self): + """发送401响应要求认证""" + return Response( + "Unauthorized", + 401, + {"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'} + ) + + def requires_auth(self, f): + """装饰器:需要认证的路由,保留函数元数据""" + @wraps(f) + def decorated(*args, **kwargs): + if not self.auth_enabled: + return f(*args, **kwargs) + + auth = request.authorization + if not auth or not self.check_auth(auth.username, auth.password): + return self.authenticate() + return f(*args, **kwargs) + return decorated + + def get_summary(self): + """获取缓存摘要信息""" + summary = {} + for plc_name, areas in self.cache_manager.cache.items(): + summary[plc_name] = {} + for area_name, area in areas.items(): + last_update = self.cache_manager.last_update[plc_name][area_name] + plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown") + + summary[plc_name][area_name] = { + "status": area["status"], + "plc_connection_status": plc_status, + "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", + "size": area["size"], + "type": area["type"] + } + return summary + + def setup_routes(self): + """设置所有API路由""" + + # =========================== + # 主页面 - 状态摘要 + # =========================== + @self.app.route("/", endpoint="index") + def index(): + summary = self.get_summary() + html = """ + + + PLC Gateway Status + + + +

PLC Gateway Status

+

Gateway running since: {{ start_time }}

+ """ + + for plc_name, areas in summary.items(): + plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown") + plc_class = "" + if plc_status == "connected": + plc_class = "plc-connected" + elif plc_status == "disconnected": + plc_class = "plc-disconnected" + else: + plc_class = "plc-never-connected" + + html += f'

PLC: {plc_name} (Status: {plc_status})

' + html += """ + + + + + + + + + + """ + + for area_name, area in areas.items(): + status_class = "" + status_text = area["status"] + + if area["status"] == "connected": + status_class = "status-connected" + elif area["status"] == "never_connected": + status_class = "status-never-connected" + status_text = "Never connected" + elif area["status"] == "disconnected": + status_class = "status-disconnected" + status_text = "Disconnected" + else: + status_class = "status-disconnected" + + html += f""" + + + + + + + + + """ + + html += "
Area NameTypeSize (bytes)StatusPLC ConnectionLast Update
{area_name}{area['type']}{area['size']}{status_text}{area['plc_connection_status']}{area['last_update']}
" + + # 添加API文档部分 + html += """ +
+

API Endpoints

+ +
+ Single Read: GET /api/read////
+ Example: /api/read/PLC1/DB100_Read/10/4 +
+ +
+ Single Write: POST /api/write///
+ Body: Raw binary data
+ Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data +
+ +
+ Single Read_Bool: GET /api/read_bool////
+ Example: /api/read_bool/PLC1/DB100_Read/0/2 +
+ +
+ Single Write_Bool: POST /api/write_bool///
+ Body: Raw binary data
+ Example: POST /api/write_bool/PLC1/DB100_Write/0 +
+ +
+ Batch Read: POST /api/batch_read
+ Body: JSON array of read requests
+ Example: [{"plc_name":"PLC1", "area_name":"DB100_Read", "offset":0, "length":4}] +
+ +
+ Batch Write: POST /api/batch_write
+ Body: JSON array of write requests
+ Example: [{"plc_name":"PLC1", "area_name":"DB100_Write", "offset":0, "data":[1,2,3,4]}] +
+ +
+ Configuration: GET/POST /api/config
+ Manage gateway configuration +
+
+ + + + + + + """ + + html += """ + + + """ + return render_template_string(html, start_time=self.start_time) + + # =========================== + # 系统状态API + # =========================== + @self.app.route("/api/status", endpoint="system_status") + def system_status(): + """获取系统状态信息""" + plc_statuses = {} + for plc_name in self.cache_manager.plc_connection_status: + plc_statuses[plc_name] = { + "status": self.cache_manager.plc_connection_status[plc_name], + "last_connected": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.cache_manager.plc_last_connected[plc_name])) + if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never" + } + + return jsonify({ + "status": "running", + "start_time": self.start_time, + "plc_count": len(self.config_manager.get_config().get("plcs", [])), + "cache_size": sum(len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()), + "plc_statuses": plc_statuses + }) + + # =========================== + # 配置管理相关路由 + # =========================== + @self.app.route("/config", endpoint="config_page") + @self.requires_auth + def config_page(): + """配置编辑页面""" + config = self.config_manager.get_config() + config_json = json.dumps(config, indent=2) + + html = """ + + + PLC Gateway Configuration + + + +

PLC Gateway Configuration

+ + + +
+

Edit the configuration JSON below. Be careful with the syntax.

+ +
+ +
+ + + +
+
+ +
+ +
+

Configuration Guide

+

PLC Configuration:

+
    +
  • name: Unique name for the PLC
  • +
  • ip: IP address of the PLC
  • +
  • rack: Rack number (usually 0)
  • +
  • slot: Slot number (usually 1 for S7-1200)
  • +
+ +

Data Area Configuration:

+
    +
  • name: Name of the data area
  • +
  • type: read, write, or read_write
  • +
  • db_number: DB number (e.g., 100 for DB100)
  • +
  • offset: Starting byte offset
  • +
  • size: Size in bytes
  • +
  • structure (optional): Define how to parse the data
  • +
+ +

Example:

+
{
+  "plcs": [
+    {
+      "name": "PLC1",
+      "ip": "192.168.0.10",
+      "rack": 0,
+      "slot": 1,
+      "areas": [
+        {
+          "name": "DB100_Read",
+          "type": "read",
+          "db_number": 100,
+          "offset": 0,
+          "size": 4000,
+          "structure": [
+            {"name": "temperature", "type": "real", "offset": 0},
+            {"name": "pressure", "type": "int", "offset": 4}
+          ]
+        }
+      ]
+    }
+  ]
+}
+
+
+ + + + + """ + return render_template_string( + html, + config_json=config_json, + username=self.username, + password=self.password + ) + + # 配置验证API + @self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config") + @self.requires_auth + def validate_config(): + """验证配置是否有效""" + try: + config = request.json + is_valid, error = self.config_manager.validate_config(config) + if is_valid: + return jsonify({"valid": True}) + else: + return jsonify({"valid": False, "message": error}), 400 + except Exception as e: + return jsonify({"valid": False, "message": str(e)}), 400 + + # 配置获取API + @self.app.route("/api/config", methods=["GET"], endpoint="get_config") + @self.requires_auth + def get_config(): + """获取当前配置""" + return jsonify(self.config_manager.get_config()) + + # 配置保存API + @self.app.route("/api/config", methods=["POST"], endpoint="save_config") + @self.requires_auth + def save_config(): + """保存配置""" + try: + config = request.json + reload = request.args.get('reload', 'false').lower() == 'true' + + success, message = self.config_manager.save_config(config) + if success: + if reload: + # 通知主应用程序重载配置 + if hasattr(self.cache_manager, 'app') and self.cache_manager.app: + self.cache_manager.app.request_reload() + return jsonify({ + "success": True, + "message": "Configuration saved and reload requested" + }) + else: + return jsonify({ + "success": True, + "message": "Configuration saved successfully (restart to apply changes)" + }) + else: + return jsonify({ + "success": False, + "message": message + }), 400 + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error saving config: {str(e)}" + }), 500 + + # =========================== + # 新增 API 文档接口 + # =========================== + @self.app.route("/api/doc", endpoint="api_doc") + def api_doc(): + """API文档页面""" + html = """ + + + PLC Gateway API Documentation + + + +

PLC Gateway API Documentation

+ + + +

Status API

+ +
+

System Status

+
+ GET + /api/status +
+

获取系统状态信息,包括启动时间、PLC数量和缓存大小。

+ +

响应示例

+
+ { + "status": "running", + "start_time": "2023-10-30 14:30:22", + "plc_count": 2, + "cache_size": 11000, + "plc_statuses": { + "PLC1": { + "status": "connected", + "last_connected": "2023-10-30 14:35:10" + }, + "PLC2": { + "status": "disconnected", + "last_connected": "Never" + } + } + } +
+
+ +
+

Area Status

+
+ GET + /api/status// +
+

获取指定PLC区域的状态信息。

+ +

路径参数

+ + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
+ +

响应示例

+
+ { + "status": "connected", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01", + "size": 4000, + "type": "read" + } +
+
+ +

Data API

+ +
+

Single Read

+
+ GET + /api/read//// +
+

从指定区域读取数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
offset起始偏移量(字节)
length读取长度(字节)
+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Read", + "offset": 0, + "length": 4, + "data": [0, 0, 123, 45], + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+
+ +
+

Single Write

+
+ POST + /api/write/// +
+

向指定区域写入数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Write)
offset起始偏移量(字节)
+ +

请求体

+

原始二进制数据

+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Write", + "offset": 0, + "length": 4, + "plc_connection_status": "connected", + "last_update": 1698754350.789, + "last_update_formatted": "2023-10-30 14:12:30" + } +
+
+ +
+

Single Read Bool

+
+ GET + /api/read_bool//// +
+

从指定区域读取数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
offset起始偏移量(字节)
length读取长度(字节)
+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Read", + "offset": 0, + "length": 2, + "data": [0:False, 1:False], + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+
+ +
+

Single Write Bool

+
+ POST + /api/write_bool/// +
+

向指定区域写入数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Write)
offset起始偏移量(字节)
+ +

请求体

+

{0:True}

+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Write", + "offset": 0, + "length": 1, + "plc_connection_status": "connected", + "last_update": 1698754350.789, + "last_update_formatted": "2023-10-30 14:12:30" + } +
+
+ +
+

Batch Read

+
+ POST + /api/batch_read +
+

批量读取多个区域的数据。

+ +

请求体

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
字段类型必需描述
plc_namestringPLC名称(与配置中一致)
area_namestring区域名称(与配置中一致)
offsetnumber起始偏移量(字节),默认为0
lengthnumber读取长度(字节),不提供则读取整个区域
+ +

请求示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Read", + "offset": 0, + "length": 4 + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "offset": 10, + "length": 2 + } + ] +
+ +

响应示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Read", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01", + "offset": 0, + "length": 4, + "data": [0, 0, 123, 45] + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754322.123, + "last_update_formatted": "2023-10-30 14:12:02", + "offset": 10, + "length": 2, + "data": [255, 0] + } + ] +
+
+ +
+

Batch Write

+
+ POST + /api/batch_write +
+

批量写入多个区域的数据。

+ +

请求体

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
字段类型必需描述
plc_namestringPLC名称(与配置中一致)
area_namestring区域名称(与配置中一致)
offsetnumber起始偏移量(字节)
dataarray要写入的数据(字节数组)
+ +

请求示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Write", + "offset": 0, + "data": [1, 2, 3, 4] + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "offset": 10, + "data": [255, 0] + } + ] +
+ +

响应示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Write", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754350.789, + "last_update_formatted": "2023-10-30 14:12:30", + "offset": 0, + "length": 4 + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754351.234, + "last_update_formatted": "2023-10-30 14:12:31", + "offset": 10, + "length": 2 + } + ] +
+
+ +
+

Parsed Data

+
+ GET + /api/data// +
+

获取解析后的数据(如果配置了结构)。

+ +

路径参数

+ + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
+ +

响应示例(配置了解析结构)

+
+ { + "parsed": { + "temperature": 25.5, + "pressure": 100, + "status": true + }, + "raw_data": [0, 0, 128, 65, 0, 100], + "status": "connected", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+ +

响应示例(未配置解析结构)

+
+ { + "raw_data": [0, 0, 128, 65, 0, 100], + "status": "connected", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+
+ +

Configuration API

+ +
+

Get Configuration

+
+ GET + /api/config +
+

获取当前配置。

+ +

认证要求

+

需要Basic Auth认证

+ +

响应示例

+
+ { + "plcs": [ + { + "name": "PLC1", + "ip": "192.168.0.10", + "rack": 0, + "slot": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 4000, + "structure": [ + {"name": "temperature", "type": "real", "offset": 0}, + {"name": "pressure", "type": "int", "offset": 4} + ] + } + ] + } + ] + } +
+
+ +
+

Validate Configuration

+
+ POST + /api/config/validate +
+

验证配置是否有效。

+ +

认证要求

+

需要Basic Auth认证

+ +

请求体

+

要验证的配置JSON

+ +

响应示例(有效)

+
+ { + "valid": true + } +
+ +

响应示例(无效)

+
+ { + "valid": false, + "message": "Invalid configuration: 'ip' is a required property" + } +
+
+ +
+

Save Configuration

+
+ POST + /api/config +
+

保存配置。

+ +

查询参数

+ + + + + + + + + +
参数描述
reload是否立即重载配置(true/false)
+ +

认证要求

+

需要Basic Auth认证

+ +

请求体

+

要保存的配置JSON

+ +

响应示例

+
+ { + "success": true, + "message": "Configuration saved and reload requested" + } +
+
+ + + + + """ + return render_template_string(html) + + # =========================== + # 数据访问API + # =========================== + # 单个读取接口 + @self.app.route("/api/read////", methods=["GET"], endpoint="single_read") + def single_read(plc_name, area_name, offset, length): + """从指定区域读取数据""" + data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": length, + "data": list(data), + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 单个读取BOOL类型接口 + @self.app.route("/api/read_bool////", methods=["GET"], endpoint="single_read_bool") + def single_read_bool(plc_name, area_name, offset, length): + """从指定区域读取数据""" + data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset, length) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": length, + "data": [data], # list(data) + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 单个写入接口 + @self.app.route("/api/write///", methods=["POST"], endpoint="single_write") + def single_write(plc_name, area_name, offset): + """向指定区域写入数据""" + data = request.data + if not data: + # 如果没有提供数据,返回错误 + return jsonify({ + "status": "error", + "message": "No data provided", + "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": len(data), + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 单个写入BOOL类型接口 + @self.app.route("/api/write_bool///", methods=["POST"], endpoint="single_write_bool") + def single_write_bool(plc_name, area_name, offset): + """向指定区域写入数据""" + data = request.data + if not data: + # 如果没有提供数据,返回错误 + return jsonify({ + "status": "error", + "message": "No data provided", + "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset, data) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": 1, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 批量读取接口 + @self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read") + def batch_read(): + """批量读取多个区域的数据""" + try: + # 确保是JSON请求 + if not request.is_json: + return jsonify({ + "status": "error", + "message": "Request must be JSON (Content-Type: application/json)", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + requests = request.get_json() + if not isinstance(requests, list): + return jsonify({ + "status": "error", + "message": "Request must be a JSON array", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + # 添加详细日志 + self.logger.info(f"Received batch read request: {json.dumps(requests)}") + + results = self.cache_manager.batch_read(requests) + + return jsonify(results) + except Exception as e: + self.logger.error(f"Batch read error: {str(e)}", exc_info=True) + return jsonify({ + "status": "error", + "message": f"Internal server error: {str(e)}", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 500 + + # 批量写入接口 + @self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write") + def batch_write(): + """批量写入多个区域的数据""" + try: + if not request.is_json: + return jsonify({ + "status": "error", + "message": "Request must be JSON (Content-Type: application/json)", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + requests = request.json + if not isinstance(requests, list): + return jsonify({ + "status": "error", + "message": "Request must be a JSON array", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + self.logger.info(f"Received batch write request: {json.dumps(requests)}") + + results = self.cache_manager.batch_write(requests) + return jsonify(results) + except Exception as e: + self.logger.error(f"Batch write error: {str(e)}", exc_info=True) + return jsonify({ + "status": "error", + "message": f"Internal server error: {str(e)}", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 500 + + # 通用读取接口 + @self.app.route("/api/read_generic////", methods=["GET"], + endpoint="read_generic") + def read_generic(plc_name, area_name, offset, data_type): + """通用读取接口""" + print("Enter Read generic") + # 检查请求参数 + count = request.args.get('count', 1, type=int) + if count < 1: + return jsonify({ + "status": "error", + "message": "Count must be at least 1", + "plc_name": plc_name, + "area_name": area_name + }), 400 + + # 执行读取 + result, error, plc_status, update_time = self.cache_manager.read_generic( + plc_name, + area_name, + offset, + data_type, + count + ) + + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( + update_time)) if update_time > 0 else "Never" + }), 400 + + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "data_type": data_type, + "count": count, + "data": result, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 通用写入接口 + @self.app.route("/api/write_generic////", methods=["POST"], + endpoint="write_generic") + def write_generic(plc_name, area_name, offset, data_type): + """通用写入接口""" + # 检查请求数据 + if not request.is_json: + return jsonify({ + "status": "error", + "message": "Request must be JSON (Content-Type: application/json)", + "plc_name": plc_name, + "area_name": area_name + }), 400 + json_data = request.get_json() + if "value" not in json_data and "values" not in json_data: + return jsonify({ + "status": "error", + "message": "Missing 'value' or 'values' field", + "plc_name": plc_name, + "area_name": area_name + }), 400 + + # 确定要写入的值 + value = json_data.get("value", json_data.get("values")) + + # 执行写入 + success, error, plc_status, update_time = self.cache_manager.write_generic( + plc_name, + area_name, + offset, + data_type, + value + ) + + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( + update_time)) if update_time > 0 else "Never" + }), 400 + + # 确定写入数量 + count = 1 + if isinstance(value, list): + count = len(value) + + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "data_type": data_type, + "count": count, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + @self.app.route("/api/batch_write_bool", methods=["POST"], endpoint="batch_write_bool") + def batch_write_bool(): + """批量写入多个区域的数据""" + try: + if not request.is_json: + return jsonify({ + "status": "error", + "message": "Request must be JSON (Content-Type: application/json)", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + requests = request.json + if not isinstance(requests, list): + return jsonify({ + "status": "error", + "message": "Request must be a JSON array", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + self.logger.info(f"Received batch write request: {json.dumps(requests)}") + + results = self.cache_manager.batch_write_bool(requests) + return jsonify(results) + except Exception as e: + self.logger.error(f"Batch write error: {str(e)}", exc_info=True) + return jsonify({ + "status": "error", + "message": f"Internal server error: {str(e)}", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 500 + + # 区域状态检查 + @self.app.route("/api/status//", methods=["GET"], endpoint="area_status") + def area_status(plc_name, area_name): + """获取区域状态""" + status = self.cache_manager.get_area_status(plc_name, area_name) + return jsonify(status) + + # 获取解析后的数据 + @self.app.route("/api/data//", methods=["GET"], endpoint="get_parsed_data") + def get_parsed_data(plc_name, area_name): + """获取解析后的数据""" + return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name)) + + def start(self): + """启动API服务器""" + self.server_thread = threading.Thread( + target=self.app.run, + kwargs={ + "host": "0.0.0.0", + "port": 5000, + "threaded": True, + "use_reloader": False # 避免在生产环境中使用重载器 + }, + daemon=True, + name="APIServerThread" + ) + self.server_thread.start() + self.logger.info("API server started at http://0.0.0.0:5000") \ No newline at end of file diff --git a/gateway/api_server_html.py b/gateway/api_server_html.py new file mode 100644 index 0000000..5ff10f1 --- /dev/null +++ b/gateway/api_server_html.py @@ -0,0 +1,446 @@ +from flask import Flask, jsonify, request, render_template_string, Response, render_template +import threading +import time +import json +from functools import wraps +from config_manager import ConfigManager +import logging + + +class APIServer: + """REST API服务器,提供PLC数据访问和配置管理功能""" + + def __init__(self, cache_manager, config_path="config/config.json"): + """ + 初始化API服务器 + + Args: + cache_manager: 缓存管理器实例 + config_path: 配置文件路径 + """ + self.cache_manager = cache_manager + self.config_manager = ConfigManager(config_path) + self.app = Flask(__name__) + self.logger = logging.getLogger("APIServer") + self.auth_enabled = True # 可通过配置关闭认证 + self.username = "admin" + self.password = "admin123" # 实际应用中应从安全存储获取 + self.start_time = time.strftime("%Y-%m-%d %H:%M:%S") + + # 在初始化方法中调用 setup_routes + self.setup_routes() + + def check_auth(self, username, password): + """验证用户名和密码""" + return username == self.username and password == self.password + + def authenticate(self): + """发送401响应要求认证""" + return Response( + "Unauthorized", + 401, + {"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'} + ) + + def requires_auth(self, f): + """装饰器:需要认证的路由,保留函数元数据""" + + @wraps(f) + def decorated(*args, **kwargs): + if not self.auth_enabled: + return f(*args, **kwargs) + + auth = request.authorization + if not auth or not self.check_auth(auth.username, auth.password): + return self.authenticate() + return f(*args, **kwargs) + + return decorated + + def get_summary(self): + """获取缓存摘要信息""" + summary = {} + for plc_name, areas in self.cache_manager.cache.items(): + summary[plc_name] = {} + for area_name, area in areas.items(): + last_update = self.cache_manager.last_update[plc_name][area_name] + plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown") + + summary[plc_name][area_name] = { + "status": area["status"], + "plc_connection_status": plc_status, + "last_update": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(last_update)) if last_update > 0 else "Never", + "size": area["size"], + "type": area["type"] + } + return summary + + def setup_routes(self): + """设置所有API路由""" + + # =========================== + # 主页面 - 状态摘要 + # =========================== + @self.app.route("/", endpoint="index") + def index(): + summary = self.get_summary() + return render_template( + "status.html", # 模板文件名 + start_time=self.start_time, + summary=summary, + plc_statuses=self.cache_manager.plc_connection_status + ) + + # =========================== + # 系统状态API + # =========================== + @self.app.route("/api/status", endpoint="system_status") + def system_status(): + """获取系统状态信息""" + plc_statuses = {} + for plc_name in self.cache_manager.plc_connection_status: + plc_statuses[plc_name] = { + "status": self.cache_manager.plc_connection_status[plc_name], + "last_connected": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(self.cache_manager.plc_last_connected[plc_name])) + if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never" + } + + return jsonify({ + "status": "running", + "start_time": self.start_time, + "plc_count": len(self.config_manager.get_config().get("plcs", [])), + "cache_size": sum( + len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()), + "plc_statuses": plc_statuses + }) + + # =========================== + # 配置管理相关路由 + # =========================== + @self.app.route("/config", endpoint="config_page") + @self.requires_auth + def config_page(): + """配置编辑页面""" + config = self.config_manager.get_config() + config_json = json.dumps(config, indent=2) + + return render_template( + 'config.html', + config_json=config_json, + username=self.username, + password=self.password + ) + + # 配置验证API + @self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config") + @self.requires_auth + def validate_config(): + """验证配置是否有效""" + try: + config = request.json + is_valid, error = self.config_manager.validate_config(config) + if is_valid: + return jsonify({"valid": True}) + else: + return jsonify({"valid": False, "message": error}), 400 + except Exception as e: + return jsonify({"valid": False, "message": str(e)}), 400 + + # 配置获取API + @self.app.route("/api/config", methods=["GET"], endpoint="get_config") + @self.requires_auth + def get_config(): + """获取当前配置""" + return jsonify(self.config_manager.get_config()) + + # 配置保存API + @self.app.route("/api/config", methods=["POST"], endpoint="save_config") + @self.requires_auth + def save_config(): + """保存配置""" + try: + config = request.json + reload = request.args.get('reload', 'false').lower() == 'true' + + success, message = self.config_manager.save_config(config) + if success: + if reload: + # 通知主应用程序重载配置 + if hasattr(self.cache_manager, 'app') and self.cache_manager.app: + self.cache_manager.app.request_reload() + return jsonify({ + "success": True, + "message": "Configuration saved and reload requested" + }) + else: + return jsonify({ + "success": True, + "message": "Configuration saved successfully (restart to apply changes)" + }) + else: + return jsonify({ + "success": False, + "message": message + }), 400 + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error saving config: {str(e)}" + }), 500 + + # =========================== + # 新增 API 文档接口 + # =========================== + @self.app.route("/api/doc", endpoint="api_doc") + def api_doc(): + """API文档页面""" + return render_template('api_doc.html') + + # =========================== + # 数据访问API + # =========================== + # 单个读取接口 + @self.app.route("/api/read////", methods=["GET"], + endpoint="single_read") + def single_read(plc_name, area_name, offset, length): + """从指定区域读取数据""" + data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": length, + "data": list(data), + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 单个读取BOOL类型接口 + @self.app.route("/api/read_bool////", methods=["GET"], + endpoint="single_read_bool") + def single_read_bool(plc_name, area_name, offset, length): + """从指定区域读取数据""" + data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset, + length) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": length, + "data": [data], # list(data) + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 单个写入接口 + @self.app.route("/api/write///", methods=["POST"], endpoint="single_write") + def single_write(plc_name, area_name, offset): + """向指定区域写入数据""" + data = request.data + if not data: + # 如果没有提供数据,返回错误 + return jsonify({ + "status": "error", + "message": "No data provided", + "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": len(data), + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 单个写入BOOL类型接口 + @self.app.route("/api/write_bool///", methods=["POST"], + endpoint="single_write_bool") + def single_write_bool(plc_name, area_name, offset): + """向指定区域写入数据""" + data = request.data + if not data: + # 如果没有提供数据,返回错误 + return jsonify({ + "status": "error", + "message": "No data provided", + "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset, + data) + if error: + return jsonify({ + "status": "error", + "plc_name": plc_name, + "area_name": area_name, + "message": error, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(update_time)) if update_time > 0 else "Never" + }), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": 1, + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) + }) + + # 批量读取接口 + @self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read") + def batch_read(): + """批量读取多个区域的数据""" + try: + # 确保是JSON请求 + if not request.is_json: + return jsonify({ + "status": "error", + "message": "Request must be JSON (Content-Type: application/json)", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + requests = request.json + if not isinstance(requests, list): + return jsonify({ + "status": "error", + "message": "Request must be a JSON array", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + # 添加详细日志 + self.logger.info(f"Received batch read request: {json.dumps(requests)}") + + results = self.cache_manager.batch_read(requests) + return jsonify(results) + except Exception as e: + self.logger.error(f"Batch read error: {str(e)}", exc_info=True) + return jsonify({ + "status": "error", + "message": f"Internal server error: {str(e)}", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 500 + + # 批量写入接口 + @self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write") + def batch_write(): + """批量写入多个区域的数据""" + try: + if not request.is_json: + return jsonify({ + "status": "error", + "message": "Request must be JSON (Content-Type: application/json)", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + requests = request.json + if not isinstance(requests, list): + return jsonify({ + "status": "error", + "message": "Request must be a JSON array", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 400 + + self.logger.info(f"Received batch write request: {json.dumps(requests)}") + + results = self.cache_manager.batch_write(requests) + return jsonify(results) + except Exception as e: + self.logger.error(f"Batch write error: {str(e)}", exc_info=True) + return jsonify({ + "status": "error", + "message": f"Internal server error: {str(e)}", + "plc_connection_status": "unknown", + "last_update": 0, + "last_update_formatted": "N/A" + }), 500 + + # 区域状态检查 + @self.app.route("/api/status//", methods=["GET"], endpoint="area_status") + def area_status(plc_name, area_name): + """获取区域状态""" + status = self.cache_manager.get_area_status(plc_name, area_name) + return jsonify(status) + + # 获取解析后的数据 + @self.app.route("/api/data//", methods=["GET"], endpoint="get_parsed_data") + def get_parsed_data(plc_name, area_name): + """获取解析后的数据""" + return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name)) + + def start(self): + """启动API服务器""" + self.server_thread = threading.Thread( + target=self.app.run, + kwargs={ + "host": "0.0.0.0", + "port": 5000, + "threaded": True, + "use_reloader": False # 避免在生产环境中使用重载器 + }, + daemon=True, + name="APIServerThread" + ) + self.server_thread.start() + self.logger.info("API server started at http://0.0.0.0:5000") \ No newline at end of file diff --git a/gateway/cache_manager.py b/gateway/cache_manager.py new file mode 100644 index 0000000..fd6621d --- /dev/null +++ b/gateway/cache_manager.py @@ -0,0 +1,877 @@ +import threading +import time +import logging +from snap7.util import * +import struct + +class CacheManager: + """PLC数据缓存管理器""" + + def __init__(self, config, plc_manager, app=None): + """ + 初始化缓存管理器 + + Args: + config: 配置对象 + plc_manager: PLC管理器实例 + app: 主应用程序引用(用于配置重载) + """ + self.plc_manager = plc_manager + self.config = config + self.app = app + self.cache = {} + self.refresh_interval = 1 # 1秒刷新一次 + self.running = False + self.lock = threading.Lock() + self.thread = None + self.last_update = {} # 区域级最后更新时间 + self.plc_last_connected = {} # PLC级最后连接时间 + self.plc_connection_status = {} # PLC连接状态 + self.logger = logging.getLogger("CacheManager") + self.init_cache() + + def init_cache(self): + """初始化缓存结构""" + for plc in self.config["plcs"]: + plc_name = plc["name"] + self.cache[plc_name] = {} + self.last_update[plc_name] = {} + self.plc_last_connected[plc_name] = 0 # 初始化为0(未连接) + self.plc_connection_status[plc_name] = "never_connected" + + for area in plc["areas"]: + name = area["name"] + # 确保初始状态为断开 + self.cache[plc_name][name] = { + "data": bytearray(area["size"]), + "db_number": area["db_number"], + "offset": area["offset"], + "size": area["size"], + "type": area["type"], + "structure": area.get("structure", []), + "status": "disconnected" # 初始状态为断开 + } + self.last_update[plc_name][name] = 0 + + def refresh_cache(self): + """后台线程:定期刷新缓存""" + while self.running: + start_time = time.time() + + try: + for plc in self.config["plcs"]: + plc_name = plc["name"] + refresh_interval = plc.get("refresh_interval", 0.5) + client = self.plc_manager.get_plc(plc_name) + + # 检查PLC连接状态 + plc_connected = client.connected + + # 更新PLC连接状态 + with self.lock: + if plc_connected: + self.plc_last_connected[plc_name] = time.time() + self.plc_connection_status[plc_name] = "connected" + else: + if self.plc_last_connected[plc_name] == 0: + self.plc_connection_status[plc_name] = "never_connected" + else: + self.plc_connection_status[plc_name] = "disconnected" + + # 刷新所有可读区域 + for area in plc["areas"]: + if area["type"] in ["read", "read_write"]: + name = area["name"] + try: + data = client.read_db(area["db_number"], area["offset"], area["size"]) + + # 更新区域状态基于PLC连接状态和读取结果 + with self.lock: + if plc_connected and data and len(data) == area["size"]: + self.cache[plc_name][name]["data"] = bytearray(data) + self.cache[plc_name][name]["status"] = "connected" + self.last_update[plc_name][name] = time.time() + else: + self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name] + # 如果之前有数据,保留旧数据但标记状态 + if self.last_update[plc_name][name] > 0: + self.logger.info(f"PLC {plc_name} area {name} disconnected but keeping last valid data") + except Exception as e: + with self.lock: + self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name] + self.logger.warning(f"Error updating status for {plc_name}/{name}: {e}") + + """计算刷新一个PLC的时间""" + # 计算实际执行时间 + execution_time = time.time() - start_time + #计算需要睡眠的时间,确保总等于refresh_time + sleep_time = max(0, refresh_interval - execution_time) + time.sleep(sleep_time) + + # 记录实际刷新间隔 + self.logger.debug(f"plc_name: {plc_name}," + f"Cache refresh completed.Execution time: {execution_time:.3f}s," + f"Sleep time: {sleep_time:.3f}s," + f"Total interval: {execution_time + sleep_time:.3f}s") + + time.sleep(self.refresh_interval) + except Exception as e: + self.logger.error(f"Error in refresh_cache: {e}") + time.sleep(self.refresh_interval) + + def start(self): + """启动缓存刷新线程""" + if self.running: + return + + self.running = True + self.thread = threading.Thread( + target=self.refresh_cache, + name="CacheRefreshThread", + daemon=True + ) + self.thread.start() + self.logger.info("Cache manager started") + + def stop(self): + """停止缓存刷新线程""" + if not self.running: + return + + self.running = False + if self.thread: + # 等待线程结束,但设置超时防止卡死 + self.thread.join(timeout=2.0) + if self.thread.is_alive(): + self.logger.warning("Cache refresh thread did not terminate gracefully") + self.thread = None + self.logger.info("Cache manager stopped") + + def get_plc_connection_status(self, plc_name): + """获取PLC连接状态""" + with self.lock: + return self.plc_connection_status.get(plc_name, "unknown") + + def get_last_update_time(self, plc_name, area_name): + """获取区域数据最后更新时间""" + with self.lock: + return self.last_update.get(plc_name, {}).get(area_name, 0) + + def get_summary(self): + """获取缓存摘要信息""" + summary = {} + with self.lock: + for plc_name, areas in self.cache.items(): + summary[plc_name] = {} + for area_name, area in areas.items(): + last_update = self.last_update[plc_name][area_name] + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 区域状态应与PLC连接状态一致,除非有有效数据 + area_status = area["status"] + if plc_status == "never_connected": + area_status = "never_connected" + elif plc_status == "disconnected" and self.last_update[plc_name][area_name] == 0: + area_status = "disconnected" + + summary[plc_name][area_name] = { + "status": area_status, + "plc_connection_status": plc_status, + "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", + "size": area["size"], + "type": area["type"] + } + return summary + + def get_area_status(self, plc_name, area_name): + """获取区域状态""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return {"status": "not_found", "message": "PLC or area not found"} + + plc_status = self.plc_connection_status.get(plc_name, "unknown") + last_update = self.last_update.get(plc_name, {}).get(area_name, 0) + + # 区域状态应与PLC连接状态一致,除非有有效数据 + area_status = area["status"] + if plc_status == "never_connected": + area_status = "never_connected" + elif plc_status == "disconnected" and last_update == 0: + area_status = "disconnected" + + return { + "status": area_status, + "plc_connection_status": plc_status, + "last_update": last_update, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", + "size": area["size"], + "type": area["type"] + } + + def read_area(self, plc_name, area_name, offset, length): + """单个区域读取""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + print("read area :",area) + if not area: + return None, "Area not found", "unknown", 0 + + if offset + length > area["size"]: + return None, "Offset out of bounds", "unknown", 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return None, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + data = client.read_db(area["db_number"], area["offset"] + offset, length) + # 验证数据有效性 + if data and len(data) == length: + # 更新缓存中的这部分数据 + for i in range(length): + area["data"][offset + i] = data[i] + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected" + + return data, None, plc_status, update_time + else: + area["status"] = plc_status + return None, "Invalid data returned", plc_status, 0 + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") + return None, f"Read failed: {str(e)}", plc_status, 0 + + def read_area_bool(self, plc_name, area_name, offset, length): + """单个区域读取""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return None, "Area not found", "unknown", 0 + + if offset + length > area["size"]: + return None, "Offset out of bounds", "unknown", 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return None, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + data = client.read_db_bool(area["db_number"], area["offset"] + offset, length) + # 验证数据有效性 + if all(isinstance(val, bool) for val in data.values()): + # 按字典键顺序更新多个值 + for i, val in data.items(): + area["data"][offset + i] = val # 确保offset+i不越界 + + # area["data"][offset] = data.values + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected" + + return data, None, plc_status, update_time + else: + area["status"] = plc_status + return None, "Invalid data returned", plc_status, 0 + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") + return None, f"Read failed: {str(e)}", plc_status, 0 + + def write_area(self, plc_name, area_name, offset, data): + """单个区域写入""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return False, "Area not found", "unknown", 0 + + if area["type"] not in ["write", "read_write"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Area is read-only", plc_status, 0 + + if offset + len(data) > area["size"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Offset out of bounds", plc_status, 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return False, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + success = client.write_db(area["db_number"], area["offset"] + offset, data) + if success: + # 更新缓存中的这部分数据 + for i in range(len(data)): + area["data"][offset + i] = data[i] + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected (last write)" + + return True, None, plc_status, update_time + else: + area["status"] = plc_status + return False, "Write failed", plc_status, 0 + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") + return False, f"Write failed: {str(e)}", plc_status, 0 + + def batch_write_area(self, plc_name, area_name, offset, data): + """单个区域写入""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return False, "Area not found", "unknown", 0 + + if area["type"] not in ["write", "read_write"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Area is read-only", plc_status, 0 + + if offset + len(data) > area["size"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Offset out of bounds", plc_status, 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return False, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + for i, byte in enumerate(data): + byte_data = bytes([byte]) + current_offset = offset + (i * 2) + byte_value = byte_data[0] + + value = bytearray(2) + if isinstance(byte_value, int): + set_int(value, 0, byte_value) + data = value + + success = client.batch_write_db(area["db_number"], current_offset, data) + if success: + # 更新缓存中的这部分数据 + for j in range(len(data)): + area["data"][offset + j] = data[j] + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected (last write)" + else: + area["status"] = plc_status + return False, "Write failed", plc_status, 0 + return True, None, plc_status, update_time + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") + return False, f"Write failed: {str(e)}", plc_status, 0 + + def batch_write_bool_area(self, plc_name, area_name, offset, data): + """单个区域写入""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return False, "Area not found", "unknown", 0 + + if area["type"] not in ["write", "read_write"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Area is read-only", plc_status, 0 + + if offset + len(data) > area["size"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Offset out of bounds", plc_status, 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return False, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + value = bytearray(offset + 1) + for bit, bit_value in enumerate(data): + set_bool(value, offset, bit, bit_value) + data = value + + success = client.batch_write_db_bool(area["db_number"], offset, data) + if success: + # 更新缓存中的这部分数据 + for j in range(len(data)): + area["data"][offset + j] = data[j] + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected (last write)" + else: + area["status"] = plc_status + return False, "Write failed", plc_status, 0 + return True, None, plc_status, update_time + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") + return False, f"Write failed: {str(e)}", plc_status, 0 + + def write_area_bool(self, plc_name, area_name, offset, data): + """单个区域写入""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return False, "Area not found", "unknown", 0 + + if area["type"] not in ["write", "read_write"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Area is read-only", plc_status, 0 + + if offset + len(data) > area["size"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Offset out of bounds", plc_status, 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return False, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + success = client.write_db_bool(area["db_number"], area["offset"] + offset, data) + if success: + # 更新缓存中的这部分数据 + for i in range(len(data)): + area["data"][offset + i] = data[i] + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected (last write)" + + return True, None, plc_status, update_time + else: + area["status"] = plc_status + return False, "Write failed", plc_status, 0 + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") + return False, f"Write failed: {str(e)}", plc_status, 0 + + def batch_read(self, requests): + """批量读取""" + results = [] + + for req in requests: + plc_name = req["plc_name"] + area_name = req["area_name"] + offset = req.get("offset", 0) + length = req.get("length", None) + + # 获取PLC连接状态 + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": 0, + "last_update_formatted": "N/A", + "message": f"PLC not connected (status: {plc_status})" + }) + continue + + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": 0, + "last_update_formatted": "N/A", + "message": "Area not found" + }) + continue + + # 如果未指定length,读取整个区域 + if length is None: + length = area["size"] - offset + + data, error, _, update_time = self.read_area(plc_name, area_name, offset, length) + if error: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", + "message": error + }) + else: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "success", + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), + "offset": offset, + "length": length, + "data": list(data) + }) + return results + + def batch_write(self, requests): + """批量写入""" + results = [] + for req in requests: + plc_name = req["plc_name"] + area_name = req["area_name"] + offset = req["offset"] + data = bytes(req["data"]) + + # 获取PLC连接状态 + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": 0, + "last_update_formatted": "N/A", + "message": f"PLC not connected (status: {plc_status})", + "offset": offset + }) + continue + success, error, _, update_time = self.batch_write_area(plc_name, area_name, offset, data) + if error: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", + "message": error, + "offset": offset + }) + else: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "success", + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), + "offset": offset, + "length": len(data) + }) + return results + + def batch_write_bool(self, requests): + """批量写入""" + results = [] + for req in requests: + plc_name = req["plc_name"] + area_name = req["area_name"] + offset = req["offset"] + data = bytes(req["data"]) + + # 获取PLC连接状态 + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": 0, + "last_update_formatted": "N/A", + "message": f"PLC not connected (status: {plc_status})", + "offset": offset + }) + continue + + success, error, _, update_time = self.batch_write_bool_area(plc_name, area_name, offset, data) + if error: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", + "message": error, + "offset": offset + }) + else: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "success", + "plc_connection_status": plc_status, + "last_update": update_time, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), + "offset": offset, + "length": len(data) + }) + return results + + def read_generic(self, plc_name, area_name, offset, data_type, count=1): + """通用读取接口""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + print("area:",area) + if not area: + return None, "Area not found", "unknown", 0 + + if area["type"] not in ["read", "read_write"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return None, "Area is read-only", plc_status, 0 + + # 计算实际DB偏移 + db_offset = area["offset"] + offset + + # 确保在区域内 + if data_type == 'bool': + required_size = (offset + count + 7) // 8 + elif data_type in ['int', 'word']: + required_size = 2 * count + elif data_type in ['dint', 'dword', 'real']: + required_size = 4 * count + else: # byte + required_size = count + + if db_offset + required_size > area["size"] or db_offset < 0: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return None, "Offset out of bounds", plc_status, 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return None, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + # 使用Snap7Client的read_generic方法 + result = client.read_generic(area["db_number"], db_offset, data_type, count) + if result is None: + return None, "Read failed", plc_status, 0 + + # 对于bool类型,需要特殊处理缓存 + if data_type == 'bool': + for i in range(count): + byte_offset = offset // 8 + i // 8 + bit_offset = (offset % 8) + (i % 8) + if bit_offset >= 8: + byte_offset += 1 + bit_offset -= 8 + + # 读取当前字节值 + current_byte = area["data"][byte_offset] + if result[i]: + # 设置位为1 + new_byte = current_byte | (1 << bit_offset) + else: + # 设置位为0 + new_byte = current_byte & ~(1 << bit_offset) + area["data"][byte_offset] = new_byte + else: + # 对于其他类型,直接更新缓存 + if not isinstance(result, list): + result = [result] + + if data_type == 'byte': + item_size = 1 + elif data_type in ['int', 'word']: + item_size = 2 + else: # dint, dword, real + item_size = 4 + + for i, val in enumerate(result): + item_offset = offset + i * item_size + if data_type == 'byte': + area["data"][item_offset] = val & 0xFF + elif data_type in ['int', 'word']: + # 2字节数据 + packed = struct.pack(">h" if data_type == "int" else ">H", val) + for j in range(2): + area["data"][item_offset + j] = packed[j] + elif data_type in ['dint', 'dword', 'real']: + # 4字节数据 + packed = struct.pack( + ">l" if data_type == "dint" else + ">I" if data_type == "dword" else + ">f", + val + ) + for j in range(4): + area["data"][item_offset + j] = packed[j] + + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected" + return result, None, plc_status, update_time + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") + return None, f"Read failed: {str(e)}", plc_status, 0 + + def write_generic(self, plc_name, area_name, offset, data_type, value): + """通用写入接口""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return False, "Area not found", "unknown", 0 + + if area["type"] not in ["write", "read_write"]: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Area is read-only", plc_status, 0 + + # 计算实际DB偏移 + db_offset = area["offset"] + offset + + # 确保在区域内 + if data_type == 'bool': + # 确定存储这些布尔值需要多少字节。 + required_size = (offset + (len(value) if isinstance(value, list) else 1) + 7) // 8 + elif data_type in ['int', 'word']: + required_size = 2 * (len(value) if isinstance(value, list) else 1) + elif data_type in ['dint', 'dword', 'real']: + required_size = 4 * (len(value) if isinstance(value, list) else 1) + else: # byte + required_size = len(value) if isinstance(value, list) else 1 + + if db_offset + required_size > area["size"] or db_offset < 0: + plc_status = self.plc_connection_status.get(plc_name, "unknown") + return False, "Offset out of bounds", plc_status, 0 + + client = self.plc_manager.get_plc(plc_name) + plc_status = self.plc_connection_status.get(plc_name, "unknown") + + # 如果PLC未连接,直接返回错误 + if plc_status != "connected": + return False, f"PLC not connected (status: {plc_status})", plc_status, 0 + + try: + # 使用Snap7Client的write_generic方法 + success = client.write_generic(area["db_number"], db_offset, data_type, value) + if success: + # 根据数据类型更新缓存 + if data_type == 'bool': + # 处理bool写入 + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + byte_offset = offset // 8 + i // 8 + bit_offset = (offset % 8) + (i % 8) + if bit_offset >= 8: + byte_offset += 1 + bit_offset -= 8 + + # 读取当前字节值 + current_byte = area["data"][byte_offset] + if val: + # 设置位为1 + new_byte = current_byte | (1 << bit_offset) + else: + # 设置位为0 + new_byte = current_byte & ~(1 << bit_offset) + area["data"][byte_offset] = new_byte + elif data_type == 'byte': + # 处理byte写入 + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + area["data"][offset + i] = val & 0xFF + elif data_type in ['int', 'word']: + # 处理int/word写入 + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + # 2字节数据 + packed = struct.pack(">h" if data_type == "int" else ">H", val) + for j in range(2): + area["data"][offset + i * 2 + j] = packed[j] + elif data_type in ['dint', 'dword', 'real']: + # 处理dint/dword/real写入 + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + # 4字节数据 + packed = struct.pack( + ">l" if data_type == "dint" else + ">I" if data_type == "dword" else + ">f", + val + ) + for j in range(4): + area["data"][offset + i * 4 + j] = packed[j] + + update_time = time.time() + self.last_update[plc_name][area_name] = update_time + area["status"] = "connected (last write)" + return True, None, plc_status, update_time + else: + area["status"] = plc_status + return False, "Write failed", plc_status, 0 + except Exception as e: + area["status"] = plc_status + self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") + return False, f"Write failed: {str(e)}", plc_status, 0 + + def get_parsed_data(self, plc_name, area_name): + """获取解析后的数据""" + from data_parser import parse_data + + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return {"error": "Area not found"} + + plc_status = self.plc_connection_status.get(plc_name, "unknown") + last_update = self.last_update.get(plc_name, {}).get(area_name, 0) + + # 区域状态应与PLC连接状态一致,除非有有效数据 + area_status = area["status"] + if plc_status == "never_connected": + area_status = "never_connected" + elif plc_status == "disconnected" and last_update == 0: + area_status = "disconnected" + + structure = area.get("structure", []) + if structure: + parsed = parse_data(area["data"], structure) + parsed["status"] = area_status + parsed["plc_connection_status"] = plc_status + parsed["last_update"] = last_update + parsed["last_update_formatted"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never" + return parsed + else: + return { + "raw_data": list(area["data"]), + "status": area_status, + "plc_connection_status": plc_status, + "last_update": last_update, + "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never" + } \ No newline at end of file diff --git a/gateway/config_loader.py b/gateway/config_loader.py new file mode 100644 index 0000000..772dfd3 --- /dev/null +++ b/gateway/config_loader.py @@ -0,0 +1,10 @@ +import json +import os + +def load_config(config_path="config/config.json"): + """加载配置文件""" + if not os.path.exists(config_path): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + with open(config_path, "r") as f: + return json.load(f) \ No newline at end of file diff --git a/gateway/config_manager.py b/gateway/config_manager.py new file mode 100644 index 0000000..b01e50b --- /dev/null +++ b/gateway/config_manager.py @@ -0,0 +1,87 @@ +import json +import os +import logging +from config_validator import ConfigValidator + +class ConfigManager: + """配置文件管理器""" + + def __init__(self, config_path="../config/config.json"): + self.config_path = config_path + self.config = None + self.load_config() + + def load_config(self): + """加载配置文件""" + try: + if not os.path.exists(self.config_path): + # 尝试从备份恢复 + backup_path = self.config_path + ".bak" + if os.path.exists(backup_path): + logging.warning(f"Main config not found, using backup: {backup_path}") + with open(backup_path, 'r') as src, open(self.config_path, 'w') as dst: + config_data = src.read() + dst.write(config_data) + else: + raise FileNotFoundError(f"Configuration file not found: {self.config_path}") + + with open(self.config_path, 'r') as f: + self.config = json.load(f) + + # 验证配置 + is_valid, error = ConfigValidator.validate_config(self.config) + if not is_valid: + logging.error(f"Invalid configuration: {error}") + # 尝试从备份恢复 + backup_path = self.config_path + ".bak" + if os.path.exists(backup_path): + logging.warning("Attempting to load from backup configuration") + with open(backup_path, 'r') as f: + self.config = json.load(f) + is_valid, error = ConfigValidator.validate_config(self.config) + if not is_valid: + raise ValueError(f"Backup config also invalid: {error}") + else: + raise ValueError(f"Invalid configuration: {error}") + + return True, None + except Exception as e: + logging.error(f"Failed to load config: {e}") + self.config = {"plcs": []} + return False, str(e) + + def get_config(self): + """获取当前配置""" + return self.config + + def validate_config(self, config): + """验证配置是否有效""" + return ConfigValidator.validate_config(config) + + def save_config(self, new_config): + """保存配置文件""" + try: + # 验证配置 + is_valid, error = self.validate_config(new_config) + if not is_valid: + return False, f"Invalid configuration: {error}" + + # 备份旧配置 + backup_path = self.config_path + ".bak" + if os.path.exists(self.config_path): + with open(self.config_path, 'r') as src, open(backup_path, 'w') as dst: + dst.write(src.read()) + + # 保存新配置 + with open(self.config_path, 'w') as f: + json.dump(new_config, f, indent=2) + + self.config = new_config + return True, None + except Exception as e: + logging.error(f"Failed to save config: {e}") + return False, str(e) + + def reload_config(self): + """重新加载配置""" + return self.load_config() \ No newline at end of file diff --git a/gateway/config_validator.py b/gateway/config_validator.py new file mode 100644 index 0000000..f3b348b --- /dev/null +++ b/gateway/config_validator.py @@ -0,0 +1,91 @@ +import json +from jsonschema import validate, Draft7Validator, FormatChecker + +class ConfigValidator: + """配置文件验证器""" + + SCHEMA = { + "type": "object", + "properties": { + "plcs": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "ip": {"type": "string", "format": "ipv4"}, + "rack": {"type": "integer", "minimum": 0}, + "slot": {"type": "integer", "minimum": 0}, + "refresh_interval": { + "type": "number", + "minimum": 0.01, + "default": 0.5 + }, + "areas": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "type": {"type": "string", "enum": ["read", "write", "read_write"]}, + "db_number": {"type": "integer", "minimum": 1}, + "offset": {"type": "integer", "minimum": 0}, + "size": {"type": "integer", "minimum": 1}, + "structure": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "type": {"type": "string", "enum": ["bool", "byte", "int", "dint", "real", "word", "dword"]}, + "offset": {"type": "integer", "minimum": 0}, + "bit": {"type": "integer", "minimum": 0, "maximum": 7} # 修复了这里 + }, + "required": ["name", "type", "offset"] + } + } + }, + "required": ["name", "type", "db_number", "offset", "size"] + } + } + }, + "required": ["name", "ip", "rack", "slot", "areas"] + } + } + }, + "required": ["plcs"] + } + + """ + @staticmethod:静态方法装饰器,使validate_config不依赖于类的实例,可以通过类直接调用 + 如ConfigValidator.validate_config(config) + """ + @staticmethod + def validate_config(config): + """使用JSONSchema验证配置是否符合规范""" + try: + # 添加IPv4格式验证 + validator = Draft7Validator( + ConfigValidator.SCHEMA, + format_checker=FormatChecker(["ipv4"]) + ) + validator.validate(config) + return True, None + except Exception as e: + return False, str(e) + + @staticmethod + def is_valid_ip(ip): + """验证IP地址格式""" + parts = ip.split('.') + if len(parts) != 4: + return False + for part in parts: + if not part.isdigit(): + return False + num = int(part) + if num < 0 or num > 255: + return False + return True \ No newline at end of file diff --git a/gateway/data_parser.py b/gateway/data_parser.py new file mode 100644 index 0000000..0c4e236 --- /dev/null +++ b/gateway/data_parser.py @@ -0,0 +1,55 @@ +from struct import unpack +import time + +def parse_data(data, structure): + """解析结构化数据""" + result = {"raw_data": list(data)} + + if not structure: + return result + + result["parsed"] = {} + for field in structure: + offset = field["offset"] + name = field["name"] + data_type = field["type"] + + try: + if data_type == "int": + if offset + 2 > len(data): + raise ValueError("Offset out of bounds") + val = unpack(">h", data[offset:offset+2])[0] + elif data_type == "dint": + if offset + 4 > len(data): + raise ValueError("Offset out of bounds") + val = unpack(">l", data[offset:offset+4])[0] + elif data_type == "real": + if offset + 4 > len(data): + raise ValueError("Offset out of bounds") + val = unpack(">f", data[offset:offset+4])[0] + elif data_type == "bool": + bit = field.get("bit", 0) + if offset >= len(data): + raise ValueError("Offset out of bounds") + byte = data[offset] + val = bool((byte >> bit) & 1) + elif data_type == "byte": + if offset >= len(data): + raise ValueError("Offset out of bounds") + val = data[offset] + elif data_type == "word": + if offset + 2 > len(data): + raise ValueError("Offset out of bounds") + val = (data[offset] << 8) | data[offset + 1] + elif data_type == "dword": + if offset + 4 > len(data): + raise ValueError("Offset out of bounds") + val = (data[offset] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | data[offset+3] + else: + val = f"Unknown type: {data_type}" + + result["parsed"][name] = val + except Exception as e: + result["parsed"][name] = f"Error: {str(e)}" + + return result \ No newline at end of file diff --git a/gateway/main.py b/gateway/main.py new file mode 100644 index 0000000..0295fac --- /dev/null +++ b/gateway/main.py @@ -0,0 +1,109 @@ +import logging +import time +import threading +from config_loader import load_config +from plc_manager import PLCManager +from cache_manager import CacheManager +from api_server import APIServer +from config_manager import ConfigManager + +class GatewayApp: + """PLC网关应用程序主类""" + + def __init__(self, config_path="../config/config.json"): + self.config_path = config_path + self.config_manager = ConfigManager(config_path) + self.plc_manager = None + self.cache_manager = None + self.api_server = None + self.reload_flag = False + self.reload_lock = threading.Lock() + self.logger = logging.getLogger("GatewayApp") + + # 加载初始配置 + self.load_configuration() + + def load_configuration(self): + """加载配置并初始化组件""" + # 加载配置 + if not self.config_manager.load_config(): + self.logger.error("Failed to load initial configuration") + return False + + config = self.config_manager.get_config() + + # 重新初始化PLC连接 + if self.plc_manager: + self.logger.info("Reinitializing PLC connections...") + self.plc_manager = PLCManager(config["plcs"]) + self.plc_manager.connect_all() + + # 重新初始化缓存 + if self.cache_manager: + self.logger.info("Stopping existing cache manager...") + self.cache_manager.stop() + + self.logger.info("Initializing cache manager...") + self.cache_manager = CacheManager(config, self.plc_manager, app=self) + self.cache_manager.start() + + # 重新初始化API服务器 + if self.api_server: + self.logger.info("API server already running") + else: + self.logger.info("Starting API server...") + self.api_server = APIServer(self.cache_manager, self.config_path) + self.api_server.start() + + self.logger.info("Configuration loaded successfully") + return True + + def check_for_reload(self): + """检查是否需要重载配置""" + while True: + with self.reload_lock: + if self.reload_flag: + self.reload_flag = False + self.load_configuration() + time.sleep(1) + + def request_reload(self): + """请求重载配置""" + with self.reload_lock: + self.reload_flag = True + self.logger.info("Configuration reload requested") + + def run(self): + """运行主程序""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' + ) + + self.logger.info("Starting PLC Gateway...") + + # 启动配置重载检查线程 + reload_thread = threading.Thread( + target=self.check_for_reload, + name="ConfigReloadThread", + daemon=True + ) + reload_thread.start() + + try: + # 保持主程序运行 + while True: + time.sleep(1) + except KeyboardInterrupt: + self.logger.info("Shutting down gracefully...") + finally: + if self.cache_manager: + self.cache_manager.stop() + self.logger.info("Shutdown complete") + +def main(): + app = GatewayApp() + app.run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/gateway/plc_manager.py b/gateway/plc_manager.py new file mode 100644 index 0000000..038b1e8 --- /dev/null +++ b/gateway/plc_manager.py @@ -0,0 +1,42 @@ +from snap7_client import Snap7Client +import logging + +class PLCManager: + """PLC连接管理器,管理多个PLC连接""" + + def __init__(self, plcs_config): + """ + 初始化PLC管理器 + + Args: + plcs_config: PLC配置列表 + """ + self.plcs = {} + for plc_config in plcs_config: + name = plc_config["name"] + self.plcs[name] = Snap7Client( + plc_config["ip"], + plc_config["rack"], + plc_config["slot"] + ) + self.logger = logging.getLogger("PLCManager") + + def get_plc(self, name): + """获取指定名称的PLC客户端""" + return self.plcs.get(name) + + def connect_all(self): + """连接所有配置的PLC""" + for name, client in self.plcs.items(): + client.connect() + + def get_connection_status(self): + """获取所有PLC的连接状态""" + status = {} + for name, client in self.plcs.items(): + status[name] = { + "ip": client.ip, + "connected": client.connected, + "retry_count": client.retry_count + } + return status \ No newline at end of file diff --git a/gateway/snap7_client.py b/gateway/snap7_client.py new file mode 100644 index 0000000..e972dd0 --- /dev/null +++ b/gateway/snap7_client.py @@ -0,0 +1,448 @@ +import snap7 +import logging +from threading import Lock +import time +from snap7.util import * +import ast + +class Snap7Client: + """Snap7客户端,处理与PLC的通信""" + + def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1): + """ + 初始化Snap7客户端 + + Args: + ip: PLC IP地址 + rack: Rack编号 + slot: Slot编号 + max_retries: 最大重试次数 + retry_base_delay: 基础重试延迟(秒) + """ + self.ip = ip + self.rack = rack + self.slot = slot + self.client = snap7.client.Client() + self.lock = Lock() + self.connected = False + self.max_retries = max_retries + self.retry_base_delay = retry_base_delay + self.last_connect_attempt = 0 + self.retry_count = 0 + self.logger = logging.getLogger(f"Snap7Client[{ip}]") + + def is_valid_connection(self): + """检查连接是否真正有效""" + try: + # 尝试读取PLC的运行状态 + cpu_state = self.client.get_cpu_state() + print("当前 CPU 状态:", cpu_state) + return cpu_state in ['S7CpuStatusRun', 'S7CpuStatusStop'] + except: + return False + + def connect(self): + """建立与PLC的连接并验证""" + current_time = time.time() + # 指数退避重试 + if self.retry_count > 0: + delay = min(self.retry_base_delay * (2 ** (self.retry_count - 1)), 30) + if current_time - self.last_connect_attempt < delay: + return False # 未到重试时间 + + self.last_connect_attempt = current_time + try: + self.client.connect(self.ip, self.rack, self.slot) + + # 验证连接是否真正有效 + if self.client.get_connected() and self.is_valid_connection(): + self.connected = True + self.retry_count = 0 # 重置重试计数 + self.logger.info(f"Successfully connected to PLC {self.ip}") + return True + else: + self.connected = False + self.logger.warning(f"Connection to {self.ip} established but PLC is not responding") + try: + self.client.disconnect() + except: + pass + return False + except Exception as e: + self.retry_count = min(self.retry_count + 1, self.max_retries) + self.logger.error(f"Connection to {self.ip} failed (attempt {self.retry_count}/{self.max_retries}): {e}") + self.connected = False + return False + + def read_db(self, db_number, offset, size): + """ + 从DB块读取数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + size: 读取字节数 + + Returns: + bytearray: 读取的数据,如果失败返回None + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Read failed: not connected to {self.ip}") + return None # 返回None而不是零填充数据 + + try: + with self.lock: # 进入锁,其他线程需等待 + data = self.client.db_read(db_number, offset, size) + # 验证返回数据的有效性 + if data is None or len(data) != size: + self.connected = False + self.logger.error(f"Read DB{db_number} returned invalid data size (expected {size}, got {len(data) if data else 0})") + return None + return data + except Exception as e: + self.logger.error(f"Read DB{db_number} error: {e}") + self.connected = False + return None + + def read_db_bool(self, db_number, offset, bit_length): + """ + 从 DB 块中读取一个字节,并提取其中的多个 BOOL 位 + + Args: + db_number (int): DB块编号 + offset (int): 要读取的字节偏移地址 + bit_length: 第几位,如1,表示第1位 + + Returns: + result:返回位值 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Read failed: not connected to {self.ip}") + return None # 返回None而不是零填充数据 + + try: + with self.lock: + data = self.client.db_read(db_number, offset, 1) + result = {} + + for bit in range(bit_length): + result[bit] = bool(data[0] & (1 << bit)) + + if result is None or len(result) != bit_length: + self.connected = False + self.logger.error(f"Read DB{db_number} returned invalid data size (expected {bit_length}, got {len(result) if data else 0})") + return None + return result + except Exception as e: + self.logger.error(f"Read DB{db_number} error: {e}") + self.connected = False + return None + + def write_db(self, db_number, offset, data): + """ + 向DB块写入数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + data: 要写入的数据 + + Returns: + bool: 是否写入成功 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Write failed: not connected to {self.ip}") + return False + + try: + + self.client.db_write(db_number, offset, data) + self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") + return True + except Exception as e: + self.logger.error(f"Write DB{db_number} error: {e}") + self.connected = False + return False + + def batch_write_db(self, db_number, offset, data): + """ + 向DB块写入数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + data: 要写入的数据 + + Returns: + bool: 是否写入成功 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Write failed: not connected to {self.ip}") + return False + + try: + with self.lock: + self.client.db_write(db_number, offset, data) + self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") + return True + except Exception as e: + self.logger.error(f"Write DB{db_number} error: {e}") + self.connected = False + return False + + def write_db_bool(self, db_number, offset, data): + """ + 向DB块写入数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + data: 要写入的bool类型数据 + + Returns: + bool: 是否写入成功 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Write failed: not connected to {self.ip}") + return False + + try: + with self.lock: + + self.client.db_write(db_number, offset, data) + self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") + return True + except Exception as e: + self.logger.error(f"Write DB{db_number} error: {e}") + self.connected = False + return False + + def batch_write_db_bool(self, db_number, offset, data): + """ + 向DB块写入数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + data: 要写入的bool类型数据 + + Returns: + bool: 是否写入成功 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Write failed: not connected to {self.ip}") + return False + + try: + with self.lock: + print(db_number, offset, data) + self.client.db_write(db_number, offset, data) + self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") + return True + except Exception as e: + self.logger.error(f"Write DB{db_number} error: {e}") + self.connected = False + return False + + def read_generic(self, db_number, offset, data_type, count=1): + """ + 通用读取接口,支持多种数据类型 + Args: + db_number: DB块编号 + offset: 起始偏移量(字节或位,对于bool类型) + data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword') + count: 要读取的数据个数 + Returns: + 解析后的数据(单个值或值列表),失败返回None + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Read failed: not connected to {self.ip}") + return None + + try: + if data_type == 'bool': + # 对于bool,offset是位偏移 + byte_offset = offset // 8 + bit_offset = offset % 8 + # 计算需要读取的字节数 + last_bit = bit_offset + count - 1 + last_byte = last_bit // 8 + total_bytes = last_byte - byte_offset + 1 + + # 读取原始字节数据 + data = self.read_db(db_number, byte_offset, total_bytes) + if data is None: + return None + + # 解析bool值 + result = [] + for i in range(count): + current_bit = bit_offset + i + byte_idx = current_bit // 8 + bit_idx = current_bit % 8 + result.append(bool(data[byte_idx] & (1 << bit_idx))) + + return result[0] if count == 1 else result + + elif data_type == 'byte': + data = self.read_db(db_number, offset, count) + if data is None: + return None + return [data[i] for i in range(count)] if count > 1 else data[0] + + elif data_type in ['int', 'word']: + total_bytes = 2 * count + data = self.read_db(db_number, offset, total_bytes) + if data is None: + return None + + result = [] + for i in range(count): + if data_type == 'int': + result.append(get_int(data, i * 2)) + else: # word + result.append(get_word(data, i * 2)) + return result[0] if count == 1 else result + + elif data_type in ['dint', 'dword', 'real']: + total_bytes = 4 * count + data = self.read_db(db_number, offset, total_bytes) + if data is None: + return None + + result = [] + for i in range(count): + if data_type == 'dint': + result.append(get_dint(data, i * 4)) + elif data_type == 'dword': + result.append(get_dword(data, i * 4)) + else: # real + result.append(get_real(data, i * 4)) + return result[0] if count == 1 else result + + else: + self.logger.error(f"Unsupported data type: {data_type}") + return None + + except Exception as e: + self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}") + return None + + def write_generic(self, db_number, offset, data_type, value): + """ + 通用写入接口,支持多种数据类型 + Args: + db_number: DB块编号 + offset: 起始偏移量(字节或位,对于bool类型) + data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword') + value: 要写入的值(可以是单个值或列表) + Returns: + bool: 是否写入成功 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Write failed: not connected to {self.ip}") + return False + + try: + if data_type == 'bool': + # 对于bool,offset是位偏移 + byte_offset = offset // 8 + bit_offset = offset % 8 + + # 读取当前字节 + current_byte = self.read_db(db_number, byte_offset, 1) + if current_byte is None: + return False + + # 修改特定位 + if isinstance(value, list): + # 多个bool值 + for i, val in enumerate(value): + current_bit = bit_offset + i + byte_idx = current_bit // 8 + bit_idx = current_bit % 8 + + if val: + current_byte[0] |= (1 << bit_idx) + else: + current_byte[0] &= ~(1 << bit_idx) + else: + # 单个bool值 + if value: + current_byte[0] |= (1 << bit_offset) + else: + current_byte[0] &= ~(1 << bit_offset) + + # 写回修改后的字节 + return self.write_db_bool(db_number, byte_offset, current_byte) + + elif data_type == 'byte': + if isinstance(value, list): + # 批量写入 + for i, val in enumerate(value): + if val < 0 or val > 255: + self.logger.error(f"Byte value out of range: {val}") + return False + if not self.write_db(db_number, offset + i, bytes([val])): + return False + return True + else: + # 单个字节 + if value < 0 or value > 255: + self.logger.error(f"Byte value out of range: {value}") + return False + return self.write_db(db_number, offset, bytes([value])) + + elif data_type in ['int', 'word']: + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + # 确保int值在范围内 + if data_type == 'int' and (val < -32768 or val > 32767): + self.logger.error(f"Int value out of range: {val}") + return False + elif data_type == 'word' and (val < 0 or val > 65535): + self.logger.error(f"Word value out of range: {val}") + return False + + data = bytearray(2) + if data_type == 'int': + set_int(data, 0, val) + else: + set_word(data, 0, val) + + if not self.write_db(db_number, offset + i * 2, data): + return False + return True + + elif data_type in ['dint', 'dword', 'real']: + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + data = bytearray(4) + if data_type == 'dint': + if val < -2147483648 or val > 2147483647: + self.logger.error(f"DInt value out of range: {val}") + return False + set_dint(data, 0, val) + elif data_type == 'dword': + if val < 0 or val > 4294967295: + self.logger.error(f"DWord value out of range: {val}") + return False + set_dword(data, 0, val) + else: # real + set_real(data, 0, float(val)) + + if not self.write_db(db_number, offset + i * 4, data): + return False + return True + + else: + self.logger.error(f"Unsupported data type: {data_type}") + return False + + except Exception as e: + self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}") + return False \ No newline at end of file diff --git a/gateway/templates/api_doc.html b/gateway/templates/api_doc.html new file mode 100644 index 0000000..e8adcde --- /dev/null +++ b/gateway/templates/api_doc.html @@ -0,0 +1,670 @@ + + + PLC Gateway API Documentation + + + +

PLC Gateway API Documentation

+ + + +

Status API

+ +
+

System Status

+
+ GET + /api/status +
+

获取系统状态信息,包括启动时间、PLC数量和缓存大小。

+ +

响应示例

+
+ { + "status": "running", + "start_time": "2023-10-30 14:30:22", + "plc_count": 2, + "cache_size": 11000, + "plc_statuses": { + "PLC1": { + "status": "connected", + "last_connected": "2023-10-30 14:35:10" + }, + "PLC2": { + "status": "disconnected", + "last_connected": "Never" + } + } + } +
+
+ +
+

Area Status

+
+ GET + /api/status// +
+

获取指定PLC区域的状态信息。

+ +

路径参数

+ + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
+ +

响应示例

+
+ { + "status": "connected", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01", + "size": 4000, + "type": "read" + } +
+
+ +

Data API

+ +
+

Single Read

+
+ GET + /api/read//// +
+

从指定区域读取数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
offset起始偏移量(字节)
length读取长度(字节)
+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Read", + "offset": 0, + "length": 4, + "data": [0, 0, 123, 45], + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+
+ +
+

Single Write

+
+ POST + /api/write/// +
+

向指定区域写入数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Write)
offset起始偏移量(字节)
+ +

请求体

+

原始二进制数据

+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Write", + "offset": 0, + "length": 4, + "plc_connection_status": "connected", + "last_update": 1698754350.789, + "last_update_formatted": "2023-10-30 14:12:30" + } +
+
+ +
+

Single Read Bool

+
+ GET + /api/read_bool//// +
+

从指定区域读取数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
offset起始偏移量(字节)
length读取长度(字节)
+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Read", + "offset": 0, + "length": 2, + "data": [0:False, 1:False], + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+
+ +
+

Single Write Bool

+
+ POST + /api/write_bool/// +
+

向指定区域写入数据。

+ +

路径参数

+ + + + + + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Write)
offset起始偏移量(字节)
+ +

请求体

+

{0:True}

+ +

响应示例

+
+ { + "status": "success", + "plc_name": "PLC1", + "area_name": "DB100_Write", + "offset": 0, + "length": 1, + "plc_connection_status": "connected", + "last_update": 1698754350.789, + "last_update_formatted": "2023-10-30 14:12:30" + } +
+
+ +
+

Batch Read

+
+ POST + /api/batch_read +
+

批量读取多个区域的数据。

+ +

请求体

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
字段类型必需描述
plc_namestringPLC名称(与配置中一致)
area_namestring区域名称(与配置中一致)
offsetnumber起始偏移量(字节),默认为0
lengthnumber读取长度(字节),不提供则读取整个区域
+ +

请求示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Read", + "offset": 0, + "length": 4 + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "offset": 10, + "length": 2 + } + ] +
+ +

响应示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Read", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01", + "offset": 0, + "length": 4, + "data": [0, 0, 123, 45] + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754322.123, + "last_update_formatted": "2023-10-30 14:12:02", + "offset": 10, + "length": 2, + "data": [255, 0] + } + ] +
+
+ +
+

Batch Write

+
+ POST + /api/batch_write +
+

批量写入多个区域的数据。

+ +

请求体

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
字段类型必需描述
plc_namestringPLC名称(与配置中一致)
area_namestring区域名称(与配置中一致)
offsetnumber起始偏移量(字节)
dataarray要写入的数据(字节数组)
+ +

请求示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Write", + "offset": 0, + "data": [1, 2, 3, 4] + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "offset": 10, + "data": [255, 0] + } + ] +
+ +

响应示例

+
+ [ + { + "plc_name": "PLC1", + "area_name": "DB100_Write", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754350.789, + "last_update_formatted": "2023-10-30 14:12:30", + "offset": 0, + "length": 4 + }, + { + "plc_name": "PLC1", + "area_name": "DB202_Params", + "status": "success", + "plc_connection_status": "connected", + "last_update": 1698754351.234, + "last_update_formatted": "2023-10-30 14:12:31", + "offset": 10, + "length": 2 + } + ] +
+
+ +
+

Parsed Data

+
+ GET + /api/data// +
+

获取解析后的数据(如果配置了结构)。

+ +

路径参数

+ + + + + + + + + + + + + +
参数描述
plc_namePLC名称(如PLC1)
area_name区域名称(如DB100_Read)
+ +

响应示例(配置了解析结构)

+
+ { + "parsed": { + "temperature": 25.5, + "pressure": 100, + "status": true + }, + "raw_data": [0, 0, 128, 65, 0, 100], + "status": "connected", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+ +

响应示例(未配置解析结构)

+
+ { + "raw_data": [0, 0, 128, 65, 0, 100], + "status": "connected", + "plc_connection_status": "connected", + "last_update": 1698754321.456, + "last_update_formatted": "2023-10-30 14:12:01" + } +
+
+ +

Configuration API

+ +
+

Get Configuration

+
+ GET + /api/config +
+

获取当前配置。

+ +

认证要求

+

需要Basic Auth认证

+ +

响应示例

+
+ { + "plcs": [ + { + "name": "PLC1", + "ip": "192.168.0.10", + "rack": 0, + "slot": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 4000, + "structure": [ + {"name": "temperature", "type": "real", "offset": 0}, + {"name": "pressure", "type": "int", "offset": 4} + ] + } + ] + } + ] + } +
+
+ +
+

Validate Configuration

+
+ POST + /api/config/validate +
+

验证配置是否有效。

+ +

认证要求

+

需要Basic Auth认证

+ +

请求体

+

要验证的配置JSON

+ +

响应示例(有效)

+
+ { + "valid": true + } +
+ +

响应示例(无效)

+
+ { + "valid": false, + "message": "Invalid configuration: 'ip' is a required property" + } +
+
+ +
+

Save Configuration

+
+ POST + /api/config +
+

保存配置。

+ +

查询参数

+ + + + + + + + + +
参数描述
reload是否立即重载配置(true/false)
+ +

认证要求

+

需要Basic Auth认证

+ +

请求体

+

要保存的配置JSON

+ +

响应示例

+
+ { + "success": true, + "message": "Configuration saved and reload requested" + } +
+
+ + + + \ No newline at end of file diff --git a/gateway/templates/config.html b/gateway/templates/config.html new file mode 100644 index 0000000..b45a010 --- /dev/null +++ b/gateway/templates/config.html @@ -0,0 +1,209 @@ + + + + PLC Gateway Configuration + + + +

PLC Gateway Configuration

+ + + +
+

Edit the configuration JSON below. Be careful with the syntax.

+ +
+ +
+ + + +
+
+ +
+ +
+

Configuration Guide

+

PLC Configuration:

+
    +
  • name: Unique name for the PLC
  • +
  • ip: IP address of the PLC
  • +
  • rack: Rack number (usually 0)
  • +
  • slot: Slot number (usually 1 for S7-1200)
  • +
+ +

Data Area Configuration:

+
    +
  • name: Name of the data area
  • +
  • type: read, write, or read_write
  • +
  • db_number: DB number (e.g., 100 for DB100)
  • +
  • offset: Starting byte offset
  • +
  • size: Size in bytes
  • +
  • structure (optional): Define how to parse the data
  • +
+ +

Example:

+
{
+  "plcs": [
+    {
+      "name": "PLC1",
+      "ip": "192.168.0.10",
+      "rack": 0,
+      "slot": 1,
+      "areas": [
+        {
+          "name": "DB100_Read",
+          "type": "read",
+          "db_number": 100,
+          "offset": 0,
+          "size": 4000,
+          "structure": [
+            {"name": "temperature", "type": "real", "offset": 0},
+            {"name": "pressure", "type": "int", "offset": 4}
+          ]
+        }
+      ]
+    }
+  ]
+}
+
+
+ + + + \ No newline at end of file diff --git a/gateway/templates/status.html b/gateway/templates/status.html new file mode 100644 index 0000000..e367e38 --- /dev/null +++ b/gateway/templates/status.html @@ -0,0 +1,126 @@ + + + + + PLC Gateway Status + + + +

PLC Gateway Status

+

Gateway running since: {{ start_time }}

+ + {% for plc_name, areas in summary.items() %} + {% set plc_status = plc_statuses.get(plc_name, "unknown") %} + {% set plc_class = { + 'connected': 'plc-connected', + 'disconnected': 'plc-disconnected' + }.get(plc_status, 'plc-never-connected') %} + +

PLC:{{plc_name}} (Status: {{plc_status}})

+ + + + + + + + + + {% for area_name, area in areas.items() %} + {% set status_class = { + 'connected': 'status-connected', + 'disconnected': 'status-disconnected', + 'never_connected': 'status-never-connected' + }.get(area.status, 'status-disconnected') %} + {% set status_text = { + 'connected': 'Connected', + 'disconnected': 'Disconnected', + 'never_connected': 'Never connected' + }.get(area.status, area.status) %} + + + + + + + + + {% endfor %} +
Area NameTypeSize (bytes)StatusPLC ConnectionLast Update
{{area_name}}{{area['type']}}{{area['size']}}{{status_text}}{{area['plc_connection_status']}}{{area['last_update']}}
+ {% endfor %} + +
+

API Endpoints

+ +
+ Single Read: GET /api/read/<plc_name>/<area_name>/<offset>/<length>
+ Example: /api/read/PLC1/DB100_Read/10/4 +
+ +
+ Single Write: POST /api/write/<plc_name>/<area_name>/<offset>
+ Body: Raw binary data
+ Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data +
+ +
+ Single Read_Bool: GET /api/read_bool/<plc_name>/<area_name>/<offset>/<length>
+ Example: /api/read_bool/PLC1/DB100_Read/0/2 +
+ +
+ Single Write_Bool: POST /api/write_bool/<plc_name>/<area_name>/<offset>
+ Body: Raw binary data
+ Example: POST /api/write_bool/PLC1/DB100_Write/0 +
+ +
+ Batch Read: POST /api/batch_read
+ Body: JSON array of read requests
+ Example: [{"plc_name":"PLC1", "area_name":"DB100_Read", "offset":0, "length":4}] +
+ +
+ Batch Write: POST /api/batch_write
+ Body: JSON array of write requests
+ Example: [{"plc_name":"PLC1", "area_name":"DB100_Write", "offset":0, "data":[1,2,3,4]}] +
+ +
+ Configuration: GET/POST /api/config
+ Manage gateway configuration +
+
+ + + + + + + + \ No newline at end of file