93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
|
|
#!/usr/bin/env python 主程序
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
'''
|
|||
|
|
# @Time : 2025/7/21
|
|||
|
|
# @Author : hx
|
|||
|
|
# @File : can_main.py
|
|||
|
|
'''
|
|||
|
|
|
|||
|
|
import can
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
# 导入你写的 generate_position_command 函数
|
|||
|
|
from motor_control_can_all import generate_position_command
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========================================
|
|||
|
|
# 发送CAN帧函数
|
|||
|
|
# ========================================
|
|||
|
|
def send_can_frame(data, can_id=0x01, channel='vcan0', interface='socketcan'):
|
|||
|
|
"""
|
|||
|
|
发送CAN帧
|
|||
|
|
:param data: 7字节的列表
|
|||
|
|
:param can_id: CAN ID
|
|||
|
|
:param channel: CAN 通道
|
|||
|
|
:param interface: 总线类型
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
bus = can.interface.Bus(channel=channel, interface=interface)
|
|||
|
|
msg = can.Message(arbitration_id=can_id, data=data, is_extended_id=False)
|
|||
|
|
bus.send(msg)
|
|||
|
|
print(f"[发送CAN帧] ID={can_id:02X} 数据=[{', '.join(f'0x{x:02X}' for x in data)}]")
|
|||
|
|
bus.shutdown()
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[发送失败] {e}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========================================
|
|||
|
|
# 监听CAN反馈函数
|
|||
|
|
# ========================================
|
|||
|
|
def listen_can_feedback(channel='vcan0', interface='socketcan', timeout=2.0):
|
|||
|
|
"""
|
|||
|
|
监听是否有CAN反馈数据
|
|||
|
|
:param channel: CAN 通道
|
|||
|
|
:param interface: 总线类型
|
|||
|
|
:param timeout: 等待反馈的超时时间(秒)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
bus = can.interface.Bus(channel=channel, interface=interface)
|
|||
|
|
print(f"[监听反馈] 等待来自 {channel} 的反馈({timeout}秒超时)...")
|
|||
|
|
msg = bus.recv(timeout=timeout)
|
|||
|
|
if msg:
|
|||
|
|
print(f"[收到反馈] ID={msg.arbitration_id:02X} 数据=[{', '.join(f'0x{x:02X}' for x in msg.data)}]")
|
|||
|
|
else:
|
|||
|
|
print("[未收到反馈]")
|
|||
|
|
bus.shutdown()
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[监听失败] {e}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ========================================
|
|||
|
|
# 主函数:发送位置控制指令并监听反馈
|
|||
|
|
# ========================================
|
|||
|
|
|
|||
|
|
def can_motor_contral(direction=0, angle=100, microstep=32, can_id=0x01, channel='vcan0', listen_feedback=False):
|
|||
|
|
print("=== 开始发送CAN位置控制指令 ===")
|
|||
|
|
print(f"参数:方向={direction}(0=逆时针,1=顺时针),角度={angle}°,细分={microstep}")
|
|||
|
|
|
|||
|
|
# 生成CAN数据帧
|
|||
|
|
can_data = generate_position_command(direction=direction, microstep=microstep, angle=angle)
|
|||
|
|
print(f"生成的CAN数据帧: [{', '.join(f'0x{x:02X}' for x in can_data)}]")
|
|||
|
|
|
|||
|
|
# 发送CAN帧
|
|||
|
|
send_can_frame(can_data, can_id=can_id, channel=channel)
|
|||
|
|
|
|||
|
|
# 如果启用监听,等待反馈
|
|||
|
|
if listen_feedback:
|
|||
|
|
listen_can_feedback(channel=channel)
|
|||
|
|
|
|||
|
|
print("=== 电机控制流程完成 ===")
|
|||
|
|
|
|||
|
|
# ========================================
|
|||
|
|
# 程序入口(直接运行时使用)
|
|||
|
|
# ========================================
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 这里写死参数,方便调试
|
|||
|
|
can_motor_contral(
|
|||
|
|
direction=1, # 顺时针
|
|||
|
|
angle=180, # 180度
|
|||
|
|
microstep=16, # 细分值16
|
|||
|
|
can_id=0x02, # CAN ID 0x02
|
|||
|
|
channel='vcan0', # 使用虚拟CAN
|
|||
|
|
listen_feedback=True # 是否监听反馈
|
|||
|
|
)
|