Files
ElecScalesMeasur/Gpio.py

116 lines
3.5 KiB
Python
Raw Normal View History

2025-02-18 11:28:24 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/2/18 10:03
# @Author : hjw
# @File : gpio.py
'''
import threading
import logging
import os
import time
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('controller.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class GPIOManager:
def __init__(self, config):
self.pins = config['gpio']['pins']
self.pulse_per_rev = config['gpio']['pulse_per_rev']
self.threads = {}
self.stop_flags = {}
self.gpio_files = {}
self.lock = threading.Lock()
for pin in self.pins:
self._init_gpio(pin)
def _init_gpio(self, pin):
try:
export_path = '/sys/class/gpio/export'
gpio_path = f'/sys/class/gpio/gpio{pin}'
if not os.path.exists(gpio_path):
with open(export_path, 'w') as f:
f.write(str(pin))
time.sleep(0.1)
direction_path = f'{gpio_path}/direction'
with open(direction_path, 'w') as f:
f.write('out')
self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+')
logger.info(f"GPIO {pin} 初始化完成")
except Exception as e:
logger.error(f"GPIO {pin} 初始化失败: {str(e)}")
raise
def set_speed(self, pin_id, speed, error):
with self.lock:
if pin_id not in self.pins:
raise ValueError(f"无效的GPIO引脚: {pin_id}")
# 停止现有线程
if pin_id in self.threads:
self.stop_flags[pin_id] = True
self.threads[pin_id].join()
if speed <= 0:
self._write_value(pin_id, 0)
return
# 计算间隔时间400脉冲/转)
if error>10:
2025-02-18 11:28:24 +08:00
interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平
else:
interval = 1.0 / speed # 每个周期包含高低电平
2025-02-18 11:28:24 +08:00
self.stop_flags[pin_id] = False
self.threads[pin_id] = threading.Thread(
target=self._pulse_loop,
args=(pin_id, interval),
daemon=True
)
self.threads[pin_id].start()
def _pulse_loop(self, pin_id, interval):
# print(interval)
while not self.stop_flags.get(pin_id, False):
self._write_value(pin_id, 1)
time.sleep(interval)
# precise_delay_us(100)
self._write_value(pin_id, 0)
time.sleep(interval)
# precise_delay_us(100)
def _write_value(self, pin_id, value):
try:
self.gpio_files[pin_id].seek(0)
self.gpio_files[pin_id].write(str(value))
self.gpio_files[pin_id].truncate()
self.gpio_files[pin_id].flush()
os.fsync(self.gpio_files[pin_id].fileno())
except IOError as e:
logger.error(f"GPIO写入错误: {str(e)}")
2025-09-30 14:57:14 +08:00
2025-02-18 11:28:24 +08:00
def cleanup(self):
with self.lock:
for pin in self.pins:
if pin in self.threads:
self.stop_flags[pin] = True
self.threads[pin].join()
self._write_value(pin, 0)
for f in self.gpio_files.values():
f.close()
logger.info("GPIO资源已释放")