Files
Feeding_control_system/view/widgets/vibration_video_widget.py

645 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
from PySide6.QtCore import Qt, QTimer, Signal, QObject
from PySide6.QtGui import QImage, QPixmap, QPalette, QBrush, QCursor, QIcon, QPainter, QFont
from PySide6.QtWidgets import (
QWidget,
QVBoxLayout,
QHBoxLayout,
QLabel,
QApplication,
QFrame,
QPushButton,
QStackedWidget,
QSizePolicy
)
from typing import Optional
import cv2
import time
from threading import Thread, Lock
import sys
from .switch_button import SwitchButton
import resources.resources_rc
from utils.image_paths import ImagePaths
class VideoStream(QObject):
# 系统(信息)信号
info_signal = Signal(str, str)
# 警告信号(重试): 发送摄像头名称和状态描述
warning_signal = Signal(str, str)
# 异常错误信号(超次数)
error_signal = Signal(str, str)
def __init__(self, rtsp_url, name="Stream", max_retry=3):
super().__init__()
self.rtsp_url = rtsp_url
self.name = name
self.cap = None
self.frame = None
self.timestamp = 0
self.lock = Lock()
self.running = False
self.thread = None
self.max_retry = max_retry
self.retry_count = 0
self.manual_stop = False # 是否停止自动重连
def start(self):
self.running = True
self.thread = Thread(target=self.update, daemon=True)
self.thread.start()
return self
def reset_retry(self):
with self.lock:
self.retry_count = 0
self.manual_stop = False
if self.cap:
self.cap.release()
self.cap = None
self.info_signal.emit(self.name, "已手动重置, 开始尝试连接...")
def _handle_max_retry(self, failure_type: str):
"""
处理超过最大重试次数的情况
failure_type: 失败类型("连接""读取"
"""
self.manual_stop = True
error_msg = f"{failure_type}摄像头失败, 已经超过最大次数({self.max_retry}次),可能设备故障,请检查后手动点击重连"
print(f"[{self.name}] {error_msg}")
self.error_signal.emit(self.name, error_msg)
time.sleep(1)
def _handle_retry_delay(self):
"""计算并执行重试前的延迟(递增间隔)"""
sleep_time = min(0.5 + self.retry_count * 0.5, 5)
warning_msg = (
f"{self.retry_count}次连接失败,将在{sleep_time:.1f}秒后再次尝试..."
)
self.warning_signal.emit(self.name, warning_msg)
print(f"[{self.name}] {warning_msg}")
time.sleep(sleep_time)
def update(self):
while self.running:
if self.manual_stop:
time.sleep(1)
continue
if self.cap is None or not self.cap.isOpened():
print(
f"[{self.name}]正在连接 RTSP(第{self.retry_count+1}次): {self.rtsp_url}"
)
info_msg = f"正在连接 RTSP(第{self.retry_count+1}次): {self.rtsp_url}"
self.info_signal.emit(self.name, info_msg)
try:
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if hasattr(cv2, "CAP_PROP_BUFFERSIZE"):
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
if hasattr(cv2, "CAP_PROP_READ_TIMEOUT"):
self.cap.set(cv2.CAP_PROP_READ_TIMEOUT, 2000)
if hasattr(cv2, "CAP_PROP_TCP_NODELAY"):
self.cap.set(cv2.CAP_PROP_TCP_NODELAY, 1)
except Exception as ex:
print(f"[{self.name}]连接失败: {ex}")
warning_msg = f"连接失败: {str(ex)},准备重试..."
self.warning_signal.emit(self.name, warning_msg)
if self.cap:
self.cap.release()
self.cap = None
if self.cap is None or not self.cap.isOpened():
self.retry_count += 1
if self.retry_count >= self.max_retry:
self._handle_max_retry("连接")
continue
self._handle_retry_delay()
continue
else:
self.retry_count = 0
self.info_signal.emit(self.name, "连接成功")
ret, frame = self.cap.read()
if ret:
with self.lock:
self.frame = frame.copy()
self.timestamp = time.time()
self.retry_count = 0
else:
print(f"[{self.name}] 读取失败,准备重连...")
warning_msg = "读取帧失败,准备重连..."
self.warning_signal.emit(self.name, warning_msg)
if self.cap:
self.cap.release()
self.cap = None
self.retry_count += 1
if self.retry_count >= self.max_retry:
self._handle_max_retry("读取")
continue
self._handle_retry_delay()
def read(self):
with self.lock:
if self.frame is not None and time.time() - self.timestamp < 1.5:
return self.frame.copy()
else:
return None
def stop(self):
self.running = False
if self.thread is not None:
self.thread.join(timeout=0.5)
if self.cap is not None:
self.cap.release()
print(f"[{self.name}] 视频流已停止")
# ====================== 摄像头框功能模块 ======================
class CameraModule(QWidget):
"""单个摄像头模块:原图显示"""
# 重置信号,用于通知需要重置连接 (重连)
reset_signal = Signal()
# 视频显示信号,用于通知是否显示视频(刷新视频帧)
# 发送摄像头名 和 是否显示
video_display_signal = Signal(str, bool)
def __init__(
self, camera_name="摄像头", rtsp_url="", need_rotate_180=True, show_ai = False, parent=None
):
super().__init__(parent)
self.setObjectName("cameraModule")
self.camera_name = camera_name
self.rtsp_url = rtsp_url
self.need_rotate_180 = need_rotate_180 # 画面是否需要旋转180度后显示
self.show_ai = show_ai # 是否需要展示ai为True会多出AI显示按钮
# 初始化AI显示相关变量为None
self.ai_display_group = None
self.ai_display_label = None
self.ai_display_switch = None
self.setup_ui()
def setup_ui(self):
layout = QVBoxLayout(self)
layout.setSpacing(0)
layout.setContentsMargins(0, 0, 0, 0)
# --- 0. 标题布局: 标题 + 显示标签及开关 ---
title_layout = QHBoxLayout()
title_layout.setContentsMargins(0, 0, 0, 4)
title_layout.setSpacing(0)
self.title_label = QLabel()
self.title_label.setAlignment(Qt.AlignLeft)
self.title_label.setText(f"{self.camera_name}视频")
# self.title_label.setFixedWidth(212)
self.title_label.setFixedSize(212, 21)
self.title_label.setObjectName("cameraTitleLabel")
# background-image: url({ImagePaths.VIDEO_TITLE_BACKGROUND});
# min-width: 126px;
self.title_label.setStyleSheet(
f"""
#cameraTitleLabel {{
font-size: 18px;
color: #16ffff;
background-image: url({ImagePaths.VIDEO_TITLE_BACKGROUND});
padding-left: 12px;
}}
"""
)
# 1、创建【显示标签+开关】组合容器
self.display_group = QWidget() # 容器:包裹标签和开关
display_group_layout = QHBoxLayout(self.display_group)
display_group_layout.setContentsMargins(0, 0, 0, 0) # 容器内边距为0
display_group_layout.setSpacing(0)
# 显示标签
self.display_label = QLabel("显示")
self.display_label.setObjectName("displayLabel")
# font-weight: bold;
self.display_label.setStyleSheet("""
#displayLabel {
font-size: 18px;
color: #16ffff;
margin: 0px;
padding: 0px;
}
""")
# 让标签垂直居中
self.display_label.setFixedWidth(40)
self.display_label.setAlignment(Qt.AlignVCenter | Qt.AlignLeft)
# 显示开关
self.display_switch = SwitchButton()
self.display_switch.setChecked(True)
self.display_switch.switched.connect(self.onDisplayButtonSwitched)
# 将显示标签和开关添加到组合容器布局
display_group_layout.addWidget(self.display_label)
display_group_layout.addWidget(self.display_switch, alignment=Qt.AlignLeft)
if self.show_ai: # 当需要显示AI时才生成
# 2、创建【AI算法标签+开关】组合容器
self.ai_display_group = QWidget() # 容器包裹AI显示标签和开关
ai_display_group_layout = QHBoxLayout(self.ai_display_group)
ai_display_group_layout.setContentsMargins(0, 0, 0, 0) # 容器内边距为0
ai_display_group_layout.setSpacing(0)
# AI算法显示标签
self.ai_display_label = QLabel("AI算法")
self.ai_display_label.setObjectName("aiDisplayLabel")
# font-weight: bold;
self.ai_display_label.setStyleSheet("""
#aiDisplayLabel {
font-size: 18px;
color: #16ffff;
margin: 0px;
padding-left: 2px;
}
""")
# self.ai_display_label.setFixedWidth(40)
self.ai_display_label.setAlignment(Qt.AlignVCenter | Qt.AlignLeft)
# AI开关
self.ai_display_switch = SwitchButton()
self.ai_display_switch.setChecked(False)
# self.ai_display_switch.switched.connect(self.onDisplayButtonSwitched) # 在camera_controller中处理
# 将AI显示标签和开关添加到组合容器布局
ai_display_group_layout.addWidget(self.ai_display_label)
ai_display_group_layout.addWidget(self.ai_display_switch, alignment=Qt.AlignLeft)
# 添加到标题布局
title_layout.addWidget(self.title_label, alignment=Qt.AlignLeft)
title_layout.addWidget(self.display_group, alignment=Qt.AlignRight)
if self.show_ai: # 需要添加 AI显示控件
title_layout.addWidget(self.ai_display_group, alignment=Qt.AlignLeft)
self.title_label.setFixedSize(139, 21) # 调整 xxx视频标签的宽度
# 视频显示容器:使用堆叠布局,让重连按钮和视频标签分层显示
self.video_container = QWidget()
self.video_container.setObjectName("videoContainer")
self.video_container.setFixedSize(327, 199) # 需要同步修改下面的尺寸
# #011d6b #033474
# self.video_container.setStyleSheet(
# "background-color: #011d6b; border: none;"
# ) # 明确视频显示背景色
self.video_container.setStyleSheet(
f"""
#videoContainer {{
border-image: url({ImagePaths.VIDEO_FRAME_BACKGROUND});
}}
"""
)
video_layout = QVBoxLayout(self.video_container)
video_layout.setContentsMargins(6, 6, 5, 6)
# 原始图像显示
self.raw_label = QLabel()
self.raw_label.setAlignment(Qt.AlignCenter)
self.raw_label.setStyleSheet("border: none;") # 移除背景,使用容器背景
# self.raw_label.setStyleSheet("background-color: red;")
self.raw_label.setText(f"{self.camera_name}摄像头, 连接中...")
# self.raw_label.setFixedSize(327, 199)
# self.raw_label.setFixedSize(314, 187) # 需要根据视频框背景大小调整
self.raw_label.setFixedSize(316, 187) # 需要根据视频框背景大小调整
# 重置按钮:居中显示、透明背景
self.reset_button = QPushButton()
# self.reset_button.setFixedSize(327, 199)
# self.reset_button.setFixedSize(314, 187) # 同上
self.reset_button.setFixedSize(316, 187) # 同上
self.reset_button.setStyleSheet(
"""
QPushButton {
background-color: transparent;
border: none;
padding: 0px;
}
"""
)
self.reset_button.setCursor(QCursor(Qt.PointingHandCursor))
self.reset_button.clicked.connect(self.onRestButtonClicked)
# 加载视频重新连接图片
try:
pixmap = QPixmap(ImagePaths.INTERFACE_REFRESH)
if not pixmap.isNull():
self.reset_button.setIcon(QIcon(pixmap))
self.reset_button.setIconSize(pixmap.size())
else:
# 图片加载失败(文件不存在或格式错误)
self.reset_button.setText("点击重置连接")
except Exception as e:
print(f"加载复位图片失败: {e}")
self.reset_button.setText("点击重置连接")
# 堆叠布局:重连按钮和视频标签分层
self.stacked_widget = QStackedWidget()
self.stacked_widget.addWidget(self.raw_label)
self.stacked_widget.addWidget(self.reset_button)
video_layout.addWidget(self.stacked_widget, alignment=Qt.AlignCenter)
# 添加到主布局
layout.addLayout(title_layout)
layout.addWidget(self.video_container)
# 显示开关切换槽函数
def onDisplayButtonSwitched(self, state:bool):
# 显示开关打开state 为 True; 显示开关关闭state 为 False
if state:
self.stacked_widget.setCurrentWidget(self.raw_label) # 视频显示标签
self.stacked_widget.setHidden(False)
else:
self.stacked_widget.setHidden(True)
self.video_display_signal.emit(self.camera_name, state)
# 重连按钮点击槽函数
def onRestButtonClicked(self):
self.stacked_widget.setCurrentWidget(self.raw_label)
self.raw_label.setText(f"{self.camera_name}摄像头, 连接中...")
self.reset_signal.emit() # 发送重连信号
def showResetButton(self):
self.raw_label.setText("")
self.stacked_widget.setCurrentWidget(self.reset_button)
def update_raw_image(self, image: Optional[QImage]):
if image:
pixmap = QPixmap.fromImage(image)
self.raw_label.setPixmap(
pixmap.scaled(
self.raw_label.size(),
Qt.IgnoreAspectRatio, # 忽略比例,占满窗口 可选Qt.KeepAspectRatio
Qt.SmoothTransformation,
)
)
self.raw_label.setText("")
else:
self.raw_label.setPixmap(QPixmap())
# 没有图片数据stream流会自动重连 或 手动重连
self.raw_label.setText(f"{self.camera_name}摄像头, 连接中...")
# 视频显示区域 总的标题 (如 振捣视频)
class VideoTitleWidget(QWidget):
def __init__(self, video_name, parent=None):
super().__init__(parent)
self.setStyleSheet("background-color: transparent;") # 整体透明
self.setup_ui(video_name)
def setup_ui(self, video_name):
# 水平布局:文字 + 图标
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0) # 布局内边距为0
# 文字标签
self.text_label = QLabel(f"{video_name}视频")
self.text_label.setStyleSheet("""
font-family: "微软雅黑";
font-size: 20px;
font-weight: bold;
color: #16FFFF;
background-color: transparent;
""")
font = self.text_label.font()
font.setLetterSpacing(QFont.SpacingType.AbsoluteSpacing, 3) # 字间距3px可按需加大
self.text_label.setFont(font)
# 图标标签
self.icon_label = QLabel()
self.icon_label.setPixmap(QPixmap(ImagePaths.VIDEO_CAMERA_MARK)) # 需要换为实际的图片的路径
self.icon_label.setStyleSheet("background-color: transparent;")
self.icon_label.setFixedSize(23, 16)
self.icon_label.setScaledContents(True)
# 添加到布局
layout.addWidget(self.text_label, alignment=Qt.AlignCenter)
layout.addWidget(self.icon_label, alignment=Qt.AlignCenter)
class VibrationVideoWidget(QWidget):
def __init__(self, parent=None):
super().__init__(parent=parent)
self.setObjectName("vibrationVideoWidget")
# 1. 加载背景图片(替换为你的图片路径)
self.background_image = QImage() # 初始化图片对象
# 注意:图片路径需正确,建议使用绝对路径或项目相对路径
self.background_path = ImagePaths.VIDEO_BACKGROUND # 你的背景图片路径
if not self.background_image.load(self.background_path):
# 图片加载失败时的容错处理(显示红色背景作为提示)
print(f"警告:无法加载背景图片 {self.background_path}")
self.background_image = QImage(388, 839, QImage.Format_RGB32)
self.background_image.fill(Qt.GlobalColor.red) # 加载失败时用红色填充
# 显示摄像头画面的模组
# 需要修改为相应的地址!!!
# 注在camera_controller中设置地址url
self.cam1 = CameraModule("上位料斗")
self.cam2 = CameraModule("下位料斗", show_ai=False)
self.cam3 = CameraModule("模具车", need_rotate_180=False)
self.setup_ui()
self.connect_signals()
def set_camera_urls(self, cam1_url, cam2_url, cam3_url):
"""设置三个摄像头的RTSP URL
注意: 在CameraController中调用
"""
self.cam1.rtsp_url = cam1_url
self.cam2.rtsp_url = cam2_url
self.cam3.rtsp_url = cam3_url
def setup_ui(self):
# 视频widget样式
# self.setFixedSize(387, 720) # 宽387、高792
self.setFixedSize(388, 839) # 背景图片宽388、高879
# 1、总的标题
self.title_widget = VideoTitleWidget("振捣")
# 布局设置
# 2. 创建摄像头容器专门存放cam1/cam2/cam3
self.camera_container = QWidget() # 容器widget
self.camera_container.setFixedSize(350, 720) # 容器固定尺寸
# 给容器添加样式
self.camera_container.setStyleSheet("""
background-color: transparent;
border: none;
""")
# 3. 容器内部布局(管理三个摄像头模块)
container_layout = QVBoxLayout(self.camera_container)
container_layout.setContentsMargins(6, 6, 6, 6) # 容器内边距
container_layout.setSpacing(6) # 摄像头模块之间的间距
# 将三个摄像头添加到容器布局中
container_layout.addWidget(self.cam1, alignment=Qt.AlignCenter)
container_layout.addWidget(self.cam2, alignment=Qt.AlignCenter)
container_layout.addWidget(self.cam3, alignment=Qt.AlignCenter)
# 4. 主窗口布局(管理摄像头容器等其他部件)
main_layout = QVBoxLayout(self) # 主窗口的布局
main_layout.setContentsMargins(0, 0, 9, 30) # 主布局内边距为0
# 添加标题部件
main_layout.addWidget(self.title_widget, alignment=Qt.AlignCenter) # 标题居中
# 添加43px间距标题与摄像头容器之间
main_layout.addSpacing(16)
# 把摄像头容器添加到主布局中(作为一个整体)
main_layout.addWidget(self.camera_container, alignment=Qt.AlignBottom | Qt.AlignHCenter)
# 初始化视频流 (注意设置好了camera_urls之后调用)
def init_streams(self):
url1 = self.cam1.rtsp_url
url2 = self.cam2.rtsp_url
url3 = self.cam3.rtsp_url
self.stream1 = VideoStream(url1, "Cam1").start()
self.stream2 = VideoStream(url2, "Cam2").start()
self.stream3 = VideoStream(url3, "Cam3").start()
self.timer1 = QTimer()
self.timer1.timeout.connect(self.update_frame1)
self.timer2 = QTimer()
self.timer2.timeout.connect(self.update_frame2)
self.timer3 = QTimer()
self.timer3.timeout.connect(self.update_frame3)
# 定时读取视频流
self.timer1.start(33)
self.timer2.start(33)
self.timer3.start(33)
def connect_signals(self):
# 流Stream相关的槽函数连接放在了 camera_controller之中
# 视频显示相关的槽函数连接
self.cam1.video_display_signal.connect(self.onVideoDisplay)
self.cam2.video_display_signal.connect(self.onVideoDisplay)
self.cam3.video_display_signal.connect(self.onVideoDisplay)
def onVideoDisplay(self, cam_name: str, state: bool):
# 1. 摄像头与 stream、timer 的映射关系
cam_mapping = {
"上位料斗": (self.stream1, self.timer1),
"下位料斗": (self.stream2, self.timer2),
"模具车": (self.stream3, self.timer3)
}
# 2. 获取当前摄像头对应的 stream 和 timer处理无效名称的情况
stream, timer = cam_mapping.get(cam_name, (None, None))
if not stream or not timer:
print(f"警告:未知摄像头名称 {cam_name}")
return
# 3. 根据 state 执行操作
if state: # 显示视频
stream.reset_retry() # 视频流重连
timer.start(33) # 定时器开始,显示视频帧
else: # 关闭视频
timer.stop() # 定时器停止
stream.manual_stop = True # 视频流停止读取
def onStreamError(self, stream_name):
if stream_name == "Cam1":
self.cam1.showResetButton()
elif stream_name == "Cam2":
self.cam2.showResetButton()
elif stream_name == "Cam3":
self.cam3.showResetButton()
def update_frame1(self):
frame = self.stream1.read()
if frame is not None:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if self.cam1.need_rotate_180:
frame_rgb = cv2.rotate(frame_rgb, cv2.ROTATE_180)
h, w, ch = frame_rgb.shape
q_img = QImage(frame_rgb.data, w, h, ch * w, QImage.Format_RGB888)
self.cam1.update_raw_image(q_img)
else:
self.cam1.update_raw_image(None)
def update_frame2(self):
frame = self.stream2.read()
if frame is not None:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if self.cam2.need_rotate_180:
frame_rgb = cv2.rotate(frame_rgb, cv2.ROTATE_180)
h, w, ch = frame_rgb.shape
q_img = QImage(frame_rgb.data, w, h, ch * w, QImage.Format_RGB888)
self.cam2.update_raw_image(q_img)
else:
self.cam2.update_raw_image(None)
def update_frame3(self):
frame = self.stream3.read()
if frame is not None:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if self.cam3.need_rotate_180:
frame_rgb = cv2.rotate(frame_rgb, cv2.ROTATE_180)
h, w, ch = frame_rgb.shape
q_img = QImage(frame_rgb.data, w, h, ch * w, QImage.Format_RGB888)
self.cam3.update_raw_image(q_img)
else:
self.cam3.update_raw_image(None)
def paintEvent(self, event):
# 1. 先调用父类的paintEvent
super().paintEvent(event)
# 2. 创建画家对象,绘制背景图片
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.SmoothPixmapTransform) # 图片缩放平滑
# 3. 获取部件的实际尺寸这里是387x720与setup_ui中一致
widget_rect = self.rect()
# 4. 绘制背景图片(缩放至部件大小)
# 可选缩放策略:
# - Qt.KeepAspectRatio保持图片比例可能有黑边
# - Qt.IgnoreAspectRatio拉伸填充可能变形
# - Qt.KeepAspectRatioByExpanding按比例放大至覆盖部件可能裁剪边缘
scaled_pixmap = QPixmap.fromImage(self.background_image).scaled(
widget_rect.size(),
Qt.KeepAspectRatio, # 推荐:保持比例,避免变形
Qt.SmoothTransformation # 平滑缩放
)
# 5. 计算图片绘制位置(居中显示,若有黑边则均匀分布)
x = (widget_rect.width() - scaled_pixmap.width()) // 2
y = (widget_rect.height() - scaled_pixmap.height()) // 2
painter.drawPixmap(x, y, scaled_pixmap)
# 6. 结束绘制
painter.end()
# ====================== 清理资源 ======================
def closeEvent(self, event):
self.hide()
self.timer1.stop()
self.timer2.stop()
self.timer3.stop()
self.stream1.stop()
self.stream2.stop()
self.stream3.stop()
super().closeEvent(event)