commit 45deb5131dc01dc932af6c39b4713b7637f0cfdc Author: chuyiwen Date: Sat Jul 26 02:46:46 2025 +0800 基本框架完成 diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..fb847c4 --- /dev/null +++ b/config/config.json @@ -0,0 +1,66 @@ +{ + "plcs": [ + { + "name": "PLC1", + "ip": "192.168.0.100", + "rack": 0, + "slot": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 4000, + "structure": [ + { + "name": "temperature", + "type": "real", + "offset": 0 + }, + { + "name": "pressure", + "type": "int", + "offset": 4 + }, + { + "name": "status", + "type": "bool", + "offset": 6, + "bit": 0 + } + ] + }, + { + "name": "DB100_Write", + "type": "write", + "db_number": 100, + "offset": 4000, + "size": 5000 + }, + { + "name": "DB202_Params", + "type": "read_write", + "db_number": 202, + "offset": 0, + "size": 2000 + } + ] + }, + { + "name": "PLC2", + "ip": "192.168.0.101", + "rack": 0, + "slot": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 4000 + } + ] + } + ] +} \ No newline at end of file diff --git a/config/config.json.bak b/config/config.json.bak new file mode 100644 index 0000000..fb847c4 --- /dev/null +++ b/config/config.json.bak @@ -0,0 +1,66 @@ +{ + "plcs": [ + { + "name": "PLC1", + "ip": "192.168.0.100", + "rack": 0, + "slot": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 4000, + "structure": [ + { + "name": "temperature", + "type": "real", + "offset": 0 + }, + { + "name": "pressure", + "type": "int", + "offset": 4 + }, + { + "name": "status", + "type": "bool", + "offset": 6, + "bit": 0 + } + ] + }, + { + "name": "DB100_Write", + "type": "write", + "db_number": 100, + "offset": 4000, + "size": 5000 + }, + { + "name": "DB202_Params", + "type": "read_write", + "db_number": 202, + "offset": 0, + "size": 2000 + } + ] + }, + { + "name": "PLC2", + "ip": "192.168.0.101", + "rack": 0, + "slot": 1, + "areas": [ + { + "name": "DB100_Read", + "type": "read", + "db_number": 100, + "offset": 0, + "size": 4000 + } + ] + } + ] +} \ No newline at end of file diff --git a/gateway/__pycache__/api_server.cpython-313.pyc b/gateway/__pycache__/api_server.cpython-313.pyc new file mode 100644 index 0000000..b99853a Binary files /dev/null and b/gateway/__pycache__/api_server.cpython-313.pyc differ diff --git a/gateway/__pycache__/cache_manager.cpython-313.pyc b/gateway/__pycache__/cache_manager.cpython-313.pyc new file mode 100644 index 0000000..0378d6a Binary files /dev/null and b/gateway/__pycache__/cache_manager.cpython-313.pyc differ diff --git a/gateway/__pycache__/config_loader.cpython-313.pyc b/gateway/__pycache__/config_loader.cpython-313.pyc new file mode 100644 index 0000000..ce1d239 Binary files /dev/null and b/gateway/__pycache__/config_loader.cpython-313.pyc differ diff --git a/gateway/__pycache__/config_manager.cpython-313.pyc b/gateway/__pycache__/config_manager.cpython-313.pyc new file mode 100644 index 0000000..3ea7506 Binary files /dev/null and b/gateway/__pycache__/config_manager.cpython-313.pyc differ diff --git a/gateway/__pycache__/config_validator.cpython-313.pyc b/gateway/__pycache__/config_validator.cpython-313.pyc new file mode 100644 index 0000000..ad4da92 Binary files /dev/null and b/gateway/__pycache__/config_validator.cpython-313.pyc differ diff --git a/gateway/__pycache__/plc_manager.cpython-313.pyc b/gateway/__pycache__/plc_manager.cpython-313.pyc new file mode 100644 index 0000000..6d004f5 Binary files /dev/null and b/gateway/__pycache__/plc_manager.cpython-313.pyc differ diff --git a/gateway/__pycache__/snap7_client.cpython-313.pyc b/gateway/__pycache__/snap7_client.cpython-313.pyc new file mode 100644 index 0000000..22e2dcd Binary files /dev/null and b/gateway/__pycache__/snap7_client.cpython-313.pyc differ diff --git a/gateway/api_server.py b/gateway/api_server.py new file mode 100644 index 0000000..61689b9 --- /dev/null +++ b/gateway/api_server.py @@ -0,0 +1,565 @@ +from flask import Flask, jsonify, request, render_template_string, Response +import threading +import time +import json +import logging +from functools import wraps +from config_manager import ConfigManager + +class APIServer: + """REST API服务器,提供PLC数据访问和配置管理功能""" + + def __init__(self, cache_manager, config_path="../config/config.json"): + """ + 初始化API服务器 + + Args: + cache_manager: 缓存管理器实例 + config_path: 配置文件路径 + """ + self.cache_manager = cache_manager + self.config_manager = ConfigManager(config_path) + self.app = Flask(__name__) + self.setup_routes() + self.auth_enabled = True # 可通过配置关闭认证 + self.username = "admin" + self.password = "admin123" # 实际应用中应从安全存储获取 + self.start_time = time.strftime("%Y-%m-%d %H:%M:%S") + self.logger = logging.getLogger("APIServer") + + def check_auth(self, username, password): + """验证用户名和密码""" + return username == self.username and password == self.password + + def authenticate(self): + """发送401响应要求认证""" + return Response( + "Unauthorized", + 401, + {"WWW-Authenticate": 'Basic realm="PLC Gateway Configuration"'} + ) + + def requires_auth(self, f): + """装饰器:需要认证的路由,保留函数元数据""" + @wraps(f) + def decorated(*args, **kwargs): + if not self.auth_enabled: + return f(*args, **kwargs) + + auth = request.authorization + if not auth or not self.check_auth(auth.username, auth.password): + return self.authenticate() + return f(*args, **kwargs) + return decorated + + def get_summary(self): + """获取缓存摘要信息""" + summary = {} + for plc_name, areas in self.cache_manager.cache.items(): + summary[plc_name] = {} + for area_name, area in areas.items(): + last_update = self.cache_manager.last_update[plc_name][area_name] + status = area["status"] + + summary[plc_name][area_name] = { + "status": status, + "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", + "size": area["size"], + "type": area["type"] + } + return summary + + def setup_routes(self): + """设置所有API路由""" + + # =========================== + # 主页面 - 状态摘要 + # =========================== + @self.app.route("/", endpoint="index") + def index(): + summary = self.get_summary() + html = """ + + + PLC Gateway Status + + + +

