import threading import time import logging from snap7.util import * class CacheManager: """PLC数据缓存管理器""" def __init__(self, config, plc_manager, app=None): """ 初始化缓存管理器 Args: config: 配置对象 plc_manager: PLC管理器实例 app: 主应用程序引用(用于配置重载) """ self.plc_manager = plc_manager self.config = config self.app = app self.cache = {} self.refresh_interval = 1 # 1秒刷新一次 self.running = False self.lock = threading.Lock() self.thread = None self.last_update = {} # 区域级最后更新时间 self.plc_last_connected = {} # PLC级最后连接时间 self.plc_connection_status = {} # PLC连接状态 self.logger = logging.getLogger("CacheManager") self.init_cache() def init_cache(self): """初始化缓存结构""" for plc in self.config["plcs"]: plc_name = plc["name"] self.cache[plc_name] = {} self.last_update[plc_name] = {} self.plc_last_connected[plc_name] = 0 # 初始化为0(未连接) self.plc_connection_status[plc_name] = "never_connected" for area in plc["areas"]: name = area["name"] # 确保初始状态为断开 self.cache[plc_name][name] = { "data": bytearray(area["size"]), "db_number": area["db_number"], "offset": area["offset"], "size": area["size"], "type": area["type"], "structure": area.get("structure", []), "status": "disconnected" # 初始状态为断开 } self.last_update[plc_name][name] = 0 def refresh_cache(self): """后台线程:定期刷新缓存""" while self.running: start_time = time.time() try: for plc in self.config["plcs"]: plc_name = plc["name"] refresh_interval = plc.get("refresh_interval", 0.5) client = self.plc_manager.get_plc(plc_name) # 检查PLC连接状态 plc_connected = client.connected # 更新PLC连接状态 with self.lock: if plc_connected: self.plc_last_connected[plc_name] = time.time() self.plc_connection_status[plc_name] = "connected" else: if self.plc_last_connected[plc_name] == 0: self.plc_connection_status[plc_name] = "never_connected" else: self.plc_connection_status[plc_name] = "disconnected" # 刷新所有可读区域 for area in plc["areas"]: if area["type"] in ["read", "read_write"]: name = area["name"] try: data = client.read_db(area["db_number"], area["offset"], area["size"]) # 更新区域状态基于PLC连接状态和读取结果 with self.lock: if plc_connected and data and len(data) == area["size"]: self.cache[plc_name][name]["data"] = bytearray(data) self.cache[plc_name][name]["status"] = "connected" self.last_update[plc_name][name] = time.time() else: self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name] # 如果之前有数据,保留旧数据但标记状态 if self.last_update[plc_name][name] > 0: self.logger.info(f"PLC {plc_name} area {name} disconnected but keeping last valid data") except Exception as e: with self.lock: self.cache[plc_name][name]["status"] = self.plc_connection_status[plc_name] self.logger.warning(f"Error updating status for {plc_name}/{name}: {e}") """计算刷新一个PLC的时间""" # 计算实际执行时间 execution_time = time.time() - start_time #计算需要睡眠的时间,确保总等于refresh_time sleep_time = max(0, refresh_interval - execution_time) time.sleep(sleep_time) print(f"plc_name: {plc_name}," f"Cache refresh completed.Execution time: {execution_time:.3f}s," f"Sleep time: {sleep_time:.3f}s," f"Total interval: {execution_time + sleep_time:.3f}s") # 记录实际刷新间隔 self.logger.debug(f"plc_name: {plc_name}," f"Cache refresh completed.Execution time: {execution_time:.3f}s," f"Sleep time: {sleep_time:.3f}s," f"Total interval: {execution_time + sleep_time:.3f}s") time.sleep(self.refresh_interval) except Exception as e: self.logger.error(f"Error in refresh_cache: {e}") time.sleep(self.refresh_interval) def start(self): """启动缓存刷新线程""" if self.running: return self.running = True self.thread = threading.Thread( target=self.refresh_cache, name="CacheRefreshThread", daemon=True ) self.thread.start() self.logger.info("Cache manager started") def stop(self): """停止缓存刷新线程""" if not self.running: return self.running = False if self.thread: # 等待线程结束,但设置超时防止卡死 self.thread.join(timeout=2.0) if self.thread.is_alive(): self.logger.warning("Cache refresh thread did not terminate gracefully") self.thread = None self.logger.info("Cache manager stopped") def get_plc_connection_status(self, plc_name): """获取PLC连接状态""" with self.lock: return self.plc_connection_status.get(plc_name, "unknown") def get_last_update_time(self, plc_name, area_name): """获取区域数据最后更新时间""" with self.lock: return self.last_update.get(plc_name, {}).get(area_name, 0) def get_summary(self): """获取缓存摘要信息""" summary = {} with self.lock: for plc_name, areas in self.cache.items(): summary[plc_name] = {} for area_name, area in areas.items(): last_update = self.last_update[plc_name][area_name] plc_status = self.plc_connection_status.get(plc_name, "unknown") # 区域状态应与PLC连接状态一致,除非有有效数据 area_status = area["status"] if plc_status == "never_connected": area_status = "never_connected" elif plc_status == "disconnected" and self.last_update[plc_name][area_name] == 0: area_status = "disconnected" summary[plc_name][area_name] = { "status": area_status, "plc_connection_status": plc_status, "last_update": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", "size": area["size"], "type": area["type"] } return summary def get_area_status(self, plc_name, area_name): """获取区域状态""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return {"status": "not_found", "message": "PLC or area not found"} plc_status = self.plc_connection_status.get(plc_name, "unknown") last_update = self.last_update.get(plc_name, {}).get(area_name, 0) # 区域状态应与PLC连接状态一致,除非有有效数据 area_status = area["status"] if plc_status == "never_connected": area_status = "never_connected" elif plc_status == "disconnected" and last_update == 0: area_status = "disconnected" return { "status": area_status, "plc_connection_status": plc_status, "last_update": last_update, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never", "size": area["size"], "type": area["type"] } def read_area(self, plc_name, area_name, offset, length): """单个区域读取""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return None, "Area not found", "unknown", 0 if offset + length > area["size"]: return None, "Offset out of bounds", "unknown", 0 client = self.plc_manager.get_plc(plc_name) plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": return None, f"PLC not connected (status: {plc_status})", plc_status, 0 try: data = client.read_db(area["db_number"], area["offset"] + offset, length) # 验证数据有效性 if data and len(data) == length: # 更新缓存中的这部分数据 for i in range(length): area["data"][offset + i] = data[i] update_time = time.time() self.last_update[plc_name][area_name] = update_time area["status"] = "connected" return data, None, plc_status, update_time else: area["status"] = plc_status return None, "Invalid data returned", plc_status, 0 except Exception as e: area["status"] = plc_status self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") return None, f"Read failed: {str(e)}", plc_status, 0 def read_area_bool(self, plc_name, area_name, offset, length): """单个区域读取""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return None, "Area not found", "unknown", 0 if offset + length > area["size"]: return None, "Offset out of bounds", "unknown", 0 client = self.plc_manager.get_plc(plc_name) plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": return None, f"PLC not connected (status: {plc_status})", plc_status, 0 try: data = client.read_db_bool(area["db_number"], area["offset"] + offset, length) # 验证数据有效性 if all(isinstance(val, bool) for val in data.values()): # 按字典键顺序更新多个值 for i, val in data.items(): area["data"][offset + i] = val # 确保offset+i不越界 # area["data"][offset] = data.values update_time = time.time() self.last_update[plc_name][area_name] = update_time area["status"] = "connected" return data, None, plc_status, update_time else: area["status"] = plc_status return None, "Invalid data returned", plc_status, 0 except Exception as e: area["status"] = plc_status self.logger.error(f"Read failed for {plc_name}/{area_name}: {e}") return None, f"Read failed: {str(e)}", plc_status, 0 def write_area(self, plc_name, area_name, offset, data): """单个区域写入""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return False, "Area not found", "unknown", 0 if area["type"] not in ["write", "read_write"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Area is read-only", plc_status, 0 if offset + len(data) > area["size"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Offset out of bounds", plc_status, 0 client = self.plc_manager.get_plc(plc_name) plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": return False, f"PLC not connected (status: {plc_status})", plc_status, 0 try: success = client.write_db(area["db_number"], area["offset"] + offset, data) if success: # 更新缓存中的这部分数据 for i in range(len(data)): area["data"][offset + i] = data[i] update_time = time.time() self.last_update[plc_name][area_name] = update_time area["status"] = "connected (last write)" return True, None, plc_status, update_time else: area["status"] = plc_status return False, "Write failed", plc_status, 0 except Exception as e: area["status"] = plc_status self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") return False, f"Write failed: {str(e)}", plc_status, 0 def batch_write_area(self, plc_name, area_name, offset, data): """单个区域写入""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return False, "Area not found", "unknown", 0 if area["type"] not in ["write", "read_write"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Area is read-only", plc_status, 0 if offset + len(data) > area["size"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Offset out of bounds", plc_status, 0 client = self.plc_manager.get_plc(plc_name) plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": return False, f"PLC not connected (status: {plc_status})", plc_status, 0 try: for i, byte in enumerate(data): byte_data = bytes([byte]) current_offset = offset + (i * 2) byte_value = byte_data[0] value = bytearray(2) if isinstance(byte_value, int): set_int(value, 0, byte_value) data = value success = client.batch_write_db(area["db_number"], current_offset, data) if success: # 更新缓存中的这部分数据 for j in range(len(data)): area["data"][offset + j] = data[j] update_time = time.time() self.last_update[plc_name][area_name] = update_time area["status"] = "connected (last write)" else: area["status"] = plc_status return False, "Write failed", plc_status, 0 return True, None, plc_status, update_time except Exception as e: area["status"] = plc_status self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") return False, f"Write failed: {str(e)}", plc_status, 0 def batch_write_bool_area(self, plc_name, area_name, offset, data): """单个区域写入""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return False, "Area not found", "unknown", 0 if area["type"] not in ["write", "read_write"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Area is read-only", plc_status, 0 if offset + len(data) > area["size"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Offset out of bounds", plc_status, 0 client = self.plc_manager.get_plc(plc_name) plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": return False, f"PLC not connected (status: {plc_status})", plc_status, 0 try: value = bytearray(offset + 1) for bit, bit_value in enumerate(data): set_bool(value, offset, bit, bit_value) data = value success = client.batch_write_db_bool(area["db_number"], offset, data) if success: # 更新缓存中的这部分数据 for j in range(len(data)): area["data"][offset + j] = data[j] update_time = time.time() self.last_update[plc_name][area_name] = update_time area["status"] = "connected (last write)" else: area["status"] = plc_status return False, "Write failed", plc_status, 0 return True, None, plc_status, update_time except Exception as e: area["status"] = plc_status self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") return False, f"Write failed: {str(e)}", plc_status, 0 def write_area_bool(self, plc_name, area_name, offset, data): """单个区域写入""" with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return False, "Area not found", "unknown", 0 if area["type"] not in ["write", "read_write"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Area is read-only", plc_status, 0 if offset + len(data) > area["size"]: plc_status = self.plc_connection_status.get(plc_name, "unknown") return False, "Offset out of bounds", plc_status, 0 client = self.plc_manager.get_plc(plc_name) plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": return False, f"PLC not connected (status: {plc_status})", plc_status, 0 try: success = client.write_db_bool(area["db_number"], area["offset"] + offset, data) if success: # 更新缓存中的这部分数据 for i in range(len(data)): area["data"][offset + i] = data[i] update_time = time.time() self.last_update[plc_name][area_name] = update_time area["status"] = "connected (last write)" return True, None, plc_status, update_time else: area["status"] = plc_status return False, "Write failed", plc_status, 0 except Exception as e: area["status"] = plc_status self.logger.error(f"Write failed for {plc_name}/{area_name}: {e}") return False, f"Write failed: {str(e)}", plc_status, 0 def batch_read(self, requests): """批量读取""" results = [] for req in requests: plc_name = req["plc_name"] area_name = req["area_name"] offset = req.get("offset", 0) length = req.get("length", None) # 获取PLC连接状态 plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": 0, "last_update_formatted": "N/A", "message": f"PLC not connected (status: {plc_status})" }) continue area = self.cache.get(plc_name, {}).get(area_name) if not area: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": 0, "last_update_formatted": "N/A", "message": "Area not found" }) continue # 如果未指定length,读取整个区域 if length is None: length = area["size"] - offset data, error, _, update_time = self.read_area(plc_name, area_name, offset, length) if error: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", "message": error }) else: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "success", "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), "offset": offset, "length": length, "data": list(data) }) return results def batch_write(self, requests): """批量写入""" results = [] for req in requests: plc_name = req["plc_name"] area_name = req["area_name"] offset = req["offset"] data = bytes(req["data"]) # 获取PLC连接状态 plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": 0, "last_update_formatted": "N/A", "message": f"PLC not connected (status: {plc_status})", "offset": offset }) continue success, error, _, update_time = self.batch_write_area(plc_name, area_name, offset, data) if error: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", "message": error, "offset": offset }) else: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "success", "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), "offset": offset, "length": len(data) }) return results def batch_write_bool(self, requests): """批量写入""" results = [] for req in requests: plc_name = req["plc_name"] area_name = req["area_name"] offset = req["offset"] data = bytes(req["data"]) # 获取PLC连接状态 plc_status = self.plc_connection_status.get(plc_name, "unknown") # 如果PLC未连接,直接返回错误 if plc_status != "connected": results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": 0, "last_update_formatted": "N/A", "message": f"PLC not connected (status: {plc_status})", "offset": offset }) continue success, error, _, update_time = self.batch_write_bool_area(plc_name, area_name, offset, data) if error: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "error", "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)) if update_time > 0 else "Never", "message": error, "offset": offset }) else: results.append({ "plc_name": plc_name, "area_name": area_name, "status": "success", "plc_connection_status": plc_status, "last_update": update_time, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(update_time)), "offset": offset, "length": len(data) }) return results def get_parsed_data(self, plc_name, area_name): """获取解析后的数据""" from data_parser import parse_data with self.lock: area = self.cache.get(plc_name, {}).get(area_name) if not area: return {"error": "Area not found"} plc_status = self.plc_connection_status.get(plc_name, "unknown") last_update = self.last_update.get(plc_name, {}).get(area_name, 0) # 区域状态应与PLC连接状态一致,除非有有效数据 area_status = area["status"] if plc_status == "never_connected": area_status = "never_connected" elif plc_status == "disconnected" and last_update == 0: area_status = "disconnected" structure = area.get("structure", []) if structure: parsed = parse_data(area["data"], structure) parsed["status"] = area_status parsed["plc_connection_status"] = plc_status parsed["last_update"] = last_update parsed["last_update_formatted"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never" return parsed else: return { "raw_data": list(area["data"]), "status": area_status, "plc_connection_status": plc_status, "last_update": last_update, "last_update_formatted": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(last_update)) if last_update > 0 else "Never" }