#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/2/18 9:59 # @Author : hjw # @File : RS485.py ''' import time import threading import serial from collections import deque import logging # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('controller.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class RS485Reader: def __init__(self, config): self.port = config['serial']['port'] self.baudrate = config['serial']['baudrate'] self.ser = None self.current_weight = 0 self.running = True self.lock = threading.Lock() self.buffer = deque(maxlen=10) # 数据缓冲 self._connect() def _connect(self): try: self.ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=8, parity='N', stopbits=1, timeout=0.1 ) logger.info(f"成功连接串口设备 {self.port}") except Exception as e: logger.error(f"串口连接失败: {str(e)}") raise def read_weight(self): cmd = bytes.fromhex('010300000002c40b') # 从地址 01 的设备读取 2 个保持寄存器,从地址 0 开始 try: self.ser.write(cmd) response = self.ser.read(9) # 然后读取设备返回的 9 字节响应 if len(response) == 9 and response[:3] == bytes.fromhex("010304"): weight = int.from_bytes(response[3:5], 'big') with self.lock: self.buffer.append(weight) self.current_weight = sum(self.buffer) // len(self.buffer) # 平滑滤波 return True return False except Exception as e: logger.error(f"读取重量失败: {str(e)}") self._reconnect() return False def tare(self): # 去皮方法 cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 发送命令给设备地址 1,将寄存器地址 0x0064 设置为 0x0001,执行“去皮”指令。 try: if self.read_weight(): # 确保有最新数据 self.ser.write(cmd) response = self.ser.read(8) if len(response) == 8 and response[:3] == bytes.fromhex("010600"): self.buffer.clear() # 清空缓冲 self.current_weight = 0 # 重置当前重量 self.read_weight() # 确保有最新数据 return True else: self.buffer.clear() # 清空缓冲 return False return False except Exception as e: logger.error(f"去皮操作失败: {str(e)}") self._reconnect() return False def _reconnect(self): max_retries = 3 for i in range(max_retries): try: if self.ser: self.ser.close() self._connect() return True except Exception as e: logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") time.sleep(1) logger.error("串口重连失败") return False def stop(self): self.running = False if self.ser and self.ser.is_open: self.ser.close()