PLC Gateway Status

+

Gateway running since: {{ start_time }}

+ """ + + for plc_name, areas in summary.items(): + html += f"

PLC: {plc_name}

" + html += """ + + + + + + + + + """ + + for area_name, area in areas.items(): + status_class = "" + if area["status"] == "connected": + status_class = "status-connected" + elif area["status"] == "disconnected": + status_class = "status-disconnected" + else: + status_class = "status-error" + + html += f""" + + + + + + + + """ + + html += "
Area NameTypeSize (bytes)StatusLast Update
{area_name}{area['type']}{area['size']}{area['status']}{area['last_update']}
" + + # 添加API文档部分 + html += """ +
+

API Endpoints

+ +
+ Single Read: GET /api/read////
+ Example: /api/read/PLC1/DB100_Read/10/4 +
+ +
+ Single Write: POST /api/write///
+ Body: Raw binary data
+ Example: POST /api/write/PLC1/DB100_Write/10 with 4 bytes of data +
+ +
+ Batch Read: POST /api/batch_read
+ Body: JSON array of read requests
+ Example: [{"plc_name":"PLC1", "area_name":"DB100_Read", "offset":0, "length":4}] +
+ +
+ Batch Write: POST /api/batch_write
+ Body: JSON array of write requests
+ Example: [{"plc_name":"PLC1", "area_name":"DB100_Write", "offset":0, "data":[1,2,3,4]}] +
+ +
+ Configuration: GET/POST /api/config
+ Manage gateway configuration +
+
+ + + + + """ + + html += """ + + + """ + return render_template_string(html, start_time=self.start_time) + + # =========================== + # 系统状态API + # =========================== + @self.app.route("/api/status", endpoint="system_status") + def system_status(): + """获取系统状态信息""" + return jsonify({ + "status": "running", + "start_time": self.start_time, + "plc_count": len(self.config_manager.get_config().get("plcs", [])), + "cache_size": sum(len(area["data"]) for plc in self.cache_manager.cache.values() for area in plc.values()) + }) + + # =========================== + # 配置管理相关路由 + # =========================== + @self.app.route("/config", endpoint="config_page") + @self.requires_auth + def config_page(): + """配置编辑页面""" + config = self.config_manager.get_config() + config_json = json.dumps(config, indent=2) + + html = """ + + + PLC Gateway Configuration + + + +

