from flask import Flask, jsonify, request, render_template_string, Response import threading import time import json from functools import wraps from config_manager import ConfigManager import logging class APIServer: """REST API服务器,提供PLC数据访问和配置管理功能""" def __init__(self, cache_manager, config_path="./config/config.json"): """ 初始化API服务器 Args: cache_manager: 缓存管理器实例 config_path: 配置文件路径 """ self.cache_manager = cache_manager self.config_manager = ConfigManager(config_path) self.app = Flask(__name__) self.logger = logging.getLogger("APIServer") self.auth_enabled = True # 可通过配置关闭认证 self.username = "admin" self.password = "admin123" # 实际应用中应从安全存储获取 self.start_time = time.strftime("%Y-%m-%d %H:%M:%S") # 在初始化方法中调用 setup_routes self.setup_routes() def check_auth(self, username, password): """验证用户名和密码""" return username == self.username and password == self.password def authenticate(self): """发送401响应要求认证""" return Response( "Unauthorized", 401, {"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'} ) def requires_auth(self, f): """装饰器:需要认证的路由,保留函数元数据""" @wraps(f) def decorated(*args, **kwargs): if not self.auth_enabled: return f(*args, **kwargs) auth = request.authorization if not auth or not self.check_auth(auth.username, auth.password): return self.authenticate() return f(*args, **kwargs) return decorated def get_summary(self): """获取缓存摘要信息""" summary = {} for plc_name, areas in self.cache_manager.cache.items(): summary[plc_name] = {} for area_name, area in areas.items(): last_update = self.cache_manager.last_update[plc_name][area_name] plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown") summary[plc_name][area_name] = { "status": area["status"], "plc_connection_status": plc_status, "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", "size": area["size"], "type": area["type"] } return summary def setup_routes(self): """设置所有API路由""" # =========================== # 主页面 - 状态摘要 # =========================== @self.app.route("/", endpoint="index") def index(): summary = self.get_summary() html = """ PLC Gateway Status

PLC Gateway Status

Gateway running since: {{ start_time }}

""" for plc_name, areas in summary.items(): plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown") plc_class = "" if plc_status == "connected": plc_class = "plc-connected" elif plc_status == "disconnected": plc_class = "plc-disconnected" else: plc_class = "plc-never-connected" html += f'

PLC: {plc_name} (Status: {plc_status})

' html += """ """ for area_name, area in areas.items(): status_class = "" status_text = area["status"] if area["status"] == "connected": status_class = "status-connected" elif area["status"] == "never_connected": status_class = "status-never-connected" status_text = "Never connected" elif area["status"] == "disconnected": status_class = "status-disconnected" status_text = "Disconnected" else: status_class = "status-disconnected" html += f""" """ html += "
Area Name Type Size (bytes) Status PLC Connection Last Update
{area_name} {area['type']} {area['size']} {status_text} {area['plc_connection_status']} {area['last_update']}
" # 添加API文档部分 html += """

API Endpoints

Single Read: GET /api/read////
Example: /api/read/PLC1/DB100_Read/10/4
Single Write: POST /api/write///
Body: Raw binary data
Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data
Single Read_Bool: GET /api/read_bool////
Example: /api/read_bool/PLC1/DB100_Read/0/2
Single Write_Bool: POST /api/write_bool///
Body: Raw binary data
Example: POST /api/write_bool/PLC1/DB100_Write/0
Batch Read: POST /api/batch_read
Body: JSON array of read requests
Example: [{"plc_name":"PLC1", "area_name":"DB100_Read", "offset":0, "length":4}]
Batch Write: POST /api/batch_write
Body: JSON array of write requests
Example: [{"plc_name":"PLC1", "area_name":"DB100_Write", "offset":0, "data":[1,2,3,4]}]
Configuration: GET/POST /api/config
Manage gateway configuration
""" html += """ """ return render_template_string(html, start_time=self.start_time) # =========================== # 系统状态API # =========================== @self.app.route("/api/status", endpoint="system_status") def system_status(): """获取系统状态信息""" plc_statuses = {} for plc_name in self.cache_manager.plc_connection_status: plc_statuses[plc_name] = { "status": self.cache_manager.plc_connection_status[plc_name], "last_connected": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.cache_manager.plc_last_connected[plc_name])) if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never" } return jsonify({ "status": "running", "start_time": self.start_time, "plc_count": len(self.config_manager.get_config().get("plcs", [])), "cache_size": sum(len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()), "plc_statuses": plc_statuses }) # =========================== # 配置管理相关路由 # =========================== @self.app.route("/config", endpoint="config_page") @self.requires_auth def config_page(): """配置编辑页面""" config = self.config_manager.get_config() config_json = json.dumps(config, indent=2) html = """ PLC Gateway Configuration

