Files
ailai_show/workflow_test.py
2025-07-29 13:16:30 +08:00

354 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import threading
from enum import IntEnum
# --- 1. 简化模拟必要的 Enum 和类 ---
class FeedStatus(IntEnum):
FNone = 0
FStart = 1
FCheck = 2
FMid = 3
FPhoto = 4
FTake = 5
FDropBag = 9 # 简化,只保留关键状态
class CatchStatus:
CNone = 0
CTake = 1
COk = 2
CDrop = 3
class Real_Position:
"""简化的位置表示"""
def __init__(self, name=""):
self.name = name
def compare(self, other, is_action=False):
# 简化比较,只比较名字
return self.name == other.name
def __str__(self):
return self.name
class PositionModel:
"""简化的位置模型"""
def __init__(self, status, position):
self.status = status
self.position = position
def get_position(self):
return self.position
class FeedLine:
"""简化 FeedLine只包含关键点"""
def __init__(self, name, positions: list): # positions: list of PositionModel
self.name = name
# 假设 positions 按顺序排列,且包含 FTake 和 FDropBag 点
self.positions = positions
self.take_position_model = None
self.drop_position_model = None
self._find_key_positions()
def _find_key_positions(self):
for pm in self.positions:
if pm.status == FeedStatus.FTake:
self.take_position_model = pm
elif pm.status == FeedStatus.FDropBag:
self.drop_position_model = pm
def get_take_position(self):
return self.take_position_model
# --- 2. 简化模拟 Catch 模块 ---
class SimulatedCatch:
def __init__(self, log_func):
self.catch_status = CatchStatus.CNone
self.log_signal = log_func # 用于打印日志的函数
def run(self):
"""模拟抓取动作"""
if self.catch_status == CatchStatus.CTake:
self.log_signal(logging.INFO, "🤖 Sim Catch: 开始执行抓取动作...")
time.sleep(1) # 模拟抓取耗时
self.catch_status = CatchStatus.COk
self.log_signal(logging.INFO, "🎉 Sim Catch: 抓取完成!")
elif self.catch_status == CatchStatus.CDrop:
self.log_signal(logging.INFO, "📦 Sim Catch: 开始执行投料动作...")
time.sleep(0.5) # 模拟投料耗时
self.catch_status = CatchStatus.COk
self.log_signal(logging.INFO, "🎊 Sim Catch: 投料完成!")
# --- 3. 简化 FeedingConfig ---
class FeedingConfig:
def __init__(self, num: int, feedLine: FeedLine, drop_positions: list):
self.num = num
self.feedLine = feedLine
self.drop_positions = drop_positions # list of Real_Position
# --- 4. 简化 RobotClient 状态 ---
class SimulatedRobotClient:
def __init__(self, initial_position_name):
self.status_model = type('obj', (object,), {'world_0': 0, 'world_1': 0, 'world_2': 0, 'world_3': 0, 'world_4': 0, 'world_5': 0})()
self.origin_position = Real_Position("Origin")
self.feed_speed = 10
# 当前模拟位置
self.current_position = Real_Position(initial_position_name)
def getRealPosition(self):
return self.current_position
# --- 5. 模拟传感器 ---
class SimulatedSensor:
def __init__(self, name, initial_state=False):
self.name = name
self.state = initial_state
self.thread = None
self._running = False
def start_simulation(self, sequence):
"""根据一个序列 (时间, 状态) 来模拟传感器状态变化"""
self._running = True
def _run():
start_time = time.time()
seq_index = 0
while self._running and seq_index < len(sequence):
elapsed = time.time() - start_time
if elapsed >= sequence[seq_index][0]: # sequence[seq_index][0] 是触发时间
self.state = sequence[seq_index][1] # sequence[seq_index][1] 是触发状态
print(f" 📡 [{self.name}] 模拟状态变为: {self.state}")
seq_index += 1
time.sleep(0.1) # 检查间隔
self.thread = threading.Thread(target=_run, daemon=True)
self.thread.start()
def stop_simulation(self):
self._running = False
if self.thread:
self.thread.join()
# --- 6. 简化版 Feeding 类 (核心逻辑) ---
import logging
class SimulatedFeeding:
def __init__(self, robotClient: SimulatedRobotClient, sensor2: SimulatedSensor):
self.feedConfig = None
self.feedStatus = FeedStatus.FNone
self.robotClient = robotClient
self.sensor2 = sensor2 # 使用模拟传感器
self.pause = False
self.is_reverse = False
self.catch = SimulatedCatch(self._simple_log) # 使用模拟 Catch
# --- 码垛模式新增 ---
self.current_drop_index = 0
# 简化的日志信号
# self.log_signal = self._simple_log
self.log_signal = lambda level, message: self._simple_log(level, message)
def _simple_log(self, level, message):
level_str = {logging.INFO: "INFO", logging.ERROR: "ERROR", logging.WARNING: "WARN"}.get(level, "LOG")
print(f"[{level_str}] {message}")
def set_config(self, config: FeedingConfig):
self.feedConfig = config
def start(self):
if self.feedConfig:
self.feedStatus = FeedStatus.FStart
self.log_signal(logging.INFO, "🚀 启动仿真投料流程")
# self.log_signal(logging.INFO, "🚀 启动仿真投料流程")
def run(self):
"""仿真运行一次主循环"""
if self.feedConfig is None or self.feedStatus == FeedStatus.FNone or self.pause:
return
# 获取当前模拟位置
real_position = self.robotClient.current_position
# --- 简化版 FStart ---
if self.feedStatus == FeedStatus.FStart:
# self.log_signal(logging.INFO, "🟢 FStart: 初始化流程")
self.log_signal(logging.INFO, "🟢 FStart: 初始化流程")
self.current_drop_index = 0 # 重置投料点索引
# 模拟移动到抓取点 (直接设置位置)
take_pos_model = self.feedConfig.feedLine.get_take_position()
if take_pos_model:
self.robotClient.current_position = take_pos_model.get_position()
self.log_signal(logging.INFO, f"🚚 FStart: 移动到抓取点 {self.robotClient.current_position}")
# self.log_signal(logging.INFO, f"🚚 FStart: 移动到抓取点 {self.robotClient.current_position}")
self.feedStatus = FeedStatus.FTake
else:
# self.log_signal(logging.ERROR, "❌ FStart: 未找到抓取点!")
self.log_signal(logging.ERROR, "❌ FStart: 未找到抓取点!")
self.feedStatus = FeedStatus.FNone
return # 等待下次循环
# --- 简化版 FTake ---
elif self.feedStatus == FeedStatus.FTake:
# self.log_signal(logging.INFO, "✋ FTake: 检查是否到达抓取点并等待传感器信号")
self.log_signal(logging.INFO, "✋ FTake: 检查是否到达抓取点并等待传感器信号")
take_position_model = self.feedConfig.feedLine.get_take_position()
if not take_position_model or not take_position_model.get_position():
self.log_signal(logging.ERROR, "❌ FTake: 抓取点配置错误!")
# self.log_signal(logging.ERROR, "❌ FTake: 抓取点配置错误!")
self.feedStatus = FeedStatus.FNone
return
if not take_position_model.get_position().compare(real_position):
self.log_signal(logging.WARNING, f"⚠️ FTake: 机器人未在抓取点 ({take_position_model.get_position()}), 当前在 {real_position}")
# self.log_signal(logging.WARNING, f"⚠️ FTake: 机器人未在抓取点 ({take_position_model.get_position()}), 当前在 {real_position}")
# 在仿真中,我们假设它会移动到位,所以直接设置
self.robotClient.current_position = take_position_model.get_position()
self.log_signal(logging.INFO, f"🔧 强制移动到抓取点 {self.robotClient.current_position}")
# self.log_signal(logging.INFO, f"🔧 强制移动到抓取点 {self.robotClient.current_position}")
# 模拟等待传感器2信号
# 在实际仿真中,我们可以通过预设序列或用户输入来控制
# 这里用一个简单的循环等待 (实际应用中应避免阻塞)
wait_start = time.time()
timeout = 10 # 10秒超时
while not self.sensor2.state:
if time.time() - wait_start > timeout:
# self.log_signal(logging.ERROR, "⏰ FTake: 等待传感器2超时!")
self.log_signal(logging.ERROR, "⏰ FTake: 等待传感器2超时!")
self.feedStatus = FeedStatus.FNone
return
self.log_signal(logging.INFO, "⏳ FTake: 等待传感器2信号...")
time.sleep(1) # 每秒检查一次
self.log_signal(logging.INFO, "✅ FTake: 传感器2信号收到开始抓取")
# 执行抓取动作
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CTake
return # 等待下一周期 Catch 完成
if self.catch.catch_status == CatchStatus.CTake:
return # 等待 Catch 模块处理
if self.catch.catch_status == CatchStatus.COk:
self.catch.catch_status = CatchStatus.CNone
self.log_signal(logging.INFO, "👏 FTake: 抓取成功,准备投料")
# 移动到第一个投料点
if self.feedConfig.drop_positions:
target_drop_pos = self.feedConfig.drop_positions[self.current_drop_index]
self.robotClient.current_position = target_drop_pos
self.log_signal(logging.INFO, f"🚚 FTake: 移动到投料点 {self.current_drop_index} ({target_drop_pos})")
self.feedStatus = FeedStatus.FDropBag
else:
self.log_signal(logging.ERROR, "❌ FTake: 投料点列表为空!")
self.feedStatus = FeedStatus.FNone
return
# --- 简化版 FDropBag (码垛核心) ---
elif self.feedStatus == FeedStatus.FDropBag:
if not self.feedConfig.drop_positions:
self.log_signal(logging.ERROR, "❌ FDropBag: 投料点列表为空!")
self.feedStatus = FeedStatus.FNone
return
target_drop_position = self.feedConfig.drop_positions[self.current_drop_index]
self.log_signal(logging.INFO, f"📦 FDropBag: 当前目标投料点索引: {self.current_drop_index} ({target_drop_position})")
# 检查是否到达目标投料点 (仿真中直接比较)
if target_drop_position.compare(real_position):
self.log_signal(logging.INFO, f"🎯 FDropBag: 到达投料点 {self.current_drop_index}")
# 执行投料动作
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CDrop
self.log_signal(logging.INFO, "🪂 FDropBag: 开始执行投料动作...")
return # 等待下一周期检查 COk
if self.catch.catch_status == CatchStatus.CDrop:
self.log_signal(logging.INFO, "🪂 FDropBag: 投料动作进行中...")
return # 等待 Catch 模块完成
if self.catch.catch_status == CatchStatus.COk:
self.catch.catch_status = CatchStatus.CNone
self.log_signal(logging.INFO, f"🎉 FDropBag: 在投料点 {self.current_drop_index} 投料完成")
# 更新投料计数
self.feedConfig.num -= 1
self.log_signal(logging.INFO, f'🔢 剩余投料次数: {self.feedConfig.num}')
# 检查是否所有投料任务完成
if self.feedConfig.num > 0:
# 更新投料点索引 (循环)
self.current_drop_index = (self.current_drop_index + 1) % len(self.feedConfig.drop_positions)
self.log_signal(logging.INFO, f"🔄 更新投料点索引为: {self.current_drop_index}")
# 移动到固定的抓取点 (仿真中直接设置位置)
take_position_model = self.feedConfig.feedLine.get_take_position()
if take_position_model:
self.log_signal(logging.INFO, f"🔄 移动到固定抓取点...")
self.robotClient.current_position = take_position_model.get_position()
self.log_signal(logging.INFO, f"📍 已移动到抓取点 {self.robotClient.current_position}")
self.feedStatus = FeedStatus.FTake # 设置状态为 FTake
else:
self.log_signal(logging.ERROR, "❌ 无法获取抓取点位置!")
self.feedStatus = FeedStatus.FNone
else:
# 所有投料完成
self.log_signal(logging.INFO, "🎊 所有投料任务完成")
self.feedStatus = FeedStatus.FNone # 结束流程
else:
# 如果还没到达目标投料点 (仿真中直接移动)
self.log_signal(logging.INFO, f"🚚 正在移动到投料点 {self.current_drop_index} ({target_drop_position})")
self.robotClient.current_position = target_drop_position
self.log_signal(logging.INFO, f"📍 已移动到投料点 {self.current_drop_index} ({self.robotClient.current_position})")
# --- 7. 主程序入口 ---
if __name__ == "__main__":
# --- 配置仿真环境 ---
logging.basicConfig(level=logging.INFO) # 设置日志级别
# 1. 定义位置点
origin_pos = Real_Position("Origin")
take_pos = Real_Position("Fixed_Take_Point")
drop_pos_1 = Real_Position("Drop_Point_1")
drop_pos_2 = Real_Position("Drop_Point_2")
drop_pos_3 = Real_Position("Drop_Point_3")
# 2. 创建 FeedLine (简化)
feedline_positions = [
PositionModel(FeedStatus.FTake, take_pos),
PositionModel(FeedStatus.FDropBag, Real_Position("Placeholder_Drop_Pos")) # 占位,实际由 drop_positions 列表决定
]
feed_line = FeedLine("SimFeedLine", feedline_positions)
# 3. 定义投料点列表 (码垛点)
drop_points = [drop_pos_1, drop_pos_2, drop_pos_3]
# 4. 创建 FeedingConfig
total_bags = 5 # 总共要投的料包数
feeding_config = FeedingConfig(num=total_bags, feedLine=feed_line, drop_positions=drop_points)
# 5. 创建模拟机器人和传感器
robot_client = SimulatedRobotClient("Origin")
sensor2 = SimulatedSensor("Sensor2", initial_state=False) # 初始无信号
# 6. 创建并配置仿真 Feeding
sim_feeding = SimulatedFeeding(robot_client, sensor2)
sim_feeding.set_config(feeding_config)
# 7. (可选) 预设传感器2信号序列 [(时间(秒), 状态), ...]
# 例如在第2秒时变为True第8秒时变为False第15秒时再变为True...
sensor2_sequence = [(2, True), (8, False), (15, True), (22, False), (30, True)]
sensor2.start_simulation(sensor2_sequence)
# 8. 启动流程
sim_feeding.start()
# 9. 运行仿真主循环
try:
while sim_feeding.feedStatus != FeedStatus.FNone:
sim_feeding.run()
sim_feeding.catch.run() # 触发模拟抓取/投料逻辑
time.sleep(0.5) # 控制仿真循环速度
except KeyboardInterrupt:
print("\n🛑 仿真被用户中断")
finally:
sensor2.stop_simulation()
print("🏁 仿真结束")