Files
wire_controlsystem/conveyor_controller/conveyor_master_controller1.py

502 lines
21 KiB
Python
Raw Permalink Normal View History

2026-01-06 16:01:15 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/12/22
# @Author : reenrr
# @File : conveyor_master_controller.py
# @Desc : 传送带1和2协同控制 使用一个串口两个轴地址485总线 每隔4秒两个传送带同步走一个挡板的距离
'''
import logging
import threading
import time
from datetime import datetime
import serial
from EMV import RelayController
# 彻底屏蔽pymodbus所有日志
logging.getLogger("pymodbus").setLevel(logging.CRITICAL)
logging.basicConfig(level=logging.INFO)
# --- 全局参数配置 ---
SERIAL_PORT = '/dev/ttyUSB0' # 单串口485总线
BAUD_RATE = 115200
ACTION_DELAY = 5
SLAVE_ID_1 = 1 # 传送带1轴地址
SLAVE_ID_2 = 2 # 传送带2轴地址
SERIAL_TIMEOUT = 0.05 # 串口超时50ms
SYNC_STOP_DURATION = 4.0 # 同步停止时间4秒
SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间200ms
DETECTION_INTERVAL = 0.05 # 传感器检测间隔50ms
RELEASE_WAIT_TIMEOUT = 5.0 # 等待挡板离开超时5秒
# 全局串口锁485总线必须单指令发送避免冲突
GLOBAL_SERIAL_LOCK = threading.Lock()
class SingleMotorController:
"""单个电机控制器(按轴地址自动匹配指令集,复用全局串口)"""
def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock):
self.slave_id = slave_id # 轴地址1/2
self.conveyor_id = conveyor_id # 传送带编号1/2
self.action_delay = action_delay
self.ser = serial_obj # 复用主控制器的串口实例
self.global_serial_lock = global_serial_lock # 全局串口锁
# 初始化指令
self._init_commands()
# 核心状态标志
self.status_thread_is_running = False
self.conveyor_thread_is_running = False
self.sensor_triggered = False # 传感器触发标志
self.sensor_locked = False # 传感器锁定(防抖)
self.stop_flag = False # 本地停止标志
self.wait_sensor_release = False # 重启后等待挡板离开标志
self.last_sensor_trigger = 0 # 上次触发时间(防抖)
# 线程对象
self.monitor_thread = None
self.run_speed_thread = None
self.relay_controller = RelayController()
# 锁
self.sensor_lock = threading.Lock()
self.state_lock = threading.Lock()
def _init_commands(self):
"""根据轴地址初始化指令集"""
if self.slave_id == 1:
# --------传送带1轴地址1指令集--------
self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令
self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令
self.speed_commands = [
bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式
bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30
bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度
bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度
]
elif self.slave_id == 2:
# --------传送带2轴地址2指令集--------
self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令
self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令
self.speed_commands = [
bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式
bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30
bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度
bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度
]
else:
raise ValueError(f"不支持的轴地址:{self.slave_id}仅支持1/2")
# 打印指令(调试用)
logging.info(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:")
logging.info(f" 启动指令: {self.start_command.hex(' ')}")
logging.info(f" 停止指令: {self.stop_command.hex(' ')}")
def send_command_list(self, command_list, delay=0.05):
"""批量发送指令列表(加全局锁,指令间延时避免总线冲突)"""
if not (self.ser and self.ser.is_open):
logging.info(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送")
return
for idx, cmd in enumerate(command_list):
self.send_and_receive_raw(cmd, f"指令{idx + 1}")
time.sleep(delay) # 485总线指令间必须加延时
def clear_buffer(self):
"""清空串口缓冲区(加全局锁)"""
if self.ser and self.ser.is_open:
with self.global_serial_lock:
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
while self.ser.in_waiting > 0:
self.ser.read(self.ser.in_waiting)
time.sleep(0.001)
def send_and_receive_raw(self, command, description):
"""底层串口通信(核心:使用全局锁保证单指令发送)"""
if not (self.ser and self.ser.is_open):
logging.info(f"传送带{self.conveyor_id}串口未打开,跳过发送")
return None
try:
self.clear_buffer()
# 全局锁:同一时间仅一个设备发送指令
with self.global_serial_lock:
send_start = time.perf_counter()
self.ser.write(command)
self.ser.flush()
send_cost = (time.perf_counter() - send_start) * 1000
# 接收响应(仅对应轴地址的设备会回复)
recv_start = time.perf_counter()
response = b""
while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT:
if self.ser.in_waiting > 0:
chunk = self.ser.read(8)
response += chunk
if len(response) >= 8:
break
time.sleep(0.001)
recv_cost = (time.perf_counter() - recv_start) * 1000
# 处理响应
valid_resp = response[:8] if len(response) >= 8 else response
logging.info(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]")
logging.info(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)")
logging.info(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)")
return valid_resp
except Exception as e:
logging.info(f"传送带{self.conveyor_id}通信异常 ({description}): {e}")
return None
def emergency_stop(self):
"""紧急停止(调用对应轴地址的停止指令)"""
with self.state_lock:
if self.stop_flag:
return
self.stop_flag = True
self.send_and_receive_raw(self.stop_command, "停止寄存器")
logging.info(f"[传送带{self.conveyor_id}] 电机(轴地址{self.slave_id})已紧急停止")
def resume_motor(self):
"""恢复电机运行(调用对应轴地址的启动指令)"""
with self.state_lock:
self.stop_flag = False
self.sensor_triggered = False
self.sensor_locked = False
self.last_sensor_trigger = 0
self.wait_sensor_release = True # 标记需要等待挡板离开
# 主动发送运行指令
self.send_and_receive_raw(self.start_command, "重启寄存器")
logging.info(f"[传送带{self.conveyor_id}] 电机(轴地址{self.slave_id})已重启,等待挡板离开传感器...")
def wait_for_sensor_release(self):
"""等待挡板离开传感器"""
start_time = time.time()
logging.info(f"[传送带{self.conveyor_id}] 开始等待挡板离开(超时{RELEASE_WAIT_TIMEOUT}秒)")
while time.time() - start_time < RELEASE_WAIT_TIMEOUT:
# 读取传感器状态True=无遮挡False=有遮挡)
if self.conveyor_id == 1:
sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors')
else:
sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors')
# 传感器恢复为无遮挡True说明挡板已离开
if sensor_status:
logging.info(f"[传送带{self.conveyor_id}] 挡板已离开传感器,恢复正常检测")
with self.state_lock:
self.wait_sensor_release = False
return True
time.sleep(DETECTION_INTERVAL)
# 超时处理
logging.info(f"[传送带{self.conveyor_id}] 等待挡板离开超时!强制恢复检测")
with self.state_lock:
self.wait_sensor_release = False
return False
def monitor_conveyors_sensor_status(self, master_controller):
"""传感器检测线程"""
logging.info(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s")
while self.status_thread_is_running and not master_controller.global_stop_flag:
try:
with self.sensor_lock:
# 1. 全局停止/电机停止时跳过检测
if master_controller.global_stop_flag or self.stop_flag:
time.sleep(DETECTION_INTERVAL)
continue
# 2. 重启后先等待挡板离开
if self.wait_sensor_release:
self.wait_for_sensor_release()
continue
# 3. 读取传感器状态
if self.conveyor_id == 1:
sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors')
else:
sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors')
current_time = time.time()
# 4. 检测到挡板且满足防抖条件
if sensor_status and (not self.sensor_locked) and \
2026-01-06 16:01:15 +08:00
(current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME:
logging.info(f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即响应")
self.last_sensor_trigger = current_time
with self.state_lock:
self.sensor_triggered = True
self.sensor_locked = True
# 立即通知主控制器
threading.Thread(
target=master_controller.on_sensor_triggered,
args=(self.conveyor_id,),
daemon=True
).start()
time.sleep(DETECTION_INTERVAL)
except Exception as e:
logging.info(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}")
time.sleep(0.1)
logging.info(f"[传送带{self.conveyor_id}] 传感器检测线程已停止")
def run_speed_mode(self, master_controller):
"""电机速度模式线程"""
logging.info(f"[传送带{self.conveyor_id}] 电机速度模式线程已启动(轴地址{self.slave_id}")
self.conveyor_thread_is_running = True
while self.conveyor_thread_is_running and not master_controller.global_stop_flag:
try:
# 串口未打开则退出循环(由主控制器管理串口连接)
if not (self.ser and self.ser.is_open):
logging.info(f"[传送带{self.conveyor_id}] 串口已关闭,停止发送运行指令")
time.sleep(1)
continue
# 仅当未停止时发送运行指令
if not self.stop_flag:
self.send_and_receive_raw(self.start_command, "持续运行指令")
# 启动传感器线程(仅一次)
if not self.status_thread_is_running and (
self.monitor_thread is None or not self.monitor_thread.is_alive()):
self.status_thread_is_running = True
self.monitor_thread = threading.Thread(
target=self.monitor_conveyors_sensor_status,
args=(master_controller,),
daemon=True
)
self.monitor_thread.start()
time.sleep(0.5)
except Exception as e:
logging.info(f"[传送带{self.conveyor_id}] 电机运行异常: {e}")
time.sleep(1)
self.emergency_stop()
self.conveyor_thread_is_running = False
logging.info(f"[传送带{self.conveyor_id}] 电机速度模式线程已停止")
def start_run_speed_thread(self, master_controller):
"""启动电机线程"""
if self.run_speed_thread and self.run_speed_thread.is_alive():
with self.state_lock:
self.conveyor_thread_is_running = False
self.run_speed_thread.join(timeout=2)
with self.state_lock:
self.stop_flag = False
self.sensor_triggered = False
self.sensor_locked = False
self.wait_sensor_release = True
self.last_sensor_trigger = 0
self.run_speed_thread = threading.Thread(
target=self.run_speed_mode,
args=(master_controller,),
daemon=True
)
self.run_speed_thread.start()
class MasterConveyorController:
"""主控制器 - 单串口管理两个传送带485总线"""
def __init__(self):
self.global_stop_flag = False
self.sync_running = False
# 1. 初始化单串口485总线
self.ser = None
self._init_serial()
# 2. 初始化两个电机控制器(复用同一个串口)
self.conveyor1 = SingleMotorController(
slave_id=SLAVE_ID_1,
conveyor_id=1,
action_delay=ACTION_DELAY,
serial_obj=self.ser,
global_serial_lock=GLOBAL_SERIAL_LOCK
)
self.conveyor2 = SingleMotorController(
slave_id=SLAVE_ID_2,
conveyor_id=2,
action_delay=ACTION_DELAY,
serial_obj=self.ser,
global_serial_lock=GLOBAL_SERIAL_LOCK
)
# 同步锁
self.sync_lock = threading.Lock()
self.sync_condition = threading.Condition(self.sync_lock)
def _init_serial(self):
"""初始化485总线串口主控制器统一管理"""
try:
self.ser = serial.Serial(
port=SERIAL_PORT,
baudrate=BAUD_RATE,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=SERIAL_TIMEOUT,
write_timeout=SERIAL_TIMEOUT,
xonxoff=False,
rtscts=False,
dsrdtr=False
)
if self.ser.is_open:
logging.info(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE}")
# 初始化时清空缓冲区
with GLOBAL_SERIAL_LOCK:
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
else:
raise RuntimeError("串口初始化失败:无法打开串口")
except Exception as e:
raise RuntimeError(f"485串口初始化失败: {e}")
def on_sensor_triggered(self, conveyor_id):
"""传感器触发回调"""
with self.sync_condition:
if self.sync_running:
return
if conveyor_id == 1:
self.conveyor1.sensor_triggered = True
logging.info(f"\n[主控制器] 传送带1检测到挡板等待传送带2...")
else:
self.conveyor2.sensor_triggered = True
logging.info(f"\n[主控制器] 传送带2检测到挡板等待传送带1...")
# 立即停止当前触发的传送带
if conveyor_id == 1:
self.conveyor1.emergency_stop()
else:
self.conveyor2.emergency_stop()
# 两个都触发后同步停止
if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered:
self.sync_running = True
logging.info(f"\n[主控制器] 两个传送带都检测到挡板!开始同步停止 {SYNC_STOP_DURATION}")
self.conveyor1.emergency_stop()
self.conveyor2.emergency_stop()
# 同步停止+重启逻辑
def sync_stop_and_resume():
try:
time.sleep(SYNC_STOP_DURATION)
with self.sync_condition:
logging.info(f"\n[主控制器] 同步停止结束,重启所有传送带")
# 重启两个电机
self.conveyor1.resume_motor()
self.conveyor2.resume_motor()
# 重置所有状态
self.conveyor1.sensor_triggered = False
self.conveyor2.sensor_triggered = False
self.conveyor1.sensor_locked = False
self.conveyor2.sensor_locked = False
self.conveyor1.last_sensor_trigger = 0
self.conveyor2.last_sensor_trigger = 0
# 强制重启电机线程
self.conveyor1.start_run_speed_thread(self)
self.conveyor2.start_run_speed_thread(self)
self.sync_running = False
self.sync_condition.notify_all()
except Exception as e:
logging.info(f"[主控制器] 同步恢复异常: {e}")
self.sync_running = False
threading.Thread(target=sync_stop_and_resume, daemon=True).start()
def start_all_conveyors(self):
"""启动所有传送带"""
logging.info("\n=== 主控制器启动所有传送带485总线===")
# 检查串口是否正常
if not (self.ser and self.ser.is_open):
logging.info("[主控制器] 485串口未打开无法启动")
return False
# 发送速度模式指令(按轴地址发送)
self.conveyor1.send_command_list(self.conveyor1.speed_commands[:4])
self.conveyor2.send_command_list(self.conveyor2.speed_commands[:4])
# 启动电机线程
self.conveyor1.start_run_speed_thread(self)
self.conveyor2.start_run_speed_thread(self)
logging.info("[主控制器] 所有传送带已启动,等待挡板离开后开始检测...")
return True
def stop_all_conveyors(self):
"""停止所有传送带并关闭串口"""
logging.info("\n=== 主控制器:停止所有传送带 ===")
self.global_stop_flag = True
with self.sync_condition:
self.sync_running = False
self.conveyor1.emergency_stop()
self.conveyor2.emergency_stop()
# 关闭串口
time.sleep(1)
with GLOBAL_SERIAL_LOCK:
if self.ser and self.ser.is_open:
self.ser.close()
logging.info(f"485总线串口 {SERIAL_PORT} 已关闭")
logging.info("[主控制器] 所有传送带已停止并关闭串口")
def run(self):
"""主运行函数"""
try:
if not self.start_all_conveyors():
return
# 主线程保持运行
while not self.global_stop_flag:
time.sleep(0.5)
except KeyboardInterrupt:
logging.info("\n\n[主控制器] 检测到退出指令,正在停止系统...")
except Exception as e:
logging.info(f"\n[主控制器] 程序异常: {e}")
finally:
self.stop_all_conveyors()
logging.info("\n=== 主控制器:程序执行完毕 ===")
def main():
"""主函数"""
try:
master = MasterConveyorController()
master.run()
except RuntimeError as e:
logging.info(f"系统启动失败: {e}")
except Exception as e:
logging.info(f"未知异常: {e}")
if __name__ == '__main__':
main()