PLC Gateway Configuration

Edit the configuration JSON below. Be careful with the syntax.

Configuration Guide

PLC Configuration:

Data Area Configuration:

Example:

{
  "plcs": [
    {
      "name": "PLC1",
      "ip": "192.168.0.10",
      "rack": 0,
      "slot": 1,
      "areas": [
        {
          "name": "DB100_Read",
          "type": "read",
          "db_number": 100,
          "offset": 0,
          "size": 4000,
          "structure": [
            {"name": "temperature", "type": "real", "offset": 0},
            {"name": "pressure", "type": "int", "offset": 4}
          ]
        }
      ]
    }
  ]
}
""" return render_template_string( html, config_json=config_json, username=self.username, password=self.password ) # 配置验证API @self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config") @self.requires_auth def validate_config(): """验证配置是否有效""" try: config = request.json is_valid, error = self.config_manager.validate_config(config) if is_valid: return jsonify({"valid": True}) else: return jsonify({"valid": False, "message": error}), 400 except Exception as e: return jsonify({"valid": False, "message": str(e)}), 400 # 配置获取API @self.app.route("/api/config", methods=["GET"], endpoint="get_config") @self.requires_auth def get_config(): """获取当前配置""" return jsonify(self.config_manager.get_config()) # 配置保存API @self.app.route("/api/config", methods=["POST"], endpoint="save_config") @self.requires_auth def save_config(): """保存配置""" try: config = request.json reload = request.args.get('reload', 'false').lower() == 'true' success, message = self.config_manager.save_config(config) if success: if reload: # 通知主应用程序重载配置 if hasattr(self.cache_manager, 'app') and self.cache_manager.app: self.cache_manager.app.request_reload() return jsonify({ "success": True, "message": "Configuration saved and reload requested" }) else: return jsonify({ "success": True, "message": "Configuration saved successfully (restart to apply changes)" }) else: return jsonify({ "success": False, "message": message }), 400 except Exception as e: return jsonify({ "success": False, "message": f"Error saving config: {str(e)}" }), 500 # =========================== # 新增 API 文档接口 # =========================== @self.app.route("/api/doc", endpoint="api_doc") def api_doc(): """API文档页面""" html = """ PLC Gateway API Documentation

PLC Gateway API Documentation

Status API

System Status

GET /api/status

获取系统状态信息,包括启动时间、PLC数量和缓存大小。

响应示例

{ "status": "running", "start_time": "2023-10-30 14:30:22", "plc_count": 2, "cache_size": 11000, "plc_statuses": { "PLC1": { "status": "connected", "last_connected": "2023-10-30 14:35:10" }, "PLC2": { "status": "disconnected", "last_connected": "Never" } } }

Area Status

GET /api/status//

获取指定PLC区域的状态信息。

路径参数

参数 描述
plc_name PLC名称(如PLC1)
area_name 区域名称(如DB100_Read)

响应示例

{ "status": "connected", "plc_connection_status": "connected", "last_update": 1698754321.456, "last_update_formatted": "2023-10-30 14:12:01", "size": 4000, "type": "read" }

Data API

Single Read

GET /api/read////

