2025-02-18 11:28:24 +08:00
|
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
'''
|
|
|
|
|
|
# @Time : 2025/2/18 10:03
|
|
|
|
|
|
# @Author : hjw
|
|
|
|
|
|
# @File : gpio.py
|
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
|
|
import threading
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import os
|
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 配置日志系统
|
|
|
|
|
|
logging.basicConfig(
|
|
|
|
|
|
level=logging.INFO,
|
|
|
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
|
|
|
|
handlers=[
|
|
|
|
|
|
logging.FileHandler('controller.log'),
|
|
|
|
|
|
logging.StreamHandler()
|
|
|
|
|
|
]
|
|
|
|
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GPIOManager:
|
|
|
|
|
|
def __init__(self, config):
|
|
|
|
|
|
self.pins = config['gpio']['pins']
|
|
|
|
|
|
self.pulse_per_rev = config['gpio']['pulse_per_rev']
|
|
|
|
|
|
self.threads = {}
|
|
|
|
|
|
self.stop_flags = {}
|
|
|
|
|
|
self.gpio_files = {}
|
|
|
|
|
|
self.lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
for pin in self.pins:
|
|
|
|
|
|
self._init_gpio(pin)
|
|
|
|
|
|
|
|
|
|
|
|
def _init_gpio(self, pin):
|
|
|
|
|
|
try:
|
|
|
|
|
|
export_path = '/sys/class/gpio/export'
|
|
|
|
|
|
gpio_path = f'/sys/class/gpio/gpio{pin}'
|
|
|
|
|
|
|
|
|
|
|
|
if not os.path.exists(gpio_path):
|
|
|
|
|
|
with open(export_path, 'w') as f:
|
|
|
|
|
|
f.write(str(pin))
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
|
|
direction_path = f'{gpio_path}/direction'
|
|
|
|
|
|
with open(direction_path, 'w') as f:
|
|
|
|
|
|
f.write('out')
|
|
|
|
|
|
|
|
|
|
|
|
self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+')
|
|
|
|
|
|
logger.info(f"GPIO {pin} 初始化完成")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"GPIO {pin} 初始化失败: {str(e)}")
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
def set_speed(self, pin_id, speed, error):
|
|
|
|
|
|
with self.lock:
|
|
|
|
|
|
if pin_id not in self.pins:
|
|
|
|
|
|
raise ValueError(f"无效的GPIO引脚: {pin_id}")
|
|
|
|
|
|
|
|
|
|
|
|
# 停止现有线程
|
|
|
|
|
|
if pin_id in self.threads:
|
|
|
|
|
|
self.stop_flags[pin_id] = True
|
|
|
|
|
|
self.threads[pin_id].join()
|
|
|
|
|
|
|
|
|
|
|
|
if speed <= 0:
|
|
|
|
|
|
self._write_value(pin_id, 0)
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# 计算间隔时间(400脉冲/转)
|
2025-02-19 16:18:41 +08:00
|
|
|
|
if error>10:
|
2025-02-18 11:28:24 +08:00
|
|
|
|
interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平
|
|
|
|
|
|
else:
|
2025-02-19 16:18:41 +08:00
|
|
|
|
interval = 1.0 / speed # 每个周期包含高低电平
|
2025-02-18 11:28:24 +08:00
|
|
|
|
self.stop_flags[pin_id] = False
|
|
|
|
|
|
self.threads[pin_id] = threading.Thread(
|
|
|
|
|
|
target=self._pulse_loop,
|
|
|
|
|
|
args=(pin_id, interval),
|
|
|
|
|
|
daemon=True
|
|
|
|
|
|
)
|
|
|
|
|
|
self.threads[pin_id].start()
|
|
|
|
|
|
|
|
|
|
|
|
def _pulse_loop(self, pin_id, interval):
|
|
|
|
|
|
# print(interval)
|
|
|
|
|
|
while not self.stop_flags.get(pin_id, False):
|
|
|
|
|
|
self._write_value(pin_id, 1)
|
|
|
|
|
|
time.sleep(interval)
|
|
|
|
|
|
# precise_delay_us(100)
|
|
|
|
|
|
self._write_value(pin_id, 0)
|
|
|
|
|
|
time.sleep(interval)
|
|
|
|
|
|
# precise_delay_us(100)
|
|
|
|
|
|
|
|
|
|
|
|
def _write_value(self, pin_id, value):
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.gpio_files[pin_id].seek(0)
|
|
|
|
|
|
self.gpio_files[pin_id].write(str(value))
|
|
|
|
|
|
self.gpio_files[pin_id].truncate()
|
|
|
|
|
|
self.gpio_files[pin_id].flush()
|
|
|
|
|
|
os.fsync(self.gpio_files[pin_id].fileno())
|
|
|
|
|
|
except IOError as e:
|
|
|
|
|
|
logger.error(f"GPIO写入错误: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
def cleanup(self):
|
|
|
|
|
|
with self.lock:
|
|
|
|
|
|
for pin in self.pins:
|
|
|
|
|
|
if pin in self.threads:
|
|
|
|
|
|
self.stop_flags[pin] = True
|
|
|
|
|
|
self.threads[pin].join()
|
|
|
|
|
|
self._write_value(pin, 0)
|
|
|
|
|
|
for f in self.gpio_files.values():
|
|
|
|
|
|
f.close()
|
|
|
|
|
|
logger.info("GPIO资源已释放")
|