新增步进电机方向控制,振动引脚

This commit is contained in:
HJW
2025-02-19 16:18:41 +08:00
parent 2b96624a1a
commit b95980afe1
9 changed files with 189 additions and 14 deletions

View File

@ -22,7 +22,7 @@ class PIDAlgorithm(ControlAlgorithm):
self.kd = kd
self.integral = 0
self.last_error = 0
self.min_speed = 10 # 最小运行速度
self.min_speed = 1 # 最小运行速度
self.max_speed = 100 # 最大运行速度
def calculate_speed(self, current, target):

View File

@ -71,10 +71,13 @@ class Controller:
self.network.status['current_weight'] = self.current_weight
if self.current_weight >= self.network.status['target_weight']:
self.network.status['measuring'] = False
self.network.status['start_weighting'] = False
self.network.status['weighting_isok'] = True
if self.network.status['set_tare'] == True:
if self.sensor.tare():
self.network.status['set_tare_num_time'] +=1
self.network.status['set_tare_num_time'] += 1
self.network.status['set_tare'] = False
if self.network.status['get_weight'] == True:
@ -90,6 +93,10 @@ class Controller:
measuring = self.network.status['measuring']
target = self.network.status['target_weight']
algorithm = self.network.status['algorithm']
direction_control = self.network.status['direction_control']
if direction_control== True: # 步进电机方向控制
self.gpio._write_value(12, 1)
self.network.status['direction_control'] = False
if measuring:
self._select_algorithm(algorithm)
@ -116,6 +123,7 @@ class Controller:
self.gpio._write_value(9, 0)
self.network.status['set_vibrate'] = False
self.network.status['set_vibrate_time'] = 0
self.network.status['vibrate_isok'] = True
time.sleep(1)
def _select_algorithm(self, name):
if name == 'pid':
@ -149,6 +157,7 @@ class Controller:
self.gpio.cleanup()
self.sensor.stop()
self.running = False
logger.info("系统已紧急停止")
def shutdown(self):

View File

@ -71,10 +71,10 @@ class GPIOManager:
return
# 计算间隔时间400脉冲/转)
if error>20:
if error>10:
interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平
else:
interval = 1.0 / (speed) # 每个周期包含高低电平
interval = 1.0 / speed # 每个周期包含高低电平
self.stop_flags[pin_id] = False
self.threads[pin_id] = threading.Thread(
target=self._pulse_loop,

View File

@ -42,7 +42,11 @@ class NetworkHandler:
'set_tare_num_time': 0,
'get_weight': False,
'set_vibrate': False,
'set_vibrate_time': 0
'set_vibrate_time': 0,
'direction_control': False,
'start_weighting': False,
'weighting_isok': False,
'vibrate_isok': False
}
self.lock = threading.Lock()
self.running = True
@ -71,6 +75,23 @@ class NetworkHandler:
self._process_command(cmd)
time.sleep(0.5)
# 返回当前状态
if self.status['start_weighting'] == True :
while True:
if self.status['start_weighting'] == False:
with self.lock:
response = json.dumps(self.status)
conn.send(response.encode())
break
time.sleep(0.5)
elif self.status['set_vibrate'] == True:
while True:
if self.status['set_vibrate'] == False:
with self.lock:
response = json.dumps(self.status)
conn.send(response.encode())
break
time.sleep(0.5)
else:
with self.lock:
response = json.dumps(self.status)
conn.send(response.encode())
@ -87,10 +108,14 @@ class NetworkHandler:
self.status['target_weight'] = cmd['payload']['target_weight']
self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid')
self.status['direction_control'] = cmd['payload']['direction_control']
self.status['measuring'] = True
self.status['start_weighting'] = True
self.status['weighting_isok'] = False
print("收到指令set_target:", self.status['target_weight'])
elif cmd.get('command') == 'stop':
self.status['measuring'] = False
self.status['start_weighting'] = False
elif cmd.get('command') == 'set_zero':
self.status['set_tare'] = True
@ -100,4 +125,5 @@ class NetworkHandler:
elif cmd.get('command') == 'set_vibrate':
self.status['set_vibrate'] = True
self.status['set_vibrate_time'] = cmd['payload']['time']
self.status['set_vibrate_time'] = cmd['payload']['time']
self.status['vibrate_isok'] = False

View File

@ -1,4 +1,32 @@
密胺计量程序
## 密胺计量
[TOC]
![img.png](rk3506扩展IO引脚图.png)
一、接线
1、在图rk3506扩展IO引脚图.png
配置文件 config.yaml
注意配置 network
驱动器rk引脚9接脉冲+rk引脚19接脉冲-引脚9对应代码pin8
振动平台rk引脚7接振动平台12rk引脚11接振动平台10rk引脚11对应代码pin9
步进电机方向控制rk引脚17接驱动器方向+rk引脚17对应代码pin12驱动器方向-接rk引脚19。
二、使用方法:
1、开启服务python main.py
2、发送称量重量、获取重量、置零 python send_target.py
三、发送端配置 send_target.py
cmd_set_target = {
# 称量
"command": "set_target",
"payload": {
"target_weight": 200, # 称量的目标重量
"algorithm": "pid",
"direction_control": False # 控制步进电机的方向 True:给驱动器方向+ 输入高电平
}
}
数据返回:
{"measuring": false, "error": null, "current_weight": 239, "target_weight": 0, "": "pid", "set_tare": false,
"set_tare_num_time": 0, "get_weight": false, "set_vibrate":true, "set_vibrate_time": 30}