从指定区域读取数据。

路径参数

参数 描述
plc_name PLC名称(如PLC1)
area_name 区域名称(如DB100_Read)
offset 起始偏移量(字节)
length 读取长度(字节)

响应示例

{ "status": "success", "plc_name": "PLC1", "area_name": "DB100_Read", "offset": 0, "length": 4, "data": [0, 0, 123, 45], "plc_connection_status": "connected", "last_update": 1698754321.456, "last_update_formatted": "2023-10-30 14:12:01" }

Single Write

POST /api/write///

向指定区域写入数据。

路径参数

参数 描述
plc_name PLC名称(如PLC1)
area_name 区域名称(如DB100_Write)
offset 起始偏移量(字节)

请求体

原始二进制数据

响应示例

{ "status": "success", "plc_name": "PLC1", "area_name": "DB100_Write", "offset": 0, "length": 4, "plc_connection_status": "connected", "last_update": 1698754350.789, "last_update_formatted": "2023-10-30 14:12:30" }

Single Read Bool

GET /api/read_bool////

从指定区域读取数据。

路径参数

参数 描述
plc_name PLC名称(如PLC1)
area_name 区域名称(如DB100_Read)
offset 起始偏移量(字节)
length 读取长度(字节)

响应示例

{ "status": "success", "plc_name": "PLC1", "area_name": "DB100_Read", "offset": 0, "length": 2, "data": [0:False, 1:False], "plc_connection_status": "connected", "last_update": 1698754321.456, "last_update_formatted": "2023-10-30 14:12:01" }

Single Write Bool

POST /api/write_bool///

向指定区域写入数据。

路径参数

参数 描述
plc_name PLC名称(如PLC1)
area_name 区域名称(如DB100_Write)
offset 起始偏移量(字节)

请求体

{0:True}

响应示例

{ "status": "success", "plc_name": "PLC1", "area_name": "DB100_Write", "offset": 0, "length": 1, "plc_connection_status": "connected", "last_update": 1698754350.789, "last_update_formatted": "2023-10-30 14:12:30" }

Batch Read

POST /api/batch_read

批量读取多个区域的数据。

请求体

字段 类型 必需 描述
plc_name string PLC名称(与配置中一致)
area_name string 区域名称(与配置中一致)
offset number 起始偏移量(字节),默认为0
length number 读取长度(字节),不提供则读取整个区域

请求示例

[ { "plc_name": "PLC1", "area_name": "DB100_Read", "offset": 0, "length": 4 }, { "plc_name": "PLC1", "area_name": "DB202_Params", "offset": 10, "length": 2 } ]

响应示例

[ { "plc_name": "PLC1", "area_name": "DB100_Read", "status": "success", "plc_connection_status": "connected", "last_update": 1698754321.456, "last_update_formatted": "2023-10-30 14:12:01", "offset": 0, "length": 4, "data": [0, 0, 123, 45] }, { "plc_name": "PLC1", "area_name": "DB202_Params", "status": "success", "plc_connection_status": "connected", "last_update": 1698754322.123, "last_update_formatted": "2023-10-30 14:12:02", "offset": 10, "length": 2, "data": [255, 0] } ]

Batch Write

POST /api/batch_write

批量写入多个区域的数据。

请求体

字段 类型 必需 描述
plc_name string PLC名称(与配置中一致)
area_name string 区域名称(与配置中一致)
offset number 起始偏移量(字节)
data array 要写入的数据(字节数组)

请求示例

[ { "plc_name": "PLC1", "area_name": "DB100_Write", "offset": 0, "data": [1, 2, 3, 4] }, { "plc_name": "PLC1", "area_name": "DB202_Params", "offset": 10, "data": [255, 0] } ]

响应示例

