Files
Feeding_control_system/test_angle.py
2025-12-12 18:00:14 +08:00

194 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# main.py
import time
from config.settings import app_set_config
from core.system import FeedingControlSystem
from hardware import relay
from hardware.relay import RelayController
import threading
import time
import cv2
import os
import sys
import io
class OutputRedirector(io.StringIO):
"""
自定义输出重定向器,用于处理所有线程的输出
确保输入提示能在其他线程输出后重新显示,且保留用户当前输入
"""
def __init__(self, original_stdout):
super().__init__()
self.original_stdout = original_stdout
self.input_prompt = "请输入新值:"
self.input_thread_active = False
self.current_input = "" # 跟踪用户当前输入
self.lock = threading.Lock()
def write(self, text):
with self.lock:
# 写入原始输出
self.original_stdout.write(text)
self.original_stdout.flush()
# 如果输入线程活跃,并且输出是换行符,重新显示输入提示和当前输入
if self.input_thread_active and '\n' in text:
# 清除当前行并重新显示输入提示和用户当前输入
self.original_stdout.write("\r" + " " * 100 + "\r")
self.original_stdout.write(f"{self.input_prompt}{self.current_input}")
self.original_stdout.flush()
def flush(self):
with self.lock:
self.original_stdout.flush()
def main():
# 加载配置
# 初始化系统
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'open')
# time.sleep(1)
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'close')
# replay_controller.control(replay_controller.DOOR_LOWER_CLOSE, 'close')
# replay_controller.control_upper_open()
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'open')
# time.sleep(5)
# replay_controller.control(replay_controller.DOOR_LOWER_OPEN, 'close')
# image=cv2.imread(os.path.join(app_set_config.project_root,'test.jpeg'))
# image=cv2.flip(image, 0)
# cv2.imshow('test',image)
# cv2.waitKey(1)
# replay_controller.control(system.relay_controller.DOOR_LOWER_OPEN, 'open')
system = FeedingControlSystem()
# system.vision_detector.detect_angle()
# while True:
# system.feeding_controller.pulse_control_door_for_maintaining()
# time.sleep(5)
# 初始设置
system.state.vehicle_aligned=True
#假设在 fertilize room
system.state._upper_door_position='over_lower'
# 创建输出重定向器
original_stdout = sys.stdout
output_redirector = OutputRedirector(original_stdout)
sys.stdout = output_redirector
# 创建输入线程函数
def input_thread():
import sys
import threading
# 初始提示只显示一次
initial_prompt = """
输入线程启动提示
--------------------------
可以随时输入新值来更新系统状态
格式:需求重量,完成重量,溢出状态
例如500,300,大堆料 或 500,300,小堆料 或 500,300,未堆料
输入'q'退出程序
--------------------------
"""
print(initial_prompt)
# 标记输入线程为活跃状态
output_redirector.input_thread_active = True
while True:
try:
import termios
import tty
# 获取当前终端设置
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno()) # 设置为cbreak模式允许逐字符读取
# 显示初始提示和当前输入
sys.stdout.write("\r" + " " * 100 + "\r")
sys.stdout.write(f"{output_redirector.input_prompt}{output_redirector.current_input}")
sys.stdout.flush()
while True:
char = sys.stdin.read(1) # 读取一个字符
if char == '\x03': # Ctrl+C
raise KeyboardInterrupt
elif char in ['\r', '\n']: # 回车键
sys.stdout.write("\n")
sys.stdout.flush()
break
elif char == '\x7f': # 退格键
if len(output_redirector.current_input) > 0:
output_redirector.current_input = output_redirector.current_input[:-1]
# 从显示中删除最后一个字符
sys.stdout.write("\b \b")
sys.stdout.flush()
else:
output_redirector.current_input += char
sys.stdout.write(char)
sys.stdout.flush()
finally:
# 恢复终端设置
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
# 获取最终输入并重置当前输入
user_input = output_redirector.current_input
output_redirector.current_input = "" # 重置为下一次输入准备
if user_input.lower() == 'q':
print("\n收到退出信号,正在关闭系统...")
output_redirector.input_thread_active = False
system.stop()
break
# 分割输入值,处理不同参数数量
input_parts = [part.strip() for part in user_input.split(',')]
if len(input_parts) >= 2:
# 更新基本参数
system.state._mould_need_weight = float(input_parts[0])
system.state._mould_finish_weight = float(input_parts[1])
if system.state._mould_finish_weight>400:
system.state.overflow_detected='大堆料'
# 输出更新结果,使用换行符分隔
update_msg = f"\n已更新:\n 需求重量 = {system.state._mould_need_weight} kg\n 完成重量 = {system.state._mould_finish_weight} kg"
print(f"溢出状态 = {system.state.overflow_detected}")
print(update_msg)
else:
print("\n输入格式错误:至少需要输入需求重量和完成重量")
except ValueError as e:
print(f"\n输入格式错误,请重新输入。错误信息:{e}")
except Exception as e:
print(f"\n发生错误:{e}")
output_redirector.input_thread_active = False
break
# 启动输入线程
input_thread = threading.Thread(target=input_thread, daemon=True)
input_thread.start()
system.relay_controller.control(system.relay_controller.DOOR_LOWER_OPEN, 'open')
# 启动视觉控制
system.camera_controller.start_cameras()
system.start_visual_control()
# 主线程保持运行
while True:
# print(f'当前重量22:{system.state._mould_finish_weight:.2f}kg, 目标重量:{system.state._mould_need_weight:.2f}kg')
time.sleep(1)
if __name__ == "__main__":
main()