From 76d8086a780b6801f63f7d0cf8e962fd3e9abb7b Mon Sep 17 00:00:00 2001 From: HJW <1576345902@qq.com> Date: Tue, 18 Feb 2025 11:28:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=86=E8=83=BA=E8=AE=A1=E9=87=8F=E7=A8=8B?= =?UTF-8?q?=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ControlAlgor.py | 58 ++++++ Controller.py | 159 +++++++++++++++ Gpio.py | 115 +++++++++++ Network.py | 103 ++++++++++ RS485.py | 109 ++++++++++ config.yaml | 19 ++ controller.log | 0 main.py | 34 ++++ send_target.py | 50 +++++ test.py | 448 +++++++++++++++++++++++++++++++++++++++++ test2.py | 522 ++++++++++++++++++++++++++++++++++++++++++++++++ tool.py | 57 ++++++ 12 files changed, 1674 insertions(+) create mode 100644 ControlAlgor.py create mode 100644 Controller.py create mode 100644 Gpio.py create mode 100644 Network.py create mode 100644 RS485.py create mode 100644 config.yaml create mode 100644 controller.log create mode 100644 main.py create mode 100644 send_target.py create mode 100644 test.py create mode 100644 test2.py create mode 100644 tool.py diff --git a/ControlAlgor.py b/ControlAlgor.py new file mode 100644 index 0000000..bca84b6 --- /dev/null +++ b/ControlAlgor.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:06 +# @Author : hjw +# @File : controlAlgor.py +''' + +from abc import ABC, abstractmethod + + +class ControlAlgorithm(ABC): + @abstractmethod + def calculate_speed(self, current, target): + pass + + +class PIDAlgorithm(ControlAlgorithm): + def __init__(self, kp=0.5, ki=0.01, kd=0.1): + self.kp = kp + self.ki = ki + self.kd = kd + self.integral = 0 + self.last_error = 0 + self.min_speed = 10 # 最小运行速度 + self.max_speed = 100 # 最大运行速度 + + def calculate_speed(self, current, target): + error = target - current + self.integral += error + derivative = error - self.last_error + self.last_error = error + + # 抗积分饱和 + if self.integral > 1000: + self.integral = 1000 + elif self.integral < -1000: + self.integral = -1000 + + output = self.kp * error + self.ki * self.integral + self.kd * derivative + #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 + #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 + return max(self.min_speed, min(self.max_speed, output)), error # + + +class FuzzyLogicAlgorithm(ControlAlgorithm): + def calculate_speed(self, current, target): + error = target - current + abs_error = abs(error) + + if abs_error > 100: + return 100 + elif abs_error > 50: + return 70 + elif abs_error > 20: + return 50 + else: + return 30 diff --git a/Controller.py b/Controller.py new file mode 100644 index 0000000..dfa1498 --- /dev/null +++ b/Controller.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:11 +# @Author : hjw +# @File : controller.py +''' + +import time +import threading +import logging +from Network import NetworkHandler +from ControlAlgor import PIDAlgorithm, FuzzyLogicAlgorithm +from Gpio import GPIOManager +from tool import parse_config +from RS485 import RS485Reader + + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class Controller: + def __init__(self, config_path='./config.yaml'): + + self.config = parse_config(config_path) + + # 初始化组件 + self.gpio = GPIOManager(self.config) + self.sensor = RS485Reader(self.config) + self.network = NetworkHandler(self.config) + self.algorithm = None + self.running = False + + # 共享状态 + self.current_weight = 0 + self.target_weight = 0 + self.system_status = 'idle' + self.status_lock = threading.Lock() + + # 看门狗线程 + self.watchdog = threading.Thread(target=self._watchdog, daemon=True) + + def start(self): + self.running = True + threads = [ + threading.Thread(target=self._weight_reading_loop), + threading.Thread(target=self._control_loop), + threading.Thread(target=self.network.start_server), + threading.Thread(target=self._control_vibrate), + self.watchdog + ] + for t in threads: + t.start() + logger.info("系统启动完成") + + def _weight_reading_loop(self): + while self.running: + if self.sensor.read_weight(): + with self.sensor.lock: + self.current_weight = self.sensor.current_weight + with self.status_lock: + self.network.status['current_weight'] = self.current_weight + if self.current_weight >= self.network.status['target_weight']: + self.network.status['measuring'] = False + + if self.network.status['set_tare'] == True: + if self.sensor.tare(): + self.network.status['set_tare_num_time'] +=1 + self.network.status['set_tare'] = False + + if self.network.status['get_weight'] == True: + with self.sensor.lock: + self.network.status['current_weight'] = self.sensor.current_weight + self.network.status['get_weight'] = False + time.sleep(0.05) # 20Hz + # time.sleep(0.1) # 20Hz + + def _control_loop(self): + while self.running: + with self.status_lock: + measuring = self.network.status['measuring'] + target = self.network.status['target_weight'] + algorithm = self.network.status['algorithm'] + + if measuring: + self._select_algorithm(algorithm) + speed, error = self.algorithm.calculate_speed(self.current_weight, target) + rpm = speed # 假设速度单位直接对应RPM + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 + self.gpio.set_speed(8, pulse_rate, error) + else: + self.gpio.set_speed(8, 0, 0) + while not self.running and not self.network.status['measuring']: + time.sleep(0.1) # 短暂休眠,避免CPU占用过高 + + time.sleep(0.1) + + def _control_vibrate(self): # 振动控制 + while self.running: + with self.status_lock: + vibrating = self.network.status['set_vibrate'] + vibrate_time = self.network.status['set_vibrate_time'] + + if vibrating: + self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate) + time.sleep(vibrate_time) + self.gpio._write_value(9, 0) + self.network.status['set_vibrate'] = False + self.network.status['set_vibrate_time'] = 0 + time.sleep(1) + def _select_algorithm(self, name): + if name == 'pid': + params = self.config['algorithm']['params']['pid'] + self.algorithm = PIDAlgorithm(**params) + elif name == 'fuzzy': + self.algorithm = FuzzyLogicAlgorithm() + else: + self.algorithm = PIDAlgorithm() + + def _watchdog(self): + while self.running: + # 检查线程状态 + threads_alive = all([ + threading.current_thread().is_alive(), + self.sensor.running, + self.network.running + ]) + + if not threads_alive: + logger.critical("检测到线程异常,触发紧急停止!") + self.emergency_stop() + break + + time.sleep(5) + + def emergency_stop(self): + with self.status_lock: + self.network.status['measuring'] = False + self.network.status['error'] = 'emergency_stop' + self.gpio.cleanup() + self.sensor.stop() + self.running = False + logger.info("系统已紧急停止") + + def shutdown(self): + self.running = False + self.gpio.cleanup() + self.sensor.stop() + self.network.running = False + logger.info("系统正常关闭") \ No newline at end of file diff --git a/Gpio.py b/Gpio.py new file mode 100644 index 0000000..07976e1 --- /dev/null +++ b/Gpio.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:03 +# @Author : hjw +# @File : gpio.py +''' + +import threading +import logging +import os +import time + + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class GPIOManager: + def __init__(self, config): + self.pins = config['gpio']['pins'] + self.pulse_per_rev = config['gpio']['pulse_per_rev'] + self.threads = {} + self.stop_flags = {} + self.gpio_files = {} + self.lock = threading.Lock() + + for pin in self.pins: + self._init_gpio(pin) + + def _init_gpio(self, pin): + try: + export_path = '/sys/class/gpio/export' + gpio_path = f'/sys/class/gpio/gpio{pin}' + + if not os.path.exists(gpio_path): + with open(export_path, 'w') as f: + f.write(str(pin)) + time.sleep(0.1) + + direction_path = f'{gpio_path}/direction' + with open(direction_path, 'w') as f: + f.write('out') + + self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') + logger.info(f"GPIO {pin} 初始化完成") + except Exception as e: + logger.error(f"GPIO {pin} 初始化失败: {str(e)}") + raise + + def set_speed(self, pin_id, speed, error): + with self.lock: + if pin_id not in self.pins: + raise ValueError(f"无效的GPIO引脚: {pin_id}") + + # 停止现有线程 + if pin_id in self.threads: + self.stop_flags[pin_id] = True + self.threads[pin_id].join() + + if speed <= 0: + self._write_value(pin_id, 0) + return + + # 计算间隔时间(400脉冲/转) + if error>20: + interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平 + else: + interval = 1.0 / (speed) # 每个周期包含高低电平 + self.stop_flags[pin_id] = False + self.threads[pin_id] = threading.Thread( + target=self._pulse_loop, + args=(pin_id, interval), + daemon=True + ) + self.threads[pin_id].start() + + def _pulse_loop(self, pin_id, interval): + # print(interval) + while not self.stop_flags.get(pin_id, False): + self._write_value(pin_id, 1) + time.sleep(interval) + # precise_delay_us(100) + self._write_value(pin_id, 0) + time.sleep(interval) + # precise_delay_us(100) + + def _write_value(self, pin_id, value): + try: + self.gpio_files[pin_id].seek(0) + self.gpio_files[pin_id].write(str(value)) + self.gpio_files[pin_id].truncate() + self.gpio_files[pin_id].flush() + os.fsync(self.gpio_files[pin_id].fileno()) + except IOError as e: + logger.error(f"GPIO写入错误: {str(e)}") + + def cleanup(self): + with self.lock: + for pin in self.pins: + if pin in self.threads: + self.stop_flags[pin] = True + self.threads[pin].join() + self._write_value(pin, 0) + for f in self.gpio_files.values(): + f.close() + logger.info("GPIO资源已释放") \ No newline at end of file diff --git a/Network.py b/Network.py new file mode 100644 index 0000000..e2de026 --- /dev/null +++ b/Network.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:08 +# @Author : hjw +# @File : Network.py +''' + +import time +import threading +import socket +import logging +import json +from queue import Queue + + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class NetworkHandler: + def __init__(self, config): + self.host = config['network']['host'] + self.port = config['network']['port'] + self.sock = None + self.command_queue = Queue() + self.status = { + 'measuring': False, + 'error': None, + 'current_weight': 0, + 'target_weight': 0, + 'algorithm': 'pid', + 'set_tare': False, + 'set_tare_num_time': 0, + 'get_weight': False, + 'set_vibrate': False, + 'set_vibrate_time': 0 + } + self.lock = threading.Lock() + self.running = True + + def start_server(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + self.sock.listen(5) + logger.info(f"网络服务启动于 {self.host}:{self.port}") + + while self.running: + conn, addr = self.sock.accept() + threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() + except Exception as e: + logger.error(f"网络服务异常: {str(e)}") + finally: + self.sock.close() + + def _handle_client(self, conn, addr): + try: + data = conn.recv(1024) + if data: + cmd = json.loads(data.decode()) + self._process_command(cmd) + time.sleep(0.5) + # 返回当前状态 + with self.lock: + response = json.dumps(self.status) + conn.send(response.encode()) + except json.JSONDecodeError: + logger.warning("收到无效的JSON指令") + except Exception as e: + logger.error(f"客户端处理异常: {str(e)}") + finally: + conn.close() + + def _process_command(self, cmd): + with self.lock: + if cmd.get('command') == 'set_target': + + self.status['target_weight'] = cmd['payload']['target_weight'] + self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') + self.status['measuring'] = True + print("收到指令set_target:", self.status['target_weight']) + elif cmd.get('command') == 'stop': + self.status['measuring'] = False + + elif cmd.get('command') == 'set_zero': + self.status['set_tare'] = True + + elif cmd.get('command') == 'get_weight': + self.status['get_weight'] = True + + elif cmd.get('command') == 'set_vibrate': + self.status['set_vibrate'] = True + self.status['set_vibrate_time'] = cmd['payload']['time'] \ No newline at end of file diff --git a/RS485.py b/RS485.py new file mode 100644 index 0000000..2f9db51 --- /dev/null +++ b/RS485.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 9:59 +# @Author : hjw +# @File : RS485.py +''' + +import time +import threading +import serial +from collections import deque +import logging + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class RS485Reader: + def __init__(self, config): + self.port = config['serial']['port'] + self.baudrate = config['serial']['baudrate'] + self.ser = None + self.current_weight = 0 + self.running = True + self.lock = threading.Lock() + self.buffer = deque(maxlen=10) # 数据缓冲 + self._connect() + + def _connect(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=8, + parity='N', + stopbits=1, + timeout=0.1 + ) + logger.info(f"成功连接串口设备 {self.port}") + except Exception as e: + logger.error(f"串口连接失败: {str(e)}") + raise + + def read_weight(self): + cmd = bytes.fromhex('010300000002c40b') + try: + self.ser.write(cmd) + response = self.ser.read(9) + if len(response) == 9 and response[:3] == bytes.fromhex("010304"): + weight = int.from_bytes(response[3:5], 'big') + with self.lock: + self.buffer.append(weight) + self.current_weight = sum(self.buffer) // len(self.buffer) + return True + return False + except Exception as e: + logger.error(f"读取重量失败: {str(e)}") + self._reconnect() + return False + + def tare(self): # 去皮方法 + cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 + try: + if self.read_weight(): # 确保有最新数据 + self.ser.write(cmd) + response = self.ser.read(8) + if len(response) == 8 and response[:3] == bytes.fromhex("010600"): + self.buffer.clear() # 清空缓冲 + self.current_weight = 0 # 重置当前重量 + self.read_weight() # 确保有最新数据 + return True + else: + self.buffer.clear() # 清空缓冲 + return False + return False + except Exception as e: + logger.error(f"去皮操作失败: {str(e)}") + self._reconnect() + return False + + + def _reconnect(self): + max_retries = 3 + for i in range(max_retries): + try: + if self.ser: + self.ser.close() + self._connect() + return True + except Exception as e: + logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") + time.sleep(1) + logger.error("串口重连失败") + return False + + def stop(self): + self.running = False + if self.ser and self.ser.is_open: + self.ser.close() + diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..124d938 --- /dev/null +++ b/config.yaml @@ -0,0 +1,19 @@ +gpio: + pins: [7,8,9] + pulse_per_rev: 400 + +serial: + port: /dev/ttyS3 + baudrate: 115200 + +network: + host: 0.0.0.0 + port: 5000 + +algorithm: + default: pid + params: + pid: + kp: 0.6 + ki: 0.02 + kd: 0.15 \ No newline at end of file diff --git a/controller.log b/controller.log new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..01c6d3a --- /dev/null +++ b/main.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:43 +# @Author : hjw +# @File : test.py +''' + +import time +import logging +from Controller import Controller + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +if __name__ == "__main__": + ctrl = Controller() + try: + ctrl.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到停止信号...") + finally: + ctrl.shutdown() \ No newline at end of file diff --git a/send_target.py b/send_target.py new file mode 100644 index 0000000..3ae7148 --- /dev/null +++ b/send_target.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:44 +# @Author : hjw +# @File : send_target.py +''' +import socket +import json + +cmd_set_target = { + # 称量 + "command": "set_target", + "payload": { + "target_weight": 200, + "algorithm": "pid" + } +} + +cmd_get_weight = { + # 获取重量 + "command": "get_weight" + # "payload": { + # "target_weight": 200, + # "algorithm": "pid" + # } +} + +cmd_set_zero = { + # 去皮 + "command": "set_zero" + # "payload": { + # "target_weight": 200, + # "algorithm": "pid" + # } +} + + +cmd_set_vibrate = { # 振动控制 + "command": "set_vibrate", + "payload": { + "time": 0 # 单位S + } +} + + +with socket.socket() as s: + s.connect(('127.0.0.1', 5000)) + s.send(json.dumps(cmd_set_target).encode()) + print(s.recv(1024).decode()) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..9cb1f8b --- /dev/null +++ b/test.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:43 +# @Author : hjw +# @File : test.py +''' +import os +import time +import threading +import serial +import socket +import logging +import json +from queue import Queue +from abc import ABC, abstractmethod +from collections import deque + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +def parse_config(file_path): + config = {} + current_level = [config] # 使用栈来跟踪当前层级的字典 + indent_stack = [] # 记录缩进层级 + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() # 去掉行尾的换行符和空格 + if not line or line.startswith('#'): # 跳过空行和注释 + continue + + # 检测缩进层级 + indent = len(line) - len(line.lstrip()) + while indent_stack and indent <= indent_stack[-1]: + current_level.pop() + indent_stack.pop() + + key, value = line.split(':', 1) # 分割键和值 + key = key.strip() + value = value.strip() + + # 处理值 + if value.startswith('[') and value.endswith(']'): # 列表 + value = [int(x.strip()) for x in value[1:-1].split(',')] + elif value.isdigit(): # 数字 + value = int(value) + elif value.replace('.', '', 1).isdigit(): # 浮点数 + value = float(value) + elif value.lower() in ('true', 'false'): # 布尔值 + value = value.lower() == 'true' + + # 插入到当前层级的字典中 + if value: # 如果有值,则直接赋值 + current_level[-1][key] = value + else: # 如果没有值,则创建一个新字典 + new_dict = {} + current_level[-1][key] = new_dict + current_level.append(new_dict) + indent_stack.append(indent) + + return config + + +class GPIOManager: + def __init__(self, config): + self.pins = config['gpio']['pins'] + self.pulse_per_rev = config['gpio']['pulse_per_rev'] + self.threads = {} + self.stop_flags = {} + self.gpio_files = {} + self.lock = threading.Lock() + + for pin in self.pins: + self._init_gpio(pin) + + def _init_gpio(self, pin): + try: + export_path = '/sys/class/gpio/export' + gpio_path = f'/sys/class/gpio/gpio{pin}' + + if not os.path.exists(gpio_path): + with open(export_path, 'w') as f: + f.write(str(pin)) + time.sleep(0.1) + + direction_path = f'{gpio_path}/direction' + with open(direction_path, 'w') as f: + f.write('out') + + self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') + logger.info(f"GPIO {pin} 初始化完成") + except Exception as e: + logger.error(f"GPIO {pin} 初始化失败: {str(e)}") + raise + + def set_speed(self, pin_id, speed): + with self.lock: + if pin_id not in self.pins: + raise ValueError(f"无效的GPIO引脚: {pin_id}") + + # 停止现有线程 + if pin_id in self.threads: + self.stop_flags[pin_id] = True + self.threads[pin_id].join() + + if speed <= 0: + self._write_value(pin_id, 0) + return + + # 计算间隔时间(400脉冲/转) + interval = 1.0 / (2 * speed) # 每个周期包含高低电平 + self.stop_flags[pin_id] = False + self.threads[pin_id] = threading.Thread( + target=self._pulse_loop, + args=(pin_id, interval), + daemon=True + ) + self.threads[pin_id].start() + + def _pulse_loop(self, pin_id, interval): + while not self.stop_flags.get(pin_id, False): + self._write_value(pin_id, 1) + time.sleep(interval) + self._write_value(pin_id, 0) + time.sleep(interval) + + def _write_value(self, pin_id, value): + try: + self.gpio_files[pin_id].seek(0) + self.gpio_files[pin_id].write(str(value)) + self.gpio_files[pin_id].truncate() + self.gpio_files[pin_id].flush() + os.fsync(self.gpio_files[pin_id].fileno()) + except IOError as e: + logger.error(f"GPIO写入错误: {str(e)}") + + def cleanup(self): + with self.lock: + for pin in self.pins: + if pin in self.threads: + self.stop_flags[pin] = True + self.threads[pin].join() + self._write_value(pin, 0) + for f in self.gpio_files.values(): + f.close() + logger.info("GPIO资源已释放") + + +class RS485Reader: + def __init__(self, config): + self.port = config['serial']['port'] + self.baudrate = config['serial']['baudrate'] + self.ser = None + self.current_weight = 0 + self.running = True + self.lock = threading.Lock() + self.buffer = deque(maxlen=10) # 数据缓冲 + self._connect() + + def _connect(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=8, + parity='N', + stopbits=1, + timeout=0.1 + ) + logger.info(f"成功连接串口设备 {self.port}") + except Exception as e: + logger.error(f"串口连接失败: {str(e)}") + raise + + def read_weight(self): + cmd = bytes.fromhex('010300000002c40b') + try: + self.ser.write(cmd) + response = self.ser.read(9) + if len(response) == 9 and response[:3] == bytes.fromhex("010304"): + weight = int.from_bytes(response[3:5], 'big') + with self.lock: + self.buffer.append(weight) + self.current_weight = sum(self.buffer) // len(self.buffer) + return True + return False + except Exception as e: + logger.error(f"读取重量失败: {str(e)}") + self._reconnect() + return False + + def _reconnect(self): + max_retries = 3 + for i in range(max_retries): + try: + if self.ser: + self.ser.close() + self._connect() + return True + except Exception as e: + logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") + time.sleep(1) + logger.error("串口重连失败") + return False + + def stop(self): + self.running = False + if self.ser and self.ser.is_open: + self.ser.close() + + +class ControlAlgorithm(ABC): + @abstractmethod + def calculate_speed(self, current, target): + pass + + +class PIDAlgorithm(ControlAlgorithm): + def __init__(self, kp=0.5, ki=0.01, kd=0.1): + self.kp = kp + self.ki = ki + self.kd = kd + self.integral = 0 + self.last_error = 0 + self.min_speed = 10 # 最小运行速度 + self.max_speed = 100 # 最大运行速度 + + def calculate_speed(self, current, target): + error = target - current + self.integral += error + derivative = error - self.last_error + self.last_error = error + + # 抗积分饱和 + if self.integral > 1000: + self.integral = 1000 + elif self.integral < -1000: + self.integral = -1000 + + output = self.kp * error + self.ki * self.integral + self.kd * derivative + #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 + #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 + return max(self.min_speed, min(self.max_speed, output)) # + + +class FuzzyLogicAlgorithm(ControlAlgorithm): + def calculate_speed(self, current, target): + error = target - current + abs_error = abs(error) + + if abs_error > 100: + return 100 + elif abs_error > 50: + return 70 + elif abs_error > 20: + return 50 + else: + return 30 + + +class NetworkHandler: + def __init__(self, config): + self.host = config['network']['host'] + self.port = config['network']['port'] + self.sock = None + self.command_queue = Queue() + self.status = { + 'measuring': False, + 'error': None, + 'current_weight': 0, + 'target_weight': 0, + 'algorithm': 'pid' + } + self.lock = threading.Lock() + self.running = True + + def start_server(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + self.sock.listen(5) + logger.info(f"网络服务启动于 {self.host}:{self.port}") + + while self.running: + conn, addr = self.sock.accept() + threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() + except Exception as e: + logger.error(f"网络服务异常: {str(e)}") + finally: + self.sock.close() + + def _handle_client(self, conn, addr): + try: + data = conn.recv(1024) + if data: + cmd = json.loads(data.decode()) + self._process_command(cmd) + + # 返回当前状态 + with self.lock: + response = json.dumps(self.status) + conn.send(response.encode()) + except json.JSONDecodeError: + logger.warning("收到无效的JSON指令") + except Exception as e: + logger.error(f"客户端处理异常: {str(e)}") + finally: + conn.close() + + def _process_command(self, cmd): + with self.lock: + if cmd.get('command') == 'set_target': + + self.status['target_weight'] = cmd['payload']['target_weight'] + self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') + self.status['measuring'] = True + print("收到指令set_target:", self.status['target_weight']) + elif cmd.get('command') == 'stop': + self.status['measuring'] = False + + +class Controller: + def __init__(self, config_path='config.yaml'): + + self.config = parse_config(config_path) + + # 初始化组件 + self.gpio = GPIOManager(self.config) + self.sensor = RS485Reader(self.config) + self.network = NetworkHandler(self.config) + self.algorithm = None + self.running = False + + # 共享状态 + self.current_weight = 0 + self.target_weight = 0 + self.system_status = 'idle' + self.status_lock = threading.Lock() + + # 看门狗线程 + self.watchdog = threading.Thread(target=self._watchdog, daemon=True) + + def start(self): + self.running = True + threads = [ + threading.Thread(target=self._weight_reading_loop), + threading.Thread(target=self._control_loop), + threading.Thread(target=self.network.start_server), + self.watchdog + ] + for t in threads: + t.start() + logger.info("系统启动完成") + + def _weight_reading_loop(self): + while self.running: + if self.sensor.read_weight(): + with self.sensor.lock: + self.current_weight = self.sensor.current_weight + with self.status_lock: + self.network.status['current_weight'] = self.current_weight + if self.current_weight >= self.network.status['target_weight']: + self.network.status['measuring'] = False + time.sleep(0.05) # 20Hz + #time.sleep(0.1) # 20Hz + + def _control_loop(self): + while self.running: + with self.status_lock: + measuring = self.network.status['measuring'] + target = self.network.status['target_weight'] + algorithm = self.network.status['algorithm'] + + if measuring: + self._select_algorithm(algorithm) + speed = self.algorithm.calculate_speed(self.current_weight, target) + rpm = speed # 假设速度单位直接对应RPM + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 + self.gpio.set_speed(8, pulse_rate) + else: + self.gpio.set_speed(8, 0) + while not self.running and not self.network.status['measuring']: + time.sleep(0.1) # 短暂休眠,避免CPU占用过高 + time.sleep(0.1) + + def _select_algorithm(self, name): + if name == 'pid': + params = self.config['algorithm']['params']['pid'] + self.algorithm = PIDAlgorithm(**params) + elif name == 'fuzzy': + self.algorithm = FuzzyLogicAlgorithm() + else: + self.algorithm = PIDAlgorithm() + + def _watchdog(self): + while self.running: + # 检查线程状态 + threads_alive = all([ + threading.current_thread().is_alive(), + self.sensor.running, + self.network.running + ]) + + if not threads_alive: + logger.critical("检测到线程异常,触发紧急停止!") + self.emergency_stop() + break + + time.sleep(5) + + def emergency_stop(self): + with self.status_lock: + self.network.status['measuring'] = False + self.network.status['error'] = 'emergency_stop' + self.gpio.cleanup() + self.sensor.stop() + self.running = False + logger.info("系统已紧急停止") + + def shutdown(self): + self.running = False + self.gpio.cleanup() + self.sensor.stop() + self.network.running = False + logger.info("系统正常关闭") + + +if __name__ == "__main__": + ctrl = Controller() + try: + ctrl.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到停止信号...") + finally: + ctrl.shutdown() \ No newline at end of file diff --git a/test2.py b/test2.py new file mode 100644 index 0000000..6d0d695 --- /dev/null +++ b/test2.py @@ -0,0 +1,522 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:43 +# @Author : hjw +# @File : test.py +''' +import os +import time +import threading +import serial +import socket +import logging +import json +from queue import Queue +from abc import ABC, abstractmethod +from collections import deque + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def precise_delay_us(us): + """实现微秒级延时""" + start_time = time.perf_counter() + #print(time.perf_counter()) + while (time.perf_counter() - start_time) * 1e6 < us: + pass + + +def parse_config(file_path): + config = {} + current_level = [config] # 使用栈来跟踪当前层级的字典 + indent_stack = [] # 记录缩进层级 + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() # 去掉行尾的换行符和空格 + if not line or line.startswith('#'): # 跳过空行和注释 + continue + + # 检测缩进层级 + indent = len(line) - len(line.lstrip()) + while indent_stack and indent <= indent_stack[-1]: + current_level.pop() + indent_stack.pop() + + key, value = line.split(':', 1) # 分割键和值 + key = key.strip() + value = value.strip() + + # 处理值 + if value.startswith('[') and value.endswith(']'): # 列表 + value = [int(x.strip()) for x in value[1:-1].split(',')] + elif value.isdigit(): # 数字 + value = int(value) + elif value.replace('.', '', 1).isdigit(): # 浮点数 + value = float(value) + elif value.lower() in ('true', 'false'): # 布尔值 + value = value.lower() == 'true' + + # 插入到当前层级的字典中 + if value: # 如果有值,则直接赋值 + current_level[-1][key] = value + else: # 如果没有值,则创建一个新字典 + new_dict = {} + current_level[-1][key] = new_dict + current_level.append(new_dict) + indent_stack.append(indent) + + return config + + +class GPIOManager: + def __init__(self, config): + self.pins = config['gpio']['pins'] + self.pulse_per_rev = config['gpio']['pulse_per_rev'] + self.threads = {} + self.stop_flags = {} + self.gpio_files = {} + self.lock = threading.Lock() + + for pin in self.pins: + self._init_gpio(pin) + + def _init_gpio(self, pin): + try: + export_path = '/sys/class/gpio/export' + gpio_path = f'/sys/class/gpio/gpio{pin}' + + if not os.path.exists(gpio_path): + with open(export_path, 'w') as f: + f.write(str(pin)) + time.sleep(0.1) + + direction_path = f'{gpio_path}/direction' + with open(direction_path, 'w') as f: + f.write('out') + + self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') + logger.info(f"GPIO {pin} 初始化完成") + except Exception as e: + logger.error(f"GPIO {pin} 初始化失败: {str(e)}") + raise + + def set_speed(self, pin_id, speed, error): + with self.lock: + if pin_id not in self.pins: + raise ValueError(f"无效的GPIO引脚: {pin_id}") + + # 停止现有线程 + if pin_id in self.threads: + self.stop_flags[pin_id] = True + self.threads[pin_id].join() + + if speed <= 0: + self._write_value(pin_id, 0) + return + + # 计算间隔时间(400脉冲/转) + if error>20: + interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平 + else: + interval = 1.0 / (speed) # 每个周期包含高低电平 + self.stop_flags[pin_id] = False + self.threads[pin_id] = threading.Thread( + target=self._pulse_loop, + args=(pin_id, interval), + daemon=True + ) + self.threads[pin_id].start() + + def _pulse_loop(self, pin_id, interval): + # print(interval) + while not self.stop_flags.get(pin_id, False): + self._write_value(pin_id, 1) + time.sleep(interval) + # precise_delay_us(100) + self._write_value(pin_id, 0) + time.sleep(interval) + # precise_delay_us(100) + + def _write_value(self, pin_id, value): + try: + self.gpio_files[pin_id].seek(0) + self.gpio_files[pin_id].write(str(value)) + self.gpio_files[pin_id].truncate() + self.gpio_files[pin_id].flush() + os.fsync(self.gpio_files[pin_id].fileno()) + except IOError as e: + logger.error(f"GPIO写入错误: {str(e)}") + + def cleanup(self): + with self.lock: + for pin in self.pins: + if pin in self.threads: + self.stop_flags[pin] = True + self.threads[pin].join() + self._write_value(pin, 0) + for f in self.gpio_files.values(): + f.close() + logger.info("GPIO资源已释放") + + +class RS485Reader: + def __init__(self, config): + self.port = config['serial']['port'] + self.baudrate = config['serial']['baudrate'] + self.ser = None + self.current_weight = 0 + self.running = True + self.lock = threading.Lock() + self.buffer = deque(maxlen=10) # 数据缓冲 + self._connect() + + def _connect(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=8, + parity='N', + stopbits=1, + timeout=0.1 + ) + logger.info(f"成功连接串口设备 {self.port}") + except Exception as e: + logger.error(f"串口连接失败: {str(e)}") + raise + + def read_weight(self): + cmd = bytes.fromhex('010300000002c40b') + try: + self.ser.write(cmd) + response = self.ser.read(9) + if len(response) == 9 and response[:3] == bytes.fromhex("010304"): + weight = int.from_bytes(response[3:5], 'big') + with self.lock: + self.buffer.append(weight) + self.current_weight = sum(self.buffer) // len(self.buffer) + return True + return False + except Exception as e: + logger.error(f"读取重量失败: {str(e)}") + self._reconnect() + return False + + def tare(self): # 去皮方法 + cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 + try: + if self.read_weight(): # 确保有最新数据 + self.ser.write(cmd) + response = self.ser.read(8) + if len(response) == 8 and response[:3] == bytes.fromhex("010600"): + self.buffer.clear() # 清空缓冲 + self.current_weight = 0 # 重置当前重量 + self.read_weight() # 确保有最新数据 + return True + else: + self.buffer.clear() # 清空缓冲 + return False + return False + except Exception as e: + logger.error(f"去皮操作失败: {str(e)}") + self._reconnect() + return False + + + def _reconnect(self): + max_retries = 3 + for i in range(max_retries): + try: + if self.ser: + self.ser.close() + self._connect() + return True + except Exception as e: + logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") + time.sleep(1) + logger.error("串口重连失败") + return False + + def stop(self): + self.running = False + if self.ser and self.ser.is_open: + self.ser.close() + + +class ControlAlgorithm(ABC): + @abstractmethod + def calculate_speed(self, current, target): + pass + + +class PIDAlgorithm(ControlAlgorithm): + def __init__(self, kp=0.5, ki=0.01, kd=0.1): + self.kp = kp + self.ki = ki + self.kd = kd + self.integral = 0 + self.last_error = 0 + self.min_speed = 10 # 最小运行速度 + self.max_speed = 100 # 最大运行速度 + + def calculate_speed(self, current, target): + error = target - current + self.integral += error + derivative = error - self.last_error + self.last_error = error + + # 抗积分饱和 + if self.integral > 1000: + self.integral = 1000 + elif self.integral < -1000: + self.integral = -1000 + + output = self.kp * error + self.ki * self.integral + self.kd * derivative + #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 + #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 + return max(self.min_speed, min(self.max_speed, output)), error # + + +class FuzzyLogicAlgorithm(ControlAlgorithm): + def calculate_speed(self, current, target): + error = target - current + abs_error = abs(error) + + if abs_error > 100: + return 100 + elif abs_error > 50: + return 70 + elif abs_error > 20: + return 50 + else: + return 30 + + +class NetworkHandler: + def __init__(self, config): + self.host = config['network']['host'] + self.port = config['network']['port'] + self.sock = None + self.command_queue = Queue() + self.status = { + 'measuring': False, + 'error': None, + 'current_weight': 0, + 'target_weight': 0, + 'algorithm': 'pid', + 'set_tare': False, + 'set_tare_num_time': 0, + 'get_weight': False, + 'set_vibrate': False, + 'set_vibrate_time': 0 + } + self.lock = threading.Lock() + self.running = True + + def start_server(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + self.sock.listen(5) + logger.info(f"网络服务启动于 {self.host}:{self.port}") + + while self.running: + conn, addr = self.sock.accept() + threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() + except Exception as e: + logger.error(f"网络服务异常: {str(e)}") + finally: + self.sock.close() + + def _handle_client(self, conn, addr): + try: + data = conn.recv(1024) + if data: + cmd = json.loads(data.decode()) + self._process_command(cmd) + time.sleep(0.5) + # 返回当前状态 + with self.lock: + response = json.dumps(self.status) + conn.send(response.encode()) + except json.JSONDecodeError: + logger.warning("收到无效的JSON指令") + except Exception as e: + logger.error(f"客户端处理异常: {str(e)}") + finally: + conn.close() + + def _process_command(self, cmd): + with self.lock: + if cmd.get('command') == 'set_target': + + self.status['target_weight'] = cmd['payload']['target_weight'] + self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') + self.status['measuring'] = True + print("收到指令set_target:", self.status['target_weight']) + elif cmd.get('command') == 'stop': + self.status['measuring'] = False + + elif cmd.get('command') == 'set_zero': + self.status['set_tare'] = True + + elif cmd.get('command') == 'get_weight': + self.status['get_weight'] = True + + elif cmd.get('command') == 'set_vibrate': + self.status['set_vibrate'] = True + self.status['set_vibrate_time'] = cmd['payload']['time'] + + +class Controller: + def __init__(self, config_path='config.yaml'): + + self.config = parse_config(config_path) + + # 初始化组件 + self.gpio = GPIOManager(self.config) + self.sensor = RS485Reader(self.config) + self.network = NetworkHandler(self.config) + self.algorithm = None + self.running = False + + # 共享状态 + self.current_weight = 0 + self.target_weight = 0 + self.system_status = 'idle' + self.status_lock = threading.Lock() + + # 看门狗线程 + self.watchdog = threading.Thread(target=self._watchdog, daemon=True) + + def start(self): + self.running = True + threads = [ + threading.Thread(target=self._weight_reading_loop), + threading.Thread(target=self._control_loop), + threading.Thread(target=self.network.start_server), + threading.Thread(target=self._control_vibrate), + self.watchdog + ] + for t in threads: + t.start() + logger.info("系统启动完成") + + def _weight_reading_loop(self): + while self.running: + if self.sensor.read_weight(): + with self.sensor.lock: + self.current_weight = self.sensor.current_weight + with self.status_lock: + self.network.status['current_weight'] = self.current_weight + if self.current_weight >= self.network.status['target_weight']: + self.network.status['measuring'] = False + + if self.network.status['set_tare'] == True: + if self.sensor.tare(): + self.network.status['set_tare_num_time'] +=1 + self.network.status['set_tare'] = False + + if self.network.status['get_weight'] == True: + with self.sensor.lock: + self.network.status['current_weight'] = self.sensor.current_weight + self.network.status['get_weight'] = False + time.sleep(0.05) # 20Hz + # time.sleep(0.1) # 20Hz + + def _control_loop(self): + while self.running: + with self.status_lock: + measuring = self.network.status['measuring'] + target = self.network.status['target_weight'] + algorithm = self.network.status['algorithm'] + + if measuring: + self._select_algorithm(algorithm) + speed, error = self.algorithm.calculate_speed(self.current_weight, target) + rpm = speed # 假设速度单位直接对应RPM + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 + self.gpio.set_speed(8, pulse_rate, error) + else: + self.gpio.set_speed(8, 0, 0) + while not self.running and not self.network.status['measuring']: + time.sleep(0.1) # 短暂休眠,避免CPU占用过高 + + time.sleep(0.1) + + def _control_vibrate(self): # 振动控制 + while self.running: + with self.status_lock: + vibrating = self.network.status['set_vibrate'] + vibrate_time = self.network.status['set_vibrate_time'] + + if vibrating: + self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate) + time.sleep(vibrate_time) + self.gpio._write_value(9, 0) + self.network.status['set_vibrate'] = False + self.network.status['set_vibrate_time'] = 0 + time.sleep(1) + def _select_algorithm(self, name): + if name == 'pid': + params = self.config['algorithm']['params']['pid'] + self.algorithm = PIDAlgorithm(**params) + elif name == 'fuzzy': + self.algorithm = FuzzyLogicAlgorithm() + else: + self.algorithm = PIDAlgorithm() + + def _watchdog(self): + while self.running: + # 检查线程状态 + threads_alive = all([ + threading.current_thread().is_alive(), + self.sensor.running, + self.network.running + ]) + + if not threads_alive: + logger.critical("检测到线程异常,触发紧急停止!") + self.emergency_stop() + break + + time.sleep(5) + + def emergency_stop(self): + with self.status_lock: + self.network.status['measuring'] = False + self.network.status['error'] = 'emergency_stop' + self.gpio.cleanup() + self.sensor.stop() + self.running = False + logger.info("系统已紧急停止") + + def shutdown(self): + self.running = False + self.gpio.cleanup() + self.sensor.stop() + self.network.running = False + logger.info("系统正常关闭") + + +if __name__ == "__main__": + ctrl = Controller() + try: + ctrl.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到停止信号...") + finally: + ctrl.shutdown() \ No newline at end of file diff --git a/tool.py b/tool.py new file mode 100644 index 0000000..1c03721 --- /dev/null +++ b/tool.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:02 +# @Author : hjw +# @File : tool.py +''' +import time + +def parse_config(file_path): + config = {} + current_level = [config] # 使用栈来跟踪当前层级的字典 + indent_stack = [] # 记录缩进层级 + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() # 去掉行尾的换行符和空格 + if not line or line.startswith('#'): # 跳过空行和注释 + continue + + # 检测缩进层级 + indent = len(line) - len(line.lstrip()) + while indent_stack and indent <= indent_stack[-1]: + current_level.pop() + indent_stack.pop() + + key, value = line.split(':', 1) # 分割键和值 + key = key.strip() + value = value.strip() + + # 处理值 + if value.startswith('[') and value.endswith(']'): # 列表 + value = [int(x.strip()) for x in value[1:-1].split(',')] + elif value.isdigit(): # 数字 + value = int(value) + elif value.replace('.', '', 1).isdigit(): # 浮点数 + value = float(value) + elif value.lower() in ('true', 'false'): # 布尔值 + value = value.lower() == 'true' + + # 插入到当前层级的字典中 + if value: # 如果有值,则直接赋值 + current_level[-1][key] = value + else: # 如果没有值,则创建一个新字典 + new_dict = {} + current_level[-1][key] = new_dict + current_level.append(new_dict) + indent_stack.append(indent) + + return config + +def precise_delay_us(us): + """实现微秒级延时""" + start_time = time.perf_counter() + #print(time.perf_counter()) + while (time.perf_counter() - start_time) * 1e6 < us: + pass \ No newline at end of file