Files
ElecScalesMeasur/Controller.py
2025-02-18 11:28:24 +08:00

159 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/2/18 10:11
# @Author : hjw
# @File : controller.py
'''
import time
import threading
import logging
from Network import NetworkHandler
from ControlAlgor import PIDAlgorithm, FuzzyLogicAlgorithm
from Gpio import GPIOManager
from tool import parse_config
from RS485 import RS485Reader
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('controller.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class Controller:
def __init__(self, config_path='./config.yaml'):
self.config = parse_config(config_path)
# 初始化组件
self.gpio = GPIOManager(self.config)
self.sensor = RS485Reader(self.config)
self.network = NetworkHandler(self.config)
self.algorithm = None
self.running = False
# 共享状态
self.current_weight = 0
self.target_weight = 0
self.system_status = 'idle'
self.status_lock = threading.Lock()
# 看门狗线程
self.watchdog = threading.Thread(target=self._watchdog, daemon=True)
def start(self):
self.running = True
threads = [
threading.Thread(target=self._weight_reading_loop),
threading.Thread(target=self._control_loop),
threading.Thread(target=self.network.start_server),
threading.Thread(target=self._control_vibrate),
self.watchdog
]
for t in threads:
t.start()
logger.info("系统启动完成")
def _weight_reading_loop(self):
while self.running:
if self.sensor.read_weight():
with self.sensor.lock:
self.current_weight = self.sensor.current_weight
with self.status_lock:
self.network.status['current_weight'] = self.current_weight
if self.current_weight >= self.network.status['target_weight']:
self.network.status['measuring'] = False
if self.network.status['set_tare'] == True:
if self.sensor.tare():
self.network.status['set_tare_num_time'] +=1
self.network.status['set_tare'] = False
if self.network.status['get_weight'] == True:
with self.sensor.lock:
self.network.status['current_weight'] = self.sensor.current_weight
self.network.status['get_weight'] = False
time.sleep(0.05) # 20Hz
# time.sleep(0.1) # 20Hz
def _control_loop(self):
while self.running:
with self.status_lock:
measuring = self.network.status['measuring']
target = self.network.status['target_weight']
algorithm = self.network.status['algorithm']
if measuring:
self._select_algorithm(algorithm)
speed, error = self.algorithm.calculate_speed(self.current_weight, target)
rpm = speed # 假设速度单位直接对应RPM
pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60
self.gpio.set_speed(8, pulse_rate, error)
else:
self.gpio.set_speed(8, 0, 0)
while not self.running and not self.network.status['measuring']:
time.sleep(0.1) # 短暂休眠避免CPU占用过高
time.sleep(0.1)
def _control_vibrate(self): # 振动控制
while self.running:
with self.status_lock:
vibrating = self.network.status['set_vibrate']
vibrate_time = self.network.status['set_vibrate_time']
if vibrating:
self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate)
time.sleep(vibrate_time)
self.gpio._write_value(9, 0)
self.network.status['set_vibrate'] = False
self.network.status['set_vibrate_time'] = 0
time.sleep(1)
def _select_algorithm(self, name):
if name == 'pid':
params = self.config['algorithm']['params']['pid']
self.algorithm = PIDAlgorithm(**params)
elif name == 'fuzzy':
self.algorithm = FuzzyLogicAlgorithm()
else:
self.algorithm = PIDAlgorithm()
def _watchdog(self):
while self.running:
# 检查线程状态
threads_alive = all([
threading.current_thread().is_alive(),
self.sensor.running,
self.network.running
])
if not threads_alive:
logger.critical("检测到线程异常,触发紧急停止!")
self.emergency_stop()
break
time.sleep(5)
def emergency_stop(self):
with self.status_lock:
self.network.status['measuring'] = False
self.network.status['error'] = 'emergency_stop'
self.gpio.cleanup()
self.sensor.stop()
self.running = False
logger.info("系统已紧急停止")
def shutdown(self):
self.running = False
self.gpio.cleanup()
self.sensor.stop()
self.network.running = False
logger.info("系统正常关闭")