diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..19dc141
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/wire_controlsystem.iml b/.idea/wire_controlsystem.iml
index 4a631ed..428c7b1 100644
--- a/.idea/wire_controlsystem.iml
+++ b/.idea/wire_controlsystem.iml
@@ -2,7 +2,7 @@
-
+
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..4b5a294
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,4 @@
+{
+ "python-envs.defaultEnvManager": "ms-python.python:conda",
+ "python-envs.defaultPackageManager": "ms-python.python:conda"
+}
\ No newline at end of file
diff --git a/EMV/EMV_test.py b/EMV/EMV_test.py
index 0689d90..fdb58e4 100644
--- a/EMV/EMV_test.py
+++ b/EMV/EMV_test.py
@@ -110,6 +110,7 @@ sensor_name_map = {
CONVEYOR2_SENSOR: '传送带2开关'
}
+
class RelayController:
def __init__(self):
"""初始化继电器控制器"""
@@ -253,6 +254,7 @@ class RelayController:
# private
_GLOBAL_RELAY = RelayController()
+
def ng_push():
"""NG推料流程"""
try:
@@ -272,6 +274,7 @@ def ng_push():
print(f"NG推料失败:{e}")
raise RuntimeError("NG推料流程异常") from e
+
def write_do(device_name: str, state: bool):
"""
控制单个数字输出设备(DO)的开关状态
@@ -297,6 +300,7 @@ def write_do(device_name: str, state: bool):
except Exception as e:
raise RuntimeError(f"控制设备 '{device_name}' 失败: {e}")
+
def read_all_io() -> dict[str, dict[str, bool]]:
"""
读取所有DI(传感器)和DO(设备)状态
@@ -314,6 +318,7 @@ def read_all_io() -> dict[str, dict[str, bool]]:
print(f"读取IO状态失败:{e}")
raise RuntimeError("读取IO失败") from e
+
# ------------测试接口-------------
if __name__ == '__main__':
diff --git a/RK1106.zip b/RK1106.zip
new file mode 100644
index 0000000..fb1d78c
Binary files /dev/null and b/RK1106.zip differ
diff --git a/RK1106/RK1106_server.py b/RK1106/RK1106_server.py
index 6466deb..1d15e5a 100644
--- a/RK1106/RK1106_server.py
+++ b/RK1106/RK1106_server.py
@@ -1,210 +1,217 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
-# @Time : 2026/1/9 10:45
+# @Time : 2026/3/16 15:00
# @Author : reenrr
# @File : RK1106_server.py
-# @Desc : RK1106服务端,等待工控机调用 通信为JSON格式
+# @Desc : RK1106服务端(类形式,多线程),等待工控机调用 通信为JSON格式
'''
import socket
import logging
-import sys
import json
+import threading
from stepper_motor import motor_start, align_wire, cleanup, motor_stop
-# ------------日志配置(终端+文件双输出)--------------
-logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- # 核心新增:日志文件配置
- handlers=[
- # 1. 文件处理器:保存到.log文件
- logging.FileHandler(
- "RK1106_server.log", # Buildroot推荐路径,临时测试可改/tmp/1106_server.log
- mode='a', # 追加模式(不会覆盖历史日志)
- encoding='utf-8' # 防止中文乱码(必加)
- ),
- # 2. 终端处理器:输出到控制台
- logging.StreamHandler(sys.stdout)
- ]
-)
-
# --------配置TCP服务端----------
-HOST = "192.168.0.100"
+HOST = "192.168.5.100"
PORT = 8888
+CONFIG_FILE = "motor_config.json"
-# 全局参数缓存
-MOTOR_CONFIG = {
- "speed": 2500, # 默认速度
- "cycle": 10.0, # 默认旋转圈数
-}
-def handle_setting(para_type: str, para_value: str) ->dict:
+class RK1106Server:
"""
- 处理客户端发送的参数设置指令(cmd:"setting"),更新全局电机配置
- :param para_type: 要设置的参数类型,仅支持"speed"或"cycle"
- :param para_value: 参数值字符串,将尝试转换为int(speed)或float(cycle)
- :return: 标准化相应字典dict,如:
- {
- "Result": "1"表示成功,"0"表示失败,
- "ErrMsg": str # 成功提示或错误详情
- }
+ RK1106服务端类
+ 在初始化时加载配置,提供配置管理和指令处理功能
"""
- try:
- if para_type == "speed":
- MOTOR_CONFIG["speed"] = int(para_value)
- elif para_type == "cycle":
- MOTOR_CONFIG["cycle"] = float(para_value)
- else:
- return {"Result": "0", "ErrMsg": f"不支持的参数类型: {para_type}"}
- return {"Result": "1", "ErrMsg": "设置成功"}
- except ValueError as e:
- return {"Result": "0", "ErrMsg": f"参数值格式错误: {str(e)}"}
+
+ def __init__(self):
+ """初始化服务端,加载配置"""
+ self.config = self._load_config()
+ logging.info(f"服务初始化完成,加载配置:{self.config}")
+
+ def _load_config(self):
+ """
+ 从JSON文件加载电机配置
+ :return: 加载的配置字典
+ """
+ try:
+ with open(CONFIG_FILE, "r") as f:
+ return json.load(f)
+ except FileNotFoundError:
+ logging.warning(f"配置文件 {CONFIG_FILE} 未找到,将使用默认配置")
+ return {"speed": 2500, "cycle": 10.0}
+ except json.JSONDecodeError:
+ logging.error(f"配置文件 {CONFIG_FILE} 格式错误")
+ return {"speed": 2500, "cycle": 10.0}
+
+ def _save_config(self):
+ """
+ 将电机配置保存到JSON文件
+ """
+ try:
+ with open(CONFIG_FILE, "w") as f:
+ json.dump(self.config, f, indent=4, ensure_ascii=False)
+ logging.info(f"配置已保存:{self.config}")
+ except IOError as e:
+ logging.error(f"配置文件 {CONFIG_FILE} 保存失败: {str(e)}")
+
+ def handle_setting(self, para_type: str, para_value: str) -> dict:
+ """
+ 处理客户端发送的参数设置指令(cmd:"setting"),更新电机配置
+ :param para_type: 要设置的参数类型,仅支持"speed"或"cycle"
+ :param para_value: 参数值字符串,将尝试转换为int(speed)或float(cycle)
+ :return: 标准化相应字典dict
+ """
+ try:
+ if para_type == "speed":
+ self.config["speed"] = int(para_value)
+ elif para_type == "cycle":
+ self.config["cycle"] = float(para_value)
+ else:
+ return {"Result": "0", "ErrMsg": f"不支持的参数类型: {para_type}"}
+
+ # 保存到文件
+ self._save_config()
+ return {"Result": "1", "ErrMsg": "设置成功"}
+ except ValueError as e:
+ return {"Result": "0", "ErrMsg": f"参数值格式错误: {str(e)}"}
+
+ def handle_start(self, para_type: str, para_value: str) -> dict:
+ """
+ 处理启动电机指令(cmd: "start"),使用当前配置运行电机
+ :param para_type:为"direction"时,使用"para_value"作为临时方向
+ :param para_value:为0或1
+ :return: 标准化相应字典dict
+ """
+ try:
+ if para_type == "direction":
+ direction = int(para_value)
+ if direction not in (0, 1):
+ return {"Result": "0", "ErrMsg": "方向必须为0或1"}
-def handle_start(para_type: str, para_value: str) -> dict:
- """
- 处理启动电机指令(cmd: "start"),使用当前MOTOR_CONFIG配置运行电机
- :param para_type:为"direction"时,使用"para_value"作为临时方向
- :param para_value:为0或1
- :return: 标准化相应字典dict,如:
- {
- "Result": "1"表示成功,"0"表示失败,
- "ErrMsg": str #执行结果或异常信息
- }
- """
- try:
- if para_type == "direction":
- direction = int(para_value)
- if direction not in (0,1):
- return {"Result": "0", "ErrMsg": "方向必须为0或1"}
-
- motor_start(speed=MOTOR_CONFIG["speed"],
- cycle=MOTOR_CONFIG["cycle"],
+ motor_start(speed=self.config["speed"],
+ cycle=self.config["cycle"],
direction=direction)
- dir_str = "正向" if direction == 1 else "负向"
- return {"Result": "1", "ErrMsg": f"电机启动成功({dir_str})"}
+
+ dir_str = "正向" if direction == 1 else "负向"
+ return {"Result": "1", "ErrMsg": f"电机启动指令已发送({dir_str})"}
+ else:
+ return {"Result": "0", "ErrMsg": "start 指令仅支持'direction'"}
+ except Exception as e:
+ return {"Result": "0", "ErrMsg": f"电机启动失败:{str(e)}"}
+
+ def handle_stop(self) -> dict:
+ """
+ 处理停止电机指令(cmd: "stop")
+ :return: 标准化相应字典dict
+ """
+ try:
+ motor_stop()
+ return {"Result": "1", "ErrMsg": "电机停止指令已发送"}
+ except Exception as e:
+ return {"Result": "0", "ErrMsg": f"电机停止失败:{str(e)}"}
+
+ def handle_align(self) -> dict:
+ """
+ 处理线条对齐(挡板一来一回)
+ :return: dict
+ """
+ try:
+ align_wire(self.config['speed'], self.config['cycle'])
+ return {"Result": "1", "ErrMsg": "线条对齐指令已发送"}
+ except Exception as e:
+ return {"Result": "0", "ErrMsg": "线条对齐失败"}
+
+ def parse_json_command(self, data: str) -> dict:
+ """
+ 解析客户端发送的原始JSON字符串指令,并分发至对应处理函数
+ :param data: 客户端发送的原始JSON字符串
+ :return dict:标准化响应字典
+ """
+ try:
+ cmd_obj = json.loads(data.strip())
+ except json.JSONDecodeError as e:
+ return {"Result": "0", "ErrMsg": f"JSON 格式错误: {str(e)}"}
+
+ cmd = cmd_obj.get("cmd", "").strip()
+ para_type = cmd_obj.get("para_type", "").strip()
+ para_value = cmd_obj.get("para_value", "").strip()
+
+ if cmd == "setting":
+ return self.handle_setting(para_type, para_value)
+ elif cmd == "start":
+ return self.handle_start(para_type, para_value)
+ elif cmd == "stop":
+ if para_type == "none" and para_value == "none":
+ return self.handle_stop()
+ else:
+ return {"Result": "0", "ErrMsg": "stop指令参数必须为none"}
+ elif cmd == "alignment":
+ if para_type == "none" and para_value == "none":
+ return self.handle_align()
+ else:
+ return {"Result": "0", "ErrMsg": "alignment指令参数必须为none"}
else:
- return {"Result": "0", "ErrMsg": "start 指令仅支持'direction'"}
- except Exception as e:
- return {"Result": "0", "ErrMsg": f"电机启动失败:{str(e)}"}
+ return {"Result": "0", "ErrMsg": f"未知指令:{cmd}"}
+
+ def run_server(self):
+ """
+ 启动TCP服务端,监听指定端口,接收工控机连接并循环处理JSON指令
+ """
+ # 创建TCP socket
+ server_socket = None
+ conn = None
-def handle_stop() -> dict:
- """
- 处理停止电机指令(cmd: "stop")
- :return: 标准化相应字典dict,如:
- {
- "Result": "1"表示成功,"0"表示失败,
- "ErrMsg": str #执行结果或异常信息
- }
- """
- try:
- motor_stop()
- return {"Result": "1", "ErrMsg": "电机已停止"}
- except Exception as e:
- return {"Result": "0", "ErrMsg": f"电机停止失败:{str(e)}"}
+ try:
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_socket.bind((HOST, PORT))
+ server_socket.listen(1) # 只允许1个工控机连接
+ logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...")
-def handle_align() -> dict:
- """
- 处理线条对齐(挡板一来一回)
- :return: dict
- """
- try:
- align_wire(MOTOR_CONFIG['speed'], MOTOR_CONFIG['cycle'])
- return {"Result": "1", "ErrMsg": "处理线条对齐"}
- except Exception as e:
- return {"Result": "0", "ErrMsg": "线条对齐失败"}
+ while True: # 持续接受新连接
+ try:
+ # 等待工控机连接
+ conn, addr = server_socket.accept()
+ logging.info(f"[1106] 工控机已连接:{addr}")
-def parse_json_command(data: str) -> dict:
- """
- 解析客户端发送的原始JSON字符串指令,并分发至对应处理函数
- :param data: 客户端发送的原始JSON字符串
- :return dict:标准化响应字典
- """
- try:
- cmd_obj = json.loads(data.strip())
- except json.JSONDecodeError as e:
- return {"Result": "0", "ErrMsg": f"JSON 格式错误: {str(e)}"}
+ # 循环接收指令
+ while True:
+ # 接收指令(最大1024字节)
+ data = conn.recv(1024).decode()
+ if not data:
+ logging.warning("客户端断开连接")
+ break
- cmd = cmd_obj.get("cmd", "").strip()
- para_type = cmd_obj.get("para_type", "").strip()
- para_value = cmd_obj.get("para_value", "").strip()
+ logging.info(f"\n[1106] 收到工控机指令:{data}")
- if cmd == "setting":
- return handle_setting(para_type, para_value)
- elif cmd == "start":
- return handle_start(para_type, para_value)
- elif cmd == "stop":
- if para_type == "none" and para_value == "none":
- return handle_stop()
- else:
- return {"Result": "0", "ErrMsg": "stop指令参数必须为none"}
- elif cmd == "alignment":
- if para_type == "none" and para_value == "none":
- return handle_align()
- else:
- return {"Result": "0", "ErrMsg": "alignment指令参数必须为none"}
- else:
- return {"Result": "0", "ErrMsg": f"未知指令:{cmd}"}
+ # 解析指令
+ response_dict = self.parse_json_command(data)
+ response_json = json.dumps(response_dict, ensure_ascii=False) + "\n" # 看需不需要加换行符\n
+ # 发送响应给工控机
+ conn.sendall(response_json.encode("utf-8"))
+ logging.info(f"[1106] 已发送响应:{response_json}")
+ except ConnectionError:
+ logging.info("客户端异常断开")
+ except Exception as e:
+ logging.info(f"处理连接时发生错误: {e}")
+ finally:
+ if conn is not None:
+ conn.close()
+ conn = None # 重置,避免重复关闭
+ logging.info("客户端连接已关闭,等待新连接...")
-# ----------对外接口----------
-def server():
- """启动TCP服务端,监听指定端口,接收工控机连接并循环处理JSON指令"""
- # 创建TCP socket
- server_socket = None
- conn = None
-
- try:
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_socket.bind((HOST, PORT))
- server_socket.listen(1) # 只允许1个工控机连接
- logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...")
-
- while True: # 持续接受新连接
- try:
- # 等待工控机连接
- conn, addr = server_socket.accept()
- logging.info(f"[1106] 工控机已连接:{addr}")
-
- # 循环接收指令
- while True:
- # 接收指令(最大1024字节)
- data = conn.recv(1024).decode()
- if not data:
- logging.warning("客户端断开连接")
- break
-
- logging.info(f"\n[1106] 收到工控机指令:{data}")
-
- # 解析指令
- response_dict = parse_json_command(data)
- response_json = json.dumps(response_dict, ensure_ascii=False) + "\n" # 看需不需要加换行符\n
-
- # 发送响应给工控机
- conn.sendall(response_json.encode("utf-8"))
- logging.info(f"[1106] 已发送响应:{response_json}")
-
- except ConnectionError:
- logging.info("客户端异常断开")
- except Exception as e:
- logging.info(f"处理连接时发生错误: {e}")
- finally:
- if conn is not None:
- conn.close()
- conn = None # 重置,避免重复关闭
- logging.info("客户端连接已关闭,等待新连接...")
-
- except KeyboardInterrupt:
- logging.info("\n收到 Ctrl+C,正在关闭服务...")
- finally:
- if server_socket:
- server_socket.close()
- logging.info("服务已停止,监听 socket 已释放")
+ except KeyboardInterrupt:
+ logging.info("\n收到 Ctrl+C,正在关闭服务...")
+ finally:
+ if server_socket:
+ server_socket.close()
+ logging.info("服务已停止,监听 socket 已释放")
# ----------测试接口----------
if __name__ == "__main__":
- server()
-
+ SERVER = RK1106Server()
+ SERVER.run_server()
diff --git a/RK1106/RK1106_server_test.py b/RK1106/RK1106_server_test.py
new file mode 100644
index 0000000..47faaca
--- /dev/null
+++ b/RK1106/RK1106_server_test.py
@@ -0,0 +1,253 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+# @Time : 2026/3/16 15:00
+# @Author : reenrr
+# @File : RK1106_server_test.py
+# @Desc : RK1106服务端(类形式,多线程),等待工控机调用 通信为JSON格式 --待测试,加了线程
+'''
+import socket
+import logging
+import json
+import threading
+
+from stepper_motor import motor_start, align_wire, cleanup, motor_stop
+
+
+# --------配置TCP服务端----------
+HOST = "192.168.5.100"
+PORT = 8888
+CONFIG_FILE = "motor_config.json"
+
+
+class RK1106Server:
+ """
+ RK1106服务端类
+ 在初始化时加载配置,提供配置管理和指令处理功能
+ """
+
+ def __init__(self):
+ """初始化服务端,加载配置"""
+ self.config = self._load_config()
+ self.current_thread = None
+ logging.info(f"服务初始化完成,加载配置:{self.config}")
+
+ def _load_config(self):
+ """
+ 从JSON文件加载电机配置
+ :return: 加载的配置字典
+ """
+ try:
+ with open(CONFIG_FILE, "r") as f:
+ return json.load(f)
+ except FileNotFoundError:
+ logging.warning(f"配置文件 {CONFIG_FILE} 未找到,将使用默认配置")
+ return {"speed": 2500, "cycle": 10.0}
+ except json.JSONDecodeError:
+ logging.error(f"配置文件 {CONFIG_FILE} 格式错误")
+ return {"speed": 2500, "cycle": 10.0}
+
+ def _save_config(self):
+ """
+ 将电机配置保存到JSON文件
+ """
+ try:
+ with open(CONFIG_FILE, "w") as f:
+ json.dump(self.config, f, indent=4, ensure_ascii=False)
+ logging.info(f"配置已保存:{self.config}")
+ except IOError as e:
+ logging.error(f"配置文件 {CONFIG_FILE} 保存失败: {str(e)}")
+
+ def _execute_motor_command(self, cmd_func: callable, *args, **kwargs):
+ """
+ 在单独的线程中执行电机命令
+ :param cmd_func: 要执行的电机命令函数
+ :param args: 传递给cmd_func的位置参数
+ :param kwargs: 传递给cmd_func的关键字参数
+ """
+ try:
+ result = cmd_func(*args, **kwargs)
+ logging.info(f"电机命令执行完成: {result}")
+ except Exception as e:
+ logging.error(f"执行电机命令时出错: {str(e)}")
+
+ def handle_setting(self, para_type: str, para_value: str) -> dict:
+ """
+ 处理客户端发送的参数设置指令(cmd:"setting"),更新电机配置
+ :param para_type: 要设置的参数类型,仅支持"speed"或"cycle"
+ :param para_value: 参数值字符串,将尝试转换为int(speed)或float(cycle)
+ :return: 标准化相应字典dict
+ """
+ try:
+ if para_type == "speed":
+ self.config["speed"] = int(para_value)
+ elif para_type == "cycle":
+ self.config["cycle"] = float(para_value)
+ else:
+ return {"Result": "0", "ErrMsg": f"不支持的参数类型: {para_type}"}
+
+ # 保存到文件
+ self._save_config()
+ return {"Result": "1", "ErrMsg": "设置成功"}
+ except ValueError as e:
+ return {"Result": "0", "ErrMsg": f"参数值格式错误: {str(e)}"}
+
+ def handle_start(self, para_type: str, para_value: str) -> dict:
+ """
+ 处理启动电机指令(cmd: "start"),使用当前配置运行电机
+ :param para_type:为"direction"时,使用"para_value"作为临时方向
+ :param para_value:为0或1
+ :return: 标准化相应字典dict
+ """
+ try:
+ if para_type == "direction":
+ direction = int(para_value)
+ if direction not in (0, 1):
+ return {"Result": "0", "ErrMsg": "方向必须为0或1"}
+
+ # 在新线程中启动电机
+ thread = threading.Thread(
+ target=self._execute_motor_command,
+ args=(motor_start,),
+ kwargs={
+ "speed": self.config["speed"],
+ "cycle": self.config["cycle"],
+ "direction": direction
+ }
+ )
+ thread.start()
+
+ dir_str = "正向" if direction == 1 else "负向"
+ return {"Result": "1", "ErrMsg": f"电机启动指令已发送({dir_str})"}
+ else:
+ return {"Result": "0", "ErrMsg": "start 指令仅支持'direction'"}
+ except Exception as e:
+ return {"Result": "0", "ErrMsg": f"电机启动失败:{str(e)}"}
+
+ def handle_stop(self) -> dict:
+ """
+ 处理停止电机指令(cmd: "stop")
+ :return: 标准化相应字典dict
+ """
+ try:
+ # 在新线程中停止电机
+ thread = threading.Thread(
+ target=self._execute_motor_command,
+ args=(motor_stop,)
+ )
+ thread.start()
+ return {"Result": "1", "ErrMsg": "电机停止指令已发送"}
+ except Exception as e:
+ return {"Result": "0", "ErrMsg": f"电机停止失败:{str(e)}"}
+
+ def handle_align(self) -> dict:
+ """
+ 处理线条对齐(挡板一来一回)
+ :return: dict
+ """
+ try:
+ # 在新线程中对齐线条
+ thread = threading.Thread(
+ target=self._execute_motor_command,
+ args=(align_wire,),
+ kwargs={
+ "speed": self.config['speed'],
+ "cycle": self.config['cycle']
+ }
+ )
+ thread.start()
+ return {"Result": "1", "ErrMsg": "线条对齐指令已发送"}
+ except Exception as e:
+ return {"Result": "0", "ErrMsg": "线条对齐失败"}
+
+ def parse_json_command(self, data: str) -> dict:
+ """
+ 解析客户端发送的原始JSON字符串指令,并分发至对应处理函数
+ :param data: 客户端发送的原始JSON字符串
+ :return dict:标准化响应字典
+ """
+ try:
+ cmd_obj = json.loads(data.strip())
+ except json.JSONDecodeError as e:
+ return {"Result": "0", "ErrMsg": f"JSON 格式错误: {str(e)}"}
+
+ cmd = cmd_obj.get("cmd", "").strip()
+ para_type = cmd_obj.get("para_type", "").strip()
+ para_value = cmd_obj.get("para_value", "").strip()
+
+ if cmd == "setting":
+ return self.handle_setting(para_type, para_value)
+ elif cmd == "start":
+ return self.handle_start(para_type, para_value)
+ elif cmd == "stop":
+ if para_type == "none" and para_value == "none":
+ return self.handle_stop()
+ else:
+ return {"Result": "0", "ErrMsg": "stop指令参数必须为none"}
+ elif cmd == "alignment":
+ if para_type == "none" and para_value == "none":
+ return self.handle_align()
+ else:
+ return {"Result": "0", "ErrMsg": "alignment指令参数必须为none"}
+ else:
+ return {"Result": "0", "ErrMsg": f"未知指令:{cmd}"}
+
+ def run_server(self):
+ """
+ 启动TCP服务端,监听指定端口,接收工控机连接并循环处理JSON指令
+ """
+ # 创建TCP socket
+ server_socket = None
+ conn = None
+
+ try:
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_socket.bind((HOST, PORT))
+ server_socket.listen(1) # 只允许1个工控机连接
+ logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...")
+
+ while True: # 持续接受新连接
+ try:
+ # 等待工控机连接
+ conn, addr = server_socket.accept()
+ logging.info(f"[1106] 工控机已连接:{addr}")
+
+ # 循环接收指令
+ while True:
+ # 接收指令(最大1024字节)
+ data = conn.recv(1024).decode()
+ if not data:
+ logging.warning("客户端断开连接")
+ break
+
+ logging.info(f"\n[1106] 收到工控机指令:{data}")
+
+ # 解析指令
+ response_dict = self.parse_json_command(data)
+ response_json = json.dumps(response_dict, ensure_ascii=False) + "\n" # 看需不需要加换行符\n
+ # 发送响应给工控机
+ conn.sendall(response_json.encode("utf-8"))
+ logging.info(f"[1106] 已发送响应:{response_json}")
+
+ except ConnectionError:
+ logging.info("客户端异常断开")
+ except Exception as e:
+ logging.info(f"处理连接时发生错误: {e}")
+ finally:
+ if conn is not None:
+ conn.close()
+ conn = None # 重置,避免重复关闭
+ logging.info("客户端连接已关闭,等待新连接...")
+
+ except KeyboardInterrupt:
+ logging.info("\n收到 Ctrl+C,正在关闭服务...")
+ finally:
+ if server_socket:
+ server_socket.close()
+ logging.info("服务已停止,监听 socket 已释放")
+
+# ----------测试接口----------
+if __name__ == "__main__":
+ SERVER = RK1106Server()
+ SERVER.run_server()
diff --git a/RK1106/image.png b/RK1106/image.png
new file mode 100644
index 0000000..f79a397
Binary files /dev/null and b/RK1106/image.png differ
diff --git a/RK1106/motor_config.json b/RK1106/motor_config.json
new file mode 100644
index 0000000..22e1cdc
--- /dev/null
+++ b/RK1106/motor_config.json
@@ -0,0 +1,4 @@
+{
+ "speed": 2500,
+ "cycle": 10.0
+}
\ No newline at end of file
diff --git a/RK1106/readme.md b/RK1106/readme.md
index 4a59aa5..2aaa2bf 100644
--- a/RK1106/readme.md
+++ b/RK1106/readme.md
@@ -8,4 +8,7 @@