PLC Gateway Configuration

+
+

Edit the configuration JSON below. Be careful with the syntax.

+ +
+ +
+ + + +
+
+ +
+ +
+

Configuration Guide

+

PLC Configuration:

+
    +
  • name: Unique name for the PLC
  • +
  • ip: IP address of the PLC
  • +
  • rack: Rack number (usually 0)
  • +
  • slot: Slot number (usually 1 for S7-1200)
  • +
+ +

Data Area Configuration:

+
    +
  • name: Name of the data area
  • +
  • type: read, write, or read_write
  • +
  • db_number: DB number (e.g., 100 for DB100)
  • +
  • offset: Starting byte offset
  • +
  • size: Size in bytes
  • +
  • structure (optional): Define how to parse the data
  • +
+ +

Example:

+
{
+  "plcs": [
+    {
+      "name": "PLC1",
+      "ip": "192.168.0.10",
+      "rack": 0,
+      "slot": 1,
+      "areas": [
+        {
+          "name": "DB100_Read",
+          "type": "read",
+          "db_number": 100,
+          "offset": 0,
+          "size": 4000,
+          "structure": [
+            {"name": "temperature", "type": "real", "offset": 0},
+            {"name": "pressure", "type": "int", "offset": 4}
+          ]
+        }
+      ]
+    }
+  ]
+}
+
+
+ + + + + """ + return render_template_string( + html, + config_json=config_json, + username=self.username, + password=self.password + ) + + # 配置验证API + @self.app.route("/api/config/validate", methods=["POST"], endpoint="validate_config") + @self.requires_auth + def validate_config(): + """验证配置是否有效""" + try: + config = request.json + is_valid, error = self.config_manager.validate_config(config) + if is_valid: + return jsonify({"valid": True}) + else: + return jsonify({"valid": False, "message": error}), 400 + except Exception as e: + return jsonify({"valid": False, "message": str(e)}), 400 + + # 配置获取API + @self.app.route("/api/config", methods=["GET"], endpoint="get_config") + @self.requires_auth + def get_config(): + """获取当前配置""" + return jsonify(self.config_manager.get_config()) + + # 配置保存API + @self.app.route("/api/config", methods=["POST"], endpoint="save_config") + @self.requires_auth + def save_config(): + """保存配置""" + try: + config = request.json + reload = request.args.get('reload', 'false').lower() == 'true' + + success, message = self.config_manager.save_config(config) + if success: + if reload: + # 通知主应用程序重载配置 + if hasattr(self.cache_manager, 'app') and self.cache_manager.app: + self.cache_manager.app.request_reload() + return jsonify({ + "success": True, + "message": "Configuration saved and reload requested" + }) + else: + return jsonify({ + "success": True, + "message": "Configuration saved successfully (restart to apply changes)" + }) + else: + return jsonify({ + "success": False, + "message": message + }), 400 + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error saving config: {str(e)}" + }), 500 + + # =========================== + # 数据访问API + # =========================== + # 单个读取接口 + @self.app.route("/api/read////", methods=["GET"], endpoint="single_read") + def single_read(plc_name, area_name, offset, length): + """从指定区域读取数据""" + data, error = self.cache_manager.read_area(plc_name, area_name, offset, length) + if error: + return jsonify({"status": "error", "message": error}), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": length, + "data": list(data) + }) + + # 单个写入接口 + @self.app.route("/api/write///", methods=["POST"], endpoint="single_write") + def single_write(plc_name, area_name, offset): + """向指定区域写入数据""" + data = request.data + if not data: + return jsonify({"status": "error", "message": "No data provided"}), 400 + + success, error = self.cache_manager.write_area(plc_name, area_name, offset, data) + if error: + return jsonify({"status": "error", "message": error}), 400 + return jsonify({ + "status": "success", + "plc_name": plc_name, + "area_name": area_name, + "offset": offset, + "length": len(data) + }) + + # 批量读取接口 + @self.app.route("/api/batch_read", methods=["POST"], endpoint="batch_read") + def batch_read(): + """批量读取多个区域的数据""" + try: + requests = request.json + if not isinstance(requests, list): + return jsonify({"status": "error", "message": "Request must be a JSON array"}), 400 + results = self.cache_manager.batch_read(requests) + return jsonify(results) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 400 + + # 批量写入接口 + @self.app.route("/api/batch_write", methods=["POST"], endpoint="batch_write") + def batch_write(): + """批量写入多个区域的数据""" + try: + requests = request.json + if not isinstance(requests, list): + return jsonify({"status": "error", "message": "Request must be a JSON array"}), 400 + results = self.cache_manager.batch_write(requests) + return jsonify(results) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 400 + + # 区域状态检查 + @self.app.route("/api/status//", methods=["GET"], endpoint="area_status") + def area_status(plc_name, area_name): + """获取区域状态""" + return jsonify(self.cache_manager.get_area_status(plc_name, area_name)) + + # 获取解析后的数据 + @self.app.route("/api/data//", methods=["GET"], endpoint="get_parsed_data") + def get_parsed_data(plc_name, area_name): + """获取解析后的数据""" + return jsonify(self.cache_manager.get_parsed_data(plc_name, area_name)) + + def start(self): + """启动API服务器""" + self.server_thread = threading.Thread( + target=self.app.run, + kwargs={ + "host": "0.0.0.0", + "port": 5000, + "threaded": True, + "use_reloader": False # 避免在生产环境中使用重载器 + }, + daemon=True, + name="APIServerThread" + ) + self.server_thread.start() + self.logger.info("API server started at http://0.0.0.0:5000") \ No newline at end of file diff --git a/gateway/cache_manager.py b/gateway/cache_manager.py new file mode 100644 index 0000000..32f8fff --- /dev/null +++ b/gateway/cache_manager.py @@ -0,0 +1,277 @@ +import threading +import time +import logging + +class CacheManager: + """PLC数据缓存管理器""" + + def __init__(self, config, plc_manager, app=None): + """ + 初始化缓存管理器 + + Args: + config: 配置对象 + plc_manager: PLC管理器实例 + app: 主应用程序引用(用于配置重载) + """ + self.plc_manager = plc_manager + self.config = config + self.app = app + self.cache = {} + self.refresh_interval = 1 # 1秒刷新一次 + self.running = False + self.lock = threading.Lock() + self.thread = None + self.last_update = {} + self.logger = logging.getLogger("CacheManager") + self.init_cache() + + def init_cache(self): + """初始化缓存结构""" + for plc in self.config["plcs"]: + plc_name = plc["name"] + self.cache[plc_name] = {} + self.last_update[plc_name] = {} + + for area in plc["areas"]: + name = area["name"] + # 确保初始状态为断开 + self.cache[plc_name][name] = { + "data": bytearray(area["size"]), + "db_number": area["db_number"], + "offset": area["offset"], + "size": area["size"], + "type": area["type"], + "structure": area.get("structure", []), + "status": "disconnected" # 初始状态为断开 + } + self.last_update[plc_name][name] = 0 + + def refresh_cache(self): + """后台线程:定期刷新缓存""" + while self.running: + try: + for plc in self.config["plcs"]: + plc_name = plc["name"] + client = self.plc_manager.get_plc(plc_name) + + for area in plc["areas"]: + if area["type"] in ["read", "read_write"]: + name = area["name"] + try: + data = client.read_db(area["db_number"], area["offset"], area["size"]) + with self.lock: + self.cache[plc_name][name]["data"] = bytearray(data) + self.cache[plc_name][name]["status"] = "connected" + self.last_update[plc_name][name] = time.time() + except Exception as e: + with self.lock: + self.cache[plc_name][name]["status"] = "disconnected" + self.logger.warning(f"PLC {plc_name} area {name} disconnected: {e}") + + time.sleep(self.refresh_interval) + except Exception as e: + self.logger.error(f"Error in refresh_cache: {e}") + time.sleep(self.refresh_interval) + + def start(self): + """启动缓存刷新线程""" + if self.running: + return + + self.running = True + self.thread = threading.Thread( + target=self.refresh_cache, + name="CacheRefreshThread", + daemon=True + ) + self.thread.start() + self.logger.info("Cache manager started") + + def stop(self): + """停止缓存刷新线程""" + if not self.running: + return + + self.running = False + if self.thread: + # 等待线程结束,但设置超时防止卡死 + self.thread.join(timeout=2.0) + if self.thread.is_alive(): + self.logger.warning("Cache refresh thread did not terminate gracefully") + self.thread = None + self.logger.info("Cache manager stopped") + + def get_summary(self): + """获取缓存摘要信息""" + summary = {} + with self.lock: + for plc_name, areas in self.cache.items(): + summary[plc_name] = {} + for area_name, area in areas.items(): + last_update = self.last_update[plc_name][area_name] + summary[plc_name][area_name] = { + "status": area["status"], + "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", + "size": area["size"], + "type": area["type"] + } + return summary + + def get_area_status(self, plc_name, area_name): + """获取区域状态""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return {"status": "not_found", "message": "PLC or area not found"} + return { + "status": area["status"], + "last_update": self.last_update[plc_name][area_name], + "size": area["size"], + "type": area["type"] + } + + def read_area(self, plc_name, area_name, offset, length): + """单个区域读取""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return None, "Area not found" + + if offset + length > area["size"]: + return None, "Offset out of bounds" + + client = self.plc_manager.get_plc(plc_name) + try: + data = client.read_db(area["db_number"], area["offset"] + offset, length) + # 更新缓存中的这部分数据 + for i in range(length): + area["data"][offset + i] = data[i] + self.last_update[plc_name][area_name] = time.time() + area["status"] = "connected" + return data, None + except Exception as e: + area["status"] = "disconnected" + self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") + return None, f"Read failed: {str(e)}" + + def write_area(self, plc_name, area_name, offset, data): + """单个区域写入""" + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return False, "Area not found" + + if area["type"] not in ["write", "read_write"]: + return False, "Area is read-only" + + if offset + len(data) > area["size"]: + return False, "Offset out of bounds" + + client = self.plc_manager.get_plc(plc_name) + try: + success = client.write_db(area["db_number"], area["offset"] + offset, data) + if success: + # 更新缓存中的这部分数据 + for i in range(len(data)): + area["data"][offset + i] = data[i] + self.last_update[plc_name][area_name] = time.time() + area["status"] = "connected (last write)" + return True, None + else: + area["status"] = "disconnected" + return False, "Write failed" + except Exception as e: + area["status"] = "disconnected" + self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") + return False, f"Write failed: {str(e)}" + + def batch_read(self, requests): + """批量读取""" + results = [] + for req in requests: + plc_name = req["plc_name"] + area_name = req["area_name"] + offset = req.get("offset", 0) + length = req.get("length", None) + + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "message": "Area not found" + }) + continue + + # 如果未指定length,读取整个区域 + if length is None: + length = area["size"] - offset + + data, error = self.read_area(plc_name, area_name, offset, length) + if error: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "message": error + }) + else: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "success", + "offset": offset, + "length": length, + "data": list(data) + }) + return results + + def batch_write(self, requests): + """批量写入""" + results = [] + for req in requests: + plc_name = req["plc_name"] + area_name = req["area_name"] + offset = req["offset"] + data = bytes(req["data"]) + + success, error = self.write_area(plc_name, area_name, offset, data) + if error: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "error", + "message": error, + "offset": offset + }) + else: + results.append({ + "plc_name": plc_name, + "area_name": area_name, + "status": "success", + "offset": offset, + "length": len(data) + }) + return results + + def get_parsed_data(self, plc_name, area_name): + """获取解析后的数据""" + from data_parser import parse_data + + with self.lock: + area = self.cache.get(plc_name, {}).get(area_name) + if not area: + return {"error": "Area not found"} + + structure = area.get("structure", []) + if structure: + return parse_data(area["data"], structure) + else: + return { + "raw_data": list(area["data"]), + "status": area["status"], + "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.last_update[plc_name][area_name])) + if self.last_update[plc_name][area_name] > 0 else "Never" + } \ No newline at end of file diff --git a/gateway/config_loader.py b/gateway/config_loader.py new file mode 100644 index 0000000..772dfd3 --- /dev/null +++ b/gateway/config_loader.py @@ -0,0 +1,10 @@ +import json +import os + +def load_config(config_path="config/config.json"): + """加载配置文件""" + if not os.path.exists(config_path): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + with open(config_path, "r") as f: + return json.load(f) \ No newline at end of file diff --git a/gateway/config_manager.py b/gateway/config_manager.py new file mode 100644 index 0000000..b01e50b --- /dev/null +++ b/gateway/config_manager.py @@ -0,0 +1,87 @@ +import json +import os +import logging +from config_validator import ConfigValidator + +class ConfigManager: + """配置文件管理器""" + + def __init__(self, config_path="../config/config.json"): + self.config_path = config_path + self.config = None + self.load_config() + + def load_config(self): + """加载配置文件""" + try: + if not os.path.exists(self.config_path): + # 尝试从备份恢复 + backup_path = self.config_path + ".bak" + if os.path.exists(backup_path): + logging.warning(f"Main config not found, using backup: {backup_path}") + with open(backup_path, 'r') as src, open(self.config_path, 'w') as dst: + config_data = src.read() + dst.write(config_data) + else: + raise FileNotFoundError(f"Configuration file not found: {self.config_path}") + + with open(self.config_path, 'r') as f: + self.config = json.load(f) + + # 验证配置 + is_valid, error = ConfigValidator.validate_config(self.config) + if not is_valid: + logging.error(f"Invalid configuration: {error}") + # 尝试从备份恢复 + backup_path = self.config_path + ".bak" + if os.path.exists(backup_path): + logging.warning("Attempting to load from backup configuration") + with open(backup_path, 'r') as f: + self.config = json.load(f) + is_valid, error = ConfigValidator.validate_config(self.config) + if not is_valid: + raise ValueError(f"Backup config also invalid: {error}") + else: + raise ValueError(f"Invalid configuration: {error}") + + return True, None + except Exception as e: + logging.error(f"Failed to load config: {e}") + self.config = {"plcs": []} + return False, str(e) + + def get_config(self): + """获取当前配置""" + return self.config + + def validate_config(self, config): + """验证配置是否有效""" + return ConfigValidator.validate_config(config) + + def save_config(self, new_config): + """保存配置文件""" + try: + # 验证配置 + is_valid, error = self.validate_config(new_config) + if not is_valid: + return False, f"Invalid configuration: {error}" + + # 备份旧配置 + backup_path = self.config_path + ".bak" + if os.path.exists(self.config_path): + with open(self.config_path, 'r') as src, open(backup_path, 'w') as dst: + dst.write(src.read()) + + # 保存新配置 + with open(self.config_path, 'w') as f: + json.dump(new_config, f, indent=2) + + self.config = new_config + return True, None + except Exception as e: + logging.error(f"Failed to save config: {e}") + return False, str(e) + + def reload_config(self): + """重新加载配置""" + return self.load_config() \ No newline at end of file diff --git a/gateway/config_validator.py b/gateway/config_validator.py new file mode 100644 index 0000000..487edf5 --- /dev/null +++ b/gateway/config_validator.py @@ -0,0 +1,82 @@ +import json +from jsonschema import validate, Draft7Validator, FormatChecker + +class ConfigValidator: + """配置文件验证器""" + + SCHEMA = { + "type": "object", + "properties": { + "plcs": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "ip": {"type": "string", "format": "ipv4"}, + "rack": {"type": "integer", "minimum": 0}, + "slot": {"type": "integer", "minimum": 0}, + "areas": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string", "minLength": 1}, + "type": {"type": "string", "enum": ["read", "write", "read_write"]}, + "db_number": {"type": "integer", "minimum": 1}, + "offset": {"type": "integer", "minimum": 0}, + "size": {"type": "integer", "minimum": 1}, + "structure": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "type": {"type": "string", "enum": ["bool", "byte", "int", "dint", "real", "word", "dword"]}, + "offset": {"type": "integer", "minimum": 0}, + "bit": {"type": "integer", "minimum": 0, "maximum": 7} # 修复了这里 + }, + "required": ["name", "type", "offset"] + } + } + }, + "required": ["name", "type", "db_number", "offset", "size"] + } + } + }, + "required": ["name", "ip", "rack", "slot", "areas"] + } + } + }, + "required": ["plcs"] + } + + @staticmethod + def validate_config(config): + """验证配置是否符合规范""" + try: + # 添加IPv4格式验证 + validator = Draft7Validator( + ConfigValidator.SCHEMA, + format_checker=FormatChecker(["ipv4"]) + ) + validator.validate(config) + return True, None + except Exception as e: + return False, str(e) + + @staticmethod + def is_valid_ip(ip): + """验证IP地址格式""" + parts = ip.split('.') + if len(parts) != 4: + return False + for part in parts: + if not part.isdigit(): + return False + num = int(part) + if num < 0 or num > 255: + return False + return True \ No newline at end of file diff --git a/gateway/data_parser.py b/gateway/data_parser.py new file mode 100644 index 0000000..38890b3 --- /dev/null +++ b/gateway/data_parser.py @@ -0,0 +1,54 @@ +from struct import unpack + +def parse_data(data, structure): + """解析结构化数据""" + result = {"raw_data": list(data)} + + if not structure: + return result + + result["parsed"] = {} + for field in structure: + offset = field["offset"] + name = field["name"] + data_type = field["type"] + + try: + if data_type == "int": + if offset + 2 > len(data): + raise ValueError("Offset out of bounds") + val = unpack(">h", data[offset:offset+2])[0] + elif data_type == "dint": + if offset + 4 > len(data): + raise ValueError("Offset out of bounds") + val = unpack(">l", data[offset:offset+4])[0] + elif data_type == "real": + if offset + 4 > len(data): + raise ValueError("Offset out of bounds") + val = unpack(">f", data[offset:offset+4])[0] + elif data_type == "bool": + bit = field.get("bit", 0) + if offset >= len(data): + raise ValueError("Offset out of bounds") + byte = data[offset] + val = bool((byte >> bit) & 1) + elif data_type == "byte": + if offset >= len(data): + raise ValueError("Offset out of bounds") + val = data[offset] + elif data_type == "word": + if offset + 2 > len(data): + raise ValueError("Offset out of bounds") + val = (data[offset] << 8) | data[offset + 1] + elif data_type == "dword": + if offset + 4 > len(data): + raise ValueError("Offset out of bounds") + val = (data[offset] << 24) | (data[offset+1] << 16) | (data[offset+2] << 8) | data[offset+3] + else: + val = f"Unknown type: {data_type}" + + result["parsed"][name] = val + except Exception as e: + result["parsed"][name] = f"Error: {str(e)}" + + return result \ No newline at end of file diff --git a/gateway/main.py b/gateway/main.py new file mode 100644 index 0000000..0295fac --- /dev/null +++ b/gateway/main.py @@ -0,0 +1,109 @@ +import logging +import time +import threading +from config_loader import load_config +from plc_manager import PLCManager +from cache_manager import CacheManager +from api_server import APIServer +from config_manager import ConfigManager + +class GatewayApp: + """PLC网关应用程序主类""" + + def __init__(self, config_path="../config/config.json"): + self.config_path = config_path + self.config_manager = ConfigManager(config_path) + self.plc_manager = None + self.cache_manager = None + self.api_server = None + self.reload_flag = False + self.reload_lock = threading.Lock() + self.logger = logging.getLogger("GatewayApp") + + # 加载初始配置 + self.load_configuration() + + def load_configuration(self): + """加载配置并初始化组件""" + # 加载配置 + if not self.config_manager.load_config(): + self.logger.error("Failed to load initial configuration") + return False + + config = self.config_manager.get_config() + + # 重新初始化PLC连接 + if self.plc_manager: + self.logger.info("Reinitializing PLC connections...") + self.plc_manager = PLCManager(config["plcs"]) + self.plc_manager.connect_all() + + # 重新初始化缓存 + if self.cache_manager: + self.logger.info("Stopping existing cache manager...") + self.cache_manager.stop() + + self.logger.info("Initializing cache manager...") + self.cache_manager = CacheManager(config, self.plc_manager, app=self) + self.cache_manager.start() + + # 重新初始化API服务器 + if self.api_server: + self.logger.info("API server already running") + else: + self.logger.info("Starting API server...") + self.api_server = APIServer(self.cache_manager, self.config_path) + self.api_server.start() + + self.logger.info("Configuration loaded successfully") + return True + + def check_for_reload(self): + """检查是否需要重载配置""" + while True: + with self.reload_lock: + if self.reload_flag: + self.reload_flag = False + self.load_configuration() + time.sleep(1) + + def request_reload(self): + """请求重载配置""" + with self.reload_lock: + self.reload_flag = True + self.logger.info("Configuration reload requested") + + def run(self): + """运行主程序""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' + ) + + self.logger.info("Starting PLC Gateway...") + + # 启动配置重载检查线程 + reload_thread = threading.Thread( + target=self.check_for_reload, + name="ConfigReloadThread", + daemon=True + ) + reload_thread.start() + + try: + # 保持主程序运行 + while True: + time.sleep(1) + except KeyboardInterrupt: + self.logger.info("Shutting down gracefully...") + finally: + if self.cache_manager: + self.cache_manager.stop() + self.logger.info("Shutdown complete") + +def main(): + app = GatewayApp() + app.run() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/gateway/plc_manager.py b/gateway/plc_manager.py new file mode 100644 index 0000000..038b1e8 --- /dev/null +++ b/gateway/plc_manager.py @@ -0,0 +1,42 @@ +from snap7_client import Snap7Client +import logging + +class PLCManager: + """PLC连接管理器,管理多个PLC连接""" + + def __init__(self, plcs_config): + """ + 初始化PLC管理器 + + Args: + plcs_config: PLC配置列表 + """ + self.plcs = {} + for plc_config in plcs_config: + name = plc_config["name"] + self.plcs[name] = Snap7Client( + plc_config["ip"], + plc_config["rack"], + plc_config["slot"] + ) + self.logger = logging.getLogger("PLCManager") + + def get_plc(self, name): + """获取指定名称的PLC客户端""" + return self.plcs.get(name) + + def connect_all(self): + """连接所有配置的PLC""" + for name, client in self.plcs.items(): + client.connect() + + def get_connection_status(self): + """获取所有PLC的连接状态""" + status = {} + for name, client in self.plcs.items(): + status[name] = { + "ip": client.ip, + "connected": client.connected, + "retry_count": client.retry_count + } + return status \ No newline at end of file diff --git a/gateway/snap7_client.py b/gateway/snap7_client.py new file mode 100644 index 0000000..94c5218 --- /dev/null +++ b/gateway/snap7_client.py @@ -0,0 +1,108 @@ +import snap7 +import logging +from threading import Lock +import time + +class Snap7Client: + """Snap7客户端,处理与PLC的通信""" + + def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1): + """ + 初始化Snap7客户端 + + Args: + ip: PLC IP地址 + rack: Rack编号 + slot: Slot编号 + max_retries: 最大重试次数 + retry_base_delay: 基础重试延迟(秒) + """ + self.ip = ip + self.rack = rack + self.slot = slot + self.client = snap7.client.Client() + self.lock = Lock() + self.connected = False + self.max_retries = max_retries + self.retry_base_delay = retry_base_delay + self.last_connect_attempt = 0 + self.retry_count = 0 + self.logger = logging.getLogger(f"Snap7Client[{ip}]") + + def connect(self): + """建立与PLC的连接""" + current_time = time.time() + # 指数退避重试 + if self.retry_count > 0: + delay = min(self.retry_base_delay * (2 ** (self.retry_count - 1)), 30) + if current_time - self.last_connect_attempt < delay: + return False # 未到重试时间 + + self.last_connect_attempt = current_time + try: + self.client.connect(self.ip, self.rack, self.slot) + if self.client.get_connected(): + self.connected = True + self.retry_count = 0 # 重置重试计数 + self.logger.info(f"Connected to PLC {self.ip}") + return True + else: + self.connected = False + self.logger.warning(f"Connection to {self.ip} established but not verified") + return False + except Exception as e: + self.retry_count = min(self.retry_count + 1, self.max_retries) + self.logger.error(f"Connection to {self.ip} failed (attempt {self.retry_count}/{self.max_retries}): {e}") + self.connected = False + return False + + def read_db(self, db_number, offset, size): + """ + 从DB块读取数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + size: 读取字节数 + + Returns: + bytearray: 读取的数据 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Read failed: not connected to {self.ip}") + return b'\x00' * size + + try: + with self.lock: + data = self.client.db_read(db_number, offset, size) + return data + except Exception as e: + self.logger.error(f"Read DB{db_number} error: {e}") + self.connected = False + return b'\x00' * size + + def write_db(self, db_number, offset, data): + """ + 向DB块写入数据 + + Args: + db_number: DB编号 + offset: 起始偏移量 + data: 要写入的数据 + + Returns: + bool: 是否写入成功 + """ + if not self.connected and not self.connect(): + self.logger.warning(f"Write failed: not connected to {self.ip}") + return False + + try: + with self.lock: + self.client.db_write(db_number, offset, data) + self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") + return True + except Exception as e: + self.logger.error(f"Write DB{db_number} error: {e}") + self.connected = False + return False \ No newline at end of file