线条厂各设备控制代码V1.0
This commit is contained in:
3
.idea/.gitignore
generated
vendored
Normal file
3
.idea/.gitignore
generated
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
# 默认忽略的文件
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
18
.idea/inspectionProfiles/Project_Default.xml
generated
Normal file
18
.idea/inspectionProfiles/Project_Default.xml
generated
Normal file
@ -0,0 +1,18 @@
|
||||
<component name="InspectionProjectProfileManager">
|
||||
<profile version="1.0">
|
||||
<option name="myName" value="Project Default" />
|
||||
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
|
||||
<option name="ignoredPackages">
|
||||
<value>
|
||||
<list size="5">
|
||||
<item index="0" class="java.lang.String" itemvalue="scipy" />
|
||||
<item index="1" class="java.lang.String" itemvalue="numpy" />
|
||||
<item index="2" class="java.lang.String" itemvalue="snap7" />
|
||||
<item index="3" class="java.lang.String" itemvalue="jsonchema" />
|
||||
<item index="4" class="java.lang.String" itemvalue="werkzeung" />
|
||||
</list>
|
||||
</value>
|
||||
</option>
|
||||
</inspection_tool>
|
||||
</profile>
|
||||
</component>
|
||||
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
@ -0,0 +1,6 @@
|
||||
<component name="InspectionProjectProfileManager">
|
||||
<settings>
|
||||
<option name="USE_PROJECT_PROFILE" value="false" />
|
||||
<version value="1.0" />
|
||||
</settings>
|
||||
</component>
|
||||
8
.idea/modules.xml
generated
Normal file
8
.idea/modules.xml
generated
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/wire_controlsystem.iml" filepath="$PROJECT_DIR$/.idea/wire_controlsystem.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
||||
6
.idea/vcs.xml
generated
Normal file
6
.idea/vcs.xml
generated
Normal file
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
12
.idea/wire_controlsystem.iml
generated
Normal file
12
.idea/wire_controlsystem.iml
generated
Normal file
@ -0,0 +1,12 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PyDocumentationSettings">
|
||||
<option name="format" value="PLAIN" />
|
||||
<option name="myDocStringFormat" value="Plain" />
|
||||
</component>
|
||||
</module>
|
||||
257
DM/DM_Motor_test.py
Normal file
257
DM/DM_Motor_test.py
Normal file
@ -0,0 +1,257 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
# @Time : 2026/1/6 13:55
|
||||
# @Author : reenrr
|
||||
# @File : DM_Motor_test.py
|
||||
# @Desc : 达妙电机测试
|
||||
'''
|
||||
from DM_CAN import *
|
||||
import serial
|
||||
import time
|
||||
|
||||
# -------------------------- 电机参数配置 --------------------------
|
||||
SLAVE_ID = 0x01
|
||||
MASTER_ID = 0x11
|
||||
PORT = 'COM10'
|
||||
BAUDRATE = 921600
|
||||
|
||||
|
||||
class DMMotorController:
|
||||
"""达妙电机控制器类"""
|
||||
def __init__(self, slave_id=None, master_id=None, port=None, baudrate=None):
|
||||
"""
|
||||
初始化电机控制器
|
||||
:param slave_id: 从机ID,默认0x01
|
||||
:param master_id: 主机ID,默认0x11
|
||||
:param port: 串口端口,默认COM6
|
||||
:param baudrate: 波特率,默认921600
|
||||
"""
|
||||
# 初始化参数
|
||||
self.slave_id = slave_id if slave_id is not None else SLAVE_ID
|
||||
self.master_id = master_id if master_id is not None else MASTER_ID
|
||||
self.port = port if port is not None else PORT
|
||||
self.baudrate = baudrate if baudrate is not None else BAUDRATE
|
||||
|
||||
# 核心属性初始化
|
||||
self.serial_device = None # 串口设备
|
||||
self.motor = None # 电机实例
|
||||
self.motor_control = None # 电机控制器实例
|
||||
|
||||
# 初始化电机和串口
|
||||
self.init_motor()
|
||||
|
||||
def init_motor(self):
|
||||
"""初始化电机和串口"""
|
||||
try:
|
||||
# 1.初始化串口
|
||||
self.serial_device = serial.Serial(
|
||||
port=self.port,
|
||||
baudrate=self.baudrate,
|
||||
timeout=0.5
|
||||
)
|
||||
|
||||
# 2.创建电机实例
|
||||
self.motor = Motor(DM_Motor_Type.DM4310, self.slave_id, self.master_id)
|
||||
# 3.创建电机控制器实例
|
||||
self.motor_control = MotorControl(self.serial_device)
|
||||
# 4.添加电机
|
||||
self.motor_control.addMotor(self.motor)
|
||||
|
||||
print(f"✅ 电机初始化成功")
|
||||
print(f" - 串口:{self.port} | 波特率:{self.baudrate}")
|
||||
print(f" - 从机ID:{hex(self.slave_id)} | 主机ID:{hex(self.master_id)}")
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"❌ 电机初始化失败:{e}")
|
||||
|
||||
def switch_control_mode(self, control_type):
|
||||
"""
|
||||
切换电机控制模式
|
||||
:param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT)
|
||||
:return: 切换成功返回True,否则返回False
|
||||
"""
|
||||
try:
|
||||
result = self.motor_control.switchControlMode(self.motor, control_type)
|
||||
mode_name = self._get_mode_name(control_type)
|
||||
if result:
|
||||
print(f"✅ 切换到{mode_name}模式成功")
|
||||
# 切换模式后保存参数
|
||||
self.motor_control.save_motor_param(self.motor)
|
||||
else:
|
||||
print(f"❌ 切换到{mode_name}模式失败")
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"❌ 切换模式出错:{str(e)}")
|
||||
return False
|
||||
|
||||
def enable_motor(self):
|
||||
"""使能电机"""
|
||||
try:
|
||||
self.motor_control.enable(self.motor)
|
||||
print("✅ 电机使能成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ 电机使能失败:{str(e)}")
|
||||
return False
|
||||
|
||||
def disable_motor(self):
|
||||
"""失能电机"""
|
||||
try:
|
||||
self.motor_control.disable(self.motor)
|
||||
print("✅ 电机失能成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ 电机失能失败:{str(e)}")
|
||||
return False
|
||||
|
||||
def control_pos_vel(self, p_desired, v_desired):
|
||||
"""
|
||||
位置-速度模式控制
|
||||
:param p_desired: 目标位置(rad, 范围[-300, 300])
|
||||
:param v_desired: 目标速度(rad/s, 范围[-30, 30])
|
||||
"""
|
||||
try:
|
||||
# 归零 + 发送运动指令
|
||||
self.motor_control.set_zero_position(self.motor)
|
||||
self.motor_control.control_Pos_Vel(self.motor, p_desired, v_desired)
|
||||
time.sleep(0.1)
|
||||
print(f"✅ 位置-速度控制:位置={p_desired}rad | 速度={v_desired}rad/s")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 位置-速度控制出错:{str(e)}")
|
||||
return False
|
||||
|
||||
def close_serial(self):
|
||||
"""关闭串口"""
|
||||
try:
|
||||
if self.serial_device and self.serial_device.is_open:
|
||||
self.serial_device.close()
|
||||
print("✅ 串口关闭成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ 串口关闭失败:{str(e)}")
|
||||
return False
|
||||
|
||||
def _get_mode_name(self, control_type):
|
||||
"""
|
||||
获取模式名称
|
||||
:param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT)
|
||||
"""
|
||||
# 1.定义[枚举值--中文名称]的映射字典
|
||||
mode_map = {
|
||||
Control_Type.POS_VEL: "位置-速度模式",
|
||||
Control_Type.VEL: "速度模式",
|
||||
Control_Type.MIT: "MIT模式"
|
||||
}
|
||||
# 2.根据控制模式值获取中文名称 字典方法
|
||||
return mode_map.get(control_type, "未知模式") # “未知模式”默认值
|
||||
|
||||
def save_param(self):
|
||||
"""保存所有电机参数"""
|
||||
try:
|
||||
if self.motor is None:
|
||||
raise ValueError("电机实例为None,无法保存参数")
|
||||
|
||||
self.motor_control.save_motor_param(self.motor)
|
||||
print("电机参数保存成功")
|
||||
except Exception as e:
|
||||
print(f"❌ 电机参数保存失败:{str(e)}")
|
||||
|
||||
def refresh_motor_status(self):
|
||||
"""获得电机状态"""
|
||||
try:
|
||||
if self.motor is None:
|
||||
raise ValueError("电机实例为None,无法保存参数")
|
||||
|
||||
self.motor_control.refresh_motor_status(self.motor)
|
||||
print("电机状态刷新成功")
|
||||
except Exception as e:
|
||||
print(f"❌ 电机状态刷新失败:{str(e)}")
|
||||
|
||||
def get_position(self):
|
||||
"""获取电机位置"""
|
||||
try:
|
||||
if self.motor is None:
|
||||
raise ValueError("电机实例为None,无法保存参数")
|
||||
|
||||
position = self.motor.getPosition()
|
||||
print(f"获取电机位置成功,当前位置: {position}")
|
||||
return position
|
||||
except Exception as e:
|
||||
print(f"获取电机位置失败: {str(e)}")
|
||||
|
||||
def change_limit_param(self, motor_type, pmax, vmax, tmax):
|
||||
"""
|
||||
改变电机的PMAX VMAX TMAX
|
||||
:param motor_type: 电机的类型
|
||||
:param pmax: 电机的PMAX
|
||||
:param vmax: 电机的VMAX
|
||||
:param tmax: 电机的TAMX
|
||||
"""
|
||||
try:
|
||||
self.motor_control.change_limit_param(motor_type, pmax, vmax, tmax)
|
||||
print(
|
||||
f"电机限位参数修改成功 | 类型: {motor_type} | PMAX: {pmax} | VMAX: {vmax} | TMAX: {tmax}"
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"修改电机限位参数失败: {str(e)}")
|
||||
|
||||
def __del__(self):
|
||||
"""析构函数:确保程序退出时失能电机、关闭串口"""
|
||||
try:
|
||||
# 先检查串口是否打开,避免重复操作
|
||||
if self.serial_device and self.serial_device.is_open:
|
||||
self.disable_motor()
|
||||
self.close_serial()
|
||||
else:
|
||||
# 串口已关闭,无需重复操作,仅打印日志
|
||||
print("ℹ️ 串口已关闭,析构函数无需重复释放资源")
|
||||
except Exception as e:
|
||||
print(f"ℹ️ 析构函数执行警告:{str(e)}")
|
||||
|
||||
|
||||
def dm_motor_control():
|
||||
# 1.创建电机控制器实例
|
||||
motor_controller = DMMotorController(
|
||||
slave_id=SLAVE_ID,
|
||||
master_id=MASTER_ID,
|
||||
port=PORT,
|
||||
baudrate=BAUDRATE
|
||||
)
|
||||
|
||||
try:
|
||||
# 切换到位置-速度模式
|
||||
motor_controller.switch_control_mode(Control_Type.POS_VEL)
|
||||
|
||||
# 使能电机
|
||||
motor_controller.enable_motor()
|
||||
|
||||
# 循环控制电机
|
||||
while True:
|
||||
print("运动前的位置", motor_controller.get_position()) # 需要测试断电后是否能读取得到
|
||||
motor_controller.control_pos_vel(p_desired=282.6, v_desired=30) # 450mm 665-215
|
||||
time.sleep(20)
|
||||
|
||||
motor_controller.refresh_motor_status()
|
||||
print("运动1的位置", motor_controller.get_position()) # 刷新的比较慢,可以等位置不变一段时间之后,再获取位置
|
||||
motor_controller.refresh_motor_status()
|
||||
|
||||
motor_controller.control_pos_vel(p_desired=-282.6, v_desired=30)
|
||||
time.sleep(20)
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n⚠️ 用户手动停止程序")
|
||||
except Exception as e:
|
||||
print(f"\n❌ 程序运行出错:{str(e)}")
|
||||
finally:
|
||||
# 5. 无论是否出错,最终都要失能电机、关闭串口
|
||||
motor_controller.disable_motor()
|
||||
motor_controller.close_serial()
|
||||
print("✅ 程序正常退出")
|
||||
|
||||
# ---------调试接口----------
|
||||
if __name__ == '__main__':
|
||||
dm_motor_control()
|
||||
369
EMV/EMV.py
Normal file
369
EMV/EMV.py
Normal file
@ -0,0 +1,369 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
# @Time : 2025/12/12 14:39
|
||||
# @Author : reenrr
|
||||
# @File : EMV.py
|
||||
# @Desc : 网络继电器控制输入、输出设备
|
||||
'''
|
||||
import socket
|
||||
import binascii
|
||||
import time
|
||||
from threading import Event, Lock
|
||||
import threading
|
||||
import logging
|
||||
|
||||
# 网络继电器的 IP 和端口
|
||||
HOST = '192.168.1.18'
|
||||
PORT = 50000
|
||||
|
||||
# 控件命名映射
|
||||
SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1
|
||||
SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2
|
||||
SOLENOID_VALVE3 = 'solenoid_valve3' # 控制吸取设备的电磁阀3
|
||||
|
||||
# 传感器命名映射
|
||||
CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关
|
||||
CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关
|
||||
PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1
|
||||
PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2
|
||||
FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器
|
||||
|
||||
# 控件控制报文
|
||||
valve_commands = {
|
||||
SOLENOID_VALVE1: {
|
||||
'open': '00000000000601050000FF00',
|
||||
'close': '000000000006010500000000',
|
||||
},
|
||||
SOLENOID_VALVE2: {
|
||||
'open': '00000000000601050001FF00',
|
||||
'close': '000000000006010500010000',
|
||||
},
|
||||
SOLENOID_VALVE3: {
|
||||
'open': '00000000000601050002FF00',
|
||||
'close': '000000000006010500020000',
|
||||
}
|
||||
}
|
||||
|
||||
# 读取状态命令
|
||||
read_status_command = {
|
||||
'devices': '000000000006010100000008',
|
||||
'sensors': '000000000006010200000008'
|
||||
}
|
||||
|
||||
# 控件对应 DO 位(从低到高)
|
||||
device_bit_map = {
|
||||
SOLENOID_VALVE1: 0,
|
||||
SOLENOID_VALVE2: 1,
|
||||
SOLENOID_VALVE3: 2,
|
||||
}
|
||||
|
||||
device_name_map = {
|
||||
SOLENOID_VALVE1: "电磁阀1",
|
||||
SOLENOID_VALVE2: "电磁阀2",
|
||||
SOLENOID_VALVE3: "电磁阀3",
|
||||
}
|
||||
|
||||
# 传感器对应位(从低到高)
|
||||
sensor_bit_map = {
|
||||
CONVEYOR1_SENSOR: 0,
|
||||
CONVEYOR2_SENSOR: 1,
|
||||
PRESS_SENSOR1: 2,
|
||||
PRESS_SENSOR2: 3,
|
||||
FIBER_SENSOR: 6
|
||||
# 根据你继电器的配置,继续添加更多传感器
|
||||
}
|
||||
|
||||
sensor_name_map = {
|
||||
CONVEYOR1_SENSOR: '传送带1开关',
|
||||
CONVEYOR2_SENSOR: '传送带2开关',
|
||||
PRESS_SENSOR1: '按压开关1',
|
||||
PRESS_SENSOR2: '按压开关2',
|
||||
FIBER_SENSOR: '光纤传感器'
|
||||
}
|
||||
|
||||
# -------------全局事件-------------
|
||||
sensor_triggered = Event()
|
||||
fiber_triggered = Event() # 光纤传感器触发事件
|
||||
fiber_lock = Lock() # 线程锁,保护共享变量
|
||||
valve1_open_time = 0.0 # 电磁阀1打开时间戳
|
||||
valve1_open_flag = False # 电磁阀1打开标志
|
||||
|
||||
|
||||
class RelayController:
|
||||
def __init__(self):
|
||||
"""初始化继电器控制器"""
|
||||
self.socket = None
|
||||
|
||||
def send_command(self, command):
|
||||
"""
|
||||
将十六进制字符串转换为字节数据并发送
|
||||
:param command: 十六进制字符串
|
||||
:return: 响应字节数据 / False
|
||||
"""
|
||||
try:
|
||||
byte_data = binascii.unhexlify(command)
|
||||
|
||||
# 创建套接字并连接到继电器
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.connect((HOST, PORT))
|
||||
sock.send(byte_data)
|
||||
# 接收响应
|
||||
response = sock.recv(1024)
|
||||
# logging.info(f"收到响应: {binascii.hexlify(response)}")
|
||||
# 校验响应
|
||||
return response
|
||||
except Exception as e:
|
||||
logging.info(f"通信错误: {e}")
|
||||
return False
|
||||
|
||||
def get_all_device_status(self, command_type='devices'):
|
||||
"""
|
||||
获取所有设备/传感器状态
|
||||
:param command_type: 'devices'(控件) / 'sensors'(传感器)
|
||||
:return: 状态字典 {设备名: 状态(bool)}
|
||||
"""
|
||||
command = read_status_command.get(command_type)
|
||||
if not command:
|
||||
logging.info(f"未知的读取类型: {command_type}")
|
||||
return {}
|
||||
|
||||
response = self.send_command(command)
|
||||
status_dict = {}
|
||||
|
||||
if response and len(response) >= 10:
|
||||
status_byte = response[9] # 状态在第10字节
|
||||
status_bin = f"{status_byte:08b}"[::-1]
|
||||
|
||||
if command_type == 'devices':
|
||||
bit_map = device_bit_map
|
||||
name_map = device_name_map
|
||||
elif command_type == 'sensors':
|
||||
bit_map = sensor_bit_map
|
||||
name_map = sensor_name_map
|
||||
else:
|
||||
logging.info("不支持的映射类型")
|
||||
return{}
|
||||
|
||||
for key, bit_index in bit_map.items():
|
||||
state = status_bin[bit_index] == '1'
|
||||
status_dict[key] = state
|
||||
# readable = "开启" if state else "关闭"
|
||||
# logging.info(f"{device.capitalize()} 状态: {readable}")
|
||||
else:
|
||||
logging.info("读取状态失败或响应无效")
|
||||
|
||||
return status_dict
|
||||
|
||||
def get_device_status(self, device_name, command_type='devices'):
|
||||
"""
|
||||
获取单个控件/传感器状态
|
||||
:param device_name:设备名称
|
||||
:param command_type: 'devices'/'sensors'
|
||||
:return:True(开启) / False(关闭) / None(无法读取)
|
||||
"""
|
||||
status = self.get_all_device_status(command_type)
|
||||
return status.get(device_name, None)
|
||||
|
||||
def open(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False):
|
||||
"""
|
||||
根据状态决定是否执行开操作
|
||||
:param solenoid_valve1:是否打开电磁阀1
|
||||
:param solenoid_valve2:是否打开电磁阀2
|
||||
:param solenoid_valve3:是否打开电磁阀3
|
||||
:return:
|
||||
"""
|
||||
global valve1_open_time, valve1_open_flag
|
||||
status = self.get_all_device_status()
|
||||
|
||||
if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False):
|
||||
logging.info("打开电磁阀1")
|
||||
self.send_command(valve_commands[SOLENOID_VALVE1]['open'])
|
||||
# 记录电磁阀1打开时的时间戳和标志
|
||||
with fiber_lock:
|
||||
valve1_open_time = time.time()
|
||||
valve1_open_flag = True
|
||||
|
||||
if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False):
|
||||
logging.info("打开电磁阀2")
|
||||
self.send_command(valve_commands[SOLENOID_VALVE2]['open'])
|
||||
|
||||
if solenoid_valve3 and not status.get(SOLENOID_VALVE3, False):
|
||||
logging.info("打开电磁阀3")
|
||||
self.send_command(valve_commands[SOLENOID_VALVE3]['open'])
|
||||
time.sleep(1) # 实际测试需要考虑这个延时是否合适
|
||||
|
||||
# 根据状态决定是否执行关操作
|
||||
def close(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False):
|
||||
"""
|
||||
根据状态决定是否执行关操作
|
||||
:param solenoid_valve1:是否关闭电磁阀1
|
||||
:param solenoid_valve2:是否关闭电磁阀2
|
||||
:param solenoid_valve3:是否关闭电磁阀3
|
||||
:return:
|
||||
"""
|
||||
global valve1_open_flag
|
||||
|
||||
status = self.get_all_device_status()
|
||||
|
||||
if solenoid_valve1 and status.get(SOLENOID_VALVE1, True):
|
||||
logging.info("关闭电磁阀1")
|
||||
self.send_command(valve_commands[SOLENOID_VALVE1]['close'])
|
||||
# 重置电磁阀1打开标志
|
||||
with fiber_lock:
|
||||
valve1_open_flag = False
|
||||
|
||||
if solenoid_valve2 and status.get(SOLENOID_VALVE2, True):
|
||||
logging.info("关闭电磁阀2")
|
||||
self.send_command(valve_commands[SOLENOID_VALVE2]['close'])
|
||||
|
||||
if solenoid_valve2 and status.get(SOLENOID_VALVE3, True):
|
||||
logging.info("关闭电磁阀3")
|
||||
self.send_command(valve_commands[SOLENOID_VALVE3]['close'])
|
||||
time.sleep(1) # 实际测试需要考虑这个延时是否合适
|
||||
|
||||
def control_solenoid(self):
|
||||
"""
|
||||
控制电磁阀,并检测光纤传感器触发状态
|
||||
"""
|
||||
global fiber_triggered
|
||||
|
||||
try:
|
||||
# 重置光纤传感器触发事件
|
||||
fiber_triggered.clear()
|
||||
|
||||
# 同时打开
|
||||
self.open(solenoid_valve1=True, solenoid_valve2=True)
|
||||
logging.info("电磁阀1、2已打开")
|
||||
# 等待线条掉落(最多等待1秒)
|
||||
timeout = 2.0
|
||||
start_time = time.time()
|
||||
fiber_detected = False
|
||||
# 等待红外传感器触发或超时
|
||||
while time.time() - start_time < timeout:
|
||||
if fiber_triggered.is_set():
|
||||
fiber_detected = True
|
||||
logging.info("该NG线条掉入费料区")
|
||||
break
|
||||
else:
|
||||
logging.info("出问题!!!,红外传感器未检测到线条")
|
||||
|
||||
time.sleep(0.2) # 等待线条掉落
|
||||
self.close(solenoid_valve1=True, solenoid_valve2=True)
|
||||
logging.info("电磁阀1、2已关闭")
|
||||
except Exception as e:
|
||||
logging.info(f"操作电磁阀失败:{str(e)}")
|
||||
|
||||
def fiber_sensor_monitor(self):
|
||||
"""
|
||||
光纤传感器监听线程,专门检测电磁阀打开后的触发状态
|
||||
"""
|
||||
global fiber_triggered, valve_open_time, valve_open_flag
|
||||
logging.info("光纤传感器监听线程已启动")
|
||||
|
||||
while True:
|
||||
try:
|
||||
# 增加短休眠,降低CPU占用,避免错过信号
|
||||
time.sleep(0.005)
|
||||
|
||||
# 获取光纤传感器状态
|
||||
fiber_status = self.get_device_status(FIBER_SENSOR, 'sensors')
|
||||
# 检测是否检测到信号
|
||||
if fiber_status:
|
||||
with fiber_lock:
|
||||
# 检查电磁阀1是否处于打开状态
|
||||
if valve1_open_flag:
|
||||
fiber_triggered.set()
|
||||
# 防止重复触发
|
||||
time.sleep(0.1)
|
||||
fiber_triggered.clear()
|
||||
except Exception as e:
|
||||
logging.info(f"光纤传感器监听异常:{e}")
|
||||
time.sleep(0.1) # 异常时增加休眠
|
||||
|
||||
def press_sensors_monitor(self, check_interval=0.1):
|
||||
"""
|
||||
双压传感器监听线程
|
||||
:param check_interval: 检测间隔
|
||||
:return:
|
||||
"""
|
||||
global sensor_triggered
|
||||
logging.info("双压传感器监听线程已启动")
|
||||
while True:
|
||||
# 检测两个传感器任意一个是否触发
|
||||
press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors')
|
||||
press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors')
|
||||
if press_sensor1_status or press_sensor2_status:
|
||||
sensor_triggered.set() # 触发事件,通知主线程
|
||||
logging.info("双压传感器触发:线条已落到传送带")
|
||||
# 重置事件(等待下一次触发)
|
||||
time.sleep(1) # 防重复触发
|
||||
sensor_triggered.clear()
|
||||
time.sleep(check_interval)
|
||||
|
||||
|
||||
# 全局初始化:启动传感器监听线程
|
||||
def init_sensor_monitor():
|
||||
relay = RelayController()
|
||||
press_sensor_thread = threading.Thread(
|
||||
target=relay.press_sensors_monitor,
|
||||
args=(0.1,),
|
||||
daemon=True
|
||||
)
|
||||
# press_sensor_thread.start()
|
||||
|
||||
# 启动红外传感器监听线程
|
||||
infrared_sensor_thread = threading.Thread(
|
||||
target=relay.fiber_sensor_monitor,
|
||||
daemon=True
|
||||
)
|
||||
# infrared_sensor_thread.start()
|
||||
|
||||
return relay
|
||||
|
||||
# 全局继电器实例
|
||||
global_relay = init_sensor_monitor()
|
||||
|
||||
# ------------对外接口----------
|
||||
def control_solenoid():
|
||||
"""
|
||||
控制电磁阀,并检测光纤传感器触发状态
|
||||
"""
|
||||
# 创建控制器实例
|
||||
controller = RelayController()
|
||||
|
||||
global fiber_triggered
|
||||
|
||||
try:
|
||||
# 重置光纤传感器触发事件
|
||||
fiber_triggered.clear()
|
||||
|
||||
# 同时打开
|
||||
controller.open(solenoid_valve1=True, solenoid_valve2=True)
|
||||
logging.info("电磁阀1、2已打开")
|
||||
# 等待线条掉落(最多等待1秒)
|
||||
timeout = 2.0
|
||||
start_time = time.time()
|
||||
fiber_detected = False
|
||||
# 等待光纤传感器触发或超时
|
||||
while time.time() - start_time < timeout:
|
||||
if fiber_triggered.is_set():
|
||||
fiber_detected = True
|
||||
logging.info("该NG线条掉入费料区")
|
||||
break
|
||||
else:
|
||||
logging.info("出问题!!!,红外传感器未检测到线条")
|
||||
|
||||
time.sleep(0.2) # 等待线条掉落
|
||||
controller.close(solenoid_valve1=True, solenoid_valve2=True)
|
||||
logging.info("电磁阀1、2已关闭")
|
||||
except Exception as e:
|
||||
logging.info(f"操作电磁阀失败:{str(e)}")
|
||||
|
||||
# ------------测试接口-------------
|
||||
if __name__ == '__main__':
|
||||
control_solenoid()
|
||||
|
||||
|
||||
|
||||
|
||||
501
conveyor_controller/conveyor_master_controller1.py
Normal file
501
conveyor_controller/conveyor_master_controller1.py
Normal file
@ -0,0 +1,501 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
# @Time : 2025/12/22
|
||||
# @Author : reenrr
|
||||
# @File : conveyor_master_controller.py
|
||||
# @Desc : 传送带1和2协同控制 使用一个串口两个轴地址(485总线) 每隔4秒两个传送带同步走一个挡板的距离
|
||||
'''
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
import serial
|
||||
from EMV import RelayController
|
||||
|
||||
# 彻底屏蔽pymodbus所有日志
|
||||
logging.getLogger("pymodbus").setLevel(logging.CRITICAL)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
# --- 全局参数配置 ---
|
||||
SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线)
|
||||
BAUD_RATE = 115200
|
||||
ACTION_DELAY = 5
|
||||
SLAVE_ID_1 = 1 # 传送带1轴地址
|
||||
SLAVE_ID_2 = 2 # 传送带2轴地址
|
||||
SERIAL_TIMEOUT = 0.05 # 串口超时50ms
|
||||
SYNC_STOP_DURATION = 4.0 # 同步停止时间(4秒)
|
||||
SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms)
|
||||
DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms)
|
||||
RELEASE_WAIT_TIMEOUT = 5.0 # 等待挡板离开超时(5秒)
|
||||
|
||||
# 全局串口锁(485总线必须单指令发送,避免冲突)
|
||||
GLOBAL_SERIAL_LOCK = threading.Lock()
|
||||
|
||||
|
||||
class SingleMotorController:
|
||||
"""单个电机控制器(按轴地址自动匹配指令集,复用全局串口)"""
|
||||
|
||||
def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock):
|
||||
self.slave_id = slave_id # 轴地址(1/2)
|
||||
self.conveyor_id = conveyor_id # 传送带编号(1/2)
|
||||
self.action_delay = action_delay
|
||||
self.ser = serial_obj # 复用主控制器的串口实例
|
||||
self.global_serial_lock = global_serial_lock # 全局串口锁
|
||||
|
||||
# 初始化指令
|
||||
self._init_commands()
|
||||
|
||||
# 核心状态标志
|
||||
self.status_thread_is_running = False
|
||||
self.conveyor_thread_is_running = False
|
||||
self.sensor_triggered = False # 传感器触发标志
|
||||
self.sensor_locked = False # 传感器锁定(防抖)
|
||||
self.stop_flag = False # 本地停止标志
|
||||
self.wait_sensor_release = False # 重启后等待挡板离开标志
|
||||
self.last_sensor_trigger = 0 # 上次触发时间(防抖)
|
||||
|
||||
# 线程对象
|
||||
self.monitor_thread = None
|
||||
self.run_speed_thread = None
|
||||
self.relay_controller = RelayController()
|
||||
|
||||
# 锁
|
||||
self.sensor_lock = threading.Lock()
|
||||
self.state_lock = threading.Lock()
|
||||
|
||||
def _init_commands(self):
|
||||
"""根据轴地址初始化指令集"""
|
||||
if self.slave_id == 1:
|
||||
# --------传送带1(轴地址1)指令集--------
|
||||
self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令
|
||||
self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令
|
||||
self.speed_commands = [
|
||||
bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式
|
||||
bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30
|
||||
bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度
|
||||
bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度
|
||||
]
|
||||
elif self.slave_id == 2:
|
||||
# --------传送带2(轴地址2)指令集--------
|
||||
self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令
|
||||
self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令
|
||||
self.speed_commands = [
|
||||
bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式
|
||||
bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30
|
||||
bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度
|
||||
bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度
|
||||
]
|
||||
else:
|
||||
raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2")
|
||||
|
||||
# 打印指令(调试用)
|
||||
logging.info(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:")
|
||||
logging.info(f" 启动指令: {self.start_command.hex(' ')}")
|
||||
logging.info(f" 停止指令: {self.stop_command.hex(' ')}")
|
||||
|
||||
def send_command_list(self, command_list, delay=0.05):
|
||||
"""批量发送指令列表(加全局锁,指令间延时避免总线冲突)"""
|
||||
if not (self.ser and self.ser.is_open):
|
||||
logging.info(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送")
|
||||
return
|
||||
for idx, cmd in enumerate(command_list):
|
||||
self.send_and_receive_raw(cmd, f"指令{idx + 1}")
|
||||
time.sleep(delay) # 485总线指令间必须加延时
|
||||
|
||||
def clear_buffer(self):
|
||||
"""清空串口缓冲区(加全局锁)"""
|
||||
if self.ser and self.ser.is_open:
|
||||
with self.global_serial_lock:
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.reset_output_buffer()
|
||||
while self.ser.in_waiting > 0:
|
||||
self.ser.read(self.ser.in_waiting)
|
||||
time.sleep(0.001)
|
||||
|
||||
def send_and_receive_raw(self, command, description):
|
||||
"""底层串口通信(核心:使用全局锁保证单指令发送)"""
|
||||
if not (self.ser and self.ser.is_open):
|
||||
logging.info(f"传送带{self.conveyor_id}串口未打开,跳过发送")
|
||||
return None
|
||||
|
||||
try:
|
||||
self.clear_buffer()
|
||||
# 全局锁:同一时间仅一个设备发送指令
|
||||
with self.global_serial_lock:
|
||||
send_start = time.perf_counter()
|
||||
self.ser.write(command)
|
||||
self.ser.flush()
|
||||
send_cost = (time.perf_counter() - send_start) * 1000
|
||||
|
||||
# 接收响应(仅对应轴地址的设备会回复)
|
||||
recv_start = time.perf_counter()
|
||||
response = b""
|
||||
while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT:
|
||||
if self.ser.in_waiting > 0:
|
||||
chunk = self.ser.read(8)
|
||||
response += chunk
|
||||
if len(response) >= 8:
|
||||
break
|
||||
time.sleep(0.001)
|
||||
recv_cost = (time.perf_counter() - recv_start) * 1000
|
||||
|
||||
# 处理响应
|
||||
valid_resp = response[:8] if len(response) >= 8 else response
|
||||
logging.info(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]")
|
||||
logging.info(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)")
|
||||
logging.info(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)")
|
||||
|
||||
return valid_resp
|
||||
|
||||
except Exception as e:
|
||||
logging.info(f"传送带{self.conveyor_id}通信异常 ({description}): {e}")
|
||||
return None
|
||||
|
||||
def emergency_stop(self):
|
||||
"""紧急停止(调用对应轴地址的停止指令)"""
|
||||
with self.state_lock:
|
||||
if self.stop_flag:
|
||||
return
|
||||
self.stop_flag = True
|
||||
|
||||
self.send_and_receive_raw(self.stop_command, "停止寄存器")
|
||||
logging.info(f"[传送带{self.conveyor_id}] 电机(轴地址{self.slave_id})已紧急停止")
|
||||
|
||||
def resume_motor(self):
|
||||
"""恢复电机运行(调用对应轴地址的启动指令)"""
|
||||
with self.state_lock:
|
||||
self.stop_flag = False
|
||||
self.sensor_triggered = False
|
||||
self.sensor_locked = False
|
||||
self.last_sensor_trigger = 0
|
||||
self.wait_sensor_release = True # 标记需要等待挡板离开
|
||||
|
||||
# 主动发送运行指令
|
||||
self.send_and_receive_raw(self.start_command, "重启寄存器")
|
||||
logging.info(f"[传送带{self.conveyor_id}] 电机(轴地址{self.slave_id})已重启,等待挡板离开传感器...")
|
||||
|
||||
def wait_for_sensor_release(self):
|
||||
"""等待挡板离开传感器"""
|
||||
start_time = time.time()
|
||||
logging.info(f"[传送带{self.conveyor_id}] 开始等待挡板离开(超时{RELEASE_WAIT_TIMEOUT}秒)")
|
||||
|
||||
while time.time() - start_time < RELEASE_WAIT_TIMEOUT:
|
||||
# 读取传感器状态(True=无遮挡,False=有遮挡)
|
||||
if self.conveyor_id == 1:
|
||||
sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors')
|
||||
else:
|
||||
sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors')
|
||||
|
||||
# 传感器恢复为无遮挡(True),说明挡板已离开
|
||||
if sensor_status:
|
||||
logging.info(f"[传送带{self.conveyor_id}] 挡板已离开传感器,恢复正常检测")
|
||||
with self.state_lock:
|
||||
self.wait_sensor_release = False
|
||||
return True
|
||||
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
|
||||
# 超时处理
|
||||
logging.info(f"[传送带{self.conveyor_id}] 等待挡板离开超时!强制恢复检测")
|
||||
with self.state_lock:
|
||||
self.wait_sensor_release = False
|
||||
return False
|
||||
|
||||
def monitor_conveyors_sensor_status(self, master_controller):
|
||||
"""传感器检测线程"""
|
||||
logging.info(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)")
|
||||
|
||||
while self.status_thread_is_running and not master_controller.global_stop_flag:
|
||||
try:
|
||||
with self.sensor_lock:
|
||||
# 1. 全局停止/电机停止时跳过检测
|
||||
if master_controller.global_stop_flag or self.stop_flag:
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
continue
|
||||
|
||||
# 2. 重启后先等待挡板离开
|
||||
if self.wait_sensor_release:
|
||||
self.wait_for_sensor_release()
|
||||
continue
|
||||
|
||||
# 3. 读取传感器状态
|
||||
if self.conveyor_id == 1:
|
||||
sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors')
|
||||
else:
|
||||
sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors')
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# 4. 检测到挡板且满足防抖条件
|
||||
if (not sensor_status) and (not self.sensor_locked) and \
|
||||
(current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME:
|
||||
logging.info(f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即响应")
|
||||
self.last_sensor_trigger = current_time
|
||||
|
||||
with self.state_lock:
|
||||
self.sensor_triggered = True
|
||||
self.sensor_locked = True
|
||||
|
||||
# 立即通知主控制器
|
||||
threading.Thread(
|
||||
target=master_controller.on_sensor_triggered,
|
||||
args=(self.conveyor_id,),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
|
||||
except Exception as e:
|
||||
logging.info(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}")
|
||||
time.sleep(0.1)
|
||||
|
||||
logging.info(f"[传送带{self.conveyor_id}] 传感器检测线程已停止")
|
||||
|
||||
def run_speed_mode(self, master_controller):
|
||||
"""电机速度模式线程"""
|
||||
logging.info(f"[传送带{self.conveyor_id}] 电机速度模式线程已启动(轴地址{self.slave_id})")
|
||||
self.conveyor_thread_is_running = True
|
||||
|
||||
while self.conveyor_thread_is_running and not master_controller.global_stop_flag:
|
||||
try:
|
||||
# 串口未打开则退出循环(由主控制器管理串口连接)
|
||||
if not (self.ser and self.ser.is_open):
|
||||
logging.info(f"[传送带{self.conveyor_id}] 串口已关闭,停止发送运行指令")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# 仅当未停止时发送运行指令
|
||||
if not self.stop_flag:
|
||||
self.send_and_receive_raw(self.start_command, "持续运行指令")
|
||||
|
||||
# 启动传感器线程(仅一次)
|
||||
if not self.status_thread_is_running and (
|
||||
self.monitor_thread is None or not self.monitor_thread.is_alive()):
|
||||
self.status_thread_is_running = True
|
||||
self.monitor_thread = threading.Thread(
|
||||
target=self.monitor_conveyors_sensor_status,
|
||||
args=(master_controller,),
|
||||
daemon=True
|
||||
)
|
||||
self.monitor_thread.start()
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
logging.info(f"[传送带{self.conveyor_id}] 电机运行异常: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
self.emergency_stop()
|
||||
self.conveyor_thread_is_running = False
|
||||
logging.info(f"[传送带{self.conveyor_id}] 电机速度模式线程已停止")
|
||||
|
||||
def start_run_speed_thread(self, master_controller):
|
||||
"""启动电机线程"""
|
||||
if self.run_speed_thread and self.run_speed_thread.is_alive():
|
||||
with self.state_lock:
|
||||
self.conveyor_thread_is_running = False
|
||||
self.run_speed_thread.join(timeout=2)
|
||||
|
||||
with self.state_lock:
|
||||
self.stop_flag = False
|
||||
self.sensor_triggered = False
|
||||
self.sensor_locked = False
|
||||
self.wait_sensor_release = True
|
||||
self.last_sensor_trigger = 0
|
||||
|
||||
self.run_speed_thread = threading.Thread(
|
||||
target=self.run_speed_mode,
|
||||
args=(master_controller,),
|
||||
daemon=True
|
||||
)
|
||||
self.run_speed_thread.start()
|
||||
|
||||
|
||||
class MasterConveyorController:
|
||||
"""主控制器 - 单串口管理两个传送带(485总线)"""
|
||||
|
||||
def __init__(self):
|
||||
self.global_stop_flag = False
|
||||
self.sync_running = False
|
||||
|
||||
# 1. 初始化单串口(485总线)
|
||||
self.ser = None
|
||||
self._init_serial()
|
||||
|
||||
# 2. 初始化两个电机控制器(复用同一个串口)
|
||||
self.conveyor1 = SingleMotorController(
|
||||
slave_id=SLAVE_ID_1,
|
||||
conveyor_id=1,
|
||||
action_delay=ACTION_DELAY,
|
||||
serial_obj=self.ser,
|
||||
global_serial_lock=GLOBAL_SERIAL_LOCK
|
||||
)
|
||||
self.conveyor2 = SingleMotorController(
|
||||
slave_id=SLAVE_ID_2,
|
||||
conveyor_id=2,
|
||||
action_delay=ACTION_DELAY,
|
||||
serial_obj=self.ser,
|
||||
global_serial_lock=GLOBAL_SERIAL_LOCK
|
||||
)
|
||||
|
||||
# 同步锁
|
||||
self.sync_lock = threading.Lock()
|
||||
self.sync_condition = threading.Condition(self.sync_lock)
|
||||
|
||||
def _init_serial(self):
|
||||
"""初始化485总线串口(主控制器统一管理)"""
|
||||
try:
|
||||
self.ser = serial.Serial(
|
||||
port=SERIAL_PORT,
|
||||
baudrate=BAUD_RATE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
timeout=SERIAL_TIMEOUT,
|
||||
write_timeout=SERIAL_TIMEOUT,
|
||||
xonxoff=False,
|
||||
rtscts=False,
|
||||
dsrdtr=False
|
||||
)
|
||||
if self.ser.is_open:
|
||||
logging.info(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})")
|
||||
# 初始化时清空缓冲区
|
||||
with GLOBAL_SERIAL_LOCK:
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.reset_output_buffer()
|
||||
else:
|
||||
raise RuntimeError("串口初始化失败:无法打开串口")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"485串口初始化失败: {e}")
|
||||
|
||||
def on_sensor_triggered(self, conveyor_id):
|
||||
"""传感器触发回调"""
|
||||
with self.sync_condition:
|
||||
if self.sync_running:
|
||||
return
|
||||
|
||||
if conveyor_id == 1:
|
||||
self.conveyor1.sensor_triggered = True
|
||||
logging.info(f"\n[主控制器] 传送带1检测到挡板,等待传送带2...")
|
||||
else:
|
||||
self.conveyor2.sensor_triggered = True
|
||||
logging.info(f"\n[主控制器] 传送带2检测到挡板,等待传送带1...")
|
||||
|
||||
# 立即停止当前触发的传送带
|
||||
if conveyor_id == 1:
|
||||
self.conveyor1.emergency_stop()
|
||||
else:
|
||||
self.conveyor2.emergency_stop()
|
||||
|
||||
# 两个都触发后同步停止
|
||||
if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered:
|
||||
self.sync_running = True
|
||||
logging.info(f"\n[主控制器] 两个传送带都检测到挡板!开始同步停止 {SYNC_STOP_DURATION} 秒")
|
||||
|
||||
self.conveyor1.emergency_stop()
|
||||
self.conveyor2.emergency_stop()
|
||||
|
||||
# 同步停止+重启逻辑
|
||||
def sync_stop_and_resume():
|
||||
try:
|
||||
time.sleep(SYNC_STOP_DURATION)
|
||||
|
||||
with self.sync_condition:
|
||||
logging.info(f"\n[主控制器] 同步停止结束,重启所有传送带")
|
||||
# 重启两个电机
|
||||
self.conveyor1.resume_motor()
|
||||
self.conveyor2.resume_motor()
|
||||
|
||||
# 重置所有状态
|
||||
self.conveyor1.sensor_triggered = False
|
||||
self.conveyor2.sensor_triggered = False
|
||||
self.conveyor1.sensor_locked = False
|
||||
self.conveyor2.sensor_locked = False
|
||||
self.conveyor1.last_sensor_trigger = 0
|
||||
self.conveyor2.last_sensor_trigger = 0
|
||||
|
||||
# 强制重启电机线程
|
||||
self.conveyor1.start_run_speed_thread(self)
|
||||
self.conveyor2.start_run_speed_thread(self)
|
||||
|
||||
self.sync_running = False
|
||||
self.sync_condition.notify_all()
|
||||
except Exception as e:
|
||||
logging.info(f"[主控制器] 同步恢复异常: {e}")
|
||||
self.sync_running = False
|
||||
|
||||
threading.Thread(target=sync_stop_and_resume, daemon=True).start()
|
||||
|
||||
def start_all_conveyors(self):
|
||||
"""启动所有传送带"""
|
||||
logging.info("\n=== 主控制器:启动所有传送带(485总线)===")
|
||||
|
||||
# 检查串口是否正常
|
||||
if not (self.ser and self.ser.is_open):
|
||||
logging.info("[主控制器] 485串口未打开,无法启动")
|
||||
return False
|
||||
|
||||
# 发送速度模式指令(按轴地址发送)
|
||||
self.conveyor1.send_command_list(self.conveyor1.speed_commands[:4])
|
||||
self.conveyor2.send_command_list(self.conveyor2.speed_commands[:4])
|
||||
|
||||
# 启动电机线程
|
||||
self.conveyor1.start_run_speed_thread(self)
|
||||
self.conveyor2.start_run_speed_thread(self)
|
||||
|
||||
logging.info("[主控制器] 所有传送带已启动,等待挡板离开后开始检测...")
|
||||
return True
|
||||
|
||||
def stop_all_conveyors(self):
|
||||
"""停止所有传送带并关闭串口"""
|
||||
logging.info("\n=== 主控制器:停止所有传送带 ===")
|
||||
self.global_stop_flag = True
|
||||
|
||||
with self.sync_condition:
|
||||
self.sync_running = False
|
||||
self.conveyor1.emergency_stop()
|
||||
self.conveyor2.emergency_stop()
|
||||
|
||||
# 关闭串口
|
||||
time.sleep(1)
|
||||
with GLOBAL_SERIAL_LOCK:
|
||||
if self.ser and self.ser.is_open:
|
||||
self.ser.close()
|
||||
logging.info(f"485总线串口 {SERIAL_PORT} 已关闭")
|
||||
|
||||
logging.info("[主控制器] 所有传送带已停止并关闭串口")
|
||||
|
||||
def run(self):
|
||||
"""主运行函数"""
|
||||
try:
|
||||
if not self.start_all_conveyors():
|
||||
return
|
||||
|
||||
# 主线程保持运行
|
||||
while not self.global_stop_flag:
|
||||
time.sleep(0.5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info("\n\n[主控制器] 检测到退出指令,正在停止系统...")
|
||||
except Exception as e:
|
||||
logging.info(f"\n[主控制器] 程序异常: {e}")
|
||||
finally:
|
||||
self.stop_all_conveyors()
|
||||
logging.info("\n=== 主控制器:程序执行完毕 ===")
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
try:
|
||||
master = MasterConveyorController()
|
||||
master.run()
|
||||
except RuntimeError as e:
|
||||
logging.info(f"系统启动失败: {e}")
|
||||
except Exception as e:
|
||||
logging.info(f"未知异常: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
381
conveyor_controller/conveyor_master_controller2.py
Normal file
381
conveyor_controller/conveyor_master_controller2.py
Normal file
@ -0,0 +1,381 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
# @Time : 2026/1/6 09:41
|
||||
# @Author : reenrr
|
||||
# @File : conveyor_master_controller2.py
|
||||
# @Desc : 传送带1和2协同控制 两个传送带同步走一个挡板的距离
|
||||
'''
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
import serial
|
||||
from EMV import RelayController
|
||||
|
||||
logging.getLogger("pymodbus").setLevel(logging.CRITICAL)
|
||||
|
||||
# --- 全局参数配置 ---
|
||||
SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线)
|
||||
BAUD_RATE = 115200
|
||||
ACTION_DELAY = 5
|
||||
SLAVE_ID_1 = 1 # 传送带1轴地址
|
||||
SLAVE_ID_2 = 2 # 传送带2轴地址
|
||||
SERIAL_TIMEOUT = 0.05 # 串口超时50ms
|
||||
SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms)
|
||||
DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms)
|
||||
|
||||
# 全局串口锁(485总线必须单指令发送,避免冲突)
|
||||
GLOBAL_SERIAL_LOCK = threading.Lock()
|
||||
|
||||
class SingleMotorController:
|
||||
"""单个电机控制器(按轴地址自动匹配指令集,复用全局串口)"""
|
||||
def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock):
|
||||
self.slave_id = slave_id # 轴地址(1/2)
|
||||
self.conveyor_id = conveyor_id # 传送带编号(1/2)
|
||||
self.action_delay = action_delay
|
||||
self.ser = serial_obj # 复用主控制器的串口实例
|
||||
self.global_serial_lock = global_serial_lock # 全局串口锁
|
||||
|
||||
# 初始化指令
|
||||
self._init_commands()
|
||||
|
||||
# 核心状态标志(简化版)
|
||||
self.status_thread_is_running = False # 传感器线程运行标志
|
||||
self.is_running = False # 电机是否已启动(一次性)
|
||||
self.is_stopped = False # 电机是否已停止(触发传感器后)
|
||||
self.sensor_triggered = False # 传感器触发标志
|
||||
self.sensor_locked = False # 传感器锁定(防抖)
|
||||
self.last_sensor_trigger = 0 # 上次触发时间(防抖)
|
||||
|
||||
# 线程对象
|
||||
self.monitor_thread = None
|
||||
self.relay_controller = RelayController()
|
||||
|
||||
# 锁
|
||||
self.sensor_lock = threading.Lock()
|
||||
self.state_lock = threading.Lock()
|
||||
|
||||
def _init_commands(self):
|
||||
"""根据轴地址初始化指令集"""
|
||||
if self.slave_id == 1:
|
||||
# --------传送带1(轴地址1)指令集--------
|
||||
self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令
|
||||
self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令
|
||||
self.speed_commands = [
|
||||
bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式
|
||||
bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30
|
||||
bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度
|
||||
bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度
|
||||
]
|
||||
elif self.slave_id == 2:
|
||||
# --------传送带2(轴地址2)指令集--------
|
||||
self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令
|
||||
self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令
|
||||
self.speed_commands = [
|
||||
bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式
|
||||
bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30
|
||||
bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度
|
||||
bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度
|
||||
]
|
||||
else:
|
||||
raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2")
|
||||
|
||||
# 打印指令(调试用)
|
||||
print(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:")
|
||||
print(f" 启动指令: {self.start_command.hex(' ')}")
|
||||
print(f" 停止指令: {self.stop_command.hex(' ')}")
|
||||
|
||||
def send_command_list(self, command_list, delay=0.05):
|
||||
"""
|
||||
批量发送指令列表(加全局锁,指令间延时避免总线冲突)
|
||||
:param command_list: 指令列表
|
||||
:param delay: 指令间延时(默认0.05s)
|
||||
"""
|
||||
if not (self.ser and self.ser.is_open):
|
||||
print(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送")
|
||||
return
|
||||
for idx, cmd in enumerate(command_list):
|
||||
self.send_and_receive_raw(cmd, f"指令{idx + 1}")
|
||||
time.sleep(delay) # 485总线指令间必须加延时
|
||||
|
||||
def clear_buffer(self):
|
||||
"""清空串口缓冲区(加全局锁)"""
|
||||
if self.ser and self.ser.is_open:
|
||||
with self.global_serial_lock:
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.reset_output_buffer()
|
||||
while self.ser.in_waiting > 0:
|
||||
self.ser.read(self.ser.in_waiting)
|
||||
time.sleep(0.001)
|
||||
|
||||
def send_and_receive_raw(self, command, description):
|
||||
"""
|
||||
底层串口通信(核心:使用全局锁保证单指令发送)
|
||||
:param command: 指令
|
||||
:param description: 指令描述
|
||||
:return: 返回响应
|
||||
"""
|
||||
if not (self.ser and self.ser.is_open):
|
||||
print(f"传送带{self.conveyor_id}串口未打开,跳过发送")
|
||||
return None
|
||||
|
||||
try:
|
||||
self.clear_buffer()
|
||||
# 全局锁:同一时间仅一个设备发送指令
|
||||
with self.global_serial_lock:
|
||||
send_start = time.perf_counter()
|
||||
self.ser.write(command)
|
||||
self.ser.flush()
|
||||
send_cost = (time.perf_counter() - send_start) * 1000
|
||||
|
||||
# 接收响应(仅对应轴地址的设备会回复)
|
||||
recv_start = time.perf_counter()
|
||||
response = b""
|
||||
while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT:
|
||||
if self.ser.in_waiting > 0:
|
||||
chunk = self.ser.read(8)
|
||||
response += chunk
|
||||
if len(response) >= 8:
|
||||
break
|
||||
time.sleep(0.001)
|
||||
recv_cost = (time.perf_counter() - recv_start) * 1000
|
||||
|
||||
# 处理响应
|
||||
valid_resp = response[:8] if len(response) >= 8 else response
|
||||
print(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]")
|
||||
print(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)")
|
||||
print(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)")
|
||||
|
||||
return valid_resp
|
||||
|
||||
except Exception as e:
|
||||
print(f"传送带{self.conveyor_id}通信异常 ({description}): {e}")
|
||||
return None
|
||||
|
||||
def start_motor_once(self):
|
||||
"""一次性启动电机"""
|
||||
with self.state_lock:
|
||||
if self.is_running or self.is_stopped:
|
||||
print(f"[传送带{self.conveyor_id}] 电机已启动/停止,无需重复启动")
|
||||
return
|
||||
|
||||
# 1. 发送速度模式配置指令
|
||||
print(f"[传送带{self.conveyor_id}] 配置速度模式...")
|
||||
self.send_command_list(self.speed_commands[:4])
|
||||
|
||||
# 2. 发送启动指令
|
||||
print(f"[传送带{self.conveyor_id}] 发送启动指令")
|
||||
self.send_and_receive_raw(self.start_command, "启动传送带")
|
||||
|
||||
with self.state_lock:
|
||||
self.is_running = True
|
||||
|
||||
def stop_motor(self):
|
||||
"""停止电机"""
|
||||
with self.state_lock:
|
||||
if self.is_stopped:
|
||||
return
|
||||
self.is_stopped = True
|
||||
self.is_running = False
|
||||
|
||||
# 发送停止指令
|
||||
print(f"[传送带{self.conveyor_id}] 发送停止指令")
|
||||
self.send_and_receive_raw(self.stop_command, "停止传送带")
|
||||
|
||||
def monitor_conveyors_sensor_status(self, master_controller):
|
||||
"""传感器检测线程(仅检测,触发后停止电机)"""
|
||||
print(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)")
|
||||
|
||||
while self.status_thread_is_running and not master_controller.global_stop_flag:
|
||||
try:
|
||||
with self.sensor_lock:
|
||||
# 1. 全局停止/电机已停止时跳过检测
|
||||
if master_controller.global_stop_flag or self.is_stopped:
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
continue
|
||||
|
||||
# 2. 读取传感器状态
|
||||
if self.conveyor_id == 1:
|
||||
sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors')
|
||||
else:
|
||||
sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors')
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# 3. 检测到挡板且满足防抖条件
|
||||
if (not sensor_status) and (not self.sensor_locked) and \
|
||||
(current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME:
|
||||
print(
|
||||
f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即停止")
|
||||
self.last_sensor_trigger = current_time
|
||||
self.sensor_triggered = True
|
||||
self.sensor_locked = True
|
||||
|
||||
# 立即停止电机
|
||||
self.stop_motor()
|
||||
|
||||
# 通知主控制器(可选:用于同步两个传送带停止)
|
||||
threading.Thread(
|
||||
target=master_controller.on_sensor_triggered,
|
||||
args=(self.conveyor_id,),
|
||||
daemon=True
|
||||
).start()
|
||||
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}")
|
||||
time.sleep(0.1)
|
||||
|
||||
print(f"[传送带{self.conveyor_id}] 传感器检测线程已停止")
|
||||
|
||||
def start_sensor_thread(self, master_controller):
|
||||
"""启动传感器检测线程"""
|
||||
if self.monitor_thread and self.monitor_thread.is_alive():
|
||||
return
|
||||
|
||||
self.status_thread_is_running = True
|
||||
self.monitor_thread = threading.Thread(
|
||||
target=self.monitor_conveyors_sensor_status,
|
||||
args=(master_controller,),
|
||||
daemon=True
|
||||
)
|
||||
self.monitor_thread.start()
|
||||
|
||||
class MasterConveyorController:
|
||||
"""主控制器 - 单串口管理两个传送带(485总线)"""
|
||||
def __init__(self):
|
||||
self.global_stop_flag = False
|
||||
self.both_stopped = False # 两个传送带是否都已停止
|
||||
|
||||
# 1. 初始化单串口(485总线)
|
||||
self.ser = None
|
||||
self._init_serial()
|
||||
|
||||
# 2. 初始化两个电机控制器(复用同一个串口)
|
||||
self.conveyor1 = SingleMotorController(
|
||||
slave_id=SLAVE_ID_1,
|
||||
conveyor_id=1,
|
||||
action_delay=ACTION_DELAY,
|
||||
serial_obj=self.ser,
|
||||
global_serial_lock=GLOBAL_SERIAL_LOCK
|
||||
)
|
||||
self.conveyor2 = SingleMotorController(
|
||||
slave_id=SLAVE_ID_2,
|
||||
conveyor_id=2,
|
||||
action_delay=ACTION_DELAY,
|
||||
serial_obj=self.ser,
|
||||
global_serial_lock=GLOBAL_SERIAL_LOCK
|
||||
)
|
||||
|
||||
# 同步锁
|
||||
self.sync_lock = threading.Lock()
|
||||
|
||||
def _init_serial(self):
|
||||
"""初始化485总线串口(主控制器统一管理)"""
|
||||
try:
|
||||
self.ser = serial.Serial(
|
||||
port=SERIAL_PORT,
|
||||
baudrate=BAUD_RATE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
timeout=SERIAL_TIMEOUT,
|
||||
write_timeout=SERIAL_TIMEOUT,
|
||||
xonxoff=False,
|
||||
rtscts=False,
|
||||
dsrdtr=False
|
||||
)
|
||||
if self.ser.is_open:
|
||||
print(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})")
|
||||
# 初始化时清空缓冲区
|
||||
with GLOBAL_SERIAL_LOCK:
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.reset_output_buffer()
|
||||
else:
|
||||
raise RuntimeError("串口初始化失败:无法打开串口")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"485串口初始化失败: {e}")
|
||||
|
||||
def on_sensor_triggered(self):
|
||||
"""传感器触发回调(同步两个传送带停止)"""
|
||||
with self.sync_lock:
|
||||
# 检查是否两个传送带都已触发
|
||||
if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered:
|
||||
self.both_stopped = True
|
||||
print(f"\n[主控制器] 两个传送带都已检测到挡板并停止!任务完成")
|
||||
self.global_stop_flag = True # 标记全局停止
|
||||
|
||||
def start_all_conveyors(self):
|
||||
"""启动所有传送带"""
|
||||
print("\n=== 主控制器:启动所有传送带===")
|
||||
|
||||
# 检查串口是否正常
|
||||
if not (self.ser and self.ser.is_open):
|
||||
print("[主控制器] 485串口未打开,无法启动")
|
||||
return False
|
||||
# 1. 一次性启动两个电机
|
||||
self.conveyor1.start_motor_once()
|
||||
self.conveyor2.start_motor_once()
|
||||
|
||||
# 2. 启动传感器检测线程
|
||||
self.conveyor1.start_sensor_thread(self)
|
||||
self.conveyor2.start_sensor_thread(self)
|
||||
|
||||
print("[主控制器] 所有传送带已一次性启动,传感器开始检测...")
|
||||
return True
|
||||
|
||||
def stop_all_conveyors(self):
|
||||
"""停止所有传送带并关闭串口"""
|
||||
print("\n=== 主控制器:停止所有传送带 ===")
|
||||
self.global_stop_flag = True
|
||||
|
||||
# 强制停止两个电机
|
||||
self.conveyor1.stop_motor()
|
||||
self.conveyor2.stop_motor()
|
||||
|
||||
# 关闭串口
|
||||
time.sleep(1)
|
||||
with GLOBAL_SERIAL_LOCK:
|
||||
if self.ser and self.ser.is_open:
|
||||
self.ser.close()
|
||||
print(f"485总线串口 {SERIAL_PORT} 已关闭")
|
||||
|
||||
print("[主控制器] 所有传送带已停止并关闭串口")
|
||||
|
||||
def run(self):
|
||||
"""主运行函数"""
|
||||
try:
|
||||
if not self.start_all_conveyors():
|
||||
return
|
||||
|
||||
# 主线程等待:直到两个传送带都停止或手动退出
|
||||
while not self.global_stop_flag:
|
||||
if self.both_stopped:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n[主控制器] 检测到退出指令,正在停止系统...")
|
||||
except Exception as e:
|
||||
print(f"\n[主控制器] 程序异常: {e}")
|
||||
finally:
|
||||
self.stop_all_conveyors()
|
||||
print("\n=== 主控制器:程序执行完毕 ===")
|
||||
|
||||
# -----------传送带对外接口--------------
|
||||
def conveyor_control():
|
||||
"""主函数"""
|
||||
try:
|
||||
master = MasterConveyorController()
|
||||
master.run()
|
||||
except RuntimeError as e:
|
||||
print(f"系统启动失败: {e}")
|
||||
except Exception as e:
|
||||
print(f"未知异常: {e}")
|
||||
|
||||
|
||||
# ------------测试接口--------------
|
||||
if __name__ == '__main__':
|
||||
conveyor_control()
|
||||
431
conveyor_controller/conveyor_master_controller2_test.py
Normal file
431
conveyor_controller/conveyor_master_controller2_test.py
Normal file
@ -0,0 +1,431 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
# @Time : 2026/1/6 10:41
|
||||
# @Author : reenrr
|
||||
# @File : conveyor_master_controller2_test.py
|
||||
# @Desc : 传送带1和2协同控制 两个传送带同步走一个挡板的距离--测试代码
|
||||
'''
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
import serial
|
||||
from EMV import RelayController
|
||||
|
||||
logging.getLogger("pymodbus").setLevel(logging.CRITICAL)
|
||||
|
||||
# --- 全局参数配置 ---
|
||||
SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线)
|
||||
BAUD_RATE = 115200
|
||||
ACTION_DELAY = 5
|
||||
SLAVE_ID_1 = 1 # 传送带1轴地址
|
||||
SLAVE_ID_2 = 2 # 传送带2轴地址
|
||||
SERIAL_TIMEOUT = 0.05 # 串口超时50ms
|
||||
SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms)
|
||||
DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms)
|
||||
WAIT_INIT_RELEASE_TIMEOUT = 3.0 # 等待初始挡板离开超时(10秒)
|
||||
|
||||
# 全局串口锁(485总线必须单指令发送,避免冲突)
|
||||
GLOBAL_SERIAL_LOCK = threading.Lock()
|
||||
|
||||
class SingleMotorController:
|
||||
"""单个电机控制器(按轴地址自动匹配指令集,复用全局串口)"""
|
||||
def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock):
|
||||
self.slave_id = slave_id # 轴地址(1/2)
|
||||
self.conveyor_id = conveyor_id # 传送带编号(1/2)
|
||||
self.action_delay = action_delay
|
||||
self.ser = serial_obj # 复用主控制器的串口实例
|
||||
self.global_serial_lock = global_serial_lock # 全局串口锁
|
||||
|
||||
# 初始化指令
|
||||
self._init_commands()
|
||||
|
||||
# 核心状态标志
|
||||
self.status_thread_is_running = False # 传感器线程运行标志
|
||||
self.is_running = False # 电机是否已启动
|
||||
self.is_stopped = False # 电机是否已停止
|
||||
self.sensor_triggered = False # 传感器检测到挡板(无信号)
|
||||
self.sensor_locked = False # 传感器锁定(防抖)
|
||||
self.last_sensor_trigger = 0 # 上次触发时间(防抖)
|
||||
self.init_release_done = False # 初始挡板是否已离开
|
||||
|
||||
# 线程对象
|
||||
self.monitor_thread = None
|
||||
self.relay_controller = RelayController()
|
||||
|
||||
# 锁
|
||||
self.sensor_lock = threading.Lock()
|
||||
self.state_lock = threading.Lock()
|
||||
|
||||
def _init_commands(self):
|
||||
"""根据轴地址初始化指令集"""
|
||||
if self.slave_id == 1:
|
||||
# --------传送带1(轴地址1)指令集--------
|
||||
self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令
|
||||
self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令
|
||||
self.speed_commands = [
|
||||
bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式
|
||||
bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30
|
||||
bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度
|
||||
bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度
|
||||
]
|
||||
elif self.slave_id == 2:
|
||||
# --------传送带2(轴地址2)指令集--------
|
||||
self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令
|
||||
self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令
|
||||
self.speed_commands = [
|
||||
bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式
|
||||
bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30
|
||||
bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度
|
||||
bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度
|
||||
]
|
||||
else:
|
||||
raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2")
|
||||
|
||||
# 打印指令(调试用)
|
||||
print(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:")
|
||||
print(f" 启动指令: {self.start_command.hex(' ')}")
|
||||
print(f" 停止指令: {self.stop_command.hex(' ')}")
|
||||
|
||||
def send_command_list(self, command_list, delay=0.05):
|
||||
"""
|
||||
批量发送指令列表(加全局锁,指令间延时避免总线冲突)
|
||||
:param command_list: 待发送的指令列表
|
||||
:param delay: 指令间延时(默认0.05s)
|
||||
"""
|
||||
if not (self.ser and self.ser.is_open):
|
||||
print(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送")
|
||||
return
|
||||
for idx, cmd in enumerate(command_list):
|
||||
self.send_and_receive_raw(cmd, f"指令{idx + 1}")
|
||||
time.sleep(delay) # 485总线指令间必须加延时
|
||||
|
||||
def clear_buffer(self):
|
||||
"""清空串口缓冲区(加全局锁)"""
|
||||
if self.ser and self.ser.is_open:
|
||||
with self.global_serial_lock:
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.reset_output_buffer()
|
||||
while self.ser.in_waiting > 0:
|
||||
self.ser.read(self.ser.in_waiting)
|
||||
time.sleep(0.001)
|
||||
|
||||
def send_and_receive_raw(self, command, description):
|
||||
"""
|
||||
底层串口通信(核心:使用全局锁保证单指令发送)
|
||||
:param command: 待发送的指令
|
||||
:param description: 指令描述
|
||||
:return: 接收到的响应
|
||||
"""
|
||||
if not (self.ser and self.ser.is_open):
|
||||
print(f"传送带{self.conveyor_id}串口未打开,跳过发送")
|
||||
return None
|
||||
|
||||
try:
|
||||
self.clear_buffer()
|
||||
# 全局锁:同一时间仅一个设备发送指令
|
||||
with self.global_serial_lock:
|
||||
send_start = time.perf_counter()
|
||||
self.ser.write(command)
|
||||
self.ser.flush()
|
||||
send_cost = (time.perf_counter() - send_start) * 1000
|
||||
|
||||
# 接收响应(仅对应轴地址的设备会回复)
|
||||
recv_start = time.perf_counter()
|
||||
response = b""
|
||||
while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT:
|
||||
if self.ser.in_waiting > 0:
|
||||
chunk = self.ser.read(8)
|
||||
response += chunk
|
||||
if len(response) >= 8:
|
||||
break
|
||||
time.sleep(0.001)
|
||||
recv_cost = (time.perf_counter() - recv_start) * 1000
|
||||
|
||||
# 处理响应
|
||||
valid_resp = response[:8] if len(response) >= 8 else response
|
||||
print(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]")
|
||||
print(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)")
|
||||
print(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)")
|
||||
|
||||
return valid_resp
|
||||
|
||||
except Exception as e:
|
||||
print(f"传送带{self.conveyor_id}通信异常 ({description}): {e}")
|
||||
return None
|
||||
|
||||
def start_motor(self):
|
||||
"""启动电机"""
|
||||
with self.state_lock:
|
||||
if self.is_running or self.is_stopped:
|
||||
print(f"[传送带{self.conveyor_id}] 电机已启动/停止,无需重复启动")
|
||||
return
|
||||
|
||||
# 1. 发送速度模式配置指令
|
||||
print(f"[传送带{self.conveyor_id}] 配置速度模式...")
|
||||
self.send_command_list(self.speed_commands[:4])
|
||||
|
||||
# 2. 发送启动指令
|
||||
print(f"[传送带{self.conveyor_id}] 发送启动指令")
|
||||
self.send_and_receive_raw(self.start_command, "启动传送带")
|
||||
|
||||
with self.state_lock:
|
||||
self.is_running = True
|
||||
|
||||
def stop_motor(self):
|
||||
"""停止电机"""
|
||||
with self.state_lock:
|
||||
if self.is_stopped:
|
||||
return
|
||||
self.is_stopped = True
|
||||
self.is_running = False
|
||||
|
||||
# 发送停止指令
|
||||
print(f"[传送带{self.conveyor_id}] 发送停止指令")
|
||||
self.send_and_receive_raw(self.stop_command, "停止传送带")
|
||||
|
||||
def wait_init_sensor_release(self):
|
||||
"""等待初始挡板离开传感器(传感器从无信号→有信号)"""
|
||||
print(f"[传送带{self.conveyor_id}] 等待初始挡板离开传感器(超时{WAIT_INIT_RELEASE_TIMEOUT}秒)")
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < WAIT_INIT_RELEASE_TIMEOUT:
|
||||
# 读取传感器状态:True=有信号(无遮挡),False=无信号(遮挡)
|
||||
sensor_status = self.get_sensor_status()
|
||||
|
||||
if sensor_status: # 挡板离开,传感器有信号
|
||||
print(f"[传送带{self.conveyor_id}] 初始挡板已离开传感器")
|
||||
with self.state_lock:
|
||||
self.init_release_done = True
|
||||
return True
|
||||
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
|
||||
# 超时处理
|
||||
print(f"[传送带{self.conveyor_id}] 等待初始挡板离开超时!强制标记为已离开")
|
||||
with self.state_lock:
|
||||
self.init_release_done = True
|
||||
return False
|
||||
|
||||
def get_sensor_status(self):
|
||||
"""读取传感器状态"""
|
||||
if self.conveyor_id == 1:
|
||||
return self.relay_controller.get_device_status('conveyor1_sensor', 'sensors')
|
||||
else:
|
||||
return self.relay_controller.get_device_status('conveyor2_sensor', 'sensors')
|
||||
|
||||
def monitor_conveyors_sensor_status(self, master_controller):
|
||||
"""
|
||||
传感器检测线程(仅检测运行中挡板遮挡)
|
||||
:param master_controller: 主控制器对象
|
||||
"""
|
||||
print(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)")
|
||||
|
||||
while self.status_thread_is_running and not master_controller.global_stop_flag:
|
||||
try:
|
||||
with self.sensor_lock:
|
||||
# 1. 全局停止/电机已停止时跳过检测
|
||||
if master_controller.global_stop_flag or self.is_stopped:
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
continue
|
||||
|
||||
# 2. 等待初始挡板离开后再开始检测
|
||||
if not self.init_release_done:
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
continue
|
||||
|
||||
# 3. 读取传感器状态
|
||||
sensor_status = self.get_sensor_status()
|
||||
current_time = time.time()
|
||||
|
||||
# 4. 检测到挡板遮挡(无信号)且满足防抖条件 → 触发停止
|
||||
if (not sensor_status) and (not self.sensor_locked) and \
|
||||
(current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME:
|
||||
print(
|
||||
f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板遮挡!准备停止")
|
||||
self.last_sensor_trigger = current_time
|
||||
self.sensor_triggered = True
|
||||
self.sensor_locked = True
|
||||
|
||||
# 立即停止当前传送带
|
||||
self.stop_motor()
|
||||
|
||||
# 通知主控制器同步状态
|
||||
master_controller.on_sensor_triggered()
|
||||
|
||||
time.sleep(DETECTION_INTERVAL)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}")
|
||||
time.sleep(0.1)
|
||||
|
||||
print(f"[传送带{self.conveyor_id}] 传感器检测线程已停止")
|
||||
|
||||
def start_sensor_thread(self, master_controller):
|
||||
"""
|
||||
启动传感器检测线程
|
||||
:param master_controller: 主控制器对象
|
||||
"""
|
||||
if self.monitor_thread and self.monitor_thread.is_alive():
|
||||
return
|
||||
|
||||
self.status_thread_is_running = True
|
||||
self.monitor_thread = threading.Thread(
|
||||
target=self.monitor_conveyors_sensor_status,
|
||||
args=(master_controller,),
|
||||
daemon=True
|
||||
)
|
||||
self.monitor_thread.start()
|
||||
|
||||
class MasterConveyorController:
|
||||
"""主控制器 - 单串口管理两个传送带(485总线)"""
|
||||
def __init__(self):
|
||||
self.global_stop_flag = False
|
||||
self.both_stopped = False # 两个传送带是否都已停止
|
||||
|
||||
# 1. 初始化单串口(485总线)
|
||||
self.ser = None
|
||||
self._init_serial()
|
||||
|
||||
# 2. 初始化两个电机控制器(复用同一个串口)
|
||||
self.conveyor1 = SingleMotorController(
|
||||
slave_id=SLAVE_ID_1,
|
||||
conveyor_id=1,
|
||||
action_delay=ACTION_DELAY,
|
||||
serial_obj=self.ser,
|
||||
global_serial_lock=GLOBAL_SERIAL_LOCK
|
||||
)
|
||||
self.conveyor2 = SingleMotorController(
|
||||
slave_id=SLAVE_ID_2,
|
||||
conveyor_id=2,
|
||||
action_delay=ACTION_DELAY,
|
||||
serial_obj=self.ser,
|
||||
global_serial_lock=GLOBAL_SERIAL_LOCK
|
||||
)
|
||||
|
||||
# 同步锁
|
||||
self.sync_lock = threading.Lock()
|
||||
|
||||
def _init_serial(self):
|
||||
"""初始化485总线串口(主控制器统一管理)"""
|
||||
try:
|
||||
self.ser = serial.Serial(
|
||||
port=SERIAL_PORT,
|
||||
baudrate=BAUD_RATE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
timeout=SERIAL_TIMEOUT,
|
||||
write_timeout=SERIAL_TIMEOUT,
|
||||
xonxoff=False,
|
||||
rtscts=False,
|
||||
dsrdtr=False
|
||||
)
|
||||
if self.ser.is_open:
|
||||
print(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})")
|
||||
# 初始化时清空缓冲区
|
||||
with GLOBAL_SERIAL_LOCK:
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.reset_output_buffer()
|
||||
else:
|
||||
raise RuntimeError("串口初始化失败:无法打开串口")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"485串口初始化失败: {e}")
|
||||
|
||||
def on_sensor_triggered(self):
|
||||
"""传感器触发回调(同步两个传送带停止)"""
|
||||
with self.sync_lock:
|
||||
# 检查是否两个传送带都检测到挡板遮挡
|
||||
if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered:
|
||||
# 停止未停止的传送带
|
||||
if not self.conveyor1.is_stopped:
|
||||
self.conveyor1.stop_motor()
|
||||
if not self.conveyor2.is_stopped:
|
||||
self.conveyor2.stop_motor()
|
||||
|
||||
self.both_stopped = True
|
||||
print(f"\n[主控制器] 两个传送带都已检测到挡板并停止!任务完成")
|
||||
self.global_stop_flag = True # 标记全局停止
|
||||
|
||||
def start_all_conveyors(self):
|
||||
"""启动所有传送带(按需求顺序:启动电机→等待初始挡板离开→开启传感器检测)"""
|
||||
print("\n=== 主控制器:启动所有传送带===")
|
||||
|
||||
# 检查串口是否正常
|
||||
if not (self.ser and self.ser.is_open):
|
||||
print("[主控制器] 485串口未打开,无法启动")
|
||||
return False
|
||||
|
||||
# 1. 第一步:同步启动两个电机
|
||||
print("[主控制器] 同步启动两个传送带(初始挡板遮挡传感器)")
|
||||
self.conveyor1.start_motor()
|
||||
self.conveyor2.start_motor()
|
||||
|
||||
# 2. 第二步:等待两个传送带的初始挡板都离开传感器
|
||||
print("[主控制器] 等待两个传送带的初始挡板离开传感器...")
|
||||
self.conveyor1.wait_init_sensor_release()
|
||||
self.conveyor2.wait_init_sensor_release()
|
||||
|
||||
# 3. 第三步:初始挡板都离开后,启动传感器检测线程
|
||||
print("[主控制器] 初始挡板已全部离开,启动传感器检测线程")
|
||||
self.conveyor1.start_sensor_thread(self)
|
||||
self.conveyor2.start_sensor_thread(self)
|
||||
|
||||
print("[主控制器] 传送带启动流程完成,等待检测挡板遮挡...")
|
||||
return True
|
||||
|
||||
def stop_all_conveyors(self):
|
||||
"""停止所有传送带并关闭串口"""
|
||||
print("\n=== 主控制器:停止所有传送带 ===")
|
||||
self.global_stop_flag = True
|
||||
|
||||
# 强制停止两个电机
|
||||
self.conveyor1.stop_motor()
|
||||
self.conveyor2.stop_motor()
|
||||
|
||||
# 关闭串口
|
||||
time.sleep(1)
|
||||
with GLOBAL_SERIAL_LOCK:
|
||||
if self.ser and self.ser.is_open:
|
||||
self.ser.close()
|
||||
print(f"485总线串口 {SERIAL_PORT} 已关闭")
|
||||
|
||||
print("[主控制器] 所有传送带已停止并关闭串口")
|
||||
|
||||
def run(self):
|
||||
"""主运行函数"""
|
||||
try:
|
||||
if not self.start_all_conveyors():
|
||||
return
|
||||
|
||||
# 主线程等待:直到两个传送带都停止或手动退出
|
||||
while not self.global_stop_flag:
|
||||
if self.both_stopped:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n[主控制器] 检测到退出指令,正在停止系统...")
|
||||
except Exception as e:
|
||||
print(f"\n[主控制器] 程序异常: {e}")
|
||||
finally:
|
||||
self.stop_all_conveyors()
|
||||
print("\n=== 主控制器:程序执行完毕 ===")
|
||||
|
||||
# -----------传送带对外接口--------------
|
||||
def conveyor_control():
|
||||
"""主函数"""
|
||||
try:
|
||||
master = MasterConveyorController()
|
||||
master.run()
|
||||
except RuntimeError as e:
|
||||
print(f"系统启动失败: {e}")
|
||||
except Exception as e:
|
||||
print(f"未知异常: {e}")
|
||||
|
||||
# ------------测试接口--------------
|
||||
if __name__ == '__main__':
|
||||
conveyor_control()
|
||||
|
||||
|
||||
61
main_control.py
Normal file
61
main_control.py
Normal file
@ -0,0 +1,61 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
'''
|
||||
# @Time : 2025/12/12 11:05
|
||||
# @Author : reenrr
|
||||
# @File : main_control.py
|
||||
# @Desc : 主控程序
|
||||
'''
|
||||
import multiprocessing # 多进程模块
|
||||
import threading
|
||||
from threading import Event
|
||||
import time
|
||||
from EMV import sensor_triggered, global_relay, control_solenoid
|
||||
from visual_algorithm import flaw_detection
|
||||
|
||||
# ------------全局事件-------------
|
||||
manger = multiprocessing.Manager()
|
||||
conveyor_start_event = manger.Event()
|
||||
|
||||
|
||||
def quality_testing():
|
||||
print("线条开始质量检测:")
|
||||
|
||||
# 执行质量检测
|
||||
result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08})
|
||||
if result == "qualified":
|
||||
result = "合格"
|
||||
print("该线条是否合格:", result)
|
||||
print("等待线条落到传送带(双压传感器触发)...")
|
||||
# 等待时间触发,超时时间设为10秒(避免无限等待)
|
||||
if sensor_triggered.wait(timeout=10):
|
||||
print("线条已落到传送带,控制两个传送带向前移动")
|
||||
# 触发传送带启动事件
|
||||
conveyor_start_event.set()
|
||||
else:
|
||||
print("超时警告:线条未落到传送带,请检查")
|
||||
elif result == "unqualified":
|
||||
result = "不合格"
|
||||
print("该线条是否合格:", result)
|
||||
print("进入NG动作")
|
||||
control_solenoid() # 执行NG动作,控制电磁阀
|
||||
print("NG动作结束")
|
||||
# print("判断NG线条是否落入肥料区:")
|
||||
|
||||
# -----------对外接口-------------
|
||||
def main_control():
|
||||
print("开始摆放线条")
|
||||
|
||||
# 质量检测
|
||||
quality_testing()
|
||||
|
||||
while True: # 防止跳出循环
|
||||
time.sleep(1)
|
||||
|
||||
# ------------测试接口-------------
|
||||
if __name__ == '__main__':
|
||||
main_control()
|
||||
|
||||
|
||||
|
||||
|
||||
14
readme.md
Normal file
14
readme.md
Normal file
@ -0,0 +1,14 @@
|
||||
# 开发板上找串口号
|
||||
## 找串口号
|
||||
sudo ls /dev/tty{S*,ACM*,USB*,rfcomm*}
|
||||
看看是哪儿个串口号,比如我的是/dev/ttyACM0
|
||||
|
||||
使用sudo dmesg | grep -i acm查看,如果出现:
|
||||
ttyACM0: USB ACM device
|
||||
说明是这个串口
|
||||
|
||||
## 给串口赋权限
|
||||
sudo chmod 666 /dev/ttyACM0
|
||||
|
||||
# python版本
|
||||
3.9
|
||||
50
visual_algorithm/visual_algorithm.py
Normal file
50
visual_algorithm/visual_algorithm.py
Normal file
@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
# @Time : 2025/12/23 16:44
|
||||
# @Author : reenrr
|
||||
# @File : visual_algorithm.py
|
||||
# @Desc : 留给视觉--质量检测的接口
|
||||
"""
|
||||
import random
|
||||
|
||||
|
||||
# -------------------------- 核心算法接口(后续替换此处即可) --------------------------
|
||||
def visual_algorithm_core(line_data: dict) -> str:
|
||||
"""
|
||||
视觉算法核心判定函数(模拟版本)
|
||||
后续接入真实视觉算法时,直接替换此函数的实现逻辑即可
|
||||
|
||||
param: line_data: 线条特征数据(字典格式,可根据实际需求扩展字段)
|
||||
示例:{"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08}
|
||||
return: 判定结果,固定返回 "qualified" 或 "unqualified"
|
||||
"""
|
||||
# 模拟随机返回合格/不合格(真实算法时,替换为实际判定逻辑)
|
||||
# return random.choice(["qualified", "unqualified"])
|
||||
return "qualified"
|
||||
|
||||
|
||||
# --------------外部接口---------------
|
||||
def flaw_detection(line_data: dict) -> str:
|
||||
"""
|
||||
视觉算法缺陷检测统一接口(对外调用入口)
|
||||
封装核心算法,保证接口格式统一,后续替换算法不影响调用方
|
||||
|
||||
:param line_data: 线条特征数据(需和核心算法入参一致)
|
||||
:return: 检测结果,"qualified"(合格) / "unqualified"(不合格)
|
||||
"""
|
||||
# 调用核心算法(后续仅需修改 visual_algorithm_core 函数)
|
||||
result = visual_algorithm_core(line_data)
|
||||
# 结果校验(保证返回值符合规范)
|
||||
if result not in ["qualified", "unqualified"]:
|
||||
raise ValueError("视觉算法返回值异常,仅支持 'qualified' 或 'unqualified'")
|
||||
return result
|
||||
|
||||
# ------------测试接口---------------
|
||||
if __name__ == '__main__':
|
||||
result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08})
|
||||
print("线条质量检测")
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user