Files
gateway_plc/gateway/api_server_html.py

446 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify, request, render_template_string, Response, render_template
import threading
import time
import json
from functools import wraps
from config_manager import ConfigManager
import logging
class APIServer:
"""REST API服务器提供PLC数据访问和配置管理功能"""
def __init__(self, cache_manager, config_path="config/config.json"):
"""
初始化API服务器
Args:
cache_manager: 缓存管理器实例
config_path: 配置文件路径
"""
self.cache_manager = cache_manager
self.config_manager = ConfigManager(config_path)
self.app = Flask(__name__)
self.logger = logging.getLogger("APIServer")
self.auth_enabled = True # 可通过配置关闭认证
self.username = "admin"
self.password = "admin123" # 实际应用中应从安全存储获取
self.start_time = time.strftime("%Y-%m-%d %H:%M:%S")
# 在初始化方法中调用 setup_routes
self.setup_routes()
def check_auth(self, username, password):
"""验证用户名和密码"""
return username == self.username and password == self.password
def authenticate(self):
"""发送401响应要求认证"""
return Response(
"Unauthorized",
401,
{"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'}
)
def requires_auth(self, f):
"""装饰器:需要认证的路由,保留函数元数据"""
@wraps(f)
def decorated(*args, **kwargs):
if not self.auth_enabled:
return f(*args, **kwargs)
auth = request.authorization
if not auth or not self.check_auth(auth.username, auth.password):
return self.authenticate()
return f(*args, **kwargs)
return decorated
def get_summary(self):
"""获取缓存摘要信息"""
summary = {}
for plc_name, areas in self.cache_manager.cache.items():
summary[plc_name] = {}
for area_name, area in areas.items():
last_update = self.cache_manager.last_update[plc_name][area_name]
plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown")
summary[plc_name][area_name] = {
"status": area["status"],
"plc_connection_status": plc_status,
"last_update": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
return summary
def setup_routes(self):
"""设置所有API路由"""
# ===========================
# 主页面 - 状态摘要
# ===========================
@self.app.route("/", endpoint="index")
def index():
summary = self.get_summary()
return render_template(
"status.html", # 模板文件名
start_time=self.start_time,
summary=summary,
plc_statuses=self.cache_manager.plc_connection_status
)
# ===========================
# 系统状态API
# ===========================
@self.app.route("/api/status", endpoint="system_status")
def system_status():
"""获取系统状态信息"""
plc_statuses = {}
for plc_name in self.cache_manager.plc_connection_status:
plc_statuses[plc_name] = {
"status": self.cache_manager.plc_connection_status[plc_name],
"last_connected": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(self.cache_manager.plc_last_connected[plc_name]))
if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never"
}
return jsonify({
"status": "running",
"start_time": self.start_time,
"plc_count": len(self.config_manager.get_config().get("plcs", [])),
"cache_size": sum(
len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()),
"plc_statuses": plc_statuses
})
# ===========================
# 配置管理相关路由
# ===========================
@self.app.route("/config", endpoint="config_page")
@self.requires_auth
def config_page():
"""配置编辑页面"""
config = self.config_manager.get_config()
config_json = json.dumps(config, indent=2)
return render_template(
'config.html',
config_json=config_json,
username=self.username,
password=self.password
)
# 配置验证API
@self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config")
@self.requires_auth
def validate_config():
"""验证配置是否有效"""
try:
config = request.json
is_valid, error = self.config_manager.validate_config(config)
if is_valid:
return jsonify({"valid": True})
else:
return jsonify({"valid": False, "message": error}), 400
except Exception as e:
return jsonify({"valid": False, "message": str(e)}), 400
# 配置获取API
@self.app.route("/api/config", methods=["GET"], endpoint="get_config")
@self.requires_auth
def get_config():
"""获取当前配置"""
return jsonify(self.config_manager.get_config())
# 配置保存API
@self.app.route("/api/config", methods=["POST"], endpoint="save_config")
@self.requires_auth
def save_config():
"""保存配置"""
try:
config = request.json
reload = request.args.get('reload', 'false').lower() == 'true'
success, message = self.config_manager.save_config(config)
if success:
if reload:
# 通知主应用程序重载配置
if hasattr(self.cache_manager, 'app') and self.cache_manager.app:
self.cache_manager.app.request_reload()
return jsonify({
"success": True,
"message": "Configuration saved and reload requested"
})
else:
return jsonify({
"success": True,
"message": "Configuration saved successfully (restart to apply changes)"
})
else:
return jsonify({
"success": False,
"message": message
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": f"Error saving config: {str(e)}"
}), 500
# ===========================
# 新增 API 文档接口
# ===========================
@self.app.route("/api/doc", endpoint="api_doc")
def api_doc():
"""API文档页面"""
return render_template('api_doc.html')
# ===========================
# 数据访问API
# ===========================
# 单个读取接口
@self.app.route("/api/read/<plc_name>/<area_name>/<int:offset>/<int:length>", methods=["GET"],
endpoint="single_read")
def single_read(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": list(data),
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个读取BOOL类型接口
@self.app.route("/api/read_bool/<plc_name>/<area_name>/<int:offset>/<int:length>", methods=["GET"],
endpoint="single_read_bool")
def single_read_bool(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset,
length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": [data], # list(data)
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入接口
@self.app.route("/api/write/<plc_name>/<area_name>/<int:offset>", methods=["POST"], endpoint="single_write")
def single_write(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": len(data),
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入BOOL类型接口
@self.app.route("/api/write_bool/<plc_name>/<area_name>/<int:offset>", methods=["POST"],
endpoint="single_write_bool")
def single_write_bool(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset,
data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": 1,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 批量读取接口
@self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read")
def batch_read():
"""批量读取多个区域的数据"""
try:
# 确保是JSON请求
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
# 添加详细日志
self.logger.info(f"Received batch read request: {json.dumps(requests)}")
results = self.cache_manager.batch_read(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch read error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 批量写入接口
@self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write")
def batch_write():
"""批量写入多个区域的数据"""
try:
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
self.logger.info(f"Received batch write request: {json.dumps(requests)}")
results = self.cache_manager.batch_write(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch write error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 区域状态检查
@self.app.route("/api/status/<plc_name>/<area_name>", methods=["GET"], endpoint="area_status")
def area_status(plc_name, area_name):
"""获取区域状态"""
status = self.cache_manager.get_area_status(plc_name, area_name)
return jsonify(status)
# 获取解析后的数据
@self.app.route("/api/data/<plc_name>/<area_name>", methods=["GET"], endpoint="get_parsed_data")
def get_parsed_data(plc_name, area_name):
"""获取解析后的数据"""
return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name))
def start(self):
"""启动API服务器"""
self.server_thread = threading.Thread(
target=self.app.run,
kwargs={
"host": "0.0.0.0",
"port": 5000,
"threaded": True,
"use_reloader": False # 避免在生产环境中使用重载器
},
daemon=True,
name="APIServerThread"
)
self.server_thread.start()
self.logger.info("API server started at http://0.0.0.0:5000")