[ { "plc_name": "PLC1", "area_name": "DB100_Write", "status": "success", "plc_connection_status": "connected", "last_update": 1698754350.789, "last_update_formatted": "2023-10-30 14:12:30", "offset": 0, "length": 4 }, { "plc_name": "PLC1", "area_name": "DB202_Params", "status": "success", "plc_connection_status": "connected", "last_update": 1698754351.234, "last_update_formatted": "2023-10-30 14:12:31", "offset": 10, "length": 2 } ]

Parsed Data

GET /api/data//

获取解析后的数据(如果配置了结构)。

路径参数

参数 描述
plc_name PLC名称(如PLC1)
area_name 区域名称(如DB100_Read)

响应示例(配置了解析结构)

{ "parsed": { "temperature": 25.5, "pressure": 100, "status": true }, "raw_data": [0, 0, 128, 65, 0, 100], "status": "connected", "plc_connection_status": "connected", "last_update": 1698754321.456, "last_update_formatted": "2023-10-30 14:12:01" }

响应示例(未配置解析结构)

{ "raw_data": [0, 0, 128, 65, 0, 100], "status": "connected", "plc_connection_status": "connected", "last_update": 1698754321.456, "last_update_formatted": "2023-10-30 14:12:01" }

Configuration API

Get Configuration

GET /api/config

获取当前配置。

认证要求

需要Basic Auth认证

响应示例

{ "plcs": [ { "name": "PLC1", "ip": "192.168.0.10", "rack": 0, "slot": 1, "areas": [ { "name": "DB100_Read", "type": "read", "db_number": 100, "offset": 0, "size": 4000, "structure": [ {"name": "temperature", "type": "real", "offset": 0}, {"name": "pressure", "type": "int", "offset": 4} ] } ] } ] }

Validate Configuration

POST /api/config/validate

验证配置是否有效。

认证要求

需要Basic Auth认证

请求体

要验证的配置JSON

响应示例(有效)

{ "valid": true }

响应示例(无效)

{ "valid": false, "message": "Invalid configuration: 'ip' is a required property" }

Save Configuration

POST /api/config

保存配置。

查询参数

参数 描述
reload 是否立即重载配置(true/false)

认证要求

需要Basic Auth认证

请求体

要保存的配置JSON

响应示例

