From 6833bb7973c23a8ea6b7d3aa85976123f8b2510e Mon Sep 17 00:00:00 2001 From: pengqi Date: Tue, 30 Sep 2025 14:57:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=80=E6=96=B0=E7=89=88=E6=9C=AC=E5=AF=86?= =?UTF-8?q?=E8=83=BA=E8=AE=A1=E9=87=8F=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ControlAlgor.py | 2 +- Controller.py | 49 ++-- Gpio.py | 1 + Network.py | 14 +- README.md | 48 +++- RS485.py | 8 +- controller.log | 5 + export_pin.py | 1 - network_controller.py | 198 +++++++++++++++ send_target.py | 62 +++-- send_target1.py | 99 ++++++++ send_vibration_control.py | 67 +++++ servo_motor_IO.py | 108 ++++++++ servo_motor_PWM.py | 138 ++++++++++ test.py | 462 ++------------------------------- test2.py | 522 -------------------------------------- 16 files changed, 767 insertions(+), 1017 deletions(-) create mode 100644 network_controller.py create mode 100644 send_target1.py create mode 100644 send_vibration_control.py create mode 100644 servo_motor_IO.py create mode 100644 servo_motor_PWM.py delete mode 100644 test2.py diff --git a/ControlAlgor.py b/ControlAlgor.py index 0d9b089..db440c9 100644 --- a/ControlAlgor.py +++ b/ControlAlgor.py @@ -42,7 +42,7 @@ class PIDAlgorithm(ControlAlgorithm): #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 return max(self.min_speed, min(self.max_speed, output)), error # - +# 不是真正的模糊控制算法,是一个简单的分段阈值控制 class FuzzyLogicAlgorithm(ControlAlgorithm): def calculate_speed(self, current, target): error = target - current diff --git a/Controller.py b/Controller.py index 195419c..1f630ac 100644 --- a/Controller.py +++ b/Controller.py @@ -51,11 +51,16 @@ class Controller: def start(self): self.running = True + + # 创建线程列表(包含5个并发任务) threads = [ + # 线程1:重量读取循环 threading.Thread(target=self._weight_reading_loop), threading.Thread(target=self._control_loop), threading.Thread(target=self.network.start_server), + # 线程4:振动控制 threading.Thread(target=self._control_vibrate), + # 线程5:看门狗线程(未使用thread()包装) self.watchdog ] for t in threads: @@ -65,26 +70,27 @@ class Controller: def _weight_reading_loop(self): while self.running: if self.sensor.read_weight(): - with self.sensor.lock: - self.current_weight = self.sensor.current_weight - with self.status_lock: + with self.sensor.lock: # 进入上下文,获取锁 获取传感器数据锁 + self.current_weight = self.sensor.current_weight # 在锁保护下操作共享数据 + # 退出上下文,自动释放锁 + with self.status_lock: # 获取传感器状态锁 self.network.status['current_weight'] = self.current_weight if self.current_weight >= self.network.status['target_weight']: self.network.status['measuring'] = False self.network.status['start_weighting'] = False - self.network.status['weighting_isok'] = True + self.network.status['weighting_isok'] = True # 标记称重完成 - - if self.network.status['set_tare'] == True: - if self.sensor.tare(): - self.network.status['set_tare_num_time'] += 1 + if self.network.status['set_tare'] == True: # 检查是否需要执行去皮操作 + if self.sensor.tare(): # 执行传感器去皮 + self.network.status['set_tare_num_time'] += 1 # 增加去皮计数器 self.network.status['set_tare'] = False + # 主动获取重量请求处理 if self.network.status['get_weight'] == True: with self.sensor.lock: self.network.status['current_weight'] = self.sensor.current_weight self.network.status['get_weight'] = False - time.sleep(0.05) # 20Hz + time.sleep(0.05) # 控制循环频率为20Hz(每秒20次循环) # time.sleep(0.1) # 20Hz def _control_loop(self): @@ -92,24 +98,24 @@ class Controller: with self.status_lock: measuring = self.network.status['measuring'] target = self.network.status['target_weight'] - algorithm = self.network.status['algorithm'] - direction_control = self.network.status['direction_control'] - if direction_control== True: # 步进电机方向控制 - self.gpio._write_value(12, 1) + algorithm = self.network.status['algorithm'] # 使用的控制算法类型 + direction_control = self.network.status['direction_control'] # 电机方向控制标志 + if direction_control == True: # 步进电机方向控制 + self.gpio._write_value(12, 1) # 电机方向 self.network.status['direction_control'] = False if measuring: - self._select_algorithm(algorithm) + self._select_algorithm(algorithm) # 选择控制算法 speed, error = self.algorithm.calculate_speed(self.current_weight, target) - rpm = speed # 假设速度单位直接对应RPM - pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 - self.gpio.set_speed(8, pulse_rate, error) + rpm = speed # 假设速度单位直接对应RPM(转/分钟) + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 # 计算脉冲频率 + self.gpio.set_speed(8, pulse_rate, error) # 电机驱动 else: self.gpio.set_speed(8, 0, 0) while not self.running and not self.network.status['measuring']: time.sleep(0.1) # 短暂休眠,避免CPU占用过高 - time.sleep(0.1) + time.sleep(0.1) # 以10hz的频率运行 def _control_vibrate(self): # 振动控制 while self.running: @@ -124,7 +130,8 @@ class Controller: self.network.status['set_vibrate'] = False self.network.status['set_vibrate_time'] = 0 self.network.status['vibrate_isok'] = True - time.sleep(1) + time.sleep(1) # 每秒检查一次振动请求 + def _select_algorithm(self, name): if name == 'pid': params = self.config['algorithm']['params']['pid'] @@ -145,8 +152,8 @@ class Controller: if not threads_alive: logger.critical("检测到线程异常,触发紧急停止!") - self.emergency_stop() - break + self.emergency_stop() # 执行紧急停止操作 + break # 退出看门狗循环 time.sleep(5) diff --git a/Gpio.py b/Gpio.py index ab1867d..c10c78c 100644 --- a/Gpio.py +++ b/Gpio.py @@ -103,6 +103,7 @@ class GPIOManager: except IOError as e: logger.error(f"GPIO写入错误: {str(e)}") + def cleanup(self): with self.lock: for pin in self.pins: diff --git a/Network.py b/Network.py index 2a942c8..4d62314 100644 --- a/Network.py +++ b/Network.py @@ -56,11 +56,11 @@ class NetworkHandler: self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.sock.bind((self.host, self.port)) - self.sock.listen(5) + self.sock.listen(5) # 最多接受多少个等待连接的客户端 logger.info(f"网络服务启动于 {self.host}:{self.port}") while self.running: - conn, addr = self.sock.accept() + conn, addr = self.sock.accept() # coon--数据sock threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() except Exception as e: logger.error(f"网络服务异常: {str(e)}") @@ -69,7 +69,7 @@ class NetworkHandler: def _handle_client(self, conn, addr): try: - data = conn.recv(1024) + data = conn.recv(1024) # 1024--指定从接收缓冲区里最多读取多少字节 if data: cmd = json.loads(data.decode()) self._process_command(cmd) @@ -119,11 +119,17 @@ class NetworkHandler: elif cmd.get('command') == 'set_zero': self.status['set_tare'] = True + print("收到指令set_tare:", self.status['set_tare']) elif cmd.get('command') == 'get_weight': self.status['get_weight'] = True + print("收到指令get_weight:", self.status['get_weight']) elif cmd.get('command') == 'set_vibrate': self.status['set_vibrate'] = True self.status['set_vibrate_time'] = cmd['payload']['time'] - self.status['vibrate_isok'] = False \ No newline at end of file + + # 打印状态 + print("振动状态 set_vibrate:", self.status['set_vibrate']) + print("振动时间 set_vibrate_time:", self.status['set_vibrate_time']) + self.status['vibrate_isok'] = False diff --git a/README.md b/README.md index b13543c..fef0597 100644 --- a/README.md +++ b/README.md @@ -6,14 +6,16 @@ 驱动器:rk引脚9接脉冲+,rk引脚19接脉冲-,引脚9对应代码(pin8)。 -振动平台:rk引脚7接振动平台12,rk引脚11接振动平台10,rk引脚11对应代码(pin9)。 +振动平台:rk引脚7接振动平台12(GND),rk引脚11接振动平台10,rk引脚11对应代码(pin9)。 + +舵机:rk引脚 步进电机方向控制:rk引脚17接驱动器方向+,(rk引脚17对应代码pin12),驱动器方向-接rk引脚19。 二、使用方法: 1、开启服务:python main.py -2、发送称量重量、获取重量、置零 python send_target.py +2、发送称量重量、获取重量、置零 python send_target1.py 或 测试控制振动平台 python send_vibration_control.py 三、发送端配置 send_target.py @@ -32,4 +34,44 @@ cmd_set_target = { "set_tare_num_time": 0, "get_weight": false, "set_vibrate":true, "set_vibrate_time": 30} 四、秤资料 -链接 https://3121226.ma3you.cn/static/tourguide/content_show_v2/index.html?v=20250115&article_id=A022Emr#/ \ No newline at end of file +链接 https://3121226.ma3you.cn/static/tourguide/content_show_v2/index.html?v=20250115&article_id=A022Emr#/ + +五、配置固定IP,可修改/oem/usr/user_init.sh文件 +```python +#!/bin/sh +# 自动配置固定 IP 和 DNS + +# 配置网络 +ifconfig eth0 192.168.1.100 netmask 255.255.255.0 up +route add default gw 192.168.1.1 + +# 配置 DNS +echo "nameserver 8.8.8.8" > /userdata/resolv.conf + +# 动态覆盖只读文件 /etc/resolv.conf +mount --bind /userdata/resolv.conf /etc/resolv.conf +``` +六、设置开机自启动某程序,可修改/oem/usr/user_init.sh文件(官方留给用户添加程序自启动的地方) +```python +python /userdata/helloword.py > /userdata/helloword.log 2>&1 & +``` +执行结果保存在/userdata/helloword.log + +## 可能会出现的问题 +1、如果RK3506并没有出现运输的过程(存在撞件的问题),但是却出现如网口、串口等识别不了的情况: + +可能考虑重新刷一遍出厂固件(除了硬件坏了之外,其他的没有什么是不能通过刷机解决的) + +## 测试3506上关于PWM设备树的引脚 +```python +# 1. 查看系统中已注册的PWM控制器 +ls /sys/class/pwm/ # 输出如pwmchip0、pwmchip1(对应PWM0、PWM1控制器) + +# 2. 查看PWM0控制器的设备树配置(需内核支持调试) +cat /proc/device-tree/pwm@ff200000/pwm0_ch0/rockchip,pin # 查看PWM0_CH0绑定的引脚 +# 输出可能类似“gpio0 0”,对应GPIO0_A0(参考文档2.6节引脚编号规则) + +# 3. 查看引脚的功能复用状态(确认是否为PWM功能) +cat /sys/kernel/debug/pinctrl/pinctrl-rockchip:gpio0/pins | grep -A5 "gpio0-0" +# 输出中若有“function: pwm0_ch0”,说明GPIO0_A0已复用为PWM0_CH0 +``` \ No newline at end of file diff --git a/RS485.py b/RS485.py index 2f9db51..92877a6 100644 --- a/RS485.py +++ b/RS485.py @@ -51,15 +51,15 @@ class RS485Reader: raise def read_weight(self): - cmd = bytes.fromhex('010300000002c40b') + cmd = bytes.fromhex('010300000002c40b') # 从地址 01 的设备读取 2 个保持寄存器,从地址 0 开始 try: self.ser.write(cmd) - response = self.ser.read(9) + response = self.ser.read(9) # 然后读取设备返回的 9 字节响应 if len(response) == 9 and response[:3] == bytes.fromhex("010304"): weight = int.from_bytes(response[3:5], 'big') with self.lock: self.buffer.append(weight) - self.current_weight = sum(self.buffer) // len(self.buffer) + self.current_weight = sum(self.buffer) // len(self.buffer) # 平滑滤波 return True return False except Exception as e: @@ -68,7 +68,7 @@ class RS485Reader: return False def tare(self): # 去皮方法 - cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 + cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 发送命令给设备地址 1,将寄存器地址 0x0064 设置为 0x0001,执行“去皮”指令。 try: if self.read_weight(): # 确保有最新数据 self.ser.write(cmd) diff --git a/controller.log b/controller.log index e69de29..d31b3fd 100644 --- a/controller.log +++ b/controller.log @@ -0,0 +1,5 @@ +2025-04-03 15:21:08,435 - Gpio - ERROR - GPIO 8 ʼʧ: [Errno 2] No such file or directory: '/sys/class/gpio/export' +2025-04-07 10:40:31,493 - Gpio - ERROR - GPIO 8 ʼʧ: [Errno 2] No such file or directory: '/sys/class/gpio/export' +2025-05-13 14:04:13,116 - Gpio - ERROR - GPIO 8 ʼʧ: [Errno 2] No such file or directory: '/sys/class/gpio/export' +2025-07-01 11:29:16,062 - Gpio - ERROR - GPIO 8 ʼʧ: [Errno 2] No such file or directory: '/sys/class/gpio/export' +2025-07-01 11:32:40,605 - Gpio - ERROR - GPIO 8 ʼʧ: [Errno 2] No such file or directory: '/sys/class/gpio/export' diff --git a/export_pin.py b/export_pin.py index 73d2fea..1928e28 100644 --- a/export_pin.py +++ b/export_pin.py @@ -9,7 +9,6 @@ import os import time - class GPIOChipController: def __init__(self, chip_path="/sys/class/gpio/gpiochip0"): # 注意引脚分组 # 读取GPIO控制器信息 diff --git a/network_controller.py b/network_controller.py new file mode 100644 index 0000000..ee31007 --- /dev/null +++ b/network_controller.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/7/1 11:07 +# @Author : reenrr +# @File : network_controller.py +''' +import socket +import binascii +import time + +# 网络继电器的 IP 和端口 +HOST = '192.168.0.18' +PORT = 50000 + +# 控件命名映射 +CONVEYOR1 = 'conveyor1' +PUSHER = 'pusher' +CONVEYOR2 = 'conveyor2' +CLAMP = 'clamp' # 机械臂抓夹,初始状态是开 + +# 传感器命名映射 +SENSOR1 = 'sensor1' +SENSOR2 = 'sensor2' + +# 按钮控制报文 +valve_commands = { + CONVEYOR1: { + 'open': '00000000000601050000FF00', + 'close': '000000000006010500000000', + }, + PUSHER: { + 'open': '00000000000601050001FF00', + 'close': '000000000006010500010000', + }, + CONVEYOR2: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500020000', + }, + CLAMP: { + 'open': '00000000000601050003FF00', + 'close': '000000000006010500030000', + } +} + +sensors_commands = { + SENSOR1: { + 'signal': '00000000000401020101', + 'nosignal': '00000000000401020100', + }, + SENSOR2: { + 'signal': '00000000000401020102', + 'nosignal': '00000000000401020100', + }, +} + +# 读取状态命令 +read_status_command = { + 'devices':'000000000006010100000008', + 'sensors':'000000000006010200000008' +} + +# 控件对应 DO 位(从低到高) +device_bit_map = { + CONVEYOR1: 0, + PUSHER: 1, + CONVEYOR2: 2, + CLAMP: 3, +} + +device_name_map = { + CONVEYOR1: "传送带1", + PUSHER: "推板", + CONVEYOR2: "传送带2", + CLAMP: "机械臂抓夹" +} + +# 传感器对应位(从低到高) +sensor_bit_map = { + SENSOR1: 0, + SENSOR2: 1, + # 根据你继电器的配置,继续添加更多传感器 +} + +sensor_name_map = { + SENSOR1: '位置传感器1', + SENSOR2: '位置传感器2', +} + + +# 将十六进制字符串转换为字节数据并发送 +def send_command(command): + byte_data = binascii.unhexlify(command) + + # 创建套接字并连接到继电器 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + sock.connect((HOST, PORT)) + sock.send(byte_data) + # 接收响应 + response = sock.recv(1024) + # print(f"收到响应: {binascii.hexlify(response)}") + # 校验响应 + return response + except Exception as e: + print(f"通信错误: {e}") + return False + +def get_all_device_status(command_type='devices'): + command = read_status_command.get(command_type) + if not command: + print(f"未知的读取类型: {command_type}") + return {} + + response = send_command(command) + status_dict = {} + + if response and len(response) >= 10: + status_byte = response[9] # 状态在第10字节 + status_bin = f"{status_byte:08b}"[::-1] + + if command_type == 'devices': + bit_map = device_bit_map + name_map = device_name_map + elif command_type == 'sensors': + bit_map = sensor_bit_map + name_map = sensor_name_map + else: + print("不支持的映射类型") + return{} + + for key, bit_index in bit_map.items(): + state = status_bin[bit_index] == '1' + status_dict[key] = state + # readable = "开启" if state else "关闭" + # print(f"{device.capitalize()} 状态: {readable}") + else: + print("读取状态失败或响应无效") + + return status_dict + +# 获取单个控件状态(True: 开启, False: 关闭, None: 无法读取) +def get_device_status(device_name, command_type='devices'): + status = get_all_device_status(command_type) + return status.get(device_name, None) + +# 根据状态决定是否执行开操作 +def open(conveyor1=False, pusher=False, conveyor2=False, clamp=False): + status = get_all_device_status() + + if conveyor1 and not status.get(CONVEYOR1, False): + print("打开传送带1") + send_command(valve_commands[CONVEYOR1]['open']) + time.sleep(1) + + if pusher and not status.get(PUSHER, False): + print("打开推板") + send_command(valve_commands[PUSHER]['open']) + time.sleep(0.05) + + if conveyor2 and not status.get(CONVEYOR2, False): + print("打开传送带2") + send_command(valve_commands[CONVEYOR2]['open']) + time.sleep(1) + + if clamp and not status.get(CLAMP, False): + print("启动机械臂") + send_command(valve_commands[CLAMP]['open']) + time.sleep(0.5) + +# 根据状态决定是否执行关操作 +def close(conveyor1=False, pusher=False, conveyor2=False, clamp=False): + status = get_all_device_status() + + if conveyor1 and status.get(CONVEYOR1, True): + print("关闭传送带1") + send_command(valve_commands[CONVEYOR1]['close']) + time.sleep(1) + + if pusher and status.get(PUSHER, True): + print("关闭推板") + send_command(valve_commands[PUSHER]['close']) + time.sleep(0.05) + + if conveyor2 and status.get(CONVEYOR2, True): + print("关闭传送带2") + send_command(valve_commands[CONVEYOR2]['close']) + time.sleep(1) + + if clamp and status.get(CLAMP, True): + print("停止机械臂") + send_command(valve_commands[CLAMP]['close']) + time.sleep(0.5) + + + + + diff --git a/send_target.py b/send_target.py index 638d425..0d0495c 100644 --- a/send_target.py +++ b/send_target.py @@ -9,11 +9,14 @@ import socket import json import time +target_weight = 200 +set_vibrate_time = 10 + cmd_set_target = { # 称量 "command": "set_target", "payload": { - "target_weight": 200, + "target_weight": target_weight, "algorithm": "pid", "direction_control": False } @@ -30,45 +33,54 @@ cmd_get_weight = { cmd_set_zero = { # 去皮 - "command": "set_zero" - # "payload": { - # "target_weight": 200, - # "algorithm": "pid" - # } + "command": "set_zero", + "payload": { + # "target_weight": 200, + # "algorithm": "pid" + } } cmd_set_vibrate = { # 振动控制 "command": "set_vibrate", "payload": { - "time": 0 # 单位S + "time": set_vibrate_time # 单位S } } + # 使用 with 语句确保 socket 在使用完毕后正确关闭 with socket.socket() as s: s.connect(('127.0.0.1', 5000)) - # 发送命令 + # 发送称重命令 try: s.sendall(json.dumps(cmd_set_target).encode()) + print("已发送目标值设定指令") + + start_time = time.time() + # 接收数据 + while True: + data = s.recv(1024) + if not data: + print("连接已关闭") + break + + decoded = data.decode() + print(f"收到数据: {decoded}") + + try: + response = json.loads(decoded) + current_weight = response.get("current_weight") + if current_weight is not None: + print(f"当前重量:{current_weight}") + if current_weight >= target_weight: + print("目标已达到,发送振动控制指令") + s.sendall(json.dumps(cmd_set_vibrate).encode()) + break + except socket.error as e: + print(f"接收数据时发生错误: {e}") + # break # 如果接收失败,退出循环 except socket.error as e: print(f"发送数据时发生错误: {e}") # break # 如果发送失败,退出循环 - start_time = time.time() - # 接收数据 - while True: - try: - data = s.recv(1024) - if not data: - print("没有收到数据,连接可能已关闭") - # break # 如果没有数据,退出循环 - time.sleep(1) - print(data.decode()) - end_time = time.time() - elapsed_time = end_time - start_time - print(f"代码执行时间:{elapsed_time:.6f} 秒") - break - except socket.error as e: - print(f"接收数据时发生错误: {e}") - #break # 如果接收失败,退出循环 \ No newline at end of file diff --git a/send_target1.py b/send_target1.py new file mode 100644 index 0000000..1646a5d --- /dev/null +++ b/send_target1.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/6/18 10:27 +# @Author : reenrr +# @File : send_target-test.py +''' +import socket +import json +import time + +target_weight = 200 +set_vibrate_time = 10 + +cmd_set_target = { + # 称量 + "command": "set_target", + "payload": { + "target_weight": target_weight, + "algorithm": "pid", + "direction_control": False + } +} + +cmd_get_weight = { + # 获取重量 + "command": "get_weight" +} + +cmd_set_zero = { + # 去皮 + "command": "set_zero", + "payload": { + } +} + +cmd_set_vibrate = { # 振动控制 + "command": "set_vibrate", + "payload": { + "time": set_vibrate_time # 单位S + } +} + +def start_weight(host='127.0.0.1', port=5000): + with socket.socket() as s: + try: + s.connect((host, port)) + s.sendall(json.dumps(cmd_set_zero).encode()) + print("已发送去皮指令") + time.sleep(1) # 等待去皮完成 + + s.sendall(json.dumps(cmd_get_weight).encode()) + print("请求当前重量...") + + data = s.recv(1024) + response = json.loads(data.decode()) + current_weight = response.get("current_weight") + + print(f"当前重量:{current_weight}") + except Exception as e: + print(f"操作过程中发生异常:{e}") + + with socket.socket() as s: + try: + if isinstance(current_weight, (int, float)) and abs(current_weight) < 0.01: + print("重量为0,开始称量流程") + + s.sendall(json.dumps(cmd_set_target).encode()) + print("已发送目标值设定指令") + + while True: + try: + data = s.recv(1024) + if not data: + print("连接已关闭") + break + + response = json.loads(data.decode()) + current_weight = response.get("current_weight") + + if current_weight is not None: + print(f"当前重量:{current_weight}") + if current_weight >= target_weight: + print("目标已达到") + break + except Exception as e: + print(f"接收或解析重量数据失败:{e}") + break + else: + print("当前重量不为0,中止称量流程,请检查设备") + except Exception as e: + print(f"操作过程中发生异常:{e}") + +if __name__ == "__main__": + if start_weight(host='127.0.0.1', port=5000): + print("称重完毕") + else: + print("称重发生错误") + exit(-1) \ No newline at end of file diff --git a/send_vibration_control.py b/send_vibration_control.py new file mode 100644 index 0000000..d4c24a8 --- /dev/null +++ b/send_vibration_control.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/6/18 10:27 +# @Author : reenrr +# @File : send_target-test.py +''' +import socket +import json +import time + +target_weight = 200 +set_vibrate_time = 10 + +cmd_set_target = { + # 称量 + "command": "set_target", + "payload": { + "target_weight": target_weight, + "algorithm": "pid", + "direction_control": False + } +} + +cmd_get_weight = { + # 获取重量 + "command": "get_weight" + # "payload": { + # "target_weight": 200, + # "algorithm": "pid" + # } +} + +cmd_set_zero = { + # 去皮 + "command": "set_zero", + "payload": { + # "target_weight": 200, + # "algorithm": "pid" + } +} + +cmd_set_vibrate = { # 振动控制 + "command": "set_vibrate", + "payload": { + "time": set_vibrate_time # 单位S + } +} + +# 使用 with 语句确保 socket 在使用完毕后正确关闭 +with socket.socket() as s: + try: + s.connect(('127.0.0.1', 5000)) + s.sendall(json.dumps(cmd_set_vibrate).encode()) + print("已发送振动控制指令") + # level = gpio.read() + # print(f"GPIO 9 当前电平值为: {level}") + + # 接收响应(可选) + data = s.recv(1024) + if data: + print("收到响应:", data.decode()) + else: + print("无响应,连接可能已关闭") + + except socket.error as e: + print(f"通信错误: {e}") diff --git a/servo_motor_IO.py b/servo_motor_IO.py new file mode 100644 index 0000000..1a8e6a6 --- /dev/null +++ b/servo_motor_IO.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/23 18:21 +# @Author : reenrr +# @File : servo_motor.py +# 功能描述:RK3506通过GPIO模拟PWM,控制DS3235舵机(模拟持续旋转) +# 参考文档:DS3235 Coreless datasheet.pdf、接线图.docx +''' +import time +import signal +from periphery import GPIO + +# -------------------------- 1. 硬件配置(必须根据实际接线调整) -------------------------- +# 1.1 舵机PWM信号输出引脚(RK3506的GPIO编号,需与硬件接线对应) +# 例:若舵机信号线接GPIO3_B2_d,对应Linux GPIO编号为10(参考你提供的vibration_control-test.py) +SERVO_GPIO_NUM = 10 # 替换为你的实际舵机信号引脚GPIO编号 + +# 1.2 DS3235舵机核心参数(来自DS3235规格书) +PWM_PERIOD_MS = 20 # PWM周期:20ms(规格书4-6条“控制频率50Hz”,1/50Hz=20ms) +MIN_PULSE_US = 500 # 最小脉宽:500μsec(对应0°,规格书4-2条) +MAX_PULSE_US = 2500 # 最大脉宽:2500μsec(对应180°,规格书4-2条) +ROTATE_DELAY = 0.1 # 角度切换间隔(越小转动越“连续”,建议0.1-0.5秒) + +# 1.3 舵机角度模式选择(根据需求切换,参考接线图.docx) +MODE = "180" # 180°模式:500μsec=0°,2500μsec=180° + +# -------------------------- 2. 全局变量初始化 -------------------------- +gpio = None # 舵机GPIO引脚对象 +is_running = True # 程序运行标志 + + +# -------------------------- 3. PWM信号生成函数 -------------------------- +def set_servo_pulse(pulse_us): + ''' + 生成指定脉宽的PWM信号(通过GPIO高低电平模拟) + 参数:pulse_us - 高电平脉宽(单位:微秒) + ''' + global gpio + # 脉宽范围限制(避免舵机过载,严格遵循规格书) + pulse_us = max(MIN_PULSE_US, min(MAX_PULSE_US, pulse_us)) + + # 1. 输出高电平(持续pulse_us微秒) + gpio.write(True) + time.sleep(pulse_us / 1000000) # 转换为秒单位 + + # 2. 输出低电平(补全PWM周期,20ms - 高电平时间) + low_time_ms = PWM_PERIOD_MS - (pulse_us / 1000) # 低电平时间(毫秒) + gpio.write(False) + time.sleep(low_time_ms / 1000) # 转换为秒单位 + + +# -------------------------- 4. 模拟持续旋转函数 -------------------------- +def continuous_rotate_sim(): + ''' + 模拟持续旋转:交替输出“最小脉宽(0°)”和“最大脉宽(180°)” + ''' + global is_running + print(f"DS3235舵机已启动({MODE}°模式,按Ctrl+C停止)...") + + while is_running: + # 第一步:输出最小脉宽(对应0°) + set_servo_pulse(MIN_PULSE_US) + print(f"当前脉宽:{MIN_PULSE_US}μsec(0°)") + time.sleep(ROTATE_DELAY) + + # 第二步:输出最大脉宽(对应180°/270°) + set_servo_pulse(MAX_PULSE_US) + if MODE == "180": + print(f"当前脉宽:{MAX_PULSE_US}μsec(180°)") + else: + print(f"当前脉宽:{MAX_PULSE_US}μsec(270°)") + time.sleep(ROTATE_DELAY) + + +# -------------------------- 5. 信号处理(Ctrl+C优雅退出) -------------------------- +def signal_handler(signum, frame): + '''捕获Ctrl+C信号,设置退出标志''' + global is_running + print("\n收到停止指令,正在清理资源...") + is_running = False + + +# -------------------------- 6. 主函数 -------------------------- +if __name__ == "__main__": + # 1. 注册信号处理(确保Ctrl+C能正常退出) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # 2. 初始化舵机GPIO引脚(输出模式,参考你的vibration_control-test.py风格) + gpio = GPIO(SERVO_GPIO_NUM, "out") + print(f"舵机GPIO引脚(编号:{SERVO_GPIO_NUM})初始化完成") + + # 3. 启动模拟持续旋转 + continuous_rotate_sim() + + except Exception as e: + # 异常处理(如GPIO初始化失败) + print(f"程序异常:{str(e)}") + + finally: + # 4. 资源清理(必做!避免GPIO引脚异常) + if gpio is not None: + gpio.write(False) # 程序结束前拉低GPIO,避免舵机持续受力 + gpio.close() + print("舵机GPIO引脚已关闭") + print("程序退出完成") \ No newline at end of file diff --git a/servo_motor_PWM.py b/servo_motor_PWM.py new file mode 100644 index 0000000..5cfaa97 --- /dev/null +++ b/servo_motor_PWM.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/24 16:55 +# @Author : reenrr +# @File : servo_motor_PWM.py +''' +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/24 10:30 +# @Author : reenrr +# @File : servo_motor_pwm.py +# 功能描述:RK3506通过硬件PWM控制DS3235舵机(模拟持续旋转) +# 参考文档:DS3235 Coreless datasheet.pdf、瑞芯微RK3506 PWM控制器文档 +''' +import time +import signal +import os + +# -------------------------- 1. 硬件配置(必须根据实际接线调整) -------------------------- +# 1.1 舵机PWM通道(RK3506的PWM通道,需与硬件接线对应) +# 瑞芯微RK3506通常有多个PWM通道,如pwm0, pwm1等,根据实际使用的通道修改 +PWM_CHIP = 0 # PWM芯片编号 +PWM_CHANNEL = 0 # PWM通道编号 +PWM_PATH = f"/sys/class/pwm/pwmchip{PWM_CHIP}/pwm{PWM_CHANNEL}" # 已经通过设备树绑定到了相关的引脚上了 + +# 1.2 DS3235舵机核心参数(来自DS3235规格书) +PWM_PERIOD_MS = 20 # PWM周期:20ms(规格书4-6条“控制频率50Hz”,1/50Hz=20ms) +MIN_PULSE_US = 500 # 最小脉宽:500μsec(对应0°,规格书4-2条) +MAX_PULSE_US = 2500 # 最大脉宽:2500μsec(对应180°,规格书4-2条) +ROTATE_DELAY = 0.1 # 角度切换间隔(越小转动越“连续”,建议0.1-0.5秒) + +# 1.3 舵机角度模式选择 +MODE = "180" # 180°模式 + +# -------------------------- 2. 全局变量初始化 -------------------------- +is_running = True # 程序运行标志 + + +# -------------------------- 3. 硬件PWM控制函数 -------------------------- +def export_pwm(): + """导出PWM通道(使能PWM功能)""" + if not os.path.exists(PWM_PATH): + with open(f"/sys/class/pwm/pwmchip{PWM_CHIP}/export", "w") as f: + f.write(str(PWM_CHANNEL)) + # 等待文件系统创建完成 + time.sleep(0.1) + + +def unexport_pwm(): + """取消导出PWM通道(禁用PWM功能)""" + if os.path.exists(PWM_PATH): + with open(f"/sys/class/pwm/pwmchip{PWM_CHIP}/unexport", "w") as f: + f.write(str(PWM_CHANNEL)) + + +def set_pwm_period(period_ns): + """设置PWM周期(单位:纳秒)""" + with open(f"{PWM_PATH}/period", "w") as f: + f.write(str(period_ns)) + + +def set_pwm_duty_cycle(duty_ns): + """设置PWM占空比(单位:纳秒)""" + # 确保占空比在有效范围内 + duty_ns = max(MIN_PULSE_US, min(MAX_PULSE_US, duty_ns)) + with open(f"{PWM_PATH}/duty_cycle", "w") as f: + f.write(str(duty_ns)) + + +def enable_pwm(enable=True): + """启用或禁用PWM输出""" + with open(f"{PWM_PATH}/enable", "w") as f: + f.write("1" if enable else "0") + + +def set_servo_angle(pulse_ns): + """通过设置脉宽控制舵机角度""" + set_pwm_duty_cycle(pulse_ns) + + +# -------------------------- 4. 模拟持续旋转函数 -------------------------- +def continuous_rotate_sim(): + """模拟持续旋转:在最小和最大脉宽之间交替切换""" + global is_running + print(f"DS3235舵机已启动({MODE}°模式,硬件PWM控制,按Ctrl+C停止)...") + + # 初始化PWM + export_pwm() + set_pwm_period(PWM_PERIOD_MS) + enable_pwm(True) + + try: + while is_running: + # 输出最小脉宽(对应0°) + set_servo_angle(MIN_PULSE_US) + print(f"当前脉宽:{MIN_PULSE_US}ns(0°)") + time.sleep(ROTATE_DELAY) + + # 输出最大脉宽(对应180°) + set_servo_angle(MAX_PULSE_US) + if MODE == "180": + print(f"当前脉宽:{MAX_PULSE_US}ns(180°)") + else: + print(f"当前脉宽:{MAX_PULSE_US}ns(270°)") + time.sleep(ROTATE_DELAY) + finally: + # 关闭PWM输出 + enable_pwm(False) + + +# -------------------------- 5. 信号处理(Ctrl+C优雅退出) -------------------------- +def signal_handler(signum, frame): + """捕获Ctrl+C信号,设置退出标志""" + global is_running + print("\n收到停止指令,正在清理资源...") + is_running = False + + +# -------------------------- 6. 主函数 -------------------------- +if __name__ == "__main__": + # 注册信号处理 + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # 启动模拟持续旋转 + continuous_rotate_sim() + + except Exception as e: + print(f"程序异常:{str(e)}") + + finally: + # 资源清理 + unexport_pwm() + print("舵机PWM通道已关闭") + print("程序退出完成") diff --git a/test.py b/test.py index 9cb1f8b..0ac5a0d 100644 --- a/test.py +++ b/test.py @@ -1,448 +1,38 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- ''' -# @Time : 2025/2/14 14:43 -# @Author : hjw +# @Time : 2025/9/24 14:07 +# @Author : reenrr # @File : test.py ''' -import os +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/6/17 09:26 +# @Author : reenrr +# @File : vibration_control-test.py +''' import time -import threading -import serial -import socket -import logging -import json -from queue import Queue -from abc import ABC, abstractmethod -from collections import deque +from periphery import GPIO -# 配置日志系统 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler('controller.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) +# GPIO0_B2_d 对应的 Linux GPIO 编号为 10 +gpio = GPIO(10, "out") # 设置为输出模式 +try: + print("开始电平切换 (按 Ctrl+C 停止)...") + while True: + gpio.write(True) # 输出高电平 + print("高电平") + time.sleep(10) # 保持 1 秒 -def parse_config(file_path): - config = {} - current_level = [config] # 使用栈来跟踪当前层级的字典 - indent_stack = [] # 记录缩进层级 + gpio.write(False) # 输出低电平 + print("低电平") + time.sleep(5) - with open(file_path, 'r', encoding='utf-8') as file: - for line in file: - line = line.rstrip() # 去掉行尾的换行符和空格 - if not line or line.startswith('#'): # 跳过空行和注释 - continue +except KeyboardInterrupt: + print("已手动停止") - # 检测缩进层级 - indent = len(line) - len(line.lstrip()) - while indent_stack and indent <= indent_stack[-1]: - current_level.pop() - indent_stack.pop() +finally: + gpio.write(False) # 程序结束前确保拉低电平 + gpio.close() - key, value = line.split(':', 1) # 分割键和值 - key = key.strip() - value = value.strip() - - # 处理值 - if value.startswith('[') and value.endswith(']'): # 列表 - value = [int(x.strip()) for x in value[1:-1].split(',')] - elif value.isdigit(): # 数字 - value = int(value) - elif value.replace('.', '', 1).isdigit(): # 浮点数 - value = float(value) - elif value.lower() in ('true', 'false'): # 布尔值 - value = value.lower() == 'true' - - # 插入到当前层级的字典中 - if value: # 如果有值,则直接赋值 - current_level[-1][key] = value - else: # 如果没有值,则创建一个新字典 - new_dict = {} - current_level[-1][key] = new_dict - current_level.append(new_dict) - indent_stack.append(indent) - - return config - - -class GPIOManager: - def __init__(self, config): - self.pins = config['gpio']['pins'] - self.pulse_per_rev = config['gpio']['pulse_per_rev'] - self.threads = {} - self.stop_flags = {} - self.gpio_files = {} - self.lock = threading.Lock() - - for pin in self.pins: - self._init_gpio(pin) - - def _init_gpio(self, pin): - try: - export_path = '/sys/class/gpio/export' - gpio_path = f'/sys/class/gpio/gpio{pin}' - - if not os.path.exists(gpio_path): - with open(export_path, 'w') as f: - f.write(str(pin)) - time.sleep(0.1) - - direction_path = f'{gpio_path}/direction' - with open(direction_path, 'w') as f: - f.write('out') - - self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') - logger.info(f"GPIO {pin} 初始化完成") - except Exception as e: - logger.error(f"GPIO {pin} 初始化失败: {str(e)}") - raise - - def set_speed(self, pin_id, speed): - with self.lock: - if pin_id not in self.pins: - raise ValueError(f"无效的GPIO引脚: {pin_id}") - - # 停止现有线程 - if pin_id in self.threads: - self.stop_flags[pin_id] = True - self.threads[pin_id].join() - - if speed <= 0: - self._write_value(pin_id, 0) - return - - # 计算间隔时间(400脉冲/转) - interval = 1.0 / (2 * speed) # 每个周期包含高低电平 - self.stop_flags[pin_id] = False - self.threads[pin_id] = threading.Thread( - target=self._pulse_loop, - args=(pin_id, interval), - daemon=True - ) - self.threads[pin_id].start() - - def _pulse_loop(self, pin_id, interval): - while not self.stop_flags.get(pin_id, False): - self._write_value(pin_id, 1) - time.sleep(interval) - self._write_value(pin_id, 0) - time.sleep(interval) - - def _write_value(self, pin_id, value): - try: - self.gpio_files[pin_id].seek(0) - self.gpio_files[pin_id].write(str(value)) - self.gpio_files[pin_id].truncate() - self.gpio_files[pin_id].flush() - os.fsync(self.gpio_files[pin_id].fileno()) - except IOError as e: - logger.error(f"GPIO写入错误: {str(e)}") - - def cleanup(self): - with self.lock: - for pin in self.pins: - if pin in self.threads: - self.stop_flags[pin] = True - self.threads[pin].join() - self._write_value(pin, 0) - for f in self.gpio_files.values(): - f.close() - logger.info("GPIO资源已释放") - - -class RS485Reader: - def __init__(self, config): - self.port = config['serial']['port'] - self.baudrate = config['serial']['baudrate'] - self.ser = None - self.current_weight = 0 - self.running = True - self.lock = threading.Lock() - self.buffer = deque(maxlen=10) # 数据缓冲 - self._connect() - - def _connect(self): - try: - self.ser = serial.Serial( - port=self.port, - baudrate=self.baudrate, - bytesize=8, - parity='N', - stopbits=1, - timeout=0.1 - ) - logger.info(f"成功连接串口设备 {self.port}") - except Exception as e: - logger.error(f"串口连接失败: {str(e)}") - raise - - def read_weight(self): - cmd = bytes.fromhex('010300000002c40b') - try: - self.ser.write(cmd) - response = self.ser.read(9) - if len(response) == 9 and response[:3] == bytes.fromhex("010304"): - weight = int.from_bytes(response[3:5], 'big') - with self.lock: - self.buffer.append(weight) - self.current_weight = sum(self.buffer) // len(self.buffer) - return True - return False - except Exception as e: - logger.error(f"读取重量失败: {str(e)}") - self._reconnect() - return False - - def _reconnect(self): - max_retries = 3 - for i in range(max_retries): - try: - if self.ser: - self.ser.close() - self._connect() - return True - except Exception as e: - logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") - time.sleep(1) - logger.error("串口重连失败") - return False - - def stop(self): - self.running = False - if self.ser and self.ser.is_open: - self.ser.close() - - -class ControlAlgorithm(ABC): - @abstractmethod - def calculate_speed(self, current, target): - pass - - -class PIDAlgorithm(ControlAlgorithm): - def __init__(self, kp=0.5, ki=0.01, kd=0.1): - self.kp = kp - self.ki = ki - self.kd = kd - self.integral = 0 - self.last_error = 0 - self.min_speed = 10 # 最小运行速度 - self.max_speed = 100 # 最大运行速度 - - def calculate_speed(self, current, target): - error = target - current - self.integral += error - derivative = error - self.last_error - self.last_error = error - - # 抗积分饱和 - if self.integral > 1000: - self.integral = 1000 - elif self.integral < -1000: - self.integral = -1000 - - output = self.kp * error + self.ki * self.integral + self.kd * derivative - #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 - #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 - return max(self.min_speed, min(self.max_speed, output)) # - - -class FuzzyLogicAlgorithm(ControlAlgorithm): - def calculate_speed(self, current, target): - error = target - current - abs_error = abs(error) - - if abs_error > 100: - return 100 - elif abs_error > 50: - return 70 - elif abs_error > 20: - return 50 - else: - return 30 - - -class NetworkHandler: - def __init__(self, config): - self.host = config['network']['host'] - self.port = config['network']['port'] - self.sock = None - self.command_queue = Queue() - self.status = { - 'measuring': False, - 'error': None, - 'current_weight': 0, - 'target_weight': 0, - 'algorithm': 'pid' - } - self.lock = threading.Lock() - self.running = True - - def start_server(self): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self.sock.bind((self.host, self.port)) - self.sock.listen(5) - logger.info(f"网络服务启动于 {self.host}:{self.port}") - - while self.running: - conn, addr = self.sock.accept() - threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() - except Exception as e: - logger.error(f"网络服务异常: {str(e)}") - finally: - self.sock.close() - - def _handle_client(self, conn, addr): - try: - data = conn.recv(1024) - if data: - cmd = json.loads(data.decode()) - self._process_command(cmd) - - # 返回当前状态 - with self.lock: - response = json.dumps(self.status) - conn.send(response.encode()) - except json.JSONDecodeError: - logger.warning("收到无效的JSON指令") - except Exception as e: - logger.error(f"客户端处理异常: {str(e)}") - finally: - conn.close() - - def _process_command(self, cmd): - with self.lock: - if cmd.get('command') == 'set_target': - - self.status['target_weight'] = cmd['payload']['target_weight'] - self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') - self.status['measuring'] = True - print("收到指令set_target:", self.status['target_weight']) - elif cmd.get('command') == 'stop': - self.status['measuring'] = False - - -class Controller: - def __init__(self, config_path='config.yaml'): - - self.config = parse_config(config_path) - - # 初始化组件 - self.gpio = GPIOManager(self.config) - self.sensor = RS485Reader(self.config) - self.network = NetworkHandler(self.config) - self.algorithm = None - self.running = False - - # 共享状态 - self.current_weight = 0 - self.target_weight = 0 - self.system_status = 'idle' - self.status_lock = threading.Lock() - - # 看门狗线程 - self.watchdog = threading.Thread(target=self._watchdog, daemon=True) - - def start(self): - self.running = True - threads = [ - threading.Thread(target=self._weight_reading_loop), - threading.Thread(target=self._control_loop), - threading.Thread(target=self.network.start_server), - self.watchdog - ] - for t in threads: - t.start() - logger.info("系统启动完成") - - def _weight_reading_loop(self): - while self.running: - if self.sensor.read_weight(): - with self.sensor.lock: - self.current_weight = self.sensor.current_weight - with self.status_lock: - self.network.status['current_weight'] = self.current_weight - if self.current_weight >= self.network.status['target_weight']: - self.network.status['measuring'] = False - time.sleep(0.05) # 20Hz - #time.sleep(0.1) # 20Hz - - def _control_loop(self): - while self.running: - with self.status_lock: - measuring = self.network.status['measuring'] - target = self.network.status['target_weight'] - algorithm = self.network.status['algorithm'] - - if measuring: - self._select_algorithm(algorithm) - speed = self.algorithm.calculate_speed(self.current_weight, target) - rpm = speed # 假设速度单位直接对应RPM - pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 - self.gpio.set_speed(8, pulse_rate) - else: - self.gpio.set_speed(8, 0) - while not self.running and not self.network.status['measuring']: - time.sleep(0.1) # 短暂休眠,避免CPU占用过高 - time.sleep(0.1) - - def _select_algorithm(self, name): - if name == 'pid': - params = self.config['algorithm']['params']['pid'] - self.algorithm = PIDAlgorithm(**params) - elif name == 'fuzzy': - self.algorithm = FuzzyLogicAlgorithm() - else: - self.algorithm = PIDAlgorithm() - - def _watchdog(self): - while self.running: - # 检查线程状态 - threads_alive = all([ - threading.current_thread().is_alive(), - self.sensor.running, - self.network.running - ]) - - if not threads_alive: - logger.critical("检测到线程异常,触发紧急停止!") - self.emergency_stop() - break - - time.sleep(5) - - def emergency_stop(self): - with self.status_lock: - self.network.status['measuring'] = False - self.network.status['error'] = 'emergency_stop' - self.gpio.cleanup() - self.sensor.stop() - self.running = False - logger.info("系统已紧急停止") - - def shutdown(self): - self.running = False - self.gpio.cleanup() - self.sensor.stop() - self.network.running = False - logger.info("系统正常关闭") - - -if __name__ == "__main__": - ctrl = Controller() - try: - ctrl.start() - while True: - time.sleep(1) - except KeyboardInterrupt: - logger.info("收到停止信号...") - finally: - ctrl.shutdown() \ No newline at end of file diff --git a/test2.py b/test2.py deleted file mode 100644 index 6d0d695..0000000 --- a/test2.py +++ /dev/null @@ -1,522 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -# @Time : 2025/2/14 14:43 -# @Author : hjw -# @File : test.py -''' -import os -import time -import threading -import serial -import socket -import logging -import json -from queue import Queue -from abc import ABC, abstractmethod -from collections import deque - -# 配置日志系统 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler('controller.log'), - logging.StreamHandler() - ] -) -logger = logging.getLogger(__name__) - -def precise_delay_us(us): - """实现微秒级延时""" - start_time = time.perf_counter() - #print(time.perf_counter()) - while (time.perf_counter() - start_time) * 1e6 < us: - pass - - -def parse_config(file_path): - config = {} - current_level = [config] # 使用栈来跟踪当前层级的字典 - indent_stack = [] # 记录缩进层级 - - with open(file_path, 'r', encoding='utf-8') as file: - for line in file: - line = line.rstrip() # 去掉行尾的换行符和空格 - if not line or line.startswith('#'): # 跳过空行和注释 - continue - - # 检测缩进层级 - indent = len(line) - len(line.lstrip()) - while indent_stack and indent <= indent_stack[-1]: - current_level.pop() - indent_stack.pop() - - key, value = line.split(':', 1) # 分割键和值 - key = key.strip() - value = value.strip() - - # 处理值 - if value.startswith('[') and value.endswith(']'): # 列表 - value = [int(x.strip()) for x in value[1:-1].split(',')] - elif value.isdigit(): # 数字 - value = int(value) - elif value.replace('.', '', 1).isdigit(): # 浮点数 - value = float(value) - elif value.lower() in ('true', 'false'): # 布尔值 - value = value.lower() == 'true' - - # 插入到当前层级的字典中 - if value: # 如果有值,则直接赋值 - current_level[-1][key] = value - else: # 如果没有值,则创建一个新字典 - new_dict = {} - current_level[-1][key] = new_dict - current_level.append(new_dict) - indent_stack.append(indent) - - return config - - -class GPIOManager: - def __init__(self, config): - self.pins = config['gpio']['pins'] - self.pulse_per_rev = config['gpio']['pulse_per_rev'] - self.threads = {} - self.stop_flags = {} - self.gpio_files = {} - self.lock = threading.Lock() - - for pin in self.pins: - self._init_gpio(pin) - - def _init_gpio(self, pin): - try: - export_path = '/sys/class/gpio/export' - gpio_path = f'/sys/class/gpio/gpio{pin}' - - if not os.path.exists(gpio_path): - with open(export_path, 'w') as f: - f.write(str(pin)) - time.sleep(0.1) - - direction_path = f'{gpio_path}/direction' - with open(direction_path, 'w') as f: - f.write('out') - - self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') - logger.info(f"GPIO {pin} 初始化完成") - except Exception as e: - logger.error(f"GPIO {pin} 初始化失败: {str(e)}") - raise - - def set_speed(self, pin_id, speed, error): - with self.lock: - if pin_id not in self.pins: - raise ValueError(f"无效的GPIO引脚: {pin_id}") - - # 停止现有线程 - if pin_id in self.threads: - self.stop_flags[pin_id] = True - self.threads[pin_id].join() - - if speed <= 0: - self._write_value(pin_id, 0) - return - - # 计算间隔时间(400脉冲/转) - if error>20: - interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平 - else: - interval = 1.0 / (speed) # 每个周期包含高低电平 - self.stop_flags[pin_id] = False - self.threads[pin_id] = threading.Thread( - target=self._pulse_loop, - args=(pin_id, interval), - daemon=True - ) - self.threads[pin_id].start() - - def _pulse_loop(self, pin_id, interval): - # print(interval) - while not self.stop_flags.get(pin_id, False): - self._write_value(pin_id, 1) - time.sleep(interval) - # precise_delay_us(100) - self._write_value(pin_id, 0) - time.sleep(interval) - # precise_delay_us(100) - - def _write_value(self, pin_id, value): - try: - self.gpio_files[pin_id].seek(0) - self.gpio_files[pin_id].write(str(value)) - self.gpio_files[pin_id].truncate() - self.gpio_files[pin_id].flush() - os.fsync(self.gpio_files[pin_id].fileno()) - except IOError as e: - logger.error(f"GPIO写入错误: {str(e)}") - - def cleanup(self): - with self.lock: - for pin in self.pins: - if pin in self.threads: - self.stop_flags[pin] = True - self.threads[pin].join() - self._write_value(pin, 0) - for f in self.gpio_files.values(): - f.close() - logger.info("GPIO资源已释放") - - -class RS485Reader: - def __init__(self, config): - self.port = config['serial']['port'] - self.baudrate = config['serial']['baudrate'] - self.ser = None - self.current_weight = 0 - self.running = True - self.lock = threading.Lock() - self.buffer = deque(maxlen=10) # 数据缓冲 - self._connect() - - def _connect(self): - try: - self.ser = serial.Serial( - port=self.port, - baudrate=self.baudrate, - bytesize=8, - parity='N', - stopbits=1, - timeout=0.1 - ) - logger.info(f"成功连接串口设备 {self.port}") - except Exception as e: - logger.error(f"串口连接失败: {str(e)}") - raise - - def read_weight(self): - cmd = bytes.fromhex('010300000002c40b') - try: - self.ser.write(cmd) - response = self.ser.read(9) - if len(response) == 9 and response[:3] == bytes.fromhex("010304"): - weight = int.from_bytes(response[3:5], 'big') - with self.lock: - self.buffer.append(weight) - self.current_weight = sum(self.buffer) // len(self.buffer) - return True - return False - except Exception as e: - logger.error(f"读取重量失败: {str(e)}") - self._reconnect() - return False - - def tare(self): # 去皮方法 - cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 - try: - if self.read_weight(): # 确保有最新数据 - self.ser.write(cmd) - response = self.ser.read(8) - if len(response) == 8 and response[:3] == bytes.fromhex("010600"): - self.buffer.clear() # 清空缓冲 - self.current_weight = 0 # 重置当前重量 - self.read_weight() # 确保有最新数据 - return True - else: - self.buffer.clear() # 清空缓冲 - return False - return False - except Exception as e: - logger.error(f"去皮操作失败: {str(e)}") - self._reconnect() - return False - - - def _reconnect(self): - max_retries = 3 - for i in range(max_retries): - try: - if self.ser: - self.ser.close() - self._connect() - return True - except Exception as e: - logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") - time.sleep(1) - logger.error("串口重连失败") - return False - - def stop(self): - self.running = False - if self.ser and self.ser.is_open: - self.ser.close() - - -class ControlAlgorithm(ABC): - @abstractmethod - def calculate_speed(self, current, target): - pass - - -class PIDAlgorithm(ControlAlgorithm): - def __init__(self, kp=0.5, ki=0.01, kd=0.1): - self.kp = kp - self.ki = ki - self.kd = kd - self.integral = 0 - self.last_error = 0 - self.min_speed = 10 # 最小运行速度 - self.max_speed = 100 # 最大运行速度 - - def calculate_speed(self, current, target): - error = target - current - self.integral += error - derivative = error - self.last_error - self.last_error = error - - # 抗积分饱和 - if self.integral > 1000: - self.integral = 1000 - elif self.integral < -1000: - self.integral = -1000 - - output = self.kp * error + self.ki * self.integral + self.kd * derivative - #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 - #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 - return max(self.min_speed, min(self.max_speed, output)), error # - - -class FuzzyLogicAlgorithm(ControlAlgorithm): - def calculate_speed(self, current, target): - error = target - current - abs_error = abs(error) - - if abs_error > 100: - return 100 - elif abs_error > 50: - return 70 - elif abs_error > 20: - return 50 - else: - return 30 - - -class NetworkHandler: - def __init__(self, config): - self.host = config['network']['host'] - self.port = config['network']['port'] - self.sock = None - self.command_queue = Queue() - self.status = { - 'measuring': False, - 'error': None, - 'current_weight': 0, - 'target_weight': 0, - 'algorithm': 'pid', - 'set_tare': False, - 'set_tare_num_time': 0, - 'get_weight': False, - 'set_vibrate': False, - 'set_vibrate_time': 0 - } - self.lock = threading.Lock() - self.running = True - - def start_server(self): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self.sock.bind((self.host, self.port)) - self.sock.listen(5) - logger.info(f"网络服务启动于 {self.host}:{self.port}") - - while self.running: - conn, addr = self.sock.accept() - threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() - except Exception as e: - logger.error(f"网络服务异常: {str(e)}") - finally: - self.sock.close() - - def _handle_client(self, conn, addr): - try: - data = conn.recv(1024) - if data: - cmd = json.loads(data.decode()) - self._process_command(cmd) - time.sleep(0.5) - # 返回当前状态 - with self.lock: - response = json.dumps(self.status) - conn.send(response.encode()) - except json.JSONDecodeError: - logger.warning("收到无效的JSON指令") - except Exception as e: - logger.error(f"客户端处理异常: {str(e)}") - finally: - conn.close() - - def _process_command(self, cmd): - with self.lock: - if cmd.get('command') == 'set_target': - - self.status['target_weight'] = cmd['payload']['target_weight'] - self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') - self.status['measuring'] = True - print("收到指令set_target:", self.status['target_weight']) - elif cmd.get('command') == 'stop': - self.status['measuring'] = False - - elif cmd.get('command') == 'set_zero': - self.status['set_tare'] = True - - elif cmd.get('command') == 'get_weight': - self.status['get_weight'] = True - - elif cmd.get('command') == 'set_vibrate': - self.status['set_vibrate'] = True - self.status['set_vibrate_time'] = cmd['payload']['time'] - - -class Controller: - def __init__(self, config_path='config.yaml'): - - self.config = parse_config(config_path) - - # 初始化组件 - self.gpio = GPIOManager(self.config) - self.sensor = RS485Reader(self.config) - self.network = NetworkHandler(self.config) - self.algorithm = None - self.running = False - - # 共享状态 - self.current_weight = 0 - self.target_weight = 0 - self.system_status = 'idle' - self.status_lock = threading.Lock() - - # 看门狗线程 - self.watchdog = threading.Thread(target=self._watchdog, daemon=True) - - def start(self): - self.running = True - threads = [ - threading.Thread(target=self._weight_reading_loop), - threading.Thread(target=self._control_loop), - threading.Thread(target=self.network.start_server), - threading.Thread(target=self._control_vibrate), - self.watchdog - ] - for t in threads: - t.start() - logger.info("系统启动完成") - - def _weight_reading_loop(self): - while self.running: - if self.sensor.read_weight(): - with self.sensor.lock: - self.current_weight = self.sensor.current_weight - with self.status_lock: - self.network.status['current_weight'] = self.current_weight - if self.current_weight >= self.network.status['target_weight']: - self.network.status['measuring'] = False - - if self.network.status['set_tare'] == True: - if self.sensor.tare(): - self.network.status['set_tare_num_time'] +=1 - self.network.status['set_tare'] = False - - if self.network.status['get_weight'] == True: - with self.sensor.lock: - self.network.status['current_weight'] = self.sensor.current_weight - self.network.status['get_weight'] = False - time.sleep(0.05) # 20Hz - # time.sleep(0.1) # 20Hz - - def _control_loop(self): - while self.running: - with self.status_lock: - measuring = self.network.status['measuring'] - target = self.network.status['target_weight'] - algorithm = self.network.status['algorithm'] - - if measuring: - self._select_algorithm(algorithm) - speed, error = self.algorithm.calculate_speed(self.current_weight, target) - rpm = speed # 假设速度单位直接对应RPM - pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 - self.gpio.set_speed(8, pulse_rate, error) - else: - self.gpio.set_speed(8, 0, 0) - while not self.running and not self.network.status['measuring']: - time.sleep(0.1) # 短暂休眠,避免CPU占用过高 - - time.sleep(0.1) - - def _control_vibrate(self): # 振动控制 - while self.running: - with self.status_lock: - vibrating = self.network.status['set_vibrate'] - vibrate_time = self.network.status['set_vibrate_time'] - - if vibrating: - self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate) - time.sleep(vibrate_time) - self.gpio._write_value(9, 0) - self.network.status['set_vibrate'] = False - self.network.status['set_vibrate_time'] = 0 - time.sleep(1) - def _select_algorithm(self, name): - if name == 'pid': - params = self.config['algorithm']['params']['pid'] - self.algorithm = PIDAlgorithm(**params) - elif name == 'fuzzy': - self.algorithm = FuzzyLogicAlgorithm() - else: - self.algorithm = PIDAlgorithm() - - def _watchdog(self): - while self.running: - # 检查线程状态 - threads_alive = all([ - threading.current_thread().is_alive(), - self.sensor.running, - self.network.running - ]) - - if not threads_alive: - logger.critical("检测到线程异常,触发紧急停止!") - self.emergency_stop() - break - - time.sleep(5) - - def emergency_stop(self): - with self.status_lock: - self.network.status['measuring'] = False - self.network.status['error'] = 'emergency_stop' - self.gpio.cleanup() - self.sensor.stop() - self.running = False - logger.info("系统已紧急停止") - - def shutdown(self): - self.running = False - self.gpio.cleanup() - self.sensor.stop() - self.network.running = False - logger.info("系统正常关闭") - - -if __name__ == "__main__": - ctrl = Controller() - try: - ctrl.start() - while True: - time.sleep(1) - except KeyboardInterrupt: - logger.info("收到停止信号...") - finally: - ctrl.shutdown() \ No newline at end of file