from flask import Flask, jsonify, request, render_template_string, Response, render_template import threading import time import json from functools import wraps from config_manager import ConfigManager import logging class APIServer: """REST API服务器,提供PLC数据访问和配置管理功能""" def __init__(self, cache_manager, config_path="config/config.json"): """ 初始化API服务器 Args: cache_manager: 缓存管理器实例 config_path: 配置文件路径 """ self.cache_manager = cache_manager self.config_manager = ConfigManager(config_path) self.app = Flask(__name__) self.logger = logging.getLogger("APIServer") self.auth_enabled = True # 可通过配置关闭认证 self.username = "admin" self.password = "admin123" # 实际应用中应从安全存储获取 self.start_time = time.strftime("%Y-%m-%d %H:%M:%S") # 在初始化方法中调用 setup_routes self.setup_routes() def check_auth(self, username, password): """验证用户名和密码""" return username == self.username and password == self.password def authenticate(self): """发送401响应要求认证""" return Response( "Unauthorized", 401, {"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'} ) def requires_auth(self, f): """装饰器:需要认证的路由,保留函数元数据""" @wraps(f) def decorated(*args, **kwargs): if not self.auth_enabled: return f(*args, **kwargs) auth = request.authorization if not auth or not self.check_auth(auth.username, auth.password): return self.authenticate() return f(*args, **kwargs) return decorated def get_summary(self): """获取缓存摘要信息""" summary = {} for plc_name, areas in self.cache_manager.cache.items(): summary[plc_name] = {} for area_name, area in areas.items(): last_update = self.cache_manager.last_update[plc_name][area_name] plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown") summary[plc_name][area_name] = { "status": area["status"], "plc_connection_status": plc_status, "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", "size": area["size"], "type": area["type"] } return summary def setup_routes(self): """设置所有API路由""" # =========================== # 主页面 - 状态摘要 # =========================== @self.app.route("/", endpoint="index") def index(): summary = self.get_summary() return render_template( "status.html", # 模板文件名 start_time=self.start_time, summary=summary, plc_statuses=self.cache_manager.plc_connection_status ) # =========================== # 系统状态API # =========================== @self.app.route("/api/status", endpoint="system_status") def system_status(): """获取系统状态信息""" plc_statuses = {} for plc_name in self.cache_manager.plc_connection_status: plc_statuses[plc_name] = { "status": self.cache_manager.plc_connection_status[plc_name], "last_connected": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.cache_manager.plc_last_connected[plc_name])) if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never" } return jsonify({ "status": "running", "start_time": self.start_time, "plc_count": len(self.config_manager.get_config().get("plcs", [])), "cache_size": sum( len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()), "plc_statuses": plc_statuses }) # =========================== # 配置管理相关路由 # =========================== @self.app.route("/config", endpoint="config_page") @self.requires_auth def config_page(): """配置编辑页面""" config = self.config_manager.get_config() config_json = json.dumps(config, indent=2) return render_template( 'config.html', config_json=config_json, username=self.username, password=self.password ) # 配置验证API @self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config") @self.requires_auth def validate_config(): """验证配置是否有效""" try: config = request.json is_valid, error = self.config_manager.validate_config(config) if is_valid: return jsonify({"valid": True}) else: return jsonify({"valid": False, "message": error}), 400 except Exception as e: return jsonify({"valid": False, "message": str(e)}), 400 # 配置获取API @self.app.route("/api/config", methods=["GET"], endpoint="get_config") @self.requires_auth def get_config(): """获取当前配置""" return jsonify(self.config_manager.get_config()) # 配置保存API @self.app.route("/api/config", methods=["POST"], endpoint="save_config") @self.requires_auth def save_config(): """保存配置""" try: config = request.json reload = request.args.get('reload', 'false').lower() == 'true' success, message = self.config_manager.save_config(config) if success: if reload: # 通知主应用程序重载配置 if hasattr(self.cache_manager, 'app') and self.cache_manager.app: self.cache_manager.app.request_reload() return jsonify({ "success": True, "message": "Configuration saved and reload requested" }) else: return jsonify({ "success": True, "message": "Configuration saved successfully (restart to apply changes)" }) else: return jsonify({ "success": False, "message": message }), 400 except Exception as e: return jsonify({ "success": False, "message": f"Error saving config: {str(e)}" }), 500 # =========================== # 新增 API 文档接口 # =========================== @self.app.route("/api/doc", endpoint="api_doc") def api_doc(): """API文档页面""" return render_template('api_doc.html') # =========================== # 数据访问API # =========================== # 单个读取接口 @self.app.route("/api/read////", methods=["GET"], endpoint="single_read") def single_read(plc_name, area_name, offset, length): """从指定区域读取数据""" data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": length, "data": list(data), "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 单个读取BOOL类型接口 @self.app.route("/api/read_bool////", methods=["GET"], endpoint="single_read_bool") def single_read_bool(plc_name, area_name, offset, length): """从指定区域读取数据""" data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset, length) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": length, "data": [data], # list(data) "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 单个写入接口 @self.app.route("/api/write///", methods=["POST"], endpoint="single_write") def single_write(plc_name, area_name, offset): """向指定区域写入数据""" data = request.data if not data: # 如果没有提供数据,返回错误 return jsonify({ "status": "error", "message": "No data provided", "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), "last_update": 0, "last_update_formatted": "N/A" }), 400 success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": len(data), "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 单个写入BOOL类型接口 @self.app.route("/api/write_bool///", methods=["POST"], endpoint="single_write_bool") def single_write_bool(plc_name, area_name, offset): """向指定区域写入数据""" data = request.data if not data: # 如果没有提供数据,返回错误 return jsonify({ "status": "error", "message": "No data provided", "plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"), "last_update": 0, "last_update_formatted": "N/A" }), 400 success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset, data) if error: return jsonify({ "status": "error", "plc_name": plc_name, "area_name": area_name, "message": error, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never" }), 400 return jsonify({ "status": "success", "plc_name": plc_name, "area_name": area_name, "offset": offset, "length": 1, "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) }) # 批量读取接口 @self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read") def batch_read(): """批量读取多个区域的数据""" try: # 确保是JSON请求 if not request.is_json: return jsonify({ "status": "error", "message": "Request must be JSON (Content-Type: application/json)", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 requests = request.json if not isinstance(requests, list): return jsonify({ "status": "error", "message": "Request must be a JSON array", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 # 添加详细日志 self.logger.info(f"Received batch read request: {json.dumps(requests)}") results = self.cache_manager.batch_read(requests) return jsonify(results) except Exception as e: self.logger.error(f"Batch read error: {str(e)}", exc_info=True) return jsonify({ "status": "error", "message": f"Internal server error: {str(e)}", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 500 # 批量写入接口 @self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write") def batch_write(): """批量写入多个区域的数据""" try: if not request.is_json: return jsonify({ "status": "error", "message": "Request must be JSON (Content-Type: application/json)", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 requests = request.json if not isinstance(requests, list): return jsonify({ "status": "error", "message": "Request must be a JSON array", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 400 self.logger.info(f"Received batch write request: {json.dumps(requests)}") results = self.cache_manager.batch_write(requests) return jsonify(results) except Exception as e: self.logger.error(f"Batch write error: {str(e)}", exc_info=True) return jsonify({ "status": "error", "message": f"Internal server error: {str(e)}", "plc_connection_status": "unknown", "last_update": 0, "last_update_formatted": "N/A" }), 500 # 区域状态检查 @self.app.route("/api/status//", methods=["GET"], endpoint="area_status") def area_status(plc_name, area_name): """获取区域状态""" status = self.cache_manager.get_area_status(plc_name, area_name) return jsonify(status) # 获取解析后的数据 @self.app.route("/api/data//", methods=["GET"], endpoint="get_parsed_data") def get_parsed_data(plc_name, area_name): """获取解析后的数据""" return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name)) def start(self): """启动API服务器""" self.server_thread = threading.Thread( target=self.app.run, kwargs={ "host": "0.0.0.0", "port": 5000, "threaded": True, "use_reloader": False # 避免在生产环境中使用重载器 }, daemon=True, name="APIServerThread" ) self.server_thread.start() self.logger.info("API server started at http://0.0.0.0:5000")