Files
ElecScalesMeasur/RS485.py

110 lines
3.5 KiB
Python
Raw Permalink Normal View History

2025-02-18 11:28:24 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/2/18 9:59
# @Author : hjw
# @File : RS485.py
'''
import time
import threading
import serial
from collections import deque
import logging
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('controller.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class RS485Reader:
def __init__(self, config):
self.port = config['serial']['port']
self.baudrate = config['serial']['baudrate']
self.ser = None
self.current_weight = 0
self.running = True
self.lock = threading.Lock()
self.buffer = deque(maxlen=10) # 数据缓冲
self._connect()
def _connect(self):
try:
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=8,
parity='N',
stopbits=1,
timeout=0.1
)
logger.info(f"成功连接串口设备 {self.port}")
except Exception as e:
logger.error(f"串口连接失败: {str(e)}")
raise
def read_weight(self):
2025-09-30 14:57:14 +08:00
cmd = bytes.fromhex('010300000002c40b') # 从地址 01 的设备读取 2 个保持寄存器,从地址 0 开始
2025-02-18 11:28:24 +08:00
try:
self.ser.write(cmd)
2025-09-30 14:57:14 +08:00
response = self.ser.read(9) # 然后读取设备返回的 9 字节响应
2025-02-18 11:28:24 +08:00
if len(response) == 9 and response[:3] == bytes.fromhex("010304"):
weight = int.from_bytes(response[3:5], 'big')
with self.lock:
self.buffer.append(weight)
2025-09-30 14:57:14 +08:00
self.current_weight = sum(self.buffer) // len(self.buffer) # 平滑滤波
2025-02-18 11:28:24 +08:00
return True
return False
except Exception as e:
logger.error(f"读取重量失败: {str(e)}")
self._reconnect()
return False
def tare(self): # 去皮方法
2025-09-30 14:57:14 +08:00
cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 发送命令给设备地址 1将寄存器地址 0x0064 设置为 0x0001执行“去皮”指令。
2025-02-18 11:28:24 +08:00
try:
if self.read_weight(): # 确保有最新数据
self.ser.write(cmd)
response = self.ser.read(8)
if len(response) == 8 and response[:3] == bytes.fromhex("010600"):
self.buffer.clear() # 清空缓冲
self.current_weight = 0 # 重置当前重量
self.read_weight() # 确保有最新数据
return True
else:
self.buffer.clear() # 清空缓冲
return False
return False
except Exception as e:
logger.error(f"去皮操作失败: {str(e)}")
self._reconnect()
return False
def _reconnect(self):
max_retries = 3
for i in range(max_retries):
try:
if self.ser:
self.ser.close()
self._connect()
return True
except Exception as e:
logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败")
time.sleep(1)
logger.error("串口重连失败")
return False
def stop(self):
self.running = False
if self.ser and self.ser.is_open:
self.ser.close()