View File

@ -1,5 +1,5 @@
gpio:
pins: [7,8,9]
pins: [8,9,12]
pulse_per_rev: 400
serial:

88
export_pin.py Normal file
View File

@ -0,0 +1,88 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/2/18 17:49
# @Author : hjw
# @File : export_pin.py
'''
import os
import time
class GPIOChipController:
def __init__(self, chip_path="/sys/class/gpio/gpiochip0"): # 注意引脚分组
# 读取GPIO控制器信息
self.base = int(self._read_file(os.path.join(chip_path, "base")))
self.ngpio = int(self._read_file(os.path.join(chip_path, "ngpio")))
self.label = self._read_file(os.path.join(chip_path, "label"))
print(f"GPIO控制器信息: {self.label} (base={self.base}, ngpio={self.ngpio})")
def _read_file(self, path):
with open(path, 'r') as f:
return f.read().strip()
def _write_file(self, path, value):
with open(path, 'w') as f:
f.write(str(value))
def export_gpio(self, offset):
"""导出指定控制器的GPIO引脚"""
if offset < 0 or offset >= self.ngpio:
raise ValueError(f"偏移量必须在0-{self.ngpio - 1}范围内")
gpio_number = self.base + offset
export_path = "/sys/class/gpio/export"
if not os.path.exists(f"/sys/class/gpio/gpio{gpio_number}"):
try:
self._write_file(export_path, gpio_number)
time.sleep(0.1) # 等待系统创建目录
except IOError as e:
if "Device or resource busy" in str(e):
print(f"GPIO{gpio_number} 已经导出")
else:
raise
return gpio_number
def setup_gpio(self, gpio_number, direction='out'):
"""配置GPIO方向"""
direction_path = f"/sys/class/gpio/gpio{gpio_number}/direction"
self._write_file(direction_path, direction)
def set_gpio(self, gpio_number, value):
"""设置GPIO电平"""
value_path = f"/sys/class/gpio/gpio{gpio_number}/value"
self._write_file(value_path, 1 if value else 0)
def read_gpio(self, gpio_number):
"""读取GPIO电平"""
value_path = f"/sys/class/gpio/gpio{gpio_number}/value"
return int(self._read_file(value_path))
if __name__ == "__main__":
"""
rk3506导出引脚
该脚本导出扩展口引脚 SAI1_SDO017 对应 GPIO0_B4_d 8 + 4 = 12
"""
try:
# 初始化GPIO控制器
ctrl = GPIOChipController()
# 导出并配置GPIO (假设控制器的base=0偏移量B4即12)
gpio_num = ctrl.export_gpio(12)
ctrl.setup_gpio(gpio_num, 'out')
# 引脚高低电平测试
for _ in range(10):
ctrl.set_gpio(gpio_num, 1)
print(f"GPIO{gpio_num} 置高")
time.sleep(10)
ctrl.set_gpio(gpio_num, 0)
print(f"GPIO{gpio_num} 置低")
time.sleep(1)
except PermissionError:
print("需要root权限请使用sudo运行")
except Exception as e:
print(f"发生错误: {str(e)}")

BIN
rk3506扩展IO引脚图.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 178 KiB

View File

@ -7,13 +7,15 @@
'''
import socket
import json
import time
cmd_set_target = {
# 称量
"command": "set_target",
"payload": {
"target_weight": 200,
"algorithm": "pid"
"algorithm": "pid",
"direction_control": False
}
}
@ -43,8 +45,30 @@ cmd_set_vibrate = { # 振动控制
}
}
# 使用 with 语句确保 socket 在使用完毕后正确关闭
with socket.socket() as s:
s.connect(('127.0.0.1', 5000))
s.send(json.dumps(cmd_set_target).encode())
print(s.recv(1024).decode())
# 发送命令
try:
s.sendall(json.dumps(cmd_set_target).encode())
except socket.error as e:
print(f"发送数据时发生错误: {e}")
# break # 如果发送失败,退出循环
start_time = time.time()
# 接收数据
while True:
try:
data = s.recv(1024)
if not data:
print("没有收到数据,连接可能已关闭")
# break # 如果没有数据,退出循环
time.sleep(1)
print(data.decode())
end_time = time.time()
elapsed_time = end_time - start_time
print(f"代码执行时间:{elapsed_time:.6f}")
break
except socket.error as e:
print(f"接收数据时发生错误: {e}")
#break # 如果接收失败,退出循环