#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/2/18 10:11 # @Author : hjw # @File : controller.py ''' import time import threading import logging from Network import NetworkHandler from ControlAlgor import PIDAlgorithm, FuzzyLogicAlgorithm from Gpio import GPIOManager from tool import parse_config from RS485 import RS485Reader # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('controller.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class Controller: def __init__(self, config_path='./config.yaml'): self.config = parse_config(config_path) # 初始化组件 self.gpio = GPIOManager(self.config) self.sensor = RS485Reader(self.config) self.network = NetworkHandler(self.config) self.algorithm = None self.running = False # 共享状态 self.current_weight = 0 self.target_weight = 0 self.system_status = 'idle' self.status_lock = threading.Lock() # 看门狗线程 self.watchdog = threading.Thread(target=self._watchdog, daemon=True) def start(self): self.running = True threads = [ threading.Thread(target=self._weight_reading_loop), threading.Thread(target=self._control_loop), threading.Thread(target=self.network.start_server), threading.Thread(target=self._control_vibrate), self.watchdog ] for t in threads: t.start() logger.info("系统启动完成") def _weight_reading_loop(self): while self.running: if self.sensor.read_weight(): with self.sensor.lock: self.current_weight = self.sensor.current_weight with self.status_lock: self.network.status['current_weight'] = self.current_weight if self.current_weight >= self.network.status['target_weight']: self.network.status['measuring'] = False if self.network.status['set_tare'] == True: if self.sensor.tare(): self.network.status['set_tare_num_time'] +=1 self.network.status['set_tare'] = False if self.network.status['get_weight'] == True: with self.sensor.lock: self.network.status['current_weight'] = self.sensor.current_weight self.network.status['get_weight'] = False time.sleep(0.05) # 20Hz # time.sleep(0.1) # 20Hz def _control_loop(self): while self.running: with self.status_lock: measuring = self.network.status['measuring'] target = self.network.status['target_weight'] algorithm = self.network.status['algorithm'] if measuring: self._select_algorithm(algorithm) speed, error = self.algorithm.calculate_speed(self.current_weight, target) rpm = speed # 假设速度单位直接对应RPM pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 self.gpio.set_speed(8, pulse_rate, error) else: self.gpio.set_speed(8, 0, 0) while not self.running and not self.network.status['measuring']: time.sleep(0.1) # 短暂休眠,避免CPU占用过高 time.sleep(0.1) def _control_vibrate(self): # 振动控制 while self.running: with self.status_lock: vibrating = self.network.status['set_vibrate'] vibrate_time = self.network.status['set_vibrate_time'] if vibrating: self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate) time.sleep(vibrate_time) self.gpio._write_value(9, 0) self.network.status['set_vibrate'] = False self.network.status['set_vibrate_time'] = 0 time.sleep(1) def _select_algorithm(self, name): if name == 'pid': params = self.config['algorithm']['params']['pid'] self.algorithm = PIDAlgorithm(**params) elif name == 'fuzzy': self.algorithm = FuzzyLogicAlgorithm() else: self.algorithm = PIDAlgorithm() def _watchdog(self): while self.running: # 检查线程状态 threads_alive = all([ threading.current_thread().is_alive(), self.sensor.running, self.network.running ]) if not threads_alive: logger.critical("检测到线程异常,触发紧急停止!") self.emergency_stop() break time.sleep(5) def emergency_stop(self): with self.status_lock: self.network.status['measuring'] = False self.network.status['error'] = 'emergency_stop' self.gpio.cleanup() self.sensor.stop() self.running = False logger.info("系统已紧急停止") def shutdown(self): self.running = False self.gpio.cleanup() self.sensor.stop() self.network.running = False logger.info("系统正常关闭")