diff --git a/gateway/snap7_client.py b/gateway/snap7_client.py index c6a9421..8740e7e 100644 --- a/gateway/snap7_client.py +++ b/gateway/snap7_client.py @@ -251,90 +251,6 @@ class Snap7Client: self.connected = False return False - # def read_generic(self, db_number, offset, data_type, count=1): - # """ - # 通用读取接口,支持多种数据类型 - # Args: - # db_number: DB块编号 - # offset: 起始偏移量(字节或位,对于bool类型) - # data_type: 数据类型 ('bool', 'byte', 'int', 'word', 'real', 'dint', 'dword') - # count: 要读取的数据个数 - # Returns: - # 解析后的数据(单个值或值列表),失败返回None - # """ - # if not self.connected and not self.connect(): - # self.logger.warning(f"Read failed: not connected to {self.ip}") - # return None - # - # try: - # if data_type == 'bool': - # # 对于bool,offset是位偏移 - # byte_offset = offset // 8 - # bit_offset = offset % 8 - # # 计算需要读取的字节数 - # last_bit = bit_offset + count - 1 - # last_byte = last_bit // 8 - # total_bytes = last_byte - byte_offset + 1 - # - # # 读取原始字节数据 - # data = self.read_db(db_number, byte_offset, total_bytes) - # if data is None: - # return None - # - # # 解析bool值 - # result = [] - # for i in range(count): - # current_bit = bit_offset + i - # byte_idx = current_bit // 8 - # bit_idx = current_bit % 8 - # result.append(bool(data[byte_idx] & (1 << bit_idx))) - # - # return result[0] if count == 1 else result - # - # elif data_type == 'byte': - # data = self.read_db(db_number, offset, count) - # if data is None: - # return None - # return [data[i] for i in range(count)] if count > 1 else data[0] - # - # elif data_type in ['int', 'word']: - # total_bytes = 2 * count - # data = self.read_db(db_number, offset, total_bytes) - # if data is None: - # return None - # - # result = [] - # for i in range(count): - # if data_type == 'int': - # result.append(get_int(data, i * 2)) - # else: # word - # result.append(get_word(data, i * 2)) - # return result[0] if count == 1 else result - # - # elif data_type in ['dint', 'dword', 'real']: - # total_bytes = 4 * count - # data = self.read_db(db_number, offset, total_bytes) - # if data is None: - # return None - # - # result = [] - # for i in range(count): - # if data_type == 'dint': - # result.append(get_dint(data, i * 4)) - # elif data_type == 'dword': - # result.append(get_dword(data, i * 4)) - # else: # real - # result.append(get_real(data, i * 4)) - # return result[0] if count == 1 else result - # - # else: - # self.logger.error(f"Unsupported data type: {data_type}") - # return None - # - # except Exception as e: - # self.logger.error(f"Error reading {data_type} from DB{db_number} offset {offset}: {e}") - # return None - def write_generic(self, db_number, offset, data_type, value): """ 通用写入接口,支持多种数据类型 @@ -351,104 +267,105 @@ class Snap7Client: return False try: - if data_type == 'bool': - # 对于bool,offset是位偏移 - byte_offset = offset // 8 - bit_offset = offset % 8 + with self.lock: + if data_type == 'bool': + # 对于bool,offset是位偏移 + byte_offset = offset // 8 + bit_offset = offset % 8 - # 读取当前字节 - current_byte = self.read_db(db_number, byte_offset, 1) - if current_byte is None: - return False + # 读取当前字节 + current_byte = self.read_db(db_number, byte_offset, 1) + if current_byte is None: + return False - # 修改特定位 - if isinstance(value, list): - # 多个bool值 - for i, val in enumerate(value): - current_bit = bit_offset + i - byte_idx = current_bit // 8 - bit_idx = current_bit % 8 + # 修改特定位 + if isinstance(value, list): + # 多个bool值 + for i, val in enumerate(value): + current_bit = bit_offset + i + byte_idx = current_bit // 8 + bit_idx = current_bit % 8 - if val: - current_byte[0] |= (1 << bit_idx) - else: - current_byte[0] &= ~(1 << bit_idx) - else: - # 单个bool值 - if value: - current_byte[0] |= (1 << bit_offset) + if val: + current_byte[0] |= (1 << bit_idx) + else: + current_byte[0] &= ~(1 << bit_idx) else: - current_byte[0] &= ~(1 << bit_offset) + # 单个bool值 + if value: + current_byte[0] |= (1 << bit_offset) + else: + current_byte[0] &= ~(1 << bit_offset) - # 写回修改后的字节 - return self.write_db_bool(db_number, byte_offset, current_byte) + # 写回修改后的字节 + return self.write_db_bool(db_number, byte_offset, current_byte) - elif data_type == 'byte': - if isinstance(value, list): - # 批量写入 - for i, val in enumerate(value): - if val < 0 or val > 255: - self.logger.error(f"Byte value out of range: {val}") + elif data_type == 'byte': + if isinstance(value, list): + # 批量写入 + for i, val in enumerate(value): + if val < 0 or val > 255: + self.logger.error(f"Byte value out of range: {val}") + return False + if not self.write_db(db_number, offset + i, bytes([val])): + return False + return True + else: + # 单个字节 + if value < 0 or value > 255: + self.logger.error(f"Byte value out of range: {value}") return False - if not self.write_db(db_number, offset + i, bytes([val])): + return self.write_db(db_number, offset, bytes([value])) + + elif data_type in ['int', 'word']: + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + # 确保int值在范围内 + if data_type == 'int' and (val < -32768 or val > 32767): + self.logger.error(f"Int value out of range: {val}") + return False + elif data_type == 'word' and (val < 0 or val > 65535): + self.logger.error(f"Word value out of range: {val}") + return False + + data = bytearray(2) + if data_type == 'int': + set_int(data, 0, val) + else: + set_word(data, 0, val) + + if not self.write_db(db_number, offset + i * 2, data): return False return True + + elif data_type in ['dint', 'dword', 'real']: + if not isinstance(value, list): + value = [value] + + for i, val in enumerate(value): + data = bytearray(4) + if data_type == 'dint': + if val < -2147483648 or val > 2147483647: + self.logger.error(f"DInt value out of range: {val}") + return False + set_dint(data, 0, val) + elif data_type == 'dword': + if val < 0 or val > 4294967295: + self.logger.error(f"DWord value out of range: {val}") + return False + set_dword(data, 0, val) + else: # real + set_real(data, 0, float(val)) + + if not self.write_db(db_number, offset + i * 4, data): + return False + return True + else: - # 单个字节 - if value < 0 or value > 255: - self.logger.error(f"Byte value out of range: {value}") - return False - return self.write_db(db_number, offset, bytes([value])) - - elif data_type in ['int', 'word']: - if not isinstance(value, list): - value = [value] - - for i, val in enumerate(value): - # 确保int值在范围内 - if data_type == 'int' and (val < -32768 or val > 32767): - self.logger.error(f"Int value out of range: {val}") - return False - elif data_type == 'word' and (val < 0 or val > 65535): - self.logger.error(f"Word value out of range: {val}") - return False - - data = bytearray(2) - if data_type == 'int': - set_int(data, 0, val) - else: - set_word(data, 0, val) - - if not self.write_db(db_number, offset + i * 2, data): - return False - return True - - elif data_type in ['dint', 'dword', 'real']: - if not isinstance(value, list): - value = [value] - - for i, val in enumerate(value): - data = bytearray(4) - if data_type == 'dint': - if val < -2147483648 or val > 2147483647: - self.logger.error(f"DInt value out of range: {val}") - return False - set_dint(data, 0, val) - elif data_type == 'dword': - if val < 0 or val > 4294967295: - self.logger.error(f"DWord value out of range: {val}") - return False - set_dword(data, 0, val) - else: # real - set_real(data, 0, float(val)) - - if not self.write_db(db_number, offset + i * 4, data): - return False - return True - - else: - self.logger.error(f"Unsupported data type: {data_type}") - return False + self.logger.error(f"Unsupported data type: {data_type}") + return False except Exception as e: self.logger.error(f"Error writing {data_type} to DB{db_number} offset {offset}: {e}") @@ -544,7 +461,6 @@ class Snap7Client: if db_number == 100: print(f"从缓存文件中读取{db_number}的数据") db100_raw = self.read_db100_from_file() - if db100_raw is not None: try: if data_type == 'bool': @@ -658,6 +574,7 @@ class Snap7Client: return [data[i] for i in range(count)] if count > 1 else data[0] elif data_type in ['int', 'word']: + total_bytes = 2 * count data = self.read_db(db_number, offset, total_bytes) if data is None: @@ -687,7 +604,6 @@ class Snap7Client: result.append(get_real(data, i * 4)) return result[0] if count == 1 else result - else: self.logger.error(f"Unsupported data type: {data_type}") return None