# main.py import time from config.settings import app_set_config from core.system import FeedingControlSystem from hardware import relay from hardware.relay import RelayController import threading import time import cv2 import os import sys import io class OutputRedirector(io.StringIO): """ 自定义输出重定向器,用于处理所有线程的输出 确保输入提示能在其他线程输出后重新显示,且保留用户当前输入 """ def __init__(self, original_stdout): super().__init__() self.original_stdout = original_stdout self.input_prompt = "请输入新值:" self.input_thread_active = False self.current_input = "" # 跟踪用户当前输入 self.lock = threading.Lock() def write(self, text): with self.lock: # 写入原始输出 self.original_stdout.write(text) self.original_stdout.flush() # 如果输入线程活跃,并且输出是换行符,重新显示输入提示和当前输入 if self.input_thread_active and '\n' in text: # 清除当前行并重新显示输入提示和用户当前输入 self.original_stdout.write("\r" + " " * 100 + "\r") self.original_stdout.write(f"{self.input_prompt}{self.current_input}") self.original_stdout.flush() def flush(self): with self.lock: self.original_stdout.flush() def main(): # 加载配置 # 初始化系统 # replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'open') # time.sleep(1) # replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'close') # replay_controller.control(replay_controller.DOOR_LOWER_CLOSE, 'close') # replay_controller.control_upper_open() # replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'open') # time.sleep(5) # replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'close') # image=cv2.imread(os.path.join(app_set_config.project_root,'test.jpeg')) # image=cv2.flip(image, 0) # cv2.imshow('test',image) # cv2.waitKey(1) # replay_controller.control(system.relay_controller.DOOR_LOWER_OPEN, 'open') system = FeedingControlSystem() # system.vision_detector.detect_angle() # while True: # system.feeding_controller.pulse_control_door_for_maintaining() # time.sleep(5) # 初始设置 system.state.vehicle_aligned=True #假设在 fertilize room system.state._upper_door_position='over_lower' # 创建输出重定向器 original_stdout = sys.stdout output_redirector = OutputRedirector(original_stdout) sys.stdout = output_redirector # 创建输入线程函数 def input_thread(): import sys import threading # 初始提示只显示一次 initial_prompt = """ 输入线程启动提示 -------------------------- 可以随时输入新值来更新系统状态 格式:需求重量,完成重量,溢出状态 例如:500,300,大堆料 或 500,300,小堆料 或 500,300,未堆料 输入'q'退出程序 -------------------------- """ print(initial_prompt) # 标记输入线程为活跃状态 output_redirector.input_thread_active = True while True: try: import termios import tty # 获取当前终端设置 old_settings = termios.tcgetattr(sys.stdin) try: tty.setcbreak(sys.stdin.fileno()) # 设置为cbreak模式,允许逐字符读取 # 显示初始提示和当前输入 sys.stdout.write("\r" + " " * 100 + "\r") sys.stdout.write(f"{output_redirector.input_prompt}{output_redirector.current_input}") sys.stdout.flush() while True: char = sys.stdin.read(1) # 读取一个字符 if char == '\x03': # Ctrl+C raise KeyboardInterrupt elif char in ['\r', '\n']: # 回车键 sys.stdout.write("\n") sys.stdout.flush() break elif char == '\x7f': # 退格键 if len(output_redirector.current_input) > 0: output_redirector.current_input = output_redirector.current_input[:-1] # 从显示中删除最后一个字符 sys.stdout.write("\b \b") sys.stdout.flush() else: output_redirector.current_input += char sys.stdout.write(char) sys.stdout.flush() finally: # 恢复终端设置 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) # 获取最终输入并重置当前输入 user_input = output_redirector.current_input output_redirector.current_input = "" # 重置为下一次输入准备 if user_input.lower() == 'q': print("\n收到退出信号,正在关闭系统...") output_redirector.input_thread_active = False system.stop() break # 分割输入值,处理不同参数数量 input_parts = [part.strip() for part in user_input.split(',')] if len(input_parts) >= 2: # 更新基本参数 system.state._mould_need_weight = float(input_parts[0]) system.state._mould_finish_weight = float(input_parts[1]) if system.state._mould_finish_weight>400: system.state.overflow_detected='大堆料' # 输出更新结果,使用换行符分隔 update_msg = f"\n已更新:\n 需求重量 = {system.state._mould_need_weight} kg\n 完成重量 = {system.state._mould_finish_weight} kg" print(f"溢出状态 = {system.state.overflow_detected}") print(update_msg) else: print("\n输入格式错误:至少需要输入需求重量和完成重量") except ValueError as e: print(f"\n输入格式错误,请重新输入。错误信息:{e}") except Exception as e: print(f"\n发生错误:{e}") output_redirector.input_thread_active = False break # 启动输入线程 input_thread = threading.Thread(target=input_thread, daemon=True) input_thread.start() system.relay_controller.control(system.relay_controller.DOOR_LOWER_OPEN, 'open') # 启动视觉控制 system.camera_controller.start_cameras() system.start_visual_control() # 主线程保持运行 while True: # print(f'当前重量22:{system.state._mould_finish_weight:.2f}kg, 目标重量:{system.state._mould_need_weight:.2f}kg') time.sleep(1) if __name__ == "__main__": main()