Files
ElecScalesMeasur/RS485.py
2025-09-30 14:57:14 +08:00

110 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/2/18 9:59
# @Author : hjw
# @File : RS485.py
'''
import time
import threading
import serial
from collections import deque
import logging
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('controller.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class RS485Reader:
def __init__(self, config):
self.port = config['serial']['port']
self.baudrate = config['serial']['baudrate']
self.ser = None
self.current_weight = 0
self.running = True
self.lock = threading.Lock()
self.buffer = deque(maxlen=10) # 数据缓冲
self._connect()
def _connect(self):
try:
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=8,
parity='N',
stopbits=1,
timeout=0.1
)
logger.info(f"成功连接串口设备 {self.port}")
except Exception as e:
logger.error(f"串口连接失败: {str(e)}")
raise
def read_weight(self):
cmd = bytes.fromhex('010300000002c40b') # 从地址 01 的设备读取 2 个保持寄存器,从地址 0 开始
try:
self.ser.write(cmd)
response = self.ser.read(9) # 然后读取设备返回的 9 字节响应
if len(response) == 9 and response[:3] == bytes.fromhex("010304"):
weight = int.from_bytes(response[3:5], 'big')
with self.lock:
self.buffer.append(weight)
self.current_weight = sum(self.buffer) // len(self.buffer) # 平滑滤波
return True
return False
except Exception as e:
logger.error(f"读取重量失败: {str(e)}")
self._reconnect()
return False
def tare(self): # 去皮方法
cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 发送命令给设备地址 1将寄存器地址 0x0064 设置为 0x0001执行“去皮”指令。
try:
if self.read_weight(): # 确保有最新数据
self.ser.write(cmd)
response = self.ser.read(8)
if len(response) == 8 and response[:3] == bytes.fromhex("010600"):
self.buffer.clear() # 清空缓冲
self.current_weight = 0 # 重置当前重量
self.read_weight() # 确保有最新数据
return True
else:
self.buffer.clear() # 清空缓冲
return False
return False
except Exception as e:
logger.error(f"去皮操作失败: {str(e)}")
self._reconnect()
return False
def _reconnect(self):
max_retries = 3
for i in range(max_retries):
try:
if self.ser:
self.ser.close()
self._connect()
return True
except Exception as e:
logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败")
time.sleep(1)
logger.error("串口重连失败")
return False
def stop(self):
self.running = False
if self.ser and self.ser.is_open:
self.ser.close()