forked from huangxin/ailai
first commit
This commit is contained in:
353
workflow_test.py
Normal file
353
workflow_test.py
Normal file
@ -0,0 +1,353 @@
|
||||
import time
|
||||
import threading
|
||||
from enum import IntEnum
|
||||
|
||||
# --- 1. 简化模拟必要的 Enum 和类 ---
|
||||
|
||||
class FeedStatus(IntEnum):
|
||||
FNone = 0
|
||||
FStart = 1
|
||||
FCheck = 2
|
||||
FMid = 3
|
||||
FPhoto = 4
|
||||
FTake = 5
|
||||
FDropBag = 9 # 简化,只保留关键状态
|
||||
|
||||
class CatchStatus:
|
||||
CNone = 0
|
||||
CTake = 1
|
||||
COk = 2
|
||||
CDrop = 3
|
||||
|
||||
class Real_Position:
|
||||
"""简化的位置表示"""
|
||||
def __init__(self, name=""):
|
||||
self.name = name
|
||||
|
||||
def compare(self, other, is_action=False):
|
||||
# 简化比较,只比较名字
|
||||
return self.name == other.name
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
class PositionModel:
|
||||
"""简化的位置模型"""
|
||||
def __init__(self, status, position):
|
||||
self.status = status
|
||||
self.position = position
|
||||
|
||||
def get_position(self):
|
||||
return self.position
|
||||
|
||||
class FeedLine:
|
||||
"""简化 FeedLine,只包含关键点"""
|
||||
def __init__(self, name, positions: list): # positions: list of PositionModel
|
||||
self.name = name
|
||||
# 假设 positions 按顺序排列,且包含 FTake 和 FDropBag 点
|
||||
self.positions = positions
|
||||
self.take_position_model = None
|
||||
self.drop_position_model = None
|
||||
self._find_key_positions()
|
||||
|
||||
def _find_key_positions(self):
|
||||
for pm in self.positions:
|
||||
if pm.status == FeedStatus.FTake:
|
||||
self.take_position_model = pm
|
||||
elif pm.status == FeedStatus.FDropBag:
|
||||
self.drop_position_model = pm
|
||||
|
||||
def get_take_position(self):
|
||||
return self.take_position_model
|
||||
|
||||
# --- 2. 简化模拟 Catch 模块 ---
|
||||
class SimulatedCatch:
|
||||
def __init__(self, log_func):
|
||||
self.catch_status = CatchStatus.CNone
|
||||
self.log_signal = log_func # 用于打印日志的函数
|
||||
|
||||
def run(self):
|
||||
"""模拟抓取动作"""
|
||||
if self.catch_status == CatchStatus.CTake:
|
||||
self.log_signal(logging.INFO, "🤖 Sim Catch: 开始执行抓取动作...")
|
||||
time.sleep(1) # 模拟抓取耗时
|
||||
self.catch_status = CatchStatus.COk
|
||||
self.log_signal(logging.INFO, "🎉 Sim Catch: 抓取完成!")
|
||||
elif self.catch_status == CatchStatus.CDrop:
|
||||
self.log_signal(logging.INFO, "📦 Sim Catch: 开始执行投料动作...")
|
||||
time.sleep(0.5) # 模拟投料耗时
|
||||
self.catch_status = CatchStatus.COk
|
||||
self.log_signal(logging.INFO, "🎊 Sim Catch: 投料完成!")
|
||||
|
||||
# --- 3. 简化 FeedingConfig ---
|
||||
class FeedingConfig:
|
||||
def __init__(self, num: int, feedLine: FeedLine, drop_positions: list):
|
||||
self.num = num
|
||||
self.feedLine = feedLine
|
||||
self.drop_positions = drop_positions # list of Real_Position
|
||||
|
||||
# --- 4. 简化 RobotClient 状态 ---
|
||||
class SimulatedRobotClient:
|
||||
def __init__(self, initial_position_name):
|
||||
self.status_model = type('obj', (object,), {'world_0': 0, 'world_1': 0, 'world_2': 0, 'world_3': 0, 'world_4': 0, 'world_5': 0})()
|
||||
self.origin_position = Real_Position("Origin")
|
||||
self.feed_speed = 10
|
||||
# 当前模拟位置
|
||||
self.current_position = Real_Position(initial_position_name)
|
||||
|
||||
def getRealPosition(self):
|
||||
return self.current_position
|
||||
|
||||
# --- 5. 模拟传感器 ---
|
||||
class SimulatedSensor:
|
||||
def __init__(self, name, initial_state=False):
|
||||
self.name = name
|
||||
self.state = initial_state
|
||||
self.thread = None
|
||||
self._running = False
|
||||
|
||||
def start_simulation(self, sequence):
|
||||
"""根据一个序列 (时间, 状态) 来模拟传感器状态变化"""
|
||||
self._running = True
|
||||
def _run():
|
||||
start_time = time.time()
|
||||
seq_index = 0
|
||||
while self._running and seq_index < len(sequence):
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed >= sequence[seq_index][0]: # sequence[seq_index][0] 是触发时间
|
||||
self.state = sequence[seq_index][1] # sequence[seq_index][1] 是触发状态
|
||||
print(f" 📡 [{self.name}] 模拟状态变为: {self.state}")
|
||||
seq_index += 1
|
||||
time.sleep(0.1) # 检查间隔
|
||||
self.thread = threading.Thread(target=_run, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def stop_simulation(self):
|
||||
self._running = False
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
|
||||
# --- 6. 简化版 Feeding 类 (核心逻辑) ---
|
||||
import logging
|
||||
|
||||
class SimulatedFeeding:
|
||||
def __init__(self, robotClient: SimulatedRobotClient, sensor2: SimulatedSensor):
|
||||
self.feedConfig = None
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
self.robotClient = robotClient
|
||||
self.sensor2 = sensor2 # 使用模拟传感器
|
||||
self.pause = False
|
||||
self.is_reverse = False
|
||||
self.catch = SimulatedCatch(self._simple_log) # 使用模拟 Catch
|
||||
# --- 码垛模式新增 ---
|
||||
self.current_drop_index = 0
|
||||
|
||||
# 简化的日志信号
|
||||
# self.log_signal = self._simple_log
|
||||
self.log_signal = lambda level, message: self._simple_log(level, message)
|
||||
|
||||
|
||||
def _simple_log(self, level, message):
|
||||
level_str = {logging.INFO: "INFO", logging.ERROR: "ERROR", logging.WARNING: "WARN"}.get(level, "LOG")
|
||||
print(f"[{level_str}] {message}")
|
||||
|
||||
def set_config(self, config: FeedingConfig):
|
||||
self.feedConfig = config
|
||||
|
||||
def start(self):
|
||||
if self.feedConfig:
|
||||
self.feedStatus = FeedStatus.FStart
|
||||
self.log_signal(logging.INFO, "🚀 启动仿真投料流程")
|
||||
# self.log_signal(logging.INFO, "🚀 启动仿真投料流程")
|
||||
|
||||
def run(self):
|
||||
"""仿真运行一次主循环"""
|
||||
if self.feedConfig is None or self.feedStatus == FeedStatus.FNone or self.pause:
|
||||
return
|
||||
|
||||
# 获取当前模拟位置
|
||||
real_position = self.robotClient.current_position
|
||||
|
||||
# --- 简化版 FStart ---
|
||||
if self.feedStatus == FeedStatus.FStart:
|
||||
# self.log_signal(logging.INFO, "🟢 FStart: 初始化流程")
|
||||
self.log_signal(logging.INFO, "🟢 FStart: 初始化流程")
|
||||
self.current_drop_index = 0 # 重置投料点索引
|
||||
# 模拟移动到抓取点 (直接设置位置)
|
||||
take_pos_model = self.feedConfig.feedLine.get_take_position()
|
||||
if take_pos_model:
|
||||
self.robotClient.current_position = take_pos_model.get_position()
|
||||
self.log_signal(logging.INFO, f"🚚 FStart: 移动到抓取点 {self.robotClient.current_position}")
|
||||
# self.log_signal(logging.INFO, f"🚚 FStart: 移动到抓取点 {self.robotClient.current_position}")
|
||||
self.feedStatus = FeedStatus.FTake
|
||||
else:
|
||||
# self.log_signal(logging.ERROR, "❌ FStart: 未找到抓取点!")
|
||||
self.log_signal(logging.ERROR, "❌ FStart: 未找到抓取点!")
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
return # 等待下次循环
|
||||
|
||||
# --- 简化版 FTake ---
|
||||
elif self.feedStatus == FeedStatus.FTake:
|
||||
# self.log_signal(logging.INFO, "✋ FTake: 检查是否到达抓取点并等待传感器信号")
|
||||
self.log_signal(logging.INFO, "✋ FTake: 检查是否到达抓取点并等待传感器信号")
|
||||
take_position_model = self.feedConfig.feedLine.get_take_position()
|
||||
if not take_position_model or not take_position_model.get_position():
|
||||
self.log_signal(logging.ERROR, "❌ FTake: 抓取点配置错误!")
|
||||
# self.log_signal(logging.ERROR, "❌ FTake: 抓取点配置错误!")
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
return
|
||||
|
||||
if not take_position_model.get_position().compare(real_position):
|
||||
self.log_signal(logging.WARNING, f"⚠️ FTake: 机器人未在抓取点 ({take_position_model.get_position()}), 当前在 {real_position}")
|
||||
# self.log_signal(logging.WARNING, f"⚠️ FTake: 机器人未在抓取点 ({take_position_model.get_position()}), 当前在 {real_position}")
|
||||
# 在仿真中,我们假设它会移动到位,所以直接设置
|
||||
self.robotClient.current_position = take_position_model.get_position()
|
||||
self.log_signal(logging.INFO, f"🔧 强制移动到抓取点 {self.robotClient.current_position}")
|
||||
# self.log_signal(logging.INFO, f"🔧 强制移动到抓取点 {self.robotClient.current_position}")
|
||||
|
||||
# 模拟等待传感器2信号
|
||||
# 在实际仿真中,我们可以通过预设序列或用户输入来控制
|
||||
# 这里用一个简单的循环等待 (实际应用中应避免阻塞)
|
||||
wait_start = time.time()
|
||||
timeout = 10 # 10秒超时
|
||||
while not self.sensor2.state:
|
||||
if time.time() - wait_start > timeout:
|
||||
# self.log_signal(logging.ERROR, "⏰ FTake: 等待传感器2超时!")
|
||||
self.log_signal(logging.ERROR, "⏰ FTake: 等待传感器2超时!")
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
return
|
||||
self.log_signal(logging.INFO, "⏳ FTake: 等待传感器2信号...")
|
||||
time.sleep(1) # 每秒检查一次
|
||||
|
||||
self.log_signal(logging.INFO, "✅ FTake: 传感器2信号收到,开始抓取")
|
||||
# 执行抓取动作
|
||||
if self.catch.catch_status == CatchStatus.CNone:
|
||||
self.catch.catch_status = CatchStatus.CTake
|
||||
return # 等待下一周期 Catch 完成
|
||||
if self.catch.catch_status == CatchStatus.CTake:
|
||||
return # 等待 Catch 模块处理
|
||||
if self.catch.catch_status == CatchStatus.COk:
|
||||
self.catch.catch_status = CatchStatus.CNone
|
||||
self.log_signal(logging.INFO, "👏 FTake: 抓取成功,准备投料")
|
||||
# 移动到第一个投料点
|
||||
if self.feedConfig.drop_positions:
|
||||
target_drop_pos = self.feedConfig.drop_positions[self.current_drop_index]
|
||||
self.robotClient.current_position = target_drop_pos
|
||||
self.log_signal(logging.INFO, f"🚚 FTake: 移动到投料点 {self.current_drop_index} ({target_drop_pos})")
|
||||
self.feedStatus = FeedStatus.FDropBag
|
||||
else:
|
||||
self.log_signal(logging.ERROR, "❌ FTake: 投料点列表为空!")
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
return
|
||||
|
||||
# --- 简化版 FDropBag (码垛核心) ---
|
||||
elif self.feedStatus == FeedStatus.FDropBag:
|
||||
if not self.feedConfig.drop_positions:
|
||||
self.log_signal(logging.ERROR, "❌ FDropBag: 投料点列表为空!")
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
return
|
||||
|
||||
target_drop_position = self.feedConfig.drop_positions[self.current_drop_index]
|
||||
self.log_signal(logging.INFO, f"📦 FDropBag: 当前目标投料点索引: {self.current_drop_index} ({target_drop_position})")
|
||||
|
||||
# 检查是否到达目标投料点 (仿真中直接比较)
|
||||
if target_drop_position.compare(real_position):
|
||||
self.log_signal(logging.INFO, f"🎯 FDropBag: 到达投料点 {self.current_drop_index}")
|
||||
|
||||
# 执行投料动作
|
||||
if self.catch.catch_status == CatchStatus.CNone:
|
||||
self.catch.catch_status = CatchStatus.CDrop
|
||||
self.log_signal(logging.INFO, "🪂 FDropBag: 开始执行投料动作...")
|
||||
return # 等待下一周期检查 COk
|
||||
if self.catch.catch_status == CatchStatus.CDrop:
|
||||
self.log_signal(logging.INFO, "🪂 FDropBag: 投料动作进行中...")
|
||||
return # 等待 Catch 模块完成
|
||||
if self.catch.catch_status == CatchStatus.COk:
|
||||
self.catch.catch_status = CatchStatus.CNone
|
||||
self.log_signal(logging.INFO, f"🎉 FDropBag: 在投料点 {self.current_drop_index} 投料完成")
|
||||
|
||||
# 更新投料计数
|
||||
self.feedConfig.num -= 1
|
||||
self.log_signal(logging.INFO, f'🔢 剩余投料次数: {self.feedConfig.num}')
|
||||
|
||||
# 检查是否所有投料任务完成
|
||||
if self.feedConfig.num > 0:
|
||||
# 更新投料点索引 (循环)
|
||||
self.current_drop_index = (self.current_drop_index + 1) % len(self.feedConfig.drop_positions)
|
||||
self.log_signal(logging.INFO, f"🔄 更新投料点索引为: {self.current_drop_index}")
|
||||
|
||||
# 移动到固定的抓取点 (仿真中直接设置位置)
|
||||
take_position_model = self.feedConfig.feedLine.get_take_position()
|
||||
if take_position_model:
|
||||
self.log_signal(logging.INFO, f"🔄 移动到固定抓取点...")
|
||||
self.robotClient.current_position = take_position_model.get_position()
|
||||
self.log_signal(logging.INFO, f"📍 已移动到抓取点 {self.robotClient.current_position}")
|
||||
self.feedStatus = FeedStatus.FTake # 设置状态为 FTake
|
||||
else:
|
||||
self.log_signal(logging.ERROR, "❌ 无法获取抓取点位置!")
|
||||
self.feedStatus = FeedStatus.FNone
|
||||
|
||||
else:
|
||||
# 所有投料完成
|
||||
self.log_signal(logging.INFO, "🎊 所有投料任务完成")
|
||||
self.feedStatus = FeedStatus.FNone # 结束流程
|
||||
|
||||
else:
|
||||
# 如果还没到达目标投料点 (仿真中直接移动)
|
||||
self.log_signal(logging.INFO, f"🚚 正在移动到投料点 {self.current_drop_index} ({target_drop_position})")
|
||||
self.robotClient.current_position = target_drop_position
|
||||
self.log_signal(logging.INFO, f"📍 已移动到投料点 {self.current_drop_index} ({self.robotClient.current_position})")
|
||||
|
||||
# --- 7. 主程序入口 ---
|
||||
if __name__ == "__main__":
|
||||
# --- 配置仿真环境 ---
|
||||
logging.basicConfig(level=logging.INFO) # 设置日志级别
|
||||
|
||||
# 1. 定义位置点
|
||||
origin_pos = Real_Position("Origin")
|
||||
take_pos = Real_Position("Fixed_Take_Point")
|
||||
drop_pos_1 = Real_Position("Drop_Point_1")
|
||||
drop_pos_2 = Real_Position("Drop_Point_2")
|
||||
drop_pos_3 = Real_Position("Drop_Point_3")
|
||||
|
||||
# 2. 创建 FeedLine (简化)
|
||||
feedline_positions = [
|
||||
PositionModel(FeedStatus.FTake, take_pos),
|
||||
PositionModel(FeedStatus.FDropBag, Real_Position("Placeholder_Drop_Pos")) # 占位,实际由 drop_positions 列表决定
|
||||
]
|
||||
feed_line = FeedLine("SimFeedLine", feedline_positions)
|
||||
|
||||
# 3. 定义投料点列表 (码垛点)
|
||||
drop_points = [drop_pos_1, drop_pos_2, drop_pos_3]
|
||||
|
||||
# 4. 创建 FeedingConfig
|
||||
total_bags = 5 # 总共要投的料包数
|
||||
feeding_config = FeedingConfig(num=total_bags, feedLine=feed_line, drop_positions=drop_points)
|
||||
|
||||
# 5. 创建模拟机器人和传感器
|
||||
robot_client = SimulatedRobotClient("Origin")
|
||||
sensor2 = SimulatedSensor("Sensor2", initial_state=False) # 初始无信号
|
||||
|
||||
# 6. 创建并配置仿真 Feeding
|
||||
sim_feeding = SimulatedFeeding(robot_client, sensor2)
|
||||
sim_feeding.set_config(feeding_config)
|
||||
|
||||
# 7. (可选) 预设传感器2信号序列 [(时间(秒), 状态), ...]
|
||||
# 例如:在第2秒时变为True,第8秒时变为False,第15秒时再变为True...
|
||||
sensor2_sequence = [(2, True), (8, False), (15, True), (22, False), (30, True)]
|
||||
sensor2.start_simulation(sensor2_sequence)
|
||||
|
||||
# 8. 启动流程
|
||||
sim_feeding.start()
|
||||
|
||||
# 9. 运行仿真主循环
|
||||
try:
|
||||
while sim_feeding.feedStatus != FeedStatus.FNone:
|
||||
sim_feeding.run()
|
||||
sim_feeding.catch.run() # 触发模拟抓取/投料逻辑
|
||||
time.sleep(0.5) # 控制仿真循环速度
|
||||
except KeyboardInterrupt:
|
||||
print("\n🛑 仿真被用户中断")
|
||||
finally:
|
||||
sensor2.stop_simulation()
|
||||
print("🏁 仿真结束")
|
||||
Reference in New Issue
Block a user