import snap7 import logging from threading import Lock import time from snap7.util import * import ast class Snap7Client: """Snap7客户端,处理与PLC的通信""" def __init__(self, ip, rack, slot, max_retries=5, retry_base_delay=1): """ 初始化Snap7客户端 Args: ip: PLC IP地址 rack: Rack编号 slot: Slot编号 max_retries: 最大重试次数 retry_base_delay: 基础重试延迟(秒) """ self.ip = ip self.rack = rack self.slot = slot self.client = snap7.client.Client() self.lock = Lock() self.connected = False self.max_retries = max_retries self.retry_base_delay = retry_base_delay self.last_connect_attempt = 0 self.retry_count = 0 self.logger = logging.getLogger(f"Snap7Client[{ip}]") def is_valid_connection(self): """检查连接是否真正有效""" try: # 尝试读取PLC的运行状态 cpu_state = self.client.get_cpu_state() print("当前 CPU 状态:", cpu_state) return cpu_state in ['S7CpuStatusRun', 'S7CpuStatusStop'] except: return False def connect(self): """建立与PLC的连接并验证""" current_time = time.time() # 指数退避重试 if self.retry_count > 0: delay = min(self.retry_base_delay * (2 ** (self.retry_count - 1)), 30) if current_time - self.last_connect_attempt < delay: return False # 未到重试时间 self.last_connect_attempt = current_time try: self.client.connect(self.ip, self.rack, self.slot) # 验证连接是否真正有效 if self.client.get_connected() and self.is_valid_connection(): self.connected = True self.retry_count = 0 # 重置重试计数 self.logger.info(f"Successfully connected to PLC {self.ip}") return True else: self.connected = False self.logger.warning(f"Connection to {self.ip} established but PLC is not responding") try: self.client.disconnect() except: pass return False except Exception as e: self.retry_count = min(self.retry_count + 1, self.max_retries) self.logger.error(f"Connection to {self.ip} failed (attempt {self.retry_count}/{self.max_retries}): {e}") self.connected = False return False def read_db(self, db_number, offset, size): """ 从DB块读取数据 Args: db_number: DB编号 offset: 起始偏移量 size: 读取字节数 Returns: bytearray: 读取的数据,如果失败返回None """ if not self.connected and not self.connect(): self.logger.warning(f"Read failed: not connected to {self.ip}") return None # 返回None而不是零填充数据 try: with self.lock: # 进入锁,其他线程需等待 data = self.client.db_read(db_number, offset, size) # 验证返回数据的有效性 if data is None or len(data) != size: self.connected = False self.logger.error(f"Read DB{db_number} returned invalid data size (expected {size}, got {len(data) if data else 0})") return None return data except Exception as e: self.logger.error(f"Read DB{db_number} error: {e}") self.connected = False return None def read_db_bool(self, db_number, offset, bit_length): """ 从 DB 块中读取一个字节,并提取其中的多个 BOOL 位 Args: db_number (int): DB块编号 offset (int): 要读取的字节偏移地址 bit_length: 第几位,如1,表示第1位 Returns: result:返回位值 """ if not self.connected and not self.connect(): self.logger.warning(f"Read failed: not connected to {self.ip}") return None # 返回None而不是零填充数据 try: with self.lock: data = self.client.db_read(db_number, offset, 1) result = {} for bit in range(bit_length): result[bit] = bool(data[0] & (1 << bit)) if result is None or len(result) != bit_length: self.connected = False self.logger.error(f"Read DB{db_number} returned invalid data size (expected {bit_length}, got {len(result) if data else 0})") return None return result except Exception as e: self.logger.error(f"Read DB{db_number} error: {e}") self.connected = False return None def write_db(self, db_number, offset, data): """ 向DB块写入数据 Args: db_number: DB编号 offset: 起始偏移量 data: 要写入的数据 Returns: bool: 是否写入成功 """ values = int(data) print("values:", values) value = bytearray(0) if isinstance(values, int): set_int(value, offset, values) data = value print(data) if not self.connected and not self.connect(): self.logger.warning(f"Write failed: not connected to {self.ip}") return False try: with self.lock: self.client.db_write(db_number, offset, data) self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") return True except Exception as e: self.logger.error(f"Write DB{db_number} error: {e}") self.connected = False return False def batch_write_db(self, db_number, offset, data): """ 向DB块写入数据 Args: db_number: DB编号 offset: 起始偏移量 data: 要写入的数据 Returns: bool: 是否写入成功 """ if not self.connected and not self.connect(): self.logger.warning(f"Write failed: not connected to {self.ip}") return False try: with self.lock: self.client.db_write(db_number, offset, data) self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") return True except Exception as e: self.logger.error(f"Write DB{db_number} error: {e}") self.connected = False return False def write_db_bool(self, db_number, offset, data): """ 向DB块写入数据 Args: db_number: DB编号 offset: 起始偏移量 data: 要写入的bool类型数据 Returns: bool: 是否写入成功 """ data = data.decode('utf-8') # 将字符串安全转换为字典 data_dict = ast.literal_eval(data) # 输出: {0: True} # value = bytearray(1) value = bytearray(offset+1) for bit, val in data_dict.items(): set_bool(value, offset, bit, val) data = value if not self.connected and not self.connect(): self.logger.warning(f"Write failed: not connected to {self.ip}") return False try: with self.lock: print(db_number, offset, data) self.client.db_write(db_number, offset, data) self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") return True except Exception as e: self.logger.error(f"Write DB{db_number} error: {e}") self.connected = False return False def batch_write_db_bool(self, db_number, offset, data): """ 向DB块写入数据 Args: db_number: DB编号 offset: 起始偏移量 data: 要写入的bool类型数据 Returns: bool: 是否写入成功 """ if not self.connected and not self.connect(): self.logger.warning(f"Write failed: not connected to {self.ip}") return False try: with self.lock: print(db_number, offset, data) self.client.db_write(db_number, offset, data) self.logger.debug(f"Wrote {len(data)} bytes to DB{db_number} offset {offset}") return True except Exception as e: self.logger.error(f"Write DB{db_number} error: {e}") self.connected = False return False