stage_one

This commit is contained in:
2025-12-12 18:00:14 +08:00
parent dc4ef9002e
commit b8b9679bc8
55 changed files with 6541 additions and 459 deletions

193
test_angle.py Normal file
View File

@ -0,0 +1,193 @@
# main.py
import time
from config.settings import app_set_config
from core.system import FeedingControlSystem
from hardware import relay
from hardware.relay import RelayController
import threading
import time
import cv2
import os
import sys
import io
class OutputRedirector(io.StringIO):
"""
自定义输出重定向器,用于处理所有线程的输出
确保输入提示能在其他线程输出后重新显示,且保留用户当前输入
"""
def __init__(self, original_stdout):
super().__init__()
self.original_stdout = original_stdout
self.input_prompt = "请输入新值:"
self.input_thread_active = False
self.current_input = "" # 跟踪用户当前输入
self.lock = threading.Lock()
def write(self, text):
with self.lock:
# 写入原始输出
self.original_stdout.write(text)
self.original_stdout.flush()
# 如果输入线程活跃,并且输出是换行符,重新显示输入提示和当前输入
if self.input_thread_active and '\n' in text:
# 清除当前行并重新显示输入提示和用户当前输入
self.original_stdout.write("\r" + " " * 100 + "\r")
self.original_stdout.write(f"{self.input_prompt}{self.current_input}")
self.original_stdout.flush()
def flush(self):
with self.lock:
self.original_stdout.flush()
def main():
# 加载配置
# 初始化系统
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'open')
# time.sleep(1)
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'close')
# replay_controller.control(replay_controller.DOOR_LOWER_CLOSE, 'close')
# replay_controller.control_upper_open()
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'open')
# time.sleep(5)
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'close')
# image=cv2.imread(os.path.join(app_set_config.project_root,'test.jpeg'))
# image=cv2.flip(image, 0)
# cv2.imshow('test',image)
# cv2.waitKey(1)
# replay_controller.control(system.relay_controller.DOOR_LOWER_OPEN, 'open')
system = FeedingControlSystem()
# system.vision_detector.detect_angle()
# while True:
# system.feeding_controller.pulse_control_door_for_maintaining()
# time.sleep(5)
# 初始设置
system.state.vehicle_aligned=True
#假设在 fertilize room
system.state._upper_door_position='over_lower'
# 创建输出重定向器
original_stdout = sys.stdout
output_redirector = OutputRedirector(original_stdout)
sys.stdout = output_redirector
# 创建输入线程函数
def input_thread():
import sys
import threading
# 初始提示只显示一次
initial_prompt = """
输入线程启动提示
--------------------------
可以随时输入新值来更新系统状态
格式:需求重量,完成重量,溢出状态
例如500,300,大堆料 或 500,300,小堆料 或 500,300,未堆料
输入'q'退出程序
--------------------------
"""
print(initial_prompt)
# 标记输入线程为活跃状态
output_redirector.input_thread_active = True
while True:
try:
import termios
import tty
# 获取当前终端设置
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno()) # 设置为cbreak模式允许逐字符读取
# 显示初始提示和当前输入
sys.stdout.write("\r" + " " * 100 + "\r")
sys.stdout.write(f"{output_redirector.input_prompt}{output_redirector.current_input}")
sys.stdout.flush()
while True:
char = sys.stdin.read(1) # 读取一个字符
if char == '\x03': # Ctrl+C
raise KeyboardInterrupt
elif char in ['\r', '\n']: # 回车键
sys.stdout.write("\n")
sys.stdout.flush()
break
elif char == '\x7f': # 退格键
if len(output_redirector.current_input) > 0:
output_redirector.current_input = output_redirector.current_input[:-1]
# 从显示中删除最后一个字符
sys.stdout.write("\b \b")
sys.stdout.flush()
else:
output_redirector.current_input += char
sys.stdout.write(char)
sys.stdout.flush()
finally:
# 恢复终端设置
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
# 获取最终输入并重置当前输入
user_input = output_redirector.current_input
output_redirector.current_input = "" # 重置为下一次输入准备
if user_input.lower() == 'q':
print("\n收到退出信号,正在关闭系统...")
output_redirector.input_thread_active = False
system.stop()
break
# 分割输入值,处理不同参数数量
input_parts = [part.strip() for part in user_input.split(',')]
if len(input_parts) >= 2:
# 更新基本参数
system.state._mould_need_weight = float(input_parts[0])
system.state._mould_finish_weight = float(input_parts[1])
if system.state._mould_finish_weight>400:
system.state.overflow_detected='大堆料'
# 输出更新结果,使用换行符分隔
update_msg = f"\n已更新:\n 需求重量 = {system.state._mould_need_weight} kg\n 完成重量 = {system.state._mould_finish_weight} kg"
print(f"溢出状态 = {system.state.overflow_detected}")
print(update_msg)
else:
print("\n输入格式错误:至少需要输入需求重量和完成重量")
except ValueError as e:
print(f"\n输入格式错误,请重新输入。错误信息:{e}")
except Exception as e:
print(f"\n发生错误:{e}")
output_redirector.input_thread_active = False
break
# 启动输入线程
input_thread = threading.Thread(target=input_thread, daemon=True)
input_thread.start()
system.relay_controller.control(system.relay_controller.DOOR_LOWER_OPEN, 'open')
# 启动视觉控制
system.camera_controller.start_cameras()
system.start_visual_control()
# 主线程保持运行
while True:
# print(f'当前重量22:{system.state._mould_finish_weight:.2f}kg, 目标重量:{system.state._mould_need_weight:.2f}kg')
time.sleep(1)
if __name__ == "__main__":
main()