import time import threading from enum import IntEnum # --- 1. 简化模拟必要的 Enum 和类 --- class FeedStatus(IntEnum): FNone = 0 FStart = 1 FCheck = 2 FMid = 3 FPhoto = 4 FTake = 5 FDropBag = 9 # 简化,只保留关键状态 class CatchStatus: CNone = 0 CTake = 1 COk = 2 CDrop = 3 class Real_Position: """简化的位置表示""" def __init__(self, name=""): self.name = name def compare(self, other, is_action=False): # 简化比较,只比较名字 return self.name == other.name def __str__(self): return self.name class PositionModel: """简化的位置模型""" def __init__(self, status, position): self.status = status self.position = position def get_position(self): return self.position class FeedLine: """简化 FeedLine,只包含关键点""" def __init__(self, name, positions: list): # positions: list of PositionModel self.name = name # 假设 positions 按顺序排列,且包含 FTake 和 FDropBag 点 self.positions = positions self.take_position_model = None self.drop_position_model = None self._find_key_positions() def _find_key_positions(self): for pm in self.positions: if pm.status == FeedStatus.FTake: self.take_position_model = pm elif pm.status == FeedStatus.FDropBag: self.drop_position_model = pm def get_take_position(self): return self.take_position_model # --- 2. 简化模拟 Catch 模块 --- class SimulatedCatch: def __init__(self, log_func): self.catch_status = CatchStatus.CNone self.log_signal = log_func # 用于打印日志的函数 def run(self): """模拟抓取动作""" if self.catch_status == CatchStatus.CTake: self.log_signal(logging.INFO, "🤖 Sim Catch: 开始执行抓取动作...") time.sleep(1) # 模拟抓取耗时 self.catch_status = CatchStatus.COk self.log_signal(logging.INFO, "🎉 Sim Catch: 抓取完成!") elif self.catch_status == CatchStatus.CDrop: self.log_signal(logging.INFO, "📦 Sim Catch: 开始执行投料动作...") time.sleep(0.5) # 模拟投料耗时 self.catch_status = CatchStatus.COk self.log_signal(logging.INFO, "🎊 Sim Catch: 投料完成!") # --- 3. 简化 FeedingConfig --- class FeedingConfig: def __init__(self, num: int, feedLine: FeedLine, drop_positions: list): self.num = num self.feedLine = feedLine self.drop_positions = drop_positions # list of Real_Position # --- 4. 简化 RobotClient 状态 --- class SimulatedRobotClient: def __init__(self, initial_position_name): self.status_model = type('obj', (object,), {'world_0': 0, 'world_1': 0, 'world_2': 0, 'world_3': 0, 'world_4': 0, 'world_5': 0})() self.origin_position = Real_Position("Origin") self.feed_speed = 10 # 当前模拟位置 self.current_position = Real_Position(initial_position_name) def getRealPosition(self): return self.current_position # --- 5. 模拟传感器 --- class SimulatedSensor: def __init__(self, name, initial_state=False): self.name = name self.state = initial_state self.thread = None self._running = False def start_simulation(self, sequence): """根据一个序列 (时间, 状态) 来模拟传感器状态变化""" self._running = True def _run(): start_time = time.time() seq_index = 0 while self._running and seq_index < len(sequence): elapsed = time.time() - start_time if elapsed >= sequence[seq_index][0]: # sequence[seq_index][0] 是触发时间 self.state = sequence[seq_index][1] # sequence[seq_index][1] 是触发状态 print(f" 📡 [{self.name}] 模拟状态变为: {self.state}") seq_index += 1 time.sleep(0.1) # 检查间隔 self.thread = threading.Thread(target=_run, daemon=True) self.thread.start() def stop_simulation(self): self._running = False if self.thread: self.thread.join() # --- 6. 简化版 Feeding 类 (核心逻辑) --- import logging class SimulatedFeeding: def __init__(self, robotClient: SimulatedRobotClient, sensor2: SimulatedSensor): self.feedConfig = None self.feedStatus = FeedStatus.FNone self.robotClient = robotClient self.sensor2 = sensor2 # 使用模拟传感器 self.pause = False self.is_reverse = False self.catch = SimulatedCatch(self._simple_log) # 使用模拟 Catch # --- 码垛模式新增 --- self.current_drop_index = 0 # 简化的日志信号 # self.log_signal = self._simple_log self.log_signal = lambda level, message: self._simple_log(level, message) def _simple_log(self, level, message): level_str = {logging.INFO: "INFO", logging.ERROR: "ERROR", logging.WARNING: "WARN"}.get(level, "LOG") print(f"[{level_str}] {message}") def set_config(self, config: FeedingConfig): self.feedConfig = config def start(self): if self.feedConfig: self.feedStatus = FeedStatus.FStart self.log_signal(logging.INFO, "🚀 启动仿真投料流程") # self.log_signal(logging.INFO, "🚀 启动仿真投料流程") def run(self): """仿真运行一次主循环""" if self.feedConfig is None or self.feedStatus == FeedStatus.FNone or self.pause: return # 获取当前模拟位置 real_position = self.robotClient.current_position # --- 简化版 FStart --- if self.feedStatus == FeedStatus.FStart: # self.log_signal(logging.INFO, "🟢 FStart: 初始化流程") self.log_signal(logging.INFO, "🟢 FStart: 初始化流程") self.current_drop_index = 0 # 重置投料点索引 # 模拟移动到抓取点 (直接设置位置) take_pos_model = self.feedConfig.feedLine.get_take_position() if take_pos_model: self.robotClient.current_position = take_pos_model.get_position() self.log_signal(logging.INFO, f"🚚 FStart: 移动到抓取点 {self.robotClient.current_position}") # self.log_signal(logging.INFO, f"🚚 FStart: 移动到抓取点 {self.robotClient.current_position}") self.feedStatus = FeedStatus.FTake else: # self.log_signal(logging.ERROR, "❌ FStart: 未找到抓取点!") self.log_signal(logging.ERROR, "❌ FStart: 未找到抓取点!") self.feedStatus = FeedStatus.FNone return # 等待下次循环 # --- 简化版 FTake --- elif self.feedStatus == FeedStatus.FTake: # self.log_signal(logging.INFO, "✋ FTake: 检查是否到达抓取点并等待传感器信号") self.log_signal(logging.INFO, "✋ FTake: 检查是否到达抓取点并等待传感器信号") take_position_model = self.feedConfig.feedLine.get_take_position() if not take_position_model or not take_position_model.get_position(): self.log_signal(logging.ERROR, "❌ FTake: 抓取点配置错误!") # self.log_signal(logging.ERROR, "❌ FTake: 抓取点配置错误!") self.feedStatus = FeedStatus.FNone return if not take_position_model.get_position().compare(real_position): self.log_signal(logging.WARNING, f"⚠️ FTake: 机器人未在抓取点 ({take_position_model.get_position()}), 当前在 {real_position}") # self.log_signal(logging.WARNING, f"⚠️ FTake: 机器人未在抓取点 ({take_position_model.get_position()}), 当前在 {real_position}") # 在仿真中,我们假设它会移动到位,所以直接设置 self.robotClient.current_position = take_position_model.get_position() self.log_signal(logging.INFO, f"🔧 强制移动到抓取点 {self.robotClient.current_position}") # self.log_signal(logging.INFO, f"🔧 强制移动到抓取点 {self.robotClient.current_position}") # 模拟等待传感器2信号 # 在实际仿真中,我们可以通过预设序列或用户输入来控制 # 这里用一个简单的循环等待 (实际应用中应避免阻塞) wait_start = time.time() timeout = 10 # 10秒超时 while not self.sensor2.state: if time.time() - wait_start > timeout: # self.log_signal(logging.ERROR, "⏰ FTake: 等待传感器2超时!") self.log_signal(logging.ERROR, "⏰ FTake: 等待传感器2超时!") self.feedStatus = FeedStatus.FNone return self.log_signal(logging.INFO, "⏳ FTake: 等待传感器2信号...") time.sleep(1) # 每秒检查一次 self.log_signal(logging.INFO, "✅ FTake: 传感器2信号收到,开始抓取") # 执行抓取动作 if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CTake return # 等待下一周期 Catch 完成 if self.catch.catch_status == CatchStatus.CTake: return # 等待 Catch 模块处理 if self.catch.catch_status == CatchStatus.COk: self.catch.catch_status = CatchStatus.CNone self.log_signal(logging.INFO, "👏 FTake: 抓取成功,准备投料") # 移动到第一个投料点 if self.feedConfig.drop_positions: target_drop_pos = self.feedConfig.drop_positions[self.current_drop_index] self.robotClient.current_position = target_drop_pos self.log_signal(logging.INFO, f"🚚 FTake: 移动到投料点 {self.current_drop_index} ({target_drop_pos})") self.feedStatus = FeedStatus.FDropBag else: self.log_signal(logging.ERROR, "❌ FTake: 投料点列表为空!") self.feedStatus = FeedStatus.FNone return # --- 简化版 FDropBag (码垛核心) --- elif self.feedStatus == FeedStatus.FDropBag: if not self.feedConfig.drop_positions: self.log_signal(logging.ERROR, "❌ FDropBag: 投料点列表为空!") self.feedStatus = FeedStatus.FNone return target_drop_position = self.feedConfig.drop_positions[self.current_drop_index] self.log_signal(logging.INFO, f"📦 FDropBag: 当前目标投料点索引: {self.current_drop_index} ({target_drop_position})") # 检查是否到达目标投料点 (仿真中直接比较) if target_drop_position.compare(real_position): self.log_signal(logging.INFO, f"🎯 FDropBag: 到达投料点 {self.current_drop_index}") # 执行投料动作 if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CDrop self.log_signal(logging.INFO, "🪂 FDropBag: 开始执行投料动作...") return # 等待下一周期检查 COk if self.catch.catch_status == CatchStatus.CDrop: self.log_signal(logging.INFO, "🪂 FDropBag: 投料动作进行中...") return # 等待 Catch 模块完成 if self.catch.catch_status == CatchStatus.COk: self.catch.catch_status = CatchStatus.CNone self.log_signal(logging.INFO, f"🎉 FDropBag: 在投料点 {self.current_drop_index} 投料完成") # 更新投料计数 self.feedConfig.num -= 1 self.log_signal(logging.INFO, f'🔢 剩余投料次数: {self.feedConfig.num}') # 检查是否所有投料任务完成 if self.feedConfig.num > 0: # 更新投料点索引 (循环) self.current_drop_index = (self.current_drop_index + 1) % len(self.feedConfig.drop_positions) self.log_signal(logging.INFO, f"🔄 更新投料点索引为: {self.current_drop_index}") # 移动到固定的抓取点 (仿真中直接设置位置) take_position_model = self.feedConfig.feedLine.get_take_position() if take_position_model: self.log_signal(logging.INFO, f"🔄 移动到固定抓取点...") self.robotClient.current_position = take_position_model.get_position() self.log_signal(logging.INFO, f"📍 已移动到抓取点 {self.robotClient.current_position}") self.feedStatus = FeedStatus.FTake # 设置状态为 FTake else: self.log_signal(logging.ERROR, "❌ 无法获取抓取点位置!") self.feedStatus = FeedStatus.FNone else: # 所有投料完成 self.log_signal(logging.INFO, "🎊 所有投料任务完成") self.feedStatus = FeedStatus.FNone # 结束流程 else: # 如果还没到达目标投料点 (仿真中直接移动) self.log_signal(logging.INFO, f"🚚 正在移动到投料点 {self.current_drop_index} ({target_drop_position})") self.robotClient.current_position = target_drop_position self.log_signal(logging.INFO, f"📍 已移动到投料点 {self.current_drop_index} ({self.robotClient.current_position})") # --- 7. 主程序入口 --- if __name__ == "__main__": # --- 配置仿真环境 --- logging.basicConfig(level=logging.INFO) # 设置日志级别 # 1. 定义位置点 origin_pos = Real_Position("Origin") take_pos = Real_Position("Fixed_Take_Point") drop_pos_1 = Real_Position("Drop_Point_1") drop_pos_2 = Real_Position("Drop_Point_2") drop_pos_3 = Real_Position("Drop_Point_3") # 2. 创建 FeedLine (简化) feedline_positions = [ PositionModel(FeedStatus.FTake, take_pos), PositionModel(FeedStatus.FDropBag, Real_Position("Placeholder_Drop_Pos")) # 占位,实际由 drop_positions 列表决定 ] feed_line = FeedLine("SimFeedLine", feedline_positions) # 3. 定义投料点列表 (码垛点) drop_points = [drop_pos_1, drop_pos_2, drop_pos_3] # 4. 创建 FeedingConfig total_bags = 5 # 总共要投的料包数 feeding_config = FeedingConfig(num=total_bags, feedLine=feed_line, drop_positions=drop_points) # 5. 创建模拟机器人和传感器 robot_client = SimulatedRobotClient("Origin") sensor2 = SimulatedSensor("Sensor2", initial_state=False) # 初始无信号 # 6. 创建并配置仿真 Feeding sim_feeding = SimulatedFeeding(robot_client, sensor2) sim_feeding.set_config(feeding_config) # 7. (可选) 预设传感器2信号序列 [(时间(秒), 状态), ...] # 例如:在第2秒时变为True,第8秒时变为False,第15秒时再变为True... sensor2_sequence = [(2, True), (8, False), (15, True), (22, False), (30, True)] sensor2.start_simulation(sensor2_sequence) # 8. 启动流程 sim_feeding.start() # 9. 运行仿真主循环 try: while sim_feeding.feedStatus != FeedStatus.FNone: sim_feeding.run() sim_feeding.catch.run() # 触发模拟抓取/投料逻辑 time.sleep(0.5) # 控制仿真循环速度 except KeyboardInterrupt: print("\n🛑 仿真被用户中断") finally: sensor2.stop_simulation() print("🏁 仿真结束")