commit 76d8086a780b6801f63f7d0cf8e962fd3e9abb7b Author: HJW <1576345902@qq.com> Date: Tue Feb 18 11:28:24 2025 +0800 密胺计量程序 diff --git a/ControlAlgor.py b/ControlAlgor.py new file mode 100644 index 0000000..bca84b6 --- /dev/null +++ b/ControlAlgor.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:06 +# @Author : hjw +# @File : controlAlgor.py +''' + +from abc import ABC, abstractmethod + + +class ControlAlgorithm(ABC): + @abstractmethod + def calculate_speed(self, current, target): + pass + + +class PIDAlgorithm(ControlAlgorithm): + def __init__(self, kp=0.5, ki=0.01, kd=0.1): + self.kp = kp + self.ki = ki + self.kd = kd + self.integral = 0 + self.last_error = 0 + self.min_speed = 10 # 最小运行速度 + self.max_speed = 100 # 最大运行速度 + + def calculate_speed(self, current, target): + error = target - current + self.integral += error + derivative = error - self.last_error + self.last_error = error + + # 抗积分饱和 + if self.integral > 1000: + self.integral = 1000 + elif self.integral < -1000: + self.integral = -1000 + + output = self.kp * error + self.ki * self.integral + self.kd * derivative + #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 + #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 + return max(self.min_speed, min(self.max_speed, output)), error # + + +class FuzzyLogicAlgorithm(ControlAlgorithm): + def calculate_speed(self, current, target): + error = target - current + abs_error = abs(error) + + if abs_error > 100: + return 100 + elif abs_error > 50: + return 70 + elif abs_error > 20: + return 50 + else: + return 30 diff --git a/Controller.py b/Controller.py new file mode 100644 index 0000000..dfa1498 --- /dev/null +++ b/Controller.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:11 +# @Author : hjw +# @File : controller.py +''' + +import time +import threading +import logging +from Network import NetworkHandler +from ControlAlgor import PIDAlgorithm, FuzzyLogicAlgorithm +from Gpio import GPIOManager +from tool import parse_config +from RS485 import RS485Reader + + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class Controller: + def __init__(self, config_path='./config.yaml'): + + self.config = parse_config(config_path) + + # 初始化组件 + self.gpio = GPIOManager(self.config) + self.sensor = RS485Reader(self.config) + self.network = NetworkHandler(self.config) + self.algorithm = None + self.running = False + + # 共享状态 + self.current_weight = 0 + self.target_weight = 0 + self.system_status = 'idle' + self.status_lock = threading.Lock() + + # 看门狗线程 + self.watchdog = threading.Thread(target=self._watchdog, daemon=True) + + def start(self): + self.running = True + threads = [ + threading.Thread(target=self._weight_reading_loop), + threading.Thread(target=self._control_loop), + threading.Thread(target=self.network.start_server), + threading.Thread(target=self._control_vibrate), + self.watchdog + ] + for t in threads: + t.start() + logger.info("系统启动完成") + + def _weight_reading_loop(self): + while self.running: + if self.sensor.read_weight(): + with self.sensor.lock: + self.current_weight = self.sensor.current_weight + with self.status_lock: + self.network.status['current_weight'] = self.current_weight + if self.current_weight >= self.network.status['target_weight']: + self.network.status['measuring'] = False + + if self.network.status['set_tare'] == True: + if self.sensor.tare(): + self.network.status['set_tare_num_time'] +=1 + self.network.status['set_tare'] = False + + if self.network.status['get_weight'] == True: + with self.sensor.lock: + self.network.status['current_weight'] = self.sensor.current_weight + self.network.status['get_weight'] = False + time.sleep(0.05) # 20Hz + # time.sleep(0.1) # 20Hz + + def _control_loop(self): + while self.running: + with self.status_lock: + measuring = self.network.status['measuring'] + target = self.network.status['target_weight'] + algorithm = self.network.status['algorithm'] + + if measuring: + self._select_algorithm(algorithm) + speed, error = self.algorithm.calculate_speed(self.current_weight, target) + rpm = speed # 假设速度单位直接对应RPM + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 + self.gpio.set_speed(8, pulse_rate, error) + else: + self.gpio.set_speed(8, 0, 0) + while not self.running and not self.network.status['measuring']: + time.sleep(0.1) # 短暂休眠,避免CPU占用过高 + + time.sleep(0.1) + + def _control_vibrate(self): # 振动控制 + while self.running: + with self.status_lock: + vibrating = self.network.status['set_vibrate'] + vibrate_time = self.network.status['set_vibrate_time'] + + if vibrating: + self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate) + time.sleep(vibrate_time) + self.gpio._write_value(9, 0) + self.network.status['set_vibrate'] = False + self.network.status['set_vibrate_time'] = 0 + time.sleep(1) + def _select_algorithm(self, name): + if name == 'pid': + params = self.config['algorithm']['params']['pid'] + self.algorithm = PIDAlgorithm(**params) + elif name == 'fuzzy': + self.algorithm = FuzzyLogicAlgorithm() + else: + self.algorithm = PIDAlgorithm() + + def _watchdog(self): + while self.running: + # 检查线程状态 + threads_alive = all([ + threading.current_thread().is_alive(), + self.sensor.running, + self.network.running + ]) + + if not threads_alive: + logger.critical("检测到线程异常,触发紧急停止!") + self.emergency_stop() + break + + time.sleep(5) + + def emergency_stop(self): + with self.status_lock: + self.network.status['measuring'] = False + self.network.status['error'] = 'emergency_stop' + self.gpio.cleanup() + self.sensor.stop() + self.running = False + logger.info("系统已紧急停止") + + def shutdown(self): + self.running = False + self.gpio.cleanup() + self.sensor.stop() + self.network.running = False + logger.info("系统正常关闭") \ No newline at end of file diff --git a/Gpio.py b/Gpio.py new file mode 100644 index 0000000..07976e1 --- /dev/null +++ b/Gpio.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:03 +# @Author : hjw +# @File : gpio.py +''' + +import threading +import logging +import os +import time + + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class GPIOManager: + def __init__(self, config): + self.pins = config['gpio']['pins'] + self.pulse_per_rev = config['gpio']['pulse_per_rev'] + self.threads = {} + self.stop_flags = {} + self.gpio_files = {} + self.lock = threading.Lock() + + for pin in self.pins: + self._init_gpio(pin) + + def _init_gpio(self, pin): + try: + export_path = '/sys/class/gpio/export' + gpio_path = f'/sys/class/gpio/gpio{pin}' + + if not os.path.exists(gpio_path): + with open(export_path, 'w') as f: + f.write(str(pin)) + time.sleep(0.1) + + direction_path = f'{gpio_path}/direction' + with open(direction_path, 'w') as f: + f.write('out') + + self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') + logger.info(f"GPIO {pin} 初始化完成") + except Exception as e: + logger.error(f"GPIO {pin} 初始化失败: {str(e)}") + raise + + def set_speed(self, pin_id, speed, error): + with self.lock: + if pin_id not in self.pins: + raise ValueError(f"无效的GPIO引脚: {pin_id}") + + # 停止现有线程 + if pin_id in self.threads: + self.stop_flags[pin_id] = True + self.threads[pin_id].join() + + if speed <= 0: + self._write_value(pin_id, 0) + return + + # 计算间隔时间(400脉冲/转) + if error>20: + interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平 + else: + interval = 1.0 / (speed) # 每个周期包含高低电平 + self.stop_flags[pin_id] = False + self.threads[pin_id] = threading.Thread( + target=self._pulse_loop, + args=(pin_id, interval), + daemon=True + ) + self.threads[pin_id].start() + + def _pulse_loop(self, pin_id, interval): + # print(interval) + while not self.stop_flags.get(pin_id, False): + self._write_value(pin_id, 1) + time.sleep(interval) + # precise_delay_us(100) + self._write_value(pin_id, 0) + time.sleep(interval) + # precise_delay_us(100) + + def _write_value(self, pin_id, value): + try: + self.gpio_files[pin_id].seek(0) + self.gpio_files[pin_id].write(str(value)) + self.gpio_files[pin_id].truncate() + self.gpio_files[pin_id].flush() + os.fsync(self.gpio_files[pin_id].fileno()) + except IOError as e: + logger.error(f"GPIO写入错误: {str(e)}") + + def cleanup(self): + with self.lock: + for pin in self.pins: + if pin in self.threads: + self.stop_flags[pin] = True + self.threads[pin].join() + self._write_value(pin, 0) + for f in self.gpio_files.values(): + f.close() + logger.info("GPIO资源已释放") \ No newline at end of file diff --git a/Network.py b/Network.py new file mode 100644 index 0000000..e2de026 --- /dev/null +++ b/Network.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:08 +# @Author : hjw +# @File : Network.py +''' + +import time +import threading +import socket +import logging +import json +from queue import Queue + + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class NetworkHandler: + def __init__(self, config): + self.host = config['network']['host'] + self.port = config['network']['port'] + self.sock = None + self.command_queue = Queue() + self.status = { + 'measuring': False, + 'error': None, + 'current_weight': 0, + 'target_weight': 0, + 'algorithm': 'pid', + 'set_tare': False, + 'set_tare_num_time': 0, + 'get_weight': False, + 'set_vibrate': False, + 'set_vibrate_time': 0 + } + self.lock = threading.Lock() + self.running = True + + def start_server(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + self.sock.listen(5) + logger.info(f"网络服务启动于 {self.host}:{self.port}") + + while self.running: + conn, addr = self.sock.accept() + threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() + except Exception as e: + logger.error(f"网络服务异常: {str(e)}") + finally: + self.sock.close() + + def _handle_client(self, conn, addr): + try: + data = conn.recv(1024) + if data: + cmd = json.loads(data.decode()) + self._process_command(cmd) + time.sleep(0.5) + # 返回当前状态 + with self.lock: + response = json.dumps(self.status) + conn.send(response.encode()) + except json.JSONDecodeError: + logger.warning("收到无效的JSON指令") + except Exception as e: + logger.error(f"客户端处理异常: {str(e)}") + finally: + conn.close() + + def _process_command(self, cmd): + with self.lock: + if cmd.get('command') == 'set_target': + + self.status['target_weight'] = cmd['payload']['target_weight'] + self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') + self.status['measuring'] = True + print("收到指令set_target:", self.status['target_weight']) + elif cmd.get('command') == 'stop': + self.status['measuring'] = False + + elif cmd.get('command') == 'set_zero': + self.status['set_tare'] = True + + elif cmd.get('command') == 'get_weight': + self.status['get_weight'] = True + + elif cmd.get('command') == 'set_vibrate': + self.status['set_vibrate'] = True + self.status['set_vibrate_time'] = cmd['payload']['time'] \ No newline at end of file diff --git a/RS485.py b/RS485.py new file mode 100644 index 0000000..2f9db51 --- /dev/null +++ b/RS485.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 9:59 +# @Author : hjw +# @File : RS485.py +''' + +import time +import threading +import serial +from collections import deque +import logging + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class RS485Reader: + def __init__(self, config): + self.port = config['serial']['port'] + self.baudrate = config['serial']['baudrate'] + self.ser = None + self.current_weight = 0 + self.running = True + self.lock = threading.Lock() + self.buffer = deque(maxlen=10) # 数据缓冲 + self._connect() + + def _connect(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=8, + parity='N', + stopbits=1, + timeout=0.1 + ) + logger.info(f"成功连接串口设备 {self.port}") + except Exception as e: + logger.error(f"串口连接失败: {str(e)}") + raise + + def read_weight(self): + cmd = bytes.fromhex('010300000002c40b') + try: + self.ser.write(cmd) + response = self.ser.read(9) + if len(response) == 9 and response[:3] == bytes.fromhex("010304"): + weight = int.from_bytes(response[3:5], 'big') + with self.lock: + self.buffer.append(weight) + self.current_weight = sum(self.buffer) // len(self.buffer) + return True + return False + except Exception as e: + logger.error(f"读取重量失败: {str(e)}") + self._reconnect() + return False + + def tare(self): # 去皮方法 + cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 + try: + if self.read_weight(): # 确保有最新数据 + self.ser.write(cmd) + response = self.ser.read(8) + if len(response) == 8 and response[:3] == bytes.fromhex("010600"): + self.buffer.clear() # 清空缓冲 + self.current_weight = 0 # 重置当前重量 + self.read_weight() # 确保有最新数据 + return True + else: + self.buffer.clear() # 清空缓冲 + return False + return False + except Exception as e: + logger.error(f"去皮操作失败: {str(e)}") + self._reconnect() + return False + + + def _reconnect(self): + max_retries = 3 + for i in range(max_retries): + try: + if self.ser: + self.ser.close() + self._connect() + return True + except Exception as e: + logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") + time.sleep(1) + logger.error("串口重连失败") + return False + + def stop(self): + self.running = False + if self.ser and self.ser.is_open: + self.ser.close() + diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..124d938 --- /dev/null +++ b/config.yaml @@ -0,0 +1,19 @@ +gpio: + pins: [7,8,9] + pulse_per_rev: 400 + +serial: + port: /dev/ttyS3 + baudrate: 115200 + +network: + host: 0.0.0.0 + port: 5000 + +algorithm: + default: pid + params: + pid: + kp: 0.6 + ki: 0.02 + kd: 0.15 \ No newline at end of file diff --git a/controller.log b/controller.log new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..01c6d3a --- /dev/null +++ b/main.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:43 +# @Author : hjw +# @File : test.py +''' + +import time +import logging +from Controller import Controller + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +if __name__ == "__main__": + ctrl = Controller() + try: + ctrl.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到停止信号...") + finally: + ctrl.shutdown() \ No newline at end of file diff --git a/send_target.py b/send_target.py new file mode 100644 index 0000000..3ae7148 --- /dev/null +++ b/send_target.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:44 +# @Author : hjw +# @File : send_target.py +''' +import socket +import json + +cmd_set_target = { + # 称量 + "command": "set_target", + "payload": { + "target_weight": 200, + "algorithm": "pid" + } +} + +cmd_get_weight = { + # 获取重量 + "command": "get_weight" + # "payload": { + # "target_weight": 200, + # "algorithm": "pid" + # } +} + +cmd_set_zero = { + # 去皮 + "command": "set_zero" + # "payload": { + # "target_weight": 200, + # "algorithm": "pid" + # } +} + + +cmd_set_vibrate = { # 振动控制 + "command": "set_vibrate", + "payload": { + "time": 0 # 单位S + } +} + + +with socket.socket() as s: + s.connect(('127.0.0.1', 5000)) + s.send(json.dumps(cmd_set_target).encode()) + print(s.recv(1024).decode()) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..9cb1f8b --- /dev/null +++ b/test.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:43 +# @Author : hjw +# @File : test.py +''' +import os +import time +import threading +import serial +import socket +import logging +import json +from queue import Queue +from abc import ABC, abstractmethod +from collections import deque + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +def parse_config(file_path): + config = {} + current_level = [config] # 使用栈来跟踪当前层级的字典 + indent_stack = [] # 记录缩进层级 + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() # 去掉行尾的换行符和空格 + if not line or line.startswith('#'): # 跳过空行和注释 + continue + + # 检测缩进层级 + indent = len(line) - len(line.lstrip()) + while indent_stack and indent <= indent_stack[-1]: + current_level.pop() + indent_stack.pop() + + key, value = line.split(':', 1) # 分割键和值 + key = key.strip() + value = value.strip() + + # 处理值 + if value.startswith('[') and value.endswith(']'): # 列表 + value = [int(x.strip()) for x in value[1:-1].split(',')] + elif value.isdigit(): # 数字 + value = int(value) + elif value.replace('.', '', 1).isdigit(): # 浮点数 + value = float(value) + elif value.lower() in ('true', 'false'): # 布尔值 + value = value.lower() == 'true' + + # 插入到当前层级的字典中 + if value: # 如果有值,则直接赋值 + current_level[-1][key] = value + else: # 如果没有值,则创建一个新字典 + new_dict = {} + current_level[-1][key] = new_dict + current_level.append(new_dict) + indent_stack.append(indent) + + return config + + +class GPIOManager: + def __init__(self, config): + self.pins = config['gpio']['pins'] + self.pulse_per_rev = config['gpio']['pulse_per_rev'] + self.threads = {} + self.stop_flags = {} + self.gpio_files = {} + self.lock = threading.Lock() + + for pin in self.pins: + self._init_gpio(pin) + + def _init_gpio(self, pin): + try: + export_path = '/sys/class/gpio/export' + gpio_path = f'/sys/class/gpio/gpio{pin}' + + if not os.path.exists(gpio_path): + with open(export_path, 'w') as f: + f.write(str(pin)) + time.sleep(0.1) + + direction_path = f'{gpio_path}/direction' + with open(direction_path, 'w') as f: + f.write('out') + + self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') + logger.info(f"GPIO {pin} 初始化完成") + except Exception as e: + logger.error(f"GPIO {pin} 初始化失败: {str(e)}") + raise + + def set_speed(self, pin_id, speed): + with self.lock: + if pin_id not in self.pins: + raise ValueError(f"无效的GPIO引脚: {pin_id}") + + # 停止现有线程 + if pin_id in self.threads: + self.stop_flags[pin_id] = True + self.threads[pin_id].join() + + if speed <= 0: + self._write_value(pin_id, 0) + return + + # 计算间隔时间(400脉冲/转) + interval = 1.0 / (2 * speed) # 每个周期包含高低电平 + self.stop_flags[pin_id] = False + self.threads[pin_id] = threading.Thread( + target=self._pulse_loop, + args=(pin_id, interval), + daemon=True + ) + self.threads[pin_id].start() + + def _pulse_loop(self, pin_id, interval): + while not self.stop_flags.get(pin_id, False): + self._write_value(pin_id, 1) + time.sleep(interval) + self._write_value(pin_id, 0) + time.sleep(interval) + + def _write_value(self, pin_id, value): + try: + self.gpio_files[pin_id].seek(0) + self.gpio_files[pin_id].write(str(value)) + self.gpio_files[pin_id].truncate() + self.gpio_files[pin_id].flush() + os.fsync(self.gpio_files[pin_id].fileno()) + except IOError as e: + logger.error(f"GPIO写入错误: {str(e)}") + + def cleanup(self): + with self.lock: + for pin in self.pins: + if pin in self.threads: + self.stop_flags[pin] = True + self.threads[pin].join() + self._write_value(pin, 0) + for f in self.gpio_files.values(): + f.close() + logger.info("GPIO资源已释放") + + +class RS485Reader: + def __init__(self, config): + self.port = config['serial']['port'] + self.baudrate = config['serial']['baudrate'] + self.ser = None + self.current_weight = 0 + self.running = True + self.lock = threading.Lock() + self.buffer = deque(maxlen=10) # 数据缓冲 + self._connect() + + def _connect(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=8, + parity='N', + stopbits=1, + timeout=0.1 + ) + logger.info(f"成功连接串口设备 {self.port}") + except Exception as e: + logger.error(f"串口连接失败: {str(e)}") + raise + + def read_weight(self): + cmd = bytes.fromhex('010300000002c40b') + try: + self.ser.write(cmd) + response = self.ser.read(9) + if len(response) == 9 and response[:3] == bytes.fromhex("010304"): + weight = int.from_bytes(response[3:5], 'big') + with self.lock: + self.buffer.append(weight) + self.current_weight = sum(self.buffer) // len(self.buffer) + return True + return False + except Exception as e: + logger.error(f"读取重量失败: {str(e)}") + self._reconnect() + return False + + def _reconnect(self): + max_retries = 3 + for i in range(max_retries): + try: + if self.ser: + self.ser.close() + self._connect() + return True + except Exception as e: + logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") + time.sleep(1) + logger.error("串口重连失败") + return False + + def stop(self): + self.running = False + if self.ser and self.ser.is_open: + self.ser.close() + + +class ControlAlgorithm(ABC): + @abstractmethod + def calculate_speed(self, current, target): + pass + + +class PIDAlgorithm(ControlAlgorithm): + def __init__(self, kp=0.5, ki=0.01, kd=0.1): + self.kp = kp + self.ki = ki + self.kd = kd + self.integral = 0 + self.last_error = 0 + self.min_speed = 10 # 最小运行速度 + self.max_speed = 100 # 最大运行速度 + + def calculate_speed(self, current, target): + error = target - current + self.integral += error + derivative = error - self.last_error + self.last_error = error + + # 抗积分饱和 + if self.integral > 1000: + self.integral = 1000 + elif self.integral < -1000: + self.integral = -1000 + + output = self.kp * error + self.ki * self.integral + self.kd * derivative + #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 + #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 + return max(self.min_speed, min(self.max_speed, output)) # + + +class FuzzyLogicAlgorithm(ControlAlgorithm): + def calculate_speed(self, current, target): + error = target - current + abs_error = abs(error) + + if abs_error > 100: + return 100 + elif abs_error > 50: + return 70 + elif abs_error > 20: + return 50 + else: + return 30 + + +class NetworkHandler: + def __init__(self, config): + self.host = config['network']['host'] + self.port = config['network']['port'] + self.sock = None + self.command_queue = Queue() + self.status = { + 'measuring': False, + 'error': None, + 'current_weight': 0, + 'target_weight': 0, + 'algorithm': 'pid' + } + self.lock = threading.Lock() + self.running = True + + def start_server(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + self.sock.listen(5) + logger.info(f"网络服务启动于 {self.host}:{self.port}") + + while self.running: + conn, addr = self.sock.accept() + threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() + except Exception as e: + logger.error(f"网络服务异常: {str(e)}") + finally: + self.sock.close() + + def _handle_client(self, conn, addr): + try: + data = conn.recv(1024) + if data: + cmd = json.loads(data.decode()) + self._process_command(cmd) + + # 返回当前状态 + with self.lock: + response = json.dumps(self.status) + conn.send(response.encode()) + except json.JSONDecodeError: + logger.warning("收到无效的JSON指令") + except Exception as e: + logger.error(f"客户端处理异常: {str(e)}") + finally: + conn.close() + + def _process_command(self, cmd): + with self.lock: + if cmd.get('command') == 'set_target': + + self.status['target_weight'] = cmd['payload']['target_weight'] + self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') + self.status['measuring'] = True + print("收到指令set_target:", self.status['target_weight']) + elif cmd.get('command') == 'stop': + self.status['measuring'] = False + + +class Controller: + def __init__(self, config_path='config.yaml'): + + self.config = parse_config(config_path) + + # 初始化组件 + self.gpio = GPIOManager(self.config) + self.sensor = RS485Reader(self.config) + self.network = NetworkHandler(self.config) + self.algorithm = None + self.running = False + + # 共享状态 + self.current_weight = 0 + self.target_weight = 0 + self.system_status = 'idle' + self.status_lock = threading.Lock() + + # 看门狗线程 + self.watchdog = threading.Thread(target=self._watchdog, daemon=True) + + def start(self): + self.running = True + threads = [ + threading.Thread(target=self._weight_reading_loop), + threading.Thread(target=self._control_loop), + threading.Thread(target=self.network.start_server), + self.watchdog + ] + for t in threads: + t.start() + logger.info("系统启动完成") + + def _weight_reading_loop(self): + while self.running: + if self.sensor.read_weight(): + with self.sensor.lock: + self.current_weight = self.sensor.current_weight + with self.status_lock: + self.network.status['current_weight'] = self.current_weight + if self.current_weight >= self.network.status['target_weight']: + self.network.status['measuring'] = False + time.sleep(0.05) # 20Hz + #time.sleep(0.1) # 20Hz + + def _control_loop(self): + while self.running: + with self.status_lock: + measuring = self.network.status['measuring'] + target = self.network.status['target_weight'] + algorithm = self.network.status['algorithm'] + + if measuring: + self._select_algorithm(algorithm) + speed = self.algorithm.calculate_speed(self.current_weight, target) + rpm = speed # 假设速度单位直接对应RPM + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 + self.gpio.set_speed(8, pulse_rate) + else: + self.gpio.set_speed(8, 0) + while not self.running and not self.network.status['measuring']: + time.sleep(0.1) # 短暂休眠,避免CPU占用过高 + time.sleep(0.1) + + def _select_algorithm(self, name): + if name == 'pid': + params = self.config['algorithm']['params']['pid'] + self.algorithm = PIDAlgorithm(**params) + elif name == 'fuzzy': + self.algorithm = FuzzyLogicAlgorithm() + else: + self.algorithm = PIDAlgorithm() + + def _watchdog(self): + while self.running: + # 检查线程状态 + threads_alive = all([ + threading.current_thread().is_alive(), + self.sensor.running, + self.network.running + ]) + + if not threads_alive: + logger.critical("检测到线程异常,触发紧急停止!") + self.emergency_stop() + break + + time.sleep(5) + + def emergency_stop(self): + with self.status_lock: + self.network.status['measuring'] = False + self.network.status['error'] = 'emergency_stop' + self.gpio.cleanup() + self.sensor.stop() + self.running = False + logger.info("系统已紧急停止") + + def shutdown(self): + self.running = False + self.gpio.cleanup() + self.sensor.stop() + self.network.running = False + logger.info("系统正常关闭") + + +if __name__ == "__main__": + ctrl = Controller() + try: + ctrl.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到停止信号...") + finally: + ctrl.shutdown() \ No newline at end of file diff --git a/test2.py b/test2.py new file mode 100644 index 0000000..6d0d695 --- /dev/null +++ b/test2.py @@ -0,0 +1,522 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/14 14:43 +# @Author : hjw +# @File : test.py +''' +import os +import time +import threading +import serial +import socket +import logging +import json +from queue import Queue +from abc import ABC, abstractmethod +from collections import deque + +# 配置日志系统 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('controller.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def precise_delay_us(us): + """实现微秒级延时""" + start_time = time.perf_counter() + #print(time.perf_counter()) + while (time.perf_counter() - start_time) * 1e6 < us: + pass + + +def parse_config(file_path): + config = {} + current_level = [config] # 使用栈来跟踪当前层级的字典 + indent_stack = [] # 记录缩进层级 + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() # 去掉行尾的换行符和空格 + if not line or line.startswith('#'): # 跳过空行和注释 + continue + + # 检测缩进层级 + indent = len(line) - len(line.lstrip()) + while indent_stack and indent <= indent_stack[-1]: + current_level.pop() + indent_stack.pop() + + key, value = line.split(':', 1) # 分割键和值 + key = key.strip() + value = value.strip() + + # 处理值 + if value.startswith('[') and value.endswith(']'): # 列表 + value = [int(x.strip()) for x in value[1:-1].split(',')] + elif value.isdigit(): # 数字 + value = int(value) + elif value.replace('.', '', 1).isdigit(): # 浮点数 + value = float(value) + elif value.lower() in ('true', 'false'): # 布尔值 + value = value.lower() == 'true' + + # 插入到当前层级的字典中 + if value: # 如果有值,则直接赋值 + current_level[-1][key] = value + else: # 如果没有值,则创建一个新字典 + new_dict = {} + current_level[-1][key] = new_dict + current_level.append(new_dict) + indent_stack.append(indent) + + return config + + +class GPIOManager: + def __init__(self, config): + self.pins = config['gpio']['pins'] + self.pulse_per_rev = config['gpio']['pulse_per_rev'] + self.threads = {} + self.stop_flags = {} + self.gpio_files = {} + self.lock = threading.Lock() + + for pin in self.pins: + self._init_gpio(pin) + + def _init_gpio(self, pin): + try: + export_path = '/sys/class/gpio/export' + gpio_path = f'/sys/class/gpio/gpio{pin}' + + if not os.path.exists(gpio_path): + with open(export_path, 'w') as f: + f.write(str(pin)) + time.sleep(0.1) + + direction_path = f'{gpio_path}/direction' + with open(direction_path, 'w') as f: + f.write('out') + + self.gpio_files[pin] = open(f'{gpio_path}/value', 'r+') + logger.info(f"GPIO {pin} 初始化完成") + except Exception as e: + logger.error(f"GPIO {pin} 初始化失败: {str(e)}") + raise + + def set_speed(self, pin_id, speed, error): + with self.lock: + if pin_id not in self.pins: + raise ValueError(f"无效的GPIO引脚: {pin_id}") + + # 停止现有线程 + if pin_id in self.threads: + self.stop_flags[pin_id] = True + self.threads[pin_id].join() + + if speed <= 0: + self._write_value(pin_id, 0) + return + + # 计算间隔时间(400脉冲/转) + if error>20: + interval = 1.0 / (2 * speed * 2) # 每个周期包含高低电平 + else: + interval = 1.0 / (speed) # 每个周期包含高低电平 + self.stop_flags[pin_id] = False + self.threads[pin_id] = threading.Thread( + target=self._pulse_loop, + args=(pin_id, interval), + daemon=True + ) + self.threads[pin_id].start() + + def _pulse_loop(self, pin_id, interval): + # print(interval) + while not self.stop_flags.get(pin_id, False): + self._write_value(pin_id, 1) + time.sleep(interval) + # precise_delay_us(100) + self._write_value(pin_id, 0) + time.sleep(interval) + # precise_delay_us(100) + + def _write_value(self, pin_id, value): + try: + self.gpio_files[pin_id].seek(0) + self.gpio_files[pin_id].write(str(value)) + self.gpio_files[pin_id].truncate() + self.gpio_files[pin_id].flush() + os.fsync(self.gpio_files[pin_id].fileno()) + except IOError as e: + logger.error(f"GPIO写入错误: {str(e)}") + + def cleanup(self): + with self.lock: + for pin in self.pins: + if pin in self.threads: + self.stop_flags[pin] = True + self.threads[pin].join() + self._write_value(pin, 0) + for f in self.gpio_files.values(): + f.close() + logger.info("GPIO资源已释放") + + +class RS485Reader: + def __init__(self, config): + self.port = config['serial']['port'] + self.baudrate = config['serial']['baudrate'] + self.ser = None + self.current_weight = 0 + self.running = True + self.lock = threading.Lock() + self.buffer = deque(maxlen=10) # 数据缓冲 + self._connect() + + def _connect(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=8, + parity='N', + stopbits=1, + timeout=0.1 + ) + logger.info(f"成功连接串口设备 {self.port}") + except Exception as e: + logger.error(f"串口连接失败: {str(e)}") + raise + + def read_weight(self): + cmd = bytes.fromhex('010300000002c40b') + try: + self.ser.write(cmd) + response = self.ser.read(9) + if len(response) == 9 and response[:3] == bytes.fromhex("010304"): + weight = int.from_bytes(response[3:5], 'big') + with self.lock: + self.buffer.append(weight) + self.current_weight = sum(self.buffer) // len(self.buffer) + return True + return False + except Exception as e: + logger.error(f"读取重量失败: {str(e)}") + self._reconnect() + return False + + def tare(self): # 去皮方法 + cmd = bytes.fromhex('01060064000109d5') # 01 06 00 64 00 01 09 d5 + try: + if self.read_weight(): # 确保有最新数据 + self.ser.write(cmd) + response = self.ser.read(8) + if len(response) == 8 and response[:3] == bytes.fromhex("010600"): + self.buffer.clear() # 清空缓冲 + self.current_weight = 0 # 重置当前重量 + self.read_weight() # 确保有最新数据 + return True + else: + self.buffer.clear() # 清空缓冲 + return False + return False + except Exception as e: + logger.error(f"去皮操作失败: {str(e)}") + self._reconnect() + return False + + + def _reconnect(self): + max_retries = 3 + for i in range(max_retries): + try: + if self.ser: + self.ser.close() + self._connect() + return True + except Exception as e: + logger.warning(f"重连尝试 {i + 1}/{max_retries} 失败") + time.sleep(1) + logger.error("串口重连失败") + return False + + def stop(self): + self.running = False + if self.ser and self.ser.is_open: + self.ser.close() + + +class ControlAlgorithm(ABC): + @abstractmethod + def calculate_speed(self, current, target): + pass + + +class PIDAlgorithm(ControlAlgorithm): + def __init__(self, kp=0.5, ki=0.01, kd=0.1): + self.kp = kp + self.ki = ki + self.kd = kd + self.integral = 0 + self.last_error = 0 + self.min_speed = 10 # 最小运行速度 + self.max_speed = 100 # 最大运行速度 + + def calculate_speed(self, current, target): + error = target - current + self.integral += error + derivative = error - self.last_error + self.last_error = error + + # 抗积分饱和 + if self.integral > 1000: + self.integral = 1000 + elif self.integral < -1000: + self.integral = -1000 + + output = self.kp * error + self.ki * self.integral + self.kd * derivative + #误差5g 5*0.5 + 1000* 0.01 + [0.5*5] = 2.5 + 10 + ... = 12.5 + #误差500g 500*0.5 + 1000* 0.01 + [0.5*10] = 250 + 10 + ... = 260 + return max(self.min_speed, min(self.max_speed, output)), error # + + +class FuzzyLogicAlgorithm(ControlAlgorithm): + def calculate_speed(self, current, target): + error = target - current + abs_error = abs(error) + + if abs_error > 100: + return 100 + elif abs_error > 50: + return 70 + elif abs_error > 20: + return 50 + else: + return 30 + + +class NetworkHandler: + def __init__(self, config): + self.host = config['network']['host'] + self.port = config['network']['port'] + self.sock = None + self.command_queue = Queue() + self.status = { + 'measuring': False, + 'error': None, + 'current_weight': 0, + 'target_weight': 0, + 'algorithm': 'pid', + 'set_tare': False, + 'set_tare_num_time': 0, + 'get_weight': False, + 'set_vibrate': False, + 'set_vibrate_time': 0 + } + self.lock = threading.Lock() + self.running = True + + def start_server(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + self.sock.listen(5) + logger.info(f"网络服务启动于 {self.host}:{self.port}") + + while self.running: + conn, addr = self.sock.accept() + threading.Thread(target=self._handle_client, args=(conn, addr), daemon=True).start() + except Exception as e: + logger.error(f"网络服务异常: {str(e)}") + finally: + self.sock.close() + + def _handle_client(self, conn, addr): + try: + data = conn.recv(1024) + if data: + cmd = json.loads(data.decode()) + self._process_command(cmd) + time.sleep(0.5) + # 返回当前状态 + with self.lock: + response = json.dumps(self.status) + conn.send(response.encode()) + except json.JSONDecodeError: + logger.warning("收到无效的JSON指令") + except Exception as e: + logger.error(f"客户端处理异常: {str(e)}") + finally: + conn.close() + + def _process_command(self, cmd): + with self.lock: + if cmd.get('command') == 'set_target': + + self.status['target_weight'] = cmd['payload']['target_weight'] + self.status['algorithm'] = cmd['payload'].get('algorithm', 'pid') + self.status['measuring'] = True + print("收到指令set_target:", self.status['target_weight']) + elif cmd.get('command') == 'stop': + self.status['measuring'] = False + + elif cmd.get('command') == 'set_zero': + self.status['set_tare'] = True + + elif cmd.get('command') == 'get_weight': + self.status['get_weight'] = True + + elif cmd.get('command') == 'set_vibrate': + self.status['set_vibrate'] = True + self.status['set_vibrate_time'] = cmd['payload']['time'] + + +class Controller: + def __init__(self, config_path='config.yaml'): + + self.config = parse_config(config_path) + + # 初始化组件 + self.gpio = GPIOManager(self.config) + self.sensor = RS485Reader(self.config) + self.network = NetworkHandler(self.config) + self.algorithm = None + self.running = False + + # 共享状态 + self.current_weight = 0 + self.target_weight = 0 + self.system_status = 'idle' + self.status_lock = threading.Lock() + + # 看门狗线程 + self.watchdog = threading.Thread(target=self._watchdog, daemon=True) + + def start(self): + self.running = True + threads = [ + threading.Thread(target=self._weight_reading_loop), + threading.Thread(target=self._control_loop), + threading.Thread(target=self.network.start_server), + threading.Thread(target=self._control_vibrate), + self.watchdog + ] + for t in threads: + t.start() + logger.info("系统启动完成") + + def _weight_reading_loop(self): + while self.running: + if self.sensor.read_weight(): + with self.sensor.lock: + self.current_weight = self.sensor.current_weight + with self.status_lock: + self.network.status['current_weight'] = self.current_weight + if self.current_weight >= self.network.status['target_weight']: + self.network.status['measuring'] = False + + if self.network.status['set_tare'] == True: + if self.sensor.tare(): + self.network.status['set_tare_num_time'] +=1 + self.network.status['set_tare'] = False + + if self.network.status['get_weight'] == True: + with self.sensor.lock: + self.network.status['current_weight'] = self.sensor.current_weight + self.network.status['get_weight'] = False + time.sleep(0.05) # 20Hz + # time.sleep(0.1) # 20Hz + + def _control_loop(self): + while self.running: + with self.status_lock: + measuring = self.network.status['measuring'] + target = self.network.status['target_weight'] + algorithm = self.network.status['algorithm'] + + if measuring: + self._select_algorithm(algorithm) + speed, error = self.algorithm.calculate_speed(self.current_weight, target) + rpm = speed # 假设速度单位直接对应RPM + pulse_rate = (int(self.config['gpio']['pulse_per_rev']) * rpm) / 60 + self.gpio.set_speed(8, pulse_rate, error) + else: + self.gpio.set_speed(8, 0, 0) + while not self.running and not self.network.status['measuring']: + time.sleep(0.1) # 短暂休眠,避免CPU占用过高 + + time.sleep(0.1) + + def _control_vibrate(self): # 振动控制 + while self.running: + with self.status_lock: + vibrating = self.network.status['set_vibrate'] + vibrate_time = self.network.status['set_vibrate_time'] + + if vibrating: + self.gpio._write_value(9, 1) #self.gpio.set_speed(8, pulse_rate) + time.sleep(vibrate_time) + self.gpio._write_value(9, 0) + self.network.status['set_vibrate'] = False + self.network.status['set_vibrate_time'] = 0 + time.sleep(1) + def _select_algorithm(self, name): + if name == 'pid': + params = self.config['algorithm']['params']['pid'] + self.algorithm = PIDAlgorithm(**params) + elif name == 'fuzzy': + self.algorithm = FuzzyLogicAlgorithm() + else: + self.algorithm = PIDAlgorithm() + + def _watchdog(self): + while self.running: + # 检查线程状态 + threads_alive = all([ + threading.current_thread().is_alive(), + self.sensor.running, + self.network.running + ]) + + if not threads_alive: + logger.critical("检测到线程异常,触发紧急停止!") + self.emergency_stop() + break + + time.sleep(5) + + def emergency_stop(self): + with self.status_lock: + self.network.status['measuring'] = False + self.network.status['error'] = 'emergency_stop' + self.gpio.cleanup() + self.sensor.stop() + self.running = False + logger.info("系统已紧急停止") + + def shutdown(self): + self.running = False + self.gpio.cleanup() + self.sensor.stop() + self.network.running = False + logger.info("系统正常关闭") + + +if __name__ == "__main__": + ctrl = Controller() + try: + ctrl.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到停止信号...") + finally: + ctrl.shutdown() \ No newline at end of file diff --git a/tool.py b/tool.py new file mode 100644 index 0000000..1c03721 --- /dev/null +++ b/tool.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/2/18 10:02 +# @Author : hjw +# @File : tool.py +''' +import time + +def parse_config(file_path): + config = {} + current_level = [config] # 使用栈来跟踪当前层级的字典 + indent_stack = [] # 记录缩进层级 + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + line = line.rstrip() # 去掉行尾的换行符和空格 + if not line or line.startswith('#'): # 跳过空行和注释 + continue + + # 检测缩进层级 + indent = len(line) - len(line.lstrip()) + while indent_stack and indent <= indent_stack[-1]: + current_level.pop() + indent_stack.pop() + + key, value = line.split(':', 1) # 分割键和值 + key = key.strip() + value = value.strip() + + # 处理值 + if value.startswith('[') and value.endswith(']'): # 列表 + value = [int(x.strip()) for x in value[1:-1].split(',')] + elif value.isdigit(): # 数字 + value = int(value) + elif value.replace('.', '', 1).isdigit(): # 浮点数 + value = float(value) + elif value.lower() in ('true', 'false'): # 布尔值 + value = value.lower() == 'true' + + # 插入到当前层级的字典中 + if value: # 如果有值,则直接赋值 + current_level[-1][key] = value + else: # 如果没有值,则创建一个新字典 + new_dict = {} + current_level[-1][key] = new_dict + current_level.append(new_dict) + indent_stack.append(indent) + + return config + +def precise_delay_us(us): + """实现微秒级延时""" + start_time = time.perf_counter() + #print(time.perf_counter()) + while (time.perf_counter() - start_time) * 1e6 < us: + pass \ No newline at end of file