{ "success": true, "message": "Configuration saved and reload requested" }
""" return render_template_string(html) # =========================== # 数据访问API # =========================== # 单个读取接口 @self.app.route("/api/read////", methods=["GET"], endpoint="single_read") def single_read(plc_name, area_name, offset, length): """从指定区域读取数据""" data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": length, "data": list(data), "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 单个读取BOOL类型接口 @self.app.route("/api/read_bool////", methods=["GET"], endpoint="single_read_bool") def single_read_bool(plc_name, area_name, offset, length): """从指定区域读取数据""" data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset, length) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": length, "data": [data], # list(data) "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 单个写入接口 @self.app.route("/api/write///", methods=["POST"], endpoint="single_write") def single_write(plc_name, area_name, offset): """向指定区域写入数据""" data = request.data if not data: # 如果没有提供数据,返回错误 return jsonify({ "status": "error", "message": "No data provided", "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), "last_update": 0, "last_update_formatted": "N/A" }), 400 success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": len(data), "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 单个写入BOOL类型接口 @self.app.route("/api/write_bool///", methods=["POST"], endpoint="single_write_bool") def single_write_bool(plc_name, area_name, offset): """向指定区域写入数据""" data = request.data if not data: # 如果没有提供数据,返回错误 return jsonify({ "status": "error", "message": "No data provided", "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), "last_update": 0, "last_update_formatted": "N/A" }), 400 success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset, data) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": 1, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 批量读取接口 @self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read") def batch_read(): """批量读取多个区域的数据""" try: # 确保是JSON请求 if not request.is_json: return jsonify({ "status": "error", "message": "Request must be JSON (Content-Type: application/json)", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 requests = request.get_json() if not isinstance(requests, list): return jsonify({ "status": "error", "message": "Request must be a JSON array", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 # 添加详细日志 self.logger.info(f"Received batch read request: {json.dumps(requests)}") results = self.cache_manager.batch_read(requests) return jsonify(results) except Exception as e: self.logger.error(f"Batch read error: {str(e)}", exc_info=True) return jsonify({ "status": "error", "message": f"Internal server error: {str(e)}", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 500 # 批量写入接口 @self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write") def batch_write(): """批量写入多个区域的数据""" try: if not request.is_json: return jsonify({ "status": "error", "message": "Request must be JSON (Content-Type: application/json)", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 requests = request.json if not isinstance(requests, list): return jsonify({ "status": "error", "message": "Request must be a JSON array", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 self.logger.info(f"Received batch write request: {json.dumps(requests)}") results = self.cache_manager.batch_write(requests) return jsonify(results) except Exception as e: self.logger.error(f"Batch write error: {str(e)}", exc_info=True) return jsonify({ "status": "error", "message": f"Internal server error: {str(e)}", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 500 # 通用读取接口 @self.app.route("/api/read_generic////", methods=["GET"], endpoint="read_generic") def read_generic(plc_name, area_name, offset, data_type): """通用读取接口""" # 检查请求参数 count = request.args.get('count', 1, type=int) if count < 1: return jsonify({ "status": "error", "message": "Count must be at least 1", "plc_name": plc_name, "area_name": area_name }), 400 # 执行读取 result, error, plc_status, update_time = self.cache_manager.read_generic( plc_name, area_name, offset, data_type, count ) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "data_type": data_type, "count": count, "data": result, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 通用写入接口 @self.app.route("/api/write_generic////", methods=["POST"], endpoint="write_generic") def write_generic(plc_name, area_name, offset, data_type): """通用写入接口""" # 检查请求数据 if not request.is_json: return jsonify({ "status": "error", "message": "Request must be JSON (Content-Type: application/json)", "plc_name": plc_name, "area_name": area_name }), 400 json_data = request.get_json() if "value" not in json_data and "values" not in json_data: return jsonify({ "status": "error", "message": "Missing 'value' or 'values' field", "plc_name": plc_name, "area_name": area_name }), 400 # 确定要写入的值 value = json_data.get("value", json_data.get("values")) # 执行写入 success, error, plc_status, update_time = self.cache_manager.write_generic( plc_name, area_name, offset, data_type, value ) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( update_time)) if update_time > 0 else "Never" }), 400 # 确定写入数量 count = 1 if isinstance(value, list): count = len(value) return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "data_type": data_type, "count": count, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) @self.app.route("/api/batch_write_bool", methods=["POST"], endpoint="batch_write_bool") def batch_write_bool(): """批量写入多个区域的数据""" try: if not request.is_json: return jsonify({ "status": "error", "message": "Request must be JSON (Content-Type: application/json)", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 requests = request.json if not isinstance(requests, list): return jsonify({ "status": "error", "message": "Request must be a JSON array", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 self.logger.info(f"Received batch write request: {json.dumps(requests)}") results = self.cache_manager.batch_write_bool(requests) return jsonify(results) except Exception as e: self.logger.error(f"Batch write error: {str(e)}", exc_info=True) return jsonify({ "status": "error", "message": f"Internal server error: {str(e)}", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 500 # 区域状态检查 @self.app.route("/api/status//", methods=["GET"], endpoint="area_status") def area_status(plc_name, area_name): """获取区域状态""" status = self.cache_manager.get_area_status(plc_name, area_name) return jsonify(status) # 获取解析后的数据 @self.app.route("/api/data//", methods=["GET"], endpoint="get_parsed_data") def get_parsed_data(plc_name, area_name): """获取解析后的数据""" return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name)) def start(self): """启动API服务器""" self.server_thread = threading.Thread( target=self.app.run, kwargs={ "host": "0.0.0.0", "port": 5000, "threaded": True, "use_reloader": False # 避免在生产环境中使用重载器 }, daemon=True, name="APIServerThread" ) self.server_thread.start() self.logger.info("API server started at http://0.0.0.0:5000")