#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/2/18 10:03 # @Author : hjw # @File : gpio.py ''' import threading import logging import os import time # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('controller.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class GPIOManager: def __init__(self, config): self.pins = config['gpio']['pins'] self.pulse_per_rev = config['gpio']['pulse_per_rev'] self.threads = {} self.stop_flags = {} self.gpio_files = {} self.lock = threading.Lock() for pin in self.pins: self._init_gpio(pin) def _init_gpio(self, pin): try: export_path = '/sys/class/gpio/export' gpio_path = f'/sys/class/gpio/gpio{pin}' if not os.path.exists(gpio_path): with open(export_path, 'w') as f: f.write(str(pin)) time.sleep(0.1) direction_path = f'{gpio_path}/direction' with open(direction_path, 'w') as f: f.write('out') self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') logger.info(f"GPIO {pin} 初始化完成") except Exception as e: logger.error(f"GPIO {pin} 初始化失败: {str(e)}") raise def set_speed(self, pin_id, speed, error): with self.lock: if pin_id not in self.pins: raise ValueError(f"无效的GPIO引脚: {pin_id}") # 停止现有线程 if pin_id in self.threads: self.stop_flags[pin_id] = True self.threads[pin_id].join() if speed <= 0: self._write_value(pin_id, 0) return # 计算间隔时间(400脉冲/转) if error>10: interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平 else: interval = 1.0 / speed # 每个周期包含高低电平 self.stop_flags[pin_id] = False self.threads[pin_id] = threading.Thread( target=self._pulse_loop, args=(pin_id, interval), daemon=True ) self.threads[pin_id].start() def _pulse_loop(self, pin_id, interval): # print(interval) while not self.stop_flags.get(pin_id, False): self._write_value(pin_id, 1) time.sleep(interval) # precise_delay_us(100) self._write_value(pin_id, 0) time.sleep(interval) # precise_delay_us(100) def _write_value(self, pin_id, value): try: self.gpio_files[pin_id].seek(0) self.gpio_files[pin_id].write(str(value)) self.gpio_files[pin_id].truncate() self.gpio_files[pin_id].flush() os.fsync(self.gpio_files[pin_id].fileno()) except IOError as e: logger.error(f"GPIO写入错误: {str(e)}") def cleanup(self): with self.lock: for pin in self.pins: if pin in self.threads: self.stop_flags[pin] = True self.threads[pin].join() self._write_value(pin, 0) for f in self.gpio_files.values(): f.close() logger.info("GPIO资源已释放")