GPIO32 --脉冲引脚
GPIO33 --方向引脚
-剩下两个引脚为地线
\ No newline at end of file
+剩下两个引脚为地线
+
+# 自启动设置:
+ 
\ No newline at end of file
diff --git a/RK1106/stepper_motor.py b/RK1106/stepper_motor.py
index cfc1025..993b9b2 100644
--- a/RK1106/stepper_motor.py
+++ b/RK1106/stepper_motor.py
@@ -7,9 +7,29 @@
# @Desc : 线条厂控制步进电机测试 应该不会丢步
"""
import time
+import logging
+import sys
from periphery import GPIO
+# ------------日志配置(终端+文件双输出)--------------
+
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s',
+ # 核心新增:日志文件配置
+ handlers=[
+ # 1. 文件处理器:保存到.log文件
+ logging.FileHandler(
+ "RK1106_server.log", # Buildroot推荐路径,临时测试可改/tmp/1106_server.log
+ mode='a', # 追加模式(不会覆盖历史日志)
+ encoding='utf-8' # 防止中文乱码(必加)
+ ),
+ # 2. 终端处理器:输出到控制台
+ logging.StreamHandler(sys.stdout)
+ ]
+)
+
# ------------参数配置-------------
# 1. 脉冲(PUL)引脚配置 → GPIO32
PUL_Pin = 32
@@ -66,8 +86,8 @@ class StepperMotor:
self.pul_gpio.write(False)
self.dir_gpio.write(False)
- print(f"✅ PUL引脚初始化完成:{self.pul_pin} 引脚")
- print(f"✅ DIR引脚初始化完成:{self.dir_pin} 引脚")
+ logging.info(f"✅ PUL引脚初始化完成:{self.pul_pin} 引脚")
+ logging.info(f"✅ DIR引脚初始化完成:{self.dir_pin} 引脚")
except PermissionError:
raise RuntimeError("权限不足!请用sudo运行程序(sudo python xxx.py)")
@@ -76,10 +96,10 @@ class StepperMotor:
def _validate_params(self, rounds: float, direction: int) -> bool:
if rounds <= 0:
- print("圈数必须为正数")
+ logging.info("圈数必须为正数")
return False
if direction not in (0, 1):
- print("方向必须为0(逆时针)或1(顺时针)")
+ logging.info("方向必须为0(逆时针)或1(顺时针)")
return False
return True
@@ -97,17 +117,17 @@ class StepperMotor:
# 设置旋转方向(DIR电平)
if direction == 1: # 顺时针
self.dir_gpio.write(self.clockwise_level)
- print(f"\n=== 顺时针旋转 {rounds} 圈 ===")
+ logging.info(f"\n=== 顺时针旋转 {rounds} 圈 ===")
else: # 逆时针
self.dir_gpio.write(self.counter_clockwise_level)
- print(f"\n=== 逆时针旋转 {rounds} 圈 ===")
+ logging.info(f"\n=== 逆时针旋转 {rounds} 圈 ===")
# 计算总脉冲数和时序(精准控制,避免丢步)
total_pulses = int(rounds * self.pulses_per_round)
pulse_period = 1.0 / pulse_frequency # 脉冲周期(秒)
half_period = pulse_period / 2 # 占空比50%(MA860H最优)
- print(f"总脉冲数:{total_pulses} | 频率:{pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
+ logging.info(f"总脉冲数:{total_pulses} | 频率:{pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
start_time = time.perf_counter() # 高精度计时(避免丢步)
# 发送脉冲序列(核心:占空比50%的方波)
@@ -122,13 +142,13 @@ class StepperMotor:
# 更新下一个脉冲的起始时间
start_time += pulse_period
- print("✅ 旋转完成")
+ logging.info("✅ 旋转完成")
def stop(self):
"""紧急停止(置低脉冲引脚)"""
if self.pul_gpio:
self.pul_gpio.write(False)
- print("🛑 电机已停止")
+ logging.info("🛑 电机已停止")
def close(self):
"""释放GPIO资源"""
@@ -136,12 +156,12 @@ class StepperMotor:
if self.pul_gpio:
self.pul_gpio.write(False) # 脉冲引脚置低
self.pul_gpio.close()
- print("\n✅ PUL引脚已关闭(电平置低)")
+ logging.info("\n✅ PUL引脚已关闭(电平置低)")
if self.dir_gpio:
self.dir_gpio.write(False) # 方向引脚置低
self.dir_gpio.close()
- print("✅ DIR引脚已关闭(电平置低)")
+ logging.info("✅ DIR引脚已关闭(电平置低)")
# 重置GPIO对象
self.pul_gpio = None
@@ -163,16 +183,16 @@ def motor_start(speed: int, cycle: float, direction: int):
:param direction: 0=负向(逆时针),1=正向(顺时针)
"""
try:
- print("\n=== 启动步进电机 ===")
+ logging.info("\n=== 启动步进电机 ===")
GLOBAL_MOTOR.rotate(pulse_frequency=speed, rounds=cycle, direction=direction)
time.sleep(5) # 暂停5秒
except ImportError:
- print("\n❌ 缺少依赖:请安装python-periphery")
- print("命令:pip install python-periphery")
+ logging.info("\n❌ 缺少依赖:请安装python-periphery")
+ logging.info("命令:pip install python-periphery")
except Exception as e:
- print(f"\n❌ 程序异常:{str(e)}")
+ logging.info(f"\n❌ 程序异常:{str(e)}")
def motor_stop():
"""紧急停止(仅停止脉冲,保留实例)"""
@@ -180,7 +200,8 @@ def motor_stop():
if GLOBAL_MOTOR:
GLOBAL_MOTOR.stop()
except Exception as e:
- print("停止失败:{e}")
+ logging.info("停止失败:{e}")
+
def align_wire(speed: int, cycle: float):
"""
@@ -189,20 +210,20 @@ def align_wire(speed: int, cycle: float):
:param cycle: 旋转圈数
"""
try:
- print("\n=== 启动线条对齐 ===")
+ logging.info("\n=== 启动线条对齐 ===")
# 靠近电机方向 逆时针
GLOBAL_MOTOR.rotate(pulse_frequency=speed, rounds=cycle, direction=0)
- time.sleep(5) # 暂停5秒
+ time.sleep(2) # 暂停3秒
# 远离电机方向 顺时针
GLOBAL_MOTOR.rotate(pulse_frequency=speed,rounds=cycle, direction=1)
- time.sleep(5) # 暂停5秒
+ time.sleep(2) # 暂停3秒
except ImportError:
- print("\n❌ 缺少依赖:请安装python-periphery")
- print("命令:pip install python-periphery")
+ logging.info("\n❌ 缺少依赖:请安装python-periphery")
+ logging.info("命令:pip install python-periphery")
except Exception as e:
- print(f"\n❌ 程序异常:{str(e)}")
+ logging.info(f"\n❌ 程序异常:{str(e)}")
def cleanup():
"""程序退出时统一清理"""
@@ -212,14 +233,14 @@ def cleanup():
if __name__ == '__main__':
align_wire(speed=2500, cycle=10.0)
- time.sleep(10) # 电机运动需要的时间
+ time.sleep(5) # 电机运动需要的时间
- # 测试是否电机能停止
+ # # 测试是否电机能停止
motor_start(speed=2500, cycle=5.0, direction=0)
time.sleep(1) # 电机运动需要的时间
- motor_stop()
+ motor_stop()
time.sleep(0.2)
-
- while(True): # 防止程序退出
+
+ while True: # 防止程序退出
time.sleep(1)
diff --git a/conveyor_controller/__pycache__/error_code.cpython-39.pyc b/conveyor_controller/__pycache__/error_code.cpython-39.pyc
new file mode 100644
index 0000000..f7e2f8c
Binary files /dev/null and b/conveyor_controller/__pycache__/error_code.cpython-39.pyc differ
diff --git a/conveyor_controller/__pycache__/modbus.cpython-39.pyc b/conveyor_controller/__pycache__/modbus.cpython-39.pyc
new file mode 100644
index 0000000..3ab2374
Binary files /dev/null and b/conveyor_controller/__pycache__/modbus.cpython-39.pyc differ
diff --git a/conveyor_controller/conveyor_motor.py b/conveyor_controller/conveyor_motor.py
index b0517c5..92b8382 100644
--- a/conveyor_controller/conveyor_motor.py
+++ b/conveyor_controller/conveyor_motor.py
@@ -11,7 +11,7 @@ import time
from modbus import (write_single_register, open_serial_port, RTU_HANDLE_MAP, RTU_HANDLE_LOCK, read_holding_register)
from error_code import ModbusError
-PORT = 'COM4'
+PORT = '/dev/ttyUSB0'
BAUDRATE = 115200
DATABITS = 8
STOPBITS = 1
@@ -19,15 +19,23 @@ PARITY = 0
# --------寄存器地址---------
MODE_REG_ADDR = 0x6200
+POSITION_H_REG_ADDR = 0x6201
+POSITION_L_REG_ADDR = 0x6202
SPEED_REG_ADDR = 0x6203
ACCELERATION_REG_ADDR = 0x6204
DECELERATION_REG_ADDR = 0x6205
START_OR_STOP_REG_ADDR = 0x6002
+ENCODER_H_REG_ADDR = 0x0B1C # 读取电机位置--编码器单位
+COMMAND_H_REG_ADDR = 0x602C # 读电机位置--指令单位
+POSITION_ERROR_H_REG_ADDR = 0x0B1E # 位置误差
+
+
# ---------相关写入值------------
-START_CMD = 0x0010
-STOP_CMD = 0x0040
-SPEED_CMD = 0x0002
-ABSOLUTE_POSITION_CMD = 0x0001
+START_CMD = 0x0010 # 启动指令
+STOP_CMD = 0x0040 # 停止指令
+SPEED_CMD = 0x0002 # 速度指令
+ABSOLUTE_POSITION_CMD = 0x0001 # 绝对位置指令
+RELATIVE_POSITION_CMD = 0x0041 # 相对位置指令
handle1 = open_serial_port(
port=PORT,
@@ -51,11 +59,60 @@ def _check_handle_valid():
print(f"错误:{err_msg}")
raise RuntimeError(err_msg)
+
+def set_motor_position(station_addr: int, position_value: int) -> bool:
+ """
+ 设置伺服电机位置
+ :param station_addr: 从机地址
+ :param position_value: 位置值(32位整数) 正数:远离电机方向;负数:靠近电机方向
+ :return: True--设置成功,False--设置失败
+ """
+ _check_handle_valid()
+
+ try:
+ # 有符号32位位置值拆分为高16位和低16位
+ position_value = position_value & 0xFFFFFFFF # 确保为32位无符号整数
+ position_h = (position_value >> 16) & 0xFFFF # 高16位
+ position_l = position_value & 0xFFFF # 低16位
+ print(f"位置值拆分:{position_value} → 高16位:(0x{position_h:04x}) 低16位:(0x{position_l:04x})")
+
+ except Exception as e:
+ err_msg = f"位置值拆分失败 - {str(e)}"
+ print(f"错误:{err_msg}")
+ raise ValueError(err_msg) from e
+
+ try:
+ # 写入高16位到0x6201
+ ret_code_h = write_single_register(handle1, station_addr, POSITION_H_REG_ADDR,
+ position_h, resp_offset=0, use_crc=1)
+ if ret_code_h != ModbusError.MODBUS_SUCCESS:
+ err_desc = ModbusError.get_error_desc(ret_code_h)
+ raise OSError(f"位置高位写入失败:{err_desc}", ret_code_h)
+
+ # 写入低16位到 0x6202
+ ret_code_l = write_single_register(handle1, station_addr, POSITION_L_REG_ADDR,
+ position_l, resp_offset=0, use_crc=1)
+ if ret_code_l != ModbusError.MODBUS_SUCCESS:
+ err_desc = ModbusError.get_error_desc(ret_code_l)
+ raise OSError(f"位置低位写入失败:{err_desc}", ret_code_l)
+
+ print(f"成功:电机位置设置完成(从站{station_addr},位置值:{position_value})")
+ return True
+
+ except OSError:
+ raise
+ except Exception as e:
+ err_msg = f"位置设置异常 - {str(e)}"
+ print(f"异常:{err_msg}")
+ raise Exception(err_msg) from e
+
+
def set_motor_speed(station_addr: int, speed_value: int) -> bool:
"""
设置伺服电机速度
:param station_addr: 从机地址
- :param speed_value: 速度值
+ :param speed_value: 速度值 速度模式时,正负值表示方向,正值--远离电机方向,负值--靠近电机方向
+ 其他模式时,速度没有方向概念,只表示速度大小
:return: True--设置成功,False--设置失败
"""
_check_handle_valid()
@@ -64,7 +121,7 @@ def set_motor_speed(station_addr: int, speed_value: int) -> bool:
try:
value = handle_obj.decimal_to_16bit(speed_value)
- print(f"速度值转换:{speed_value} → {value}(0x{value:04X})")
+ # print(f"速度值转换:{speed_value} → {value}(0x{value:04X})")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
@@ -93,6 +150,7 @@ def set_motor_speed(station_addr: int, speed_value: int) -> bool:
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
+
def set_motor_acceleration(station_addr: int, value: int) -> bool:
"""
设置伺服电机加速度
@@ -106,7 +164,7 @@ def set_motor_acceleration(station_addr: int, value: int) -> bool:
try:
conv_value = handle_obj.decimal_to_16bit(value)
- print(f"加速度值转换:{value} → {conv_value}(0x{conv_value:04X})")
+ # print(f"加速度值转换:{value} → {conv_value}(0x{conv_value:04X})")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
@@ -138,6 +196,7 @@ def set_motor_acceleration(station_addr: int, value: int) -> bool:
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
+
def set_motor_deceleration(station_addr: int, value: int) -> bool:
"""
设置伺服电机减速度
@@ -151,7 +210,7 @@ def set_motor_deceleration(station_addr: int, value: int) -> bool:
try:
conv_value = handle_obj.decimal_to_16bit(value)
- print(f"减速度值转换:{value} → {conv_value}(0x{conv_value:04X})")
+ # print(f"减速度值转换:{value} → {conv_value}(0x{conv_value:04X})")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
@@ -178,24 +237,28 @@ def set_motor_deceleration(station_addr: int, value: int) -> bool:
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
-def set_motor_mode(station_addr: int, value: int) -> bool:
+
+def set_motor_mode(station_addr: int, mode: int) -> bool:
"""
设置电机模式
:param station_addr: 从机地址
- :param value: 0--速度模式 1--绝对位置模式
+ :param mode: 0--速度模式 1--绝对位置模式 2--相对位置模式
:return: bool 设置是否成功
"""
_check_handle_valid()
try:
- if value == 0:
+ if mode == 0:
write_cmd = SPEED_CMD
mode_desc = "速度模式"
- elif value == 1:
+ elif mode == 1:
write_cmd = ABSOLUTE_POSITION_CMD
mode_desc = "绝对位置模式"
+ elif mode == 2:
+ write_cmd = RELATIVE_POSITION_CMD
+ mode_desc = "相对位置模式"
- ret_code = write_single_register(handle1, station_addr, MODE_REG_ADDR, write_cmd, 0, 1)
+ ret_code = write_single_register(handle1, station_addr, MODE_REG_ADDR, write_cmd, resp_offset=0, use_crc=1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机模式设置完成(从站{station_addr},地址0x{MODE_REG_ADDR:04X},模式:{mode_desc})")
@@ -215,6 +278,7 @@ def set_motor_mode(station_addr: int, value: int) -> bool:
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
+
def get_motor_speed(station_addr: int) -> int:
"""
获取电机速度
@@ -247,6 +311,7 @@ def get_motor_speed(station_addr: int) -> int:
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
+
def get_motor_acceleration(station_addr: int) -> int:
"""
获取电机加速度
@@ -279,6 +344,7 @@ def get_motor_acceleration(station_addr: int) -> int:
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
+
def get_motor_deceleration(station_addr: int) -> int:
"""
获取电机减速度
@@ -310,6 +376,7 @@ def get_motor_deceleration(station_addr: int) -> int:
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
+
def move_motor(station_addr: int, value: bool) -> bool:
"""
启停电机
@@ -347,6 +414,7 @@ def move_motor(station_addr: int, value: bool) -> bool:
print(f"异常:{err_msg}")
raise Exception(err_msg)
+
def sync_motor_move(value: bool):
"""
同步传送带电机
@@ -372,25 +440,479 @@ def sync_motor_move(value: bool):
raise RuntimeError(err_msg) from e
-# ------------调试接口----------
-if __name__ == '__main__':
- # 配置传送带电机参数 只需配置一次
- set_motor_mode(1, 0) # 配置成速度模式
- set_motor_speed(1, -30)
- set_motor_acceleration(1, 50)
- set_motor_deceleration(1, 50)
+def read_motor_value(station_addr: int) -> int:
+ """
+ 读取电机位置值
+ :param station_addr: 从站地址
+ :return: 电机位置值
+ """
+ _check_handle_valid()
- set_motor_mode(2, 0) # 配置成速度模式
- set_motor_speed(2, -30)
- set_motor_acceleration(2, 50)
- set_motor_deceleration(2, 50)
+ handle_obj = RTU_HANDLE_MAP.get(handle1)
+
+ try:
+ reg_values = read_holding_register(handle1, station_addr, ENCODER_H_REG_ADDR,
+ reg_count=2, resp_offset=0, out_buffer=[], use_crc=1)
+ if not reg_values or len(reg_values) != 2:
+ err_msg = f"电机位置值读取失败,数据长度错误:{reg_values}"
+ print(f"错误:{err_msg}")
+ raise OSError(err_msg)
+
+ # 组合高16位和低16位
+ encoder_h = reg_values[0] # 高16位
+ encoder_l = reg_values[1] # 低16位
+ encoder_value = (encoder_h << 16) | encoder_l
+ # 转换为有符号数
+ signed_encoder_value = handle_obj.uint32_to_int32(encoder_value)
+
+ print(f"成功:读取电机位置值(从站{station_addr})-->高位:{encoder_h} 低位:{encoder_l} 总值:{signed_encoder_value}")
+ return signed_encoder_value
+
+ except OSError:
+ raise
+ except Exception as e:
+ err_msg = f"电机位置值读取异常 - {str(e)}"
+ print(f"异常:{err_msg}")
+ raise Exception(err_msg) from e
+
+
+def read_motor_value_command(station_addr: int) -> int:
+ """
+ 读取电机位置值--指令单位
+ :param station_addr: 从站地址
+ :return: 电机位置值
+ """
+ _check_handle_valid()
+
+ handle_obj = RTU_HANDLE_MAP.get(handle1)
+
+ try:
+ reg_values = read_holding_register(handle1, station_addr, COMMAND_H_REG_ADDR,
+ reg_count=2, resp_offset=0, out_buffer=[], use_crc=1)
+ if not reg_values or len(reg_values) != 2:
+ err_msg = f"电机位置值读取失败,数据长度错误:{reg_values}"
+ print(f"错误:{err_msg}")
+ raise OSError(err_msg)
+
+ # 组合高16位和低16位
+ command_value_h = reg_values[0] # 高16位
+ command_value_l = reg_values[1] # 低16位
+ command_value = (command_value_h << 16) | command_value_l
+ # 转换为有符号数
+ signed_command_value = handle_obj.uint32_to_int32(command_value)
+
+ print(f"成功:读取电机位置值(从站{station_addr})-->高位:{command_value_h} 低位:{command_value_l} 总值:{signed_command_value}")
+ return signed_command_value
+
+ except OSError:
+ raise
+ except Exception as e:
+ err_msg = f"电机位置值读取异常 - {str(e)}"
+ print(f"异常:{err_msg}")
+ raise Exception(err_msg) from e
+
+def read_position_error(station_addr: int) -> int:
+ """
+ 读取电机位置误差--编码器单位
+ :param station_addr: 从站地址
+ :return: 电机位置误差
+ """
+ _check_handle_valid()
+
+ handle_obj = RTU_HANDLE_MAP.get(handle1)
+
+ try:
+ reg_values = read_holding_register(handle1, station_addr, POSITION_ERROR_H_REG_ADDR,
+ reg_count=2, resp_offset=0, out_buffer=[], use_crc=1)
+ if not reg_values or len(reg_values) != 2:
+ err_msg = f"电机位置误差读取失败,数据长度错误:{reg_values}"
+ print(f"错误:{err_msg}")
+ raise OSError(err_msg)
+
+ # 组合高16位和低16位
+ position_error_h = reg_values[0] # 高16位
+ position_error_l = reg_values[1] # 低16位
+ position_error = (position_error_h << 16) | position_error_l
+ # 转换为有符号数
+ signed_position_error = handle_obj.uint32_to_int32(position_error)
+
+ print(f"成功:读取电机位置误差(从站{station_addr})-->高位:{position_error_h} 低位:{position_error_l} 总值:{signed_position_error}")
+ return signed_position_error
+
+ except OSError:
+ raise
+ except Exception as e:
+ err_msg = f"电机位置误差读取异常 - {str(e)}"
+ print(f"异常:{err_msg}")
+ raise Exception(err_msg) from e
+
+# ------------对外测试接口------------
+def test_speed_mode():
+ # 配置传送带电机参数 只需配置一次
+ # 速度模式
+ set_motor_mode(station_addr=1, mode=0) # 配置成速度模式
+ set_motor_speed(station_addr=1, speed_value=-30)
+ set_motor_acceleration(station_addr=1, acceleration=50)
+ set_motor_deceleration(station_addr=1, deceleration=50)
+
+ set_motor_mode(station_addr=2, mode=0) # 配置成速度模式
+ set_motor_speed(station_addr=2, speed_value=-30)
+ set_motor_acceleration(station_addr=2, acceleration=50)
+ set_motor_deceleration(station_addr=2, deceleration=50)
sync_motor_move(True)
- time.sleep(1)
+ time.sleep(2)
sync_motor_move(False)
time.sleep(0.5)
-
- while(True):
+
+ while True:
time.sleep(1)
+
+def test_absolute_position_mode():
+ # 绝对位置模式
+ set_motor_mode(station_addr=1, mode=1) # 配置成绝对位置模式
+ set_motor_position(station_addr=1, position_value=0) # 每圈10000脉冲,跑两圈
+ set_motor_speed(station_addr=1, speed_value=30)
+ set_motor_acceleration(station_addr=1, value=50)
+ set_motor_deceleration(station_addr=1, value=50)
+
+ set_motor_mode(station_addr=2, mode=1) # 配置成绝对位置模式
+ set_motor_position(station_addr=2, position_value=0)
+ set_motor_speed(station_addr=2, speed_value=30)
+ set_motor_acceleration(station_addr=2, value=50)
+ set_motor_deceleration(station_addr=2, value=50)
+
+ # 运行前查看电机位置初始值
+ print("\n=== 运行前电机位置初始值 ===")
+ motor1_initial = read_motor_value(station_addr=1)
+ motor2_initial = read_motor_value(station_addr=2)
+ print(f"电机1初始电机位置值: {motor1_initial}")
+ print(f"电机2初始电机位置值: {motor2_initial}")
+
+ sync_motor_move(True)
+
+ # 等待电机完成运动(根据实际情况调整等待时间)
+ time.sleep(5)
+
+ # 读取编码器值检查是否丢步
+ print("\n=== 检查电机是否丢步 ===")
+ target_position = 0 # 目标位置值
+
+ # 检查电机1
+ motor1 = read_motor_value(station_addr=1)
+ error1 = abs(motor1 - target_position)
+ print(f"电机1 - 目标位置: {target_position}, 实际位置: {motor1}, 误差: {error1}")
+
+ # 检查电机2
+ encoder2 = read_motor_value(station_addr=2)
+ error2 = abs(encoder2 - target_position)
+ print(f"电机2 - 目标位置: {target_position}, 实际位置: {encoder2}, 误差: {error2}")
+
+ encoder_unit3 = read_motor_value_command(station_addr=1)
+ encoder_unit4 = read_motor_value_command(station_addr=2)
+ print(f"电机1运动后电机的指令值: {encoder_unit3}")
+ print(f"电机2运动后电机的指令值: {encoder_unit4}")
+
+ # 读取电机位置误差
+ position_error1 = read_position_error(station_addr=1)
+ position_error2 = read_position_error(station_addr=2)
+ print(f"电机1位置误差: {position_error1}")
+ print(f"电机2位置误差: {position_error2}")
+
+ while True:
+ time.sleep(1)
+
+def test_relative_position_mode():
+ """
+ 相对位置模式测试(已修复CRC错误、数据符号、通信延时问题)
+ 功能:每3秒相对移动6000脉冲,实时监控编码器误差,误差>100自动补偿
+ """
+ try:
+ # ===================== 1. 基础配置 =====================
+ station_list = [1, 2]
+ base_pulse = 6000
+ interval = 3
+ error_threshold = 100
+ move_wait_time = 1.5
+
+ # 补偿值
+ compensate_map = {1: 0, 2: 0}
+
+ print("===== 相对位置模式 + 误差监控 + 自动补偿(稳定版)=====")
+ print(f"基础移动:{base_pulse} 脉冲 | 误差阈值:>{error_threshold}")
+
+ # 初始化电机
+ for station in station_list:
+ set_motor_mode(station, 2)
+ set_motor_speed(station, 30)
+ set_motor_acceleration(station, 50)
+ set_motor_deceleration(station, 50)
+ time.sleep(0.1)
+ print(f"✅ 电机{station} 配置完成")
+
+ print("\n===== 开始循环运动 =====")
+ count = 0
+
+ while True:
+ count += 1
+ print(f"\n==================== 第{count}次运动 ====================")
+
+ # 写入位置(带重试 + 间隔,解决CRC)
+ for station in station_list:
+ compensate = compensate_map[station]
+ final_pulse = base_pulse - compensate
+
+ # 关键修复:相对位置必须用 有符号32位
+ final_pulse = int(final_pulse)
+
+ retry = 3
+ while retry > 0:
+ try:
+ set_motor_position(station, final_pulse)
+ time.sleep(0.05)
+ break
+ except:
+ retry -= 1
+ time.sleep(0.1)
+ if retry == 0:
+ print(f"❌ 电机{station} 写入位置失败")
+
+ print(f"电机{station}:移动 {final_pulse} 脉冲")
+
+ # 启动
+ sync_motor_move(True)
+ time.sleep(move_wait_time)
+
+ # 停止
+ sync_motor_move(False)
+ time.sleep(0.2)
+
+ # 读取误差
+ print("\n---------------- 误差监控 ----------------")
+ for station in station_list:
+ try:
+ real_pos = read_motor_value(station)
+ pos_error = read_position_error(station)
+
+ if abs(pos_error) > error_threshold:
+ new_comp = pos_error - error_threshold
+ compensate_map[station] = new_comp
+ status = f"⚠️ 补偿:{new_comp}"
+ else:
+ compensate_map[station] = 0
+ status = "✅ 正常"
+
+ print(f"电机{station} | 实际:{real_pos} | 误差:{pos_error} | {status}")
+ except:
+ print(f"⚠️ 电机{station} 读取失败")
+
+ print(f"\n⏳ {interval}秒后继续...")
+ time.sleep(interval)
+
+ except KeyboardInterrupt:
+ print("\n🛑 手动停止")
+ sync_motor_move(False)
+ except Exception as e:
+ print(f"\n❌ 异常:{e}")
+ sync_motor_move(False)
+
+def test_cycle_relative_position_mode():
+ """
+ 相对位置模式 —— 前20次每次+5,之后固定使用6007脉冲
+ 稳定版:防CRC报错 + 自动重试
+ """
+ try:
+ # ===================== 配置 =====================
+ station_list = [1, 2]
+ base_pulse = 6007 # 初始脉冲
+ move_wait_time = 3 # 运动时间
+ send_interval = 2 # 间隔2秒
+ speed = 30
+ acc = 50
+ dec = 50
+
+ max_increase_times = 50 # 前20次递增
+ increase_step = 2 # 每次加5
+ current_count = 0 # 运行次数计数
+
+ print("===== 前20次每次+5,20次后恢复6007脉冲 =====")
+
+ # 初始化电机
+ for station in station_list:
+ set_motor_mode(station, 2)
+ time.sleep(0.1)
+ set_motor_speed(station, speed)
+ time.sleep(0.1)
+ set_motor_acceleration(station, acc)
+ time.sleep(0.1)
+ set_motor_deceleration(station, dec)
+ time.sleep(0.1)
+ print(f"✅ 电机{station} 初始化完成")
+
+ # ===================== 主循环 =====================
+ while True:
+ current_count += 1
+ print(f"\n==================== 第 {current_count} 次运动 ====================")
+
+ # ============== 核心逻辑 ==============
+ if current_count <= max_increase_times:
+ # 前20次:每次 +5
+ send_pulse = base_pulse + current_count * increase_step
+ print(f"📈 前20次递增 → 本次发送:{send_pulse}")
+ else:
+ # 20次以后:固定 6007
+ send_pulse = 6007
+ print(f"✅ 已超过20次 → 固定发送:{send_pulse}")
+
+ # ========== 写入位置(带重试)==========
+ for station in station_list:
+ retry = 5
+ while retry > 0:
+ try:
+ set_motor_position(station, send_pulse)
+ time.sleep(0.1)
+ break
+ except Exception as e:
+ retry -= 1
+ print(f"⚠️ 电机{station} 写入失败,重试 {retry} 次")
+ time.sleep(0.2)
+
+ # ========== 启动电机 ==========
+ sync_motor_move(True)
+
+ # ========== 运动中实时监控误差 ==========
+ print("\n------------ 运动中实时误差 ------------")
+ start_time = time.time()
+ while time.time() - start_time < move_wait_time:
+ for station in station_list:
+ try:
+ err = read_position_error(station)
+ print(f"电机{station} 实时误差 → {err}")
+ except:
+ pass
+ time.sleep(0.1)
+
+ # ========== 停止 + 等待 ==========
+ sync_motor_move(False)
+ time.sleep(0.3)
+ print(f"\n✅ 运动完成,等待 {send_interval} 秒后继续...")
+ time.sleep(send_interval)
+
+ except KeyboardInterrupt:
+ print("\n🛑 手动停止")
+ try:
+ sync_motor_move(False)
+ except:
+ pass
+ except Exception as e:
+ print(f"\n❌ 异常:{e}")
+ try:
+ sync_motor_move(False)
+ except:
+ pass
+
+def test_cycle_relative_position_mode_test():
+ """
+ 相对位置模式 —— 电机1每次+2,电机2每次+4
+ 稳定版:防CRC报错 + 自动重试
+ """
+ try:
+ # ===================== 配置 =====================
+ station_list = [1, 2]
+ base_pulse = 6007 # 初始基准脉冲
+ move_wait_time = 3 # 运动时间
+ send_interval = 2 # 间隔2秒
+ speed = 30
+ acc = 50
+ dec = 50
+
+ current_count = 0 # 运行次数计数
+
+ print("===== 电机1每次+2,电机2每次+4 循环运行 =====")
+
+ # 初始化电机
+ for station in station_list:
+ set_motor_mode(station, 2)
+ time.sleep(0.1)
+ set_motor_speed(station, speed)
+ time.sleep(0.1)
+ set_motor_acceleration(station, acc)
+ time.sleep(0.1)
+ set_motor_deceleration(station, dec)
+ time.sleep(0.1)
+ print(f"✅ 电机{station} 初始化完成")
+
+ # ===================== 主循环 =====================
+ while True:
+ current_count += 1
+ print(f"\n==================== 第 {current_count} 次运动 ====================")
+
+ # ============== 核心:两台电机不同增量 ==============
+ pulse_1 = base_pulse + current_count * 10 # 电机1:每次 +2 靠近出料口的电机
+ pulse_2 = base_pulse + current_count * 5 # 电机2:每次 +4 远离出料口的电机
+
+ print(f"📌 电机1本次脉冲:{pulse_1}")
+ print(f"📌 电机2本次脉冲:{pulse_2}")
+
+ # ========== 分别写入两台电机(带重试)==========
+ # 写入电机1
+ retry = 5
+ while retry > 0:
+ try:
+ set_motor_position(1, pulse_1)
+ time.sleep(0.1)
+ break
+ except Exception as e:
+ retry -= 1
+ print(f"⚠️ 电机1写入失败,重试 {retry} 次")
+ time.sleep(0.2)
+
+ # 写入电机2
+ retry = 5
+ while retry > 0:
+ try:
+ set_motor_position(2, pulse_2)
+ time.sleep(0.1)
+ break
+ except Exception as e:
+ retry -= 1
+ print(f"⚠️ 电机2写入失败,重试 {retry} 次")
+ time.sleep(0.2)
+
+ # ========== 启动电机 ==========
+ sync_motor_move(True)
+
+ # ========== 运动中实时监控误差 ==========
+ print("\n------------ 运动中实时误差 ------------")
+ start_time = time.time()
+ while time.time() - start_time < move_wait_time:
+ for station in station_list:
+ try:
+ err = read_position_error(station)
+ print(f"电机{station} 实时误差 → {err}")
+ except:
+ pass
+ time.sleep(0.1)
+
+ time.sleep(1)
+ print(f"\n✅ 运动完成,等待 {send_interval} 秒后继续...")
+ time.sleep(send_interval)
+
+ except KeyboardInterrupt:
+ print("\n🛑 手动停止")
+ try:
+ sync_motor_move(False)
+ except:
+ pass
+ except Exception as e:
+ print(f"\n❌ 异常:{e}")
+ try:
+ sync_motor_move(False)
+ except:
+ pass
+
+# ------------调试接口----------
+if __name__ == '__main__':
+ test_cycle_relative_position_mode()
diff --git a/conveyor_controller/modbus.py b/conveyor_controller/modbus.py
index 54d547a..3493914 100644
--- a/conveyor_controller/modbus.py
+++ b/conveyor_controller/modbus.py
@@ -161,10 +161,29 @@ class RTUSerialHandle:
else:
return unsigned_value
+ def uint32_to_int32(self, unsigned_value: int) -> int:
+ """
+ 将32位无符号十进制数转换为32位有符号十进制数
+ :param unsigned_value: 无符号十进制数
+ :return: 有符号十进制数
+ """
+ # 先校验输入范围(必须是32位无符号数)
+ if not isinstance(unsigned_value, int):
+ raise ValueError(f"输入必须是整数,当前是{type(unsigned_value)}")
+ if unsigned_value < 0 or unsigned_value > 4294967295:
+ raise ValueError(f"输入必须是0~4294967295的整数,当前是{unsigned_value}")
+
+ # 核心转换逻辑
+ if unsigned_value > 2147483647: # 0x7FFFFFFF
+ return unsigned_value - 4294967296 # 0x100000000
+ else:
+ return unsigned_value
+
def __del__(self):
"""析构函数,程序退出时自动关闭串口,防止资源泄露"""
self.close()
+
# -------对外接口--------
# public
def open_serial_port(port: str, baudrate: int, databits: int, stopbits: int, parity: int) -> Optional[int]:
@@ -223,6 +242,7 @@ def open_serial_port(port: str, baudrate: int, databits: int, stopbits: int, par
print(f"串口[{port}]打开成功,句柄ID:{handle_id}")
return handle_id
+
def close_serial_port(handle: int):
"""
关闭Modbus串口
@@ -245,6 +265,7 @@ def close_serial_port(handle: int):
print(f"句柄{handle}(串口[{handle_obj.port}])已关闭")
+
# --------Modbus RTU CRC16校验函数-----------
def modbus_crc16(data: bytes) -> bytes:
"""
@@ -269,6 +290,7 @@ def modbus_crc16(data: bytes) -> bytes:
# 把16位CRC值拆成2个字节(小端序:低字节在前,高字节在后)
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
+
def verify_modbus_crc(data: bytes) -> bool:
"""
验证Modbus RTU数据的CRC16校验码
@@ -411,6 +433,7 @@ def read_holding_register(handle: int, station_addr: int, start_reg_addr: int,
print(f"读寄存器成功 | 从站{station_addr} | 起始地址{start_reg_addr} | 数量{reg_count} | 数据:{out_buffer}")
return out_buffer
+
def write_single_register(handle: int, station_addr: int, reg_addr: int, write_value: int,
resp_offset: int, use_crc: int) -> int:
"""
@@ -498,9 +521,10 @@ def write_single_register(handle: int, station_addr: int, reg_addr: int, write_v
print(f"写响应不匹配 | 请求:{expected_resp.hex(' ')} | 响应:{response.hex(' ')}")
return ModbusError.MODBUS_ERR_RESPONSE
- print(f"写单个寄存器成功 | 从站{station_addr} | 地址{reg_addr} | 值{write_value}")
+ # print(f"写单个寄存器成功 | 从站{station_addr} | 地址{reg_addr} | 值{write_value}")
return ModbusError.MODBUS_SUCCESS
+
def write_multi_register(handle: int, station_addr: int, start_reg_addr: int, reg_count: int,
write_values: list[int], resp_offset: int, use_crc: int) -> int:
"""
@@ -620,6 +644,7 @@ def write_multi_register(handle: int, station_addr: int, start_reg_addr: int, re
print(f"批量写寄存器成功 | 从站{station_addr} | 起始地址{start_reg_addr} | 数量{reg_count} | 值:{write_values}")
return ModbusError.MODBUS_SUCCESS
+
# ---------测试接口--------
if __name__ == '__main__':
# handle1 = open_serial_port(
diff --git a/img.png b/img.png
new file mode 100644
index 0000000..9f631ca
Binary files /dev/null and b/img.png differ
diff --git a/readme.md b/readme.md
index 9e859af..5c3b791 100644
--- a/readme.md
+++ b/readme.md
@@ -13,22 +13,35 @@ sudo chmod 666 /dev/ttyACM0
# python版本
3.9
-# 过年之后调试,需要注意
-## 1.修改网络继电器的IP,将1网段改成5网段
-
# 各设备IP地址
-工控机:192.168.5.50
+工控机:192.168.5.50 用户名:teamhd 密码:teamhd
笔记本:192.168.5.105
网络继电器:192.168.5.18
RK1106:192.168.5.100
-相机1:192.168.5.10
-相机2:192.168.5.11
+相机1:192.168.5.10(黑色)
+相机2:192.168.5.11(黑色)
+相机3:192.168.5.164(白色) 账号:admin 密码:XJ123456
# 网络继电器传感器:
-1、传感器1:DI5
+1、传感器1:DI5 (右边那个)
2、传感器2:DI4
-3、光纤传感器:DI1
+3、光纤传感器:DI1 拉低延时125ms(当有信号时,物体离开,信号会持续125ms)
4、双按压传感器:DI2、3
5、吸取装置电磁阀:DO4、5、6、7、8
+# 传送带伺服电机使用:
+## 使能:P04.00将01值改成83(失能就是把83值改成01)
+
+## 该型号伺服电机,每旋转一圈指令单位为10000,编码器单位为2^17=131072(该电机为17位电机)
+
+指令单位:编码器单位 = 10000 / 131072 = 0.78125
+## 参数:刚性--15,惯量比--1250
+
+电机:小轴 = 8 :15 = 1.875 10000个脉冲走25.8cm
+ 6007个脉冲走15.5cm
+
+## PR模式:将P00.01设为6(PR模式)
+### PR模式下的绝对位置模式,设置原点:
+
+
diff --git a/servo/servo_control.py b/servo/servo_control.py
index 42e114c..0ee8d1f 100644
--- a/servo/servo_control.py
+++ b/servo/servo_control.py
@@ -33,6 +33,7 @@ custom_config = {
'acc': ACC,
}
+
class ServoInterface:
def __init__(self, config=None):
self.servo_controller = ServoController(config)
@@ -46,9 +47,11 @@ class ServoInterface:
except Exception as e:
raise RuntimeError(f"舵机初始化失败:{str(e)}") from e
+
# 全局变量
_Servo = ServoInterface(custom_config)
+
def _degree_to_raw(degree: int) -> int:
"""
角度转原始值
@@ -61,6 +64,7 @@ def _degree_to_raw(degree: int) -> int:
return max(0, min(raw_value, 4095))
+
def _raw_to_degree(raw: int) -> float:
"""
原始值转角度
@@ -69,6 +73,7 @@ def _raw_to_degree(raw: int) -> float:
"""
return round(raw * RAW_TO_DEGREE_RATIO, 2)
+
def set_servo_speed(value: int):
"""
设置舵机速度
@@ -82,6 +87,7 @@ def set_servo_speed(value: int):
else:
raise ValueError("速度值超出限定范围")
+
def set_servo_acceleration(value: int):
"""
设置舵机加速度
@@ -94,6 +100,7 @@ def set_servo_acceleration(value: int):
else:
raise ValueError("加速度值超出限定范围")
+
def set_servo_ori_position(ori_position: int):
"""
设置舵机原点位
@@ -103,6 +110,7 @@ def set_servo_ori_position(ori_position: int):
POS_START = _degree_to_raw(ori_position)
print(f"舵机原点位置已设置为:{ori_position}度(对应原始值:{POS_START})")
+
def set_servo_rot_position(rot_position: int):
"""
设置舵机翻转位置
@@ -112,6 +120,7 @@ def set_servo_rot_position(rot_position: int):
POS_END = _degree_to_raw(rot_position)
print(f"舵机翻转位置已设置为:{rot_position}度(对应原始值:{POS_END})")
+
def move_to_rot_position():
"""舵机旋转到翻转位置"""
try:
@@ -123,6 +132,7 @@ def move_to_rot_position():
except Exception as e:
raise RuntimeError(f"舵机移动到翻转位置失败:{str(e)}") from e
+
def move_to_ori_position():
"""舵机旋转到原点"""
try:
@@ -134,6 +144,7 @@ def move_to_ori_position():
except Exception as e:
raise RuntimeError(f"舵机移动到原点位置失败:{str(e)}") from e
+
# ----------调试接口----------
if __name__ == '__main__':
set_servo_speed(1500)
@@ -143,9 +154,9 @@ if __name__ == '__main__':
move_to_rot_position() # 旋转180度
time.sleep(1)
- move_to_ori_position() # 旋转0度
- time.sleep(1)
+ # move_to_ori_position() # 旋转0度
+ # time.sleep(1)
- while(True):
+ while True:
time.sleep(1)
diff --git a/servo/servo_test.py b/servo/servo_test.py
index 2386b9e..583e9cf 100644
--- a/servo/servo_test.py
+++ b/servo/servo_test.py
@@ -11,7 +11,7 @@ import logging
# -------------参数配置--------------
BAUDRATE = 115200 # 舵机的波特率
-PORT = 'COM4'
+PORT = '/dev/ttyACM0 '
SERVO_IDS = [1, 2, 3, 4, 5] # 舵机们的 ID 号
POS_START = 2047
POS_END = 0
@@ -20,14 +20,15 @@ ACC = 0
TIME_INTERVAL1 = ((2047-0) / 1500) + 2 # 翻转回来的时间
TIME_INTERVAL2 = 10 # 不用翻转运行的时间
+
class ServoController:
def __init__(self, config=None):
"""
初始化舵机控制器
"""
self.config = config
- self.time_interval1 = self.config['time_interval1']
- self.time_interval2 = self.config['time_interval2']
+ # self.time_interval1 = self.config['time_interval1']
+ # self.time_interval2 = self.config['time_interval2']
# 初始化串口和舵机处理器
self.port_handler = PortHandler(self.config['port'])
@@ -130,14 +131,14 @@ class ServoController:
self.config['speed'],
self.config['acc'])
print("运动到180度")
- time.sleep(self.time_interval1)
+ time.sleep(2)
# 运动到结束位置(0度)
self.write_position(self.config['pos_end'],
self.config['speed'],
self.config['acc'])
print("运动到0度")
- time.sleep(self.time_interval2)
+ time.sleep(2)
def run(self):
"""循环运行:复位位置之后,先转到180度,延时TIME_INTERVAL1一段时间,再转到0度"""