Files
wire_controlsystem/main_control.py

212 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2025/12/12 11:05
# @Author : reenrr
# @File : main_control.py
# @Desc : 主控程序
"""
import time
from EMV.EMV_test import press_sensors_triggered, control_solenoid, global_relay
from visual_algorithm.visual_algorithm import flaw_detection
import logging
import os
from conveyor_controller.conveyor_master_controller2_test import conveyor_control
from client import IndustrialPCClient
from DM.DM_Motor_controller import DMMotorController, Control_Type, dm_motor_control
# ------------ 日志+参数配置 ------------
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file_path = os.path.join(script_dir, "main_control.log")
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file_path, encoding='utf-8')
]
)
# -------------------------- 电机参数配置 --------------------------
SLAVE_ID = 0x01
MASTER_ID = 0x11
PORT = 'COM10'
BAUDRATE = 921600
class LineMainControlSystem:
"""
线条主控系统类
"""
def __init__(self):
"""
构造方法:初始化类属性
"""
# 客户端
self.client = IndustrialPCClient(ip='127.0.0.1', port=8888, timeout=10)
# 连接服务端
if not self.client.connect():
return
# 合格线条处理记数
self.qualified_line_count = 0
self.qualified_trigger_threshold = 5
# 创建达妙电机实例、参数
self.motor_controller = DMMotorController(
slave_id=SLAVE_ID,
master_id=MASTER_ID,
port=PORT,
baudrate=BAUDRATE
)
self.dm_motor_init()
def dm_motor_init(self):
# 切换到位置-速度模式
self.motor_controller.switch_control_mode(Control_Type.POS_VEL)
# 使能电机
self.motor_controller.enable_motor()
# 将电机位置恢复到初始位置(防止断电,电机停在某个位置上)
dm_motor_position = self.motor_controller.get_position()
logging.info(f"初始位置{dm_motor_position}")
# 将位置移动到0位
self.motor_controller.control_pos_vel(p_desired=0, v_desired=30)
time.sleep(10)
def start_thread(self):
"""开启各种线程"""
logging.info("开启各类监听线程")
global_relay.start_press_sensors_monitor() # 双压传感器监听线程
def stop_thread(self):
"""关闭各种线程"""
logging.info("关闭各类监听线程")
global_relay.stop_press_sensors_monitor() # 双压传感器监听线程
def handle_line(self):
try:
logging.info("线条开始质量检测:")
# 执行质量检测
result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08})
if result == "qualified":
self._handle_qualified_line()
elif result == "unqualified":
self._handle_unqualified_line()
except Exception as e:
logging.error(f"质量检测失败:{e}", exc_info=True)
finally:
# 关闭连接服务端
if self.client:
self.client.close()
logging.info("关闭连接服务端")
def _handle_qualified_line(self):
"""
处理合格线条的流程
"""
result = "合格"
logging.info(f"该线条是否合格:{result}")
logging.info("等待线条落到传送带(双压传感器触发)...")
# 等待时间触发超时时间设为10秒避免无限等待
if press_sensors_triggered.wait(timeout=10):
press_sensors_triggered.clear()
logging.info("线条已落到传送带,控制两个传送带向前移动")
# 传送带运行
conveyor_control()
# 验证传送带运行结果
press_sensor1_status = global_relay.get_device_status('press_sensor1', 'sensors')
press_sensor2_status = global_relay.get_device_status('press_sensor2', 'sensors')
if not press_sensor1_status and not press_sensor2_status:
logging.info("传送带运行正常,线条已移动")
self.qualified_line_count += 1
logging.info(f"已处理合格线条{self.qualified_line_count}/{self.qualified_trigger_threshold}")
# 控制步进电机移动
logging.info("控制步进电机移动")
success, msg = self.client.send_command("test")
if not success:
logging.error(f"调用失败:{msg}")
else:
logging.info("步进电机移动成功")
# 判断线条个数
if self.qualified_line_count >= self.qualified_trigger_threshold:
# 舵机、电机相关控制
self.qualified_line_count = 0
else:
logging.warning("传送带运行异常,线条未正常移动,不更新合格计数")
else:
logging.info("线条未落到传送带,请检查传感器或设备状态")
def _handle_unqualified_line(self):
"""
处理不合格线条的流程
"""
result = "不合格"
logging.info("该线条是否合格:", result)
logging.info("等待线条落到传送带上")
# 等待时间触发超时时间设为10秒避免无限等待
if press_sensors_triggered.wait(timeout=10):
press_sensors_triggered.clear()
logging.info("线条已落到传送带")
# 执行NG动作控制电磁阀
logging.info("进入NG动作")
control_solenoid() # 执行NG动作,控制电磁阀
logging.info("NG动作结束")
def gripper_control(self):
"""
气动夹爪相关控制:达妙电机的移动、舵机的翻转
"""
# 打开电磁阀3
global_relay.open('solenoid_valve3', "devices")
# 控制达妙电机收缩
self.motor_controller.control_pos_vel(p_desired=282.6, v_desired=30) # 450mm 665-215
time.sleep(10)
# -----------对外接口-------------
def main_control():
main_control_system = LineMainControlSystem()
try:
logging.info("开启各种线程")
main_control_system.start_thread()
while True: # 防止跳出循环
logging.info("开始摆放线条")
# 质量检测
main_control_system.handle_line()
except KeyboardInterrupt:
logging.info("用户手动退出")
finally:
# 程序退出时,关闭各类线程
main_control_system.stop_thread()
logging.info("主控程序已正常退出")
# ------------测试接口-------------
if __name__ == '__main__':
main_control()