from flask import Flask, jsonify, request, render_template_string, Response
import threading
import time
import json
from functools import wraps
from config_manager import ConfigManager
import logging
class APIServer:
"""REST API服务器,提供PLC数据访问和配置管理功能"""
def __init__(self, cache_manager, config_path="./config/config.json"):
"""
初始化API服务器
Args:
cache_manager: 缓存管理器实例
config_path: 配置文件路径
"""
self.cache_manager = cache_manager
self.config_manager = ConfigManager(config_path)
self.app = Flask(__name__)
self.logger = logging.getLogger("APIServer")
self.auth_enabled = True # 可通过配置关闭认证
self.username = "admin"
self.password = "admin123" # 实际应用中应从安全存储获取
self.start_time = time.strftime("%Y-%m-%d %H:%M:%S")
# 在初始化方法中调用 setup_routes
self.setup_routes()
def check_auth(self, username, password):
"""验证用户名和密码"""
return username == self.username and password == self.password
def authenticate(self):
"""发送401响应要求认证"""
return Response(
"Unauthorized",
401,
{"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'}
)
def requires_auth(self, f):
"""装饰器:需要认证的路由,保留函数元数据"""
@wraps(f)
def decorated(*args, **kwargs):
if not self.auth_enabled:
return f(*args, **kwargs)
auth = request.authorization
if not auth or not self.check_auth(auth.username, auth.password):
return self.authenticate()
return f(*args, **kwargs)
return decorated
def get_summary(self):
"""获取缓存摘要信息"""
summary = {}
for plc_name, areas in self.cache_manager.cache.items():
summary[plc_name] = {}
for area_name, area in areas.items():
last_update = self.cache_manager.last_update[plc_name][area_name]
plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown")
summary[plc_name][area_name] = {
"status": area["status"],
"plc_connection_status": plc_status,
"last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never",
"size": area["size"],
"type": area["type"]
}
return summary
def setup_routes(self):
"""设置所有API路由"""
# ===========================
# 主页面 - 状态摘要
# ===========================
@self.app.route("/", endpoint="index")
def index():
summary = self.get_summary()
html = """
PLC Gateway Status
PLC Gateway Status
Gateway running since: {{ start_time }}
"""
for plc_name, areas in summary.items():
plc_status = self.cache_manager.plc_connection_status.get(plc_name, "unknown")
plc_class = ""
if plc_status == "connected":
plc_class = "plc-connected"
elif plc_status == "disconnected":
plc_class = "plc-disconnected"
else:
plc_class = "plc-never-connected"
html += f'PLC: {plc_name} (Status: {plc_status})
'
html += """
| Area Name |
Type |
Size (bytes) |
Status |
PLC Connection |
Last Update |
"""
for area_name, area in areas.items():
status_class = ""
status_text = area["status"]
if area["status"] == "connected":
status_class = "status-connected"
elif area["status"] == "never_connected":
status_class = "status-never-connected"
status_text = "Never connected"
elif area["status"] == "disconnected":
status_class = "status-disconnected"
status_text = "Disconnected"
else:
status_class = "status-disconnected"
html += f"""
| {area_name} |
{area['type']} |
{area['size']} |
{status_text} |
{area['plc_connection_status']} |
{area['last_update']} |
"""
html += "
"
# 添加API文档部分
html += """
API Endpoints
Single Read: GET /api/read/
///
Example: /api/read/PLC1/DB100_Read/10/4
Single Write: POST /api/write/
//
Body: Raw binary data
Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data
Single Read_Bool: GET /api/read_bool/
///
Example: /api/read_bool/PLC1/DB100_Read/0/2
Single Write_Bool: POST /api/write_bool/
//
Body: Raw binary data
Example: POST /api/write_bool/PLC1/DB100_Write/0
Batch Read: POST /api/batch_read
Body: JSON array of read requests
Example: [{"plc_name":"PLC1", "area_name":"DB100_Read", "offset":0, "length":4}]
Batch Write: POST /api/batch_write
Body: JSON array of write requests
Example: [{"plc_name":"PLC1", "area_name":"DB100_Write", "offset":0, "data":[1,2,3,4]}]
Configuration: GET/POST /api/config
Manage gateway configuration
"""
html += """
"""
return render_template_string(html, start_time=self.start_time)
# ===========================
# 系统状态API
# ===========================
@self.app.route("/api/status", endpoint="system_status")
def system_status():
"""获取系统状态信息"""
plc_statuses = {}
for plc_name in self.cache_manager.plc_connection_status:
plc_statuses[plc_name] = {
"status": self.cache_manager.plc_connection_status[plc_name],
"last_connected": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.cache_manager.plc_last_connected[plc_name]))
if self.cache_manager.plc_last_connected[plc_name] > 0 else "Never"
}
return jsonify({
"status": "running",
"start_time": self.start_time,
"plc_count": len(self.config_manager.get_config().get("plcs", [])),
"cache_size": sum(len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()),
"plc_statuses": plc_statuses
})
# ===========================
# 配置管理相关路由
# ===========================
@self.app.route("/config", endpoint="config_page")
@self.requires_auth
def config_page():
"""配置编辑页面"""
config = self.config_manager.get_config()
config_json = json.dumps(config, indent=2)
html = """
PLC Gateway Configuration
PLC Gateway Configuration
Edit the configuration JSON below. Be careful with the syntax.
Configuration Guide
PLC Configuration:
name: Unique name for the PLC
ip: IP address of the PLC
rack: Rack number (usually 0)
slot: Slot number (usually 1 for S7-1200)
Data Area Configuration:
name: Name of the data area
type: read, write, or read_write
db_number: DB number (e.g., 100 for DB100)
offset: Starting byte offset
size: Size in bytes
structure (optional): Define how to parse the data
Example:
{
"plcs": [
{
"name": "PLC1",
"ip": "192.168.0.10",
"rack": 0,
"slot": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 4000,
"structure": [
{"name": "temperature", "type": "real", "offset": 0},
{"name": "pressure", "type": "int", "offset": 4}
]
}
]
}
]
}
"""
return render_template_string(
html,
config_json=config_json,
username=self.username,
password=self.password
)
# 配置验证API
@self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config")
@self.requires_auth
def validate_config():
"""验证配置是否有效"""
try:
config = request.json
is_valid, error = self.config_manager.validate_config(config)
if is_valid:
return jsonify({"valid": True})
else:
return jsonify({"valid": False, "message": error}), 400
except Exception as e:
return jsonify({"valid": False, "message": str(e)}), 400
# 配置获取API
@self.app.route("/api/config", methods=["GET"], endpoint="get_config")
@self.requires_auth
def get_config():
"""获取当前配置"""
return jsonify(self.config_manager.get_config())
# 配置保存API
@self.app.route("/api/config", methods=["POST"], endpoint="save_config")
@self.requires_auth
def save_config():
"""保存配置"""
try:
config = request.json
reload = request.args.get('reload', 'false').lower() == 'true'
success, message = self.config_manager.save_config(config)
if success:
if reload:
# 通知主应用程序重载配置
if hasattr(self.cache_manager, 'app') and self.cache_manager.app:
self.cache_manager.app.request_reload()
return jsonify({
"success": True,
"message": "Configuration saved and reload requested"
})
else:
return jsonify({
"success": True,
"message": "Configuration saved successfully (restart to apply changes)"
})
else:
return jsonify({
"success": False,
"message": message
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": f"Error saving config: {str(e)}"
}), 500
# ===========================
# 新增 API 文档接口
# ===========================
@self.app.route("/api/doc", endpoint="api_doc")
def api_doc():
"""API文档页面"""
html = """
PLC Gateway API Documentation
PLC Gateway API Documentation
Status API
System Status
GET
/api/status
获取系统状态信息,包括启动时间、PLC数量和缓存大小。
响应示例
{
"status": "running",
"start_time": "2023-10-30 14:30:22",
"plc_count": 2,
"cache_size": 11000,
"plc_statuses": {
"PLC1": {
"status": "connected",
"last_connected": "2023-10-30 14:35:10"
},
"PLC2": {
"status": "disconnected",
"last_connected": "Never"
}
}
}
Area Status
获取指定PLC区域的状态信息。
路径参数
| 参数 |
描述 |
| plc_name |
PLC名称(如PLC1) |
| area_name |
区域名称(如DB100_Read) |
响应示例
{
"status": "connected",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01",
"size": 4000,
"type": "read"
}
Data API
Single Read
从指定区域读取数据。
路径参数
| 参数 |
描述 |
| plc_name |
PLC名称(如PLC1) |
| area_name |
区域名称(如DB100_Read) |
| offset |
起始偏移量(字节) |
| length |
读取长度(字节) |
响应示例
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 4,
"data": [0, 0, 123, 45],
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
Single Write
向指定区域写入数据。
路径参数
| 参数 |
描述 |
| plc_name |
PLC名称(如PLC1) |
| area_name |
区域名称(如DB100_Write) |
| offset |
起始偏移量(字节) |
请求体
原始二进制数据
响应示例
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"length": 4,
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30"
}
Single Read Bool
从指定区域读取数据。
路径参数
| 参数 |
描述 |
| plc_name |
PLC名称(如PLC1) |
| area_name |
区域名称(如DB100_Read) |
| offset |
起始偏移量(字节) |
| length |
读取长度(字节) |
响应示例
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 2,
"data": [0:False, 1:False],
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
Single Write Bool
向指定区域写入数据。
路径参数
| 参数 |
描述 |
| plc_name |
PLC名称(如PLC1) |
| area_name |
区域名称(如DB100_Write) |
| offset |
起始偏移量(字节) |
请求体
{0:True}
响应示例
{
"status": "success",
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"length": 1,
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30"
}
Batch Read
POST
/api/batch_read
批量读取多个区域的数据。
请求体
| 字段 |
类型 |
必需 |
描述 |
| plc_name |
string |
是 |
PLC名称(与配置中一致) |
| area_name |
string |
是 |
区域名称(与配置中一致) |
| offset |
number |
否 |
起始偏移量(字节),默认为0 |
| length |
number |
否 |
读取长度(字节),不提供则读取整个区域 |
请求示例
[
{
"plc_name": "PLC1",
"area_name": "DB100_Read",
"offset": 0,
"length": 4
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"offset": 10,
"length": 2
}
]
响应示例
[
{
"plc_name": "PLC1",
"area_name": "DB100_Read",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01",
"offset": 0,
"length": 4,
"data": [0, 0, 123, 45]
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754322.123,
"last_update_formatted": "2023-10-30 14:12:02",
"offset": 10,
"length": 2,
"data": [255, 0]
}
]
Batch Write
POST
/api/batch_write
批量写入多个区域的数据。
请求体
| 字段 |
类型 |
必需 |
描述 |
| plc_name |
string |
是 |
PLC名称(与配置中一致) |
| area_name |
string |
是 |
区域名称(与配置中一致) |
| offset |
number |
是 |
起始偏移量(字节) |
| data |
array |
是 |
要写入的数据(字节数组) |
请求示例
[
{
"plc_name": "PLC1",
"area_name": "DB100_Write",
"offset": 0,
"data": [1, 2, 3, 4]
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"offset": 10,
"data": [255, 0]
}
]
响应示例
[
{
"plc_name": "PLC1",
"area_name": "DB100_Write",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754350.789,
"last_update_formatted": "2023-10-30 14:12:30",
"offset": 0,
"length": 4
},
{
"plc_name": "PLC1",
"area_name": "DB202_Params",
"status": "success",
"plc_connection_status": "connected",
"last_update": 1698754351.234,
"last_update_formatted": "2023-10-30 14:12:31",
"offset": 10,
"length": 2
}
]
Parsed Data
获取解析后的数据(如果配置了结构)。
路径参数
| 参数 |
描述 |
| plc_name |
PLC名称(如PLC1) |
| area_name |
区域名称(如DB100_Read) |
响应示例(配置了解析结构)
{
"parsed": {
"temperature": 25.5,
"pressure": 100,
"status": true
},
"raw_data": [0, 0, 128, 65, 0, 100],
"status": "connected",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
响应示例(未配置解析结构)
{
"raw_data": [0, 0, 128, 65, 0, 100],
"status": "connected",
"plc_connection_status": "connected",
"last_update": 1698754321.456,
"last_update_formatted": "2023-10-30 14:12:01"
}
Configuration API
Get Configuration
GET
/api/config
获取当前配置。
认证要求
需要Basic Auth认证
响应示例
{
"plcs": [
{
"name": "PLC1",
"ip": "192.168.0.10",
"rack": 0,
"slot": 1,
"areas": [
{
"name": "DB100_Read",
"type": "read",
"db_number": 100,
"offset": 0,
"size": 4000,
"structure": [
{"name": "temperature", "type": "real", "offset": 0},
{"name": "pressure", "type": "int", "offset": 4}
]
}
]
}
]
}
Validate Configuration
POST
/api/config/validate
验证配置是否有效。
认证要求
需要Basic Auth认证
请求体
要验证的配置JSON
响应示例(有效)
{
"valid": true
}
响应示例(无效)
{
"valid": false,
"message": "Invalid configuration: 'ip' is a required property"
}
Save Configuration
POST
/api/config
保存配置。
查询参数
| 参数 |
描述 |
| reload |
是否立即重载配置(true/false) |
认证要求
需要Basic Auth认证
请求体
要保存的配置JSON
响应示例
{
"success": true,
"message": "Configuration saved and reload requested"
}
"""
return render_template_string(html)
# ===========================
# 数据访问API
# ===========================
# 单个读取接口
@self.app.route("/api/read////", methods=["GET"], endpoint="single_read")
def single_read(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area(plc_name, area_name, offset, length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": list(data),
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个读取BOOL类型接口
@self.app.route("/api/read_bool////", methods=["GET"], endpoint="single_read_bool")
def single_read_bool(plc_name, area_name, offset, length):
"""从指定区域读取数据"""
data, error, plc_status, update_time = self.cache_manager.read_area_bool(plc_name, area_name, offset, length)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": length,
"data": [data], # list(data)
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入接口
@self.app.route("/api/write///", methods=["POST"], endpoint="single_write")
def single_write(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area(plc_name, area_name, offset, data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": len(data),
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 单个写入BOOL类型接口
@self.app.route("/api/write_bool///", methods=["POST"], endpoint="single_write_bool")
def single_write_bool(plc_name, area_name, offset):
"""向指定区域写入数据"""
data = request.data
if not data:
# 如果没有提供数据,返回错误
return jsonify({
"status": "error",
"message": "No data provided",
"plc_connection_status": self.cache_manager.plc_connection_status.get(plc_name, "unknown"),
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
success, error, plc_status, update_time = self.cache_manager.write_area_bool(plc_name, area_name, offset, data)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"length": 1,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 批量读取接口
@self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read")
def batch_read():
"""批量读取多个区域的数据"""
try:
# 确保是JSON请求
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.get_json()
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
# 添加详细日志
self.logger.info(f"Received batch read request: {json.dumps(requests)}")
results = self.cache_manager.batch_read(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch read error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 批量写入接口
@self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write")
def batch_write():
"""批量写入多个区域的数据"""
try:
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
self.logger.info(f"Received batch write request: {json.dumps(requests)}")
results = self.cache_manager.batch_write(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch write error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 通用读取接口
@self.app.route("/api/read_generic////", methods=["GET"],
endpoint="read_generic")
def read_generic(plc_name, area_name, offset, data_type):
"""通用读取接口"""
# 检查请求参数
count = request.args.get('count', 1, type=int)
if count < 1:
return jsonify({
"status": "error",
"message": "Count must be at least 1",
"plc_name": plc_name,
"area_name": area_name
}), 400
# 执行读取
result, error, plc_status, update_time = self.cache_manager.read_generic(
plc_name,
area_name,
offset,
data_type,
count
)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(
update_time)) if update_time > 0 else "Never"
}), 400
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"data_type": data_type,
"count": count,
"data": result,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
# 通用写入接口
@self.app.route("/api/write_generic////", methods=["POST"],
endpoint="write_generic")
def write_generic(plc_name, area_name, offset, data_type):
"""通用写入接口"""
# 检查请求数据
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_name": plc_name,
"area_name": area_name
}), 400
json_data = request.get_json()
if "value" not in json_data and "values" not in json_data:
return jsonify({
"status": "error",
"message": "Missing 'value' or 'values' field",
"plc_name": plc_name,
"area_name": area_name
}), 400
# 确定要写入的值
value = json_data.get("value", json_data.get("values"))
# 执行写入
success, error, plc_status, update_time = self.cache_manager.write_generic(
plc_name,
area_name,
offset,
data_type,
value
)
if error:
return jsonify({
"status": "error",
"plc_name": plc_name,
"area_name": area_name,
"message": error,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(
update_time)) if update_time > 0 else "Never"
}), 400
# 确定写入数量
count = 1
if isinstance(value, list):
count = len(value)
return jsonify({
"status": "success",
"plc_name": plc_name,
"area_name": area_name,
"offset": offset,
"data_type": data_type,
"count": count,
"plc_connection_status": plc_status,
"last_update": update_time,
"last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time))
})
@self.app.route("/api/batch_write_bool", methods=["POST"], endpoint="batch_write_bool")
def batch_write_bool():
"""批量写入多个区域的数据"""
try:
if not request.is_json:
return jsonify({
"status": "error",
"message": "Request must be JSON (Content-Type: application/json)",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
requests = request.json
if not isinstance(requests, list):
return jsonify({
"status": "error",
"message": "Request must be a JSON array",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 400
self.logger.info(f"Received batch write request: {json.dumps(requests)}")
results = self.cache_manager.batch_write_bool(requests)
return jsonify(results)
except Exception as e:
self.logger.error(f"Batch write error: {str(e)}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Internal server error: {str(e)}",
"plc_connection_status": "unknown",
"last_update": 0,
"last_update_formatted": "N/A"
}), 500
# 区域状态检查
@self.app.route("/api/status//", methods=["GET"], endpoint="area_status")
def area_status(plc_name, area_name):
"""获取区域状态"""
status = self.cache_manager.get_area_status(plc_name, area_name)
return jsonify(status)
# 获取解析后的数据
@self.app.route("/api/data//", methods=["GET"], endpoint="get_parsed_data")
def get_parsed_data(plc_name, area_name):
"""获取解析后的数据"""
return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name))
def start(self):
"""启动API服务器"""
self.server_thread = threading.Thread(
target=self.app.run,
kwargs={
"host": "0.0.0.0",
"port": 5000,
"threaded": True,
"use_reloader": False # 避免在生产环境中使用重载器
},
daemon=True,
name="APIServerThread"
)
self.server_thread.start()
self.logger.info("API server started at http://0.0.0.0:5000")