# coding:utf-8 from PySide6.QtCore import Qt, QTimer, Signal, QObject from PySide6.QtGui import QImage, QPixmap from PySide6.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QLabel, QApplication from typing import Optional import cv2 import time from threading import Thread, Lock import sys # ====================== 后台视频流管理(自动重连)====================== class VideoStream: """后台读取 RTSP 流,自动重连,只返回新鲜帧""" def __init__(self, rtsp_url, name="Stream"): self.rtsp_url = rtsp_url self.name = name self.cap = None self.frame = None self.timestamp = 0 self.lock = Lock() self.running = False self.thread = None def start(self): self.running = True self.thread = Thread(target=self.update, args=(), daemon=True) self.thread.start() return self def update(self): while self.running: if self.cap is None or not self.cap.isOpened(): print(f"[{self.name}] 正在连接 RTSP: {self.rtsp_url}") try: self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) if hasattr(cv2, 'CAP_PROP_BUFFERSIZE'): self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if hasattr(cv2, 'CAP_PROP_READ_TIMEOUT'): self.cap.set(cv2.CAP_PROP_READ_TIMEOUT, 2000) if hasattr(cv2, 'CAP_PROP_TCP_NODELAY'): self.cap.set(cv2.CAP_PROP_TCP_NODELAY, 1) except Exception as e: print(f"[{self.name}] 连接失败: {e}") time.sleep(1) continue ret, frame = self.cap.read() if ret: with self.lock: self.frame = frame.copy() self.timestamp = time.time() else: print(f"[{self.name}] 读取失败,准备重连...") if self.cap: self.cap.release() self.cap = None time.sleep(1) def read(self): with self.lock: if self.frame is not None and time.time() - self.timestamp < 1.5: return self.frame.copy() else: return None def stop(self): self.running = False if self.thread is not None: self.thread.join(timeout=0.5) if self.cap is not None: self.cap.release() print(f"[{self.name}] 视频流已停止") # ====================== 摄像头功能模块 ====================== class CameraModule(QWidget): """单个摄像头模块:原图显示""" def __init__(self, camera_name="摄像头", rtsp_url="", need_rotate_180=True, parent=None): super().__init__(parent) self.camera_name = camera_name self.rtsp_url = rtsp_url self.need_rotate_180 = need_rotate_180 # 画面是否需要旋转180度后显示 self.setup_ui() def setup_ui(self): layout = QVBoxLayout(self) layout.setSpacing(0) # layout.setContentsMargins(8, 8, 8, 8) layout.setContentsMargins(0, 0, 0, 0) # --- 0. 标题 --- self.title_label = QLabel() self.title_label.setFixedSize(106, 25) self.title_label.setAlignment(Qt.AlignLeft) self.title_label.setText(f"{self.camera_name}视频") self.title_label.setObjectName("cameraTitleLabel") self.title_label.setStyleSheet(""" #cameraTitleLabel { font-size: 16px; color: black; font-weight: bold; } """) # --- 1. 原始图像 --- self.raw_label = QLabel() # 显示图像的 label # self.raw_label.setFixedSize(320, 240) self.raw_label.setFixedSize(327, 199) # 显示的图像的宽、高 palette = self.raw_label.palette() palette.setColor(self.raw_label.foregroundRole(), Qt.white) # 字体颜色:白色 self.raw_label.setPalette(palette) self.raw_label.setAlignment(Qt.AlignCenter) # self.raw_label.setStyleSheet("background: black; border: 1px solid #ccc;") self.raw_label.setStyleSheet("background: #033474; border: none") # 视频显示框样式 self.raw_label.setText(f"{self.camera_name}摄像头, 连接中...") # --- 添加到主布局 --- layout.addWidget(self.title_label) # 标题 layout.addWidget(self.raw_label) # 图像 def update_raw_image(self, image: Optional[QImage]): if image: pixmap = QPixmap.fromImage(image) self.raw_label.setPixmap(pixmap.scaled( self.raw_label.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation )) self.raw_label.setText("") else: self.raw_label.setPixmap(QPixmap()) self.raw_label.setText(f"{self.camera_name}摄像头, 断线中...") class VibrationVideoWidget(QWidget): def __init__(self, parent=None): super().__init__(parent=parent) self.setObjectName("vibrationVideoWidget") # 显示摄像头画面的模组 # 需要修改为相应的地址!!! self.cam1 = CameraModule("上位料斗", "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101") self.cam2 = CameraModule("下位料斗", "rtsp://admin:XJ123456@192.168.1.51:554/streaming/channels/101") self.cam3 = CameraModule("模具车", "rtsp://admin:XJ123456@192.168.1.51:554/streaming/channels/101") self.setup_ui() self.init_streams() self.connect_signals() def setup_ui(self): # 视频widget样式 self.setFixedSize(387, 720) # 宽387、高792 self.setAutoFillBackground(True) self.setStyleSheet(""" #vibrationVideoWidget { background-color: #043d76; border: none; /* 可选:去除默认边框,避免视觉干扰 */ } """) # 布局设置 layout = QVBoxLayout(self) layout.setContentsMargins(16, 16, 16, 16) layout.setSpacing(6) # 每两个摄像头模块间隔6px layout.addWidget(self.cam1) layout.addWidget(self.cam2) layout.addWidget(self.cam3) def init_streams(self): url1 = self.cam1.rtsp_url url2 = self.cam2.rtsp_url url3 = self.cam3.rtsp_url self.stream1 = VideoStream(url1, "Cam1").start() self.stream2 = VideoStream(url2, "Cam2").start() self.stream3 = VideoStream(url3, "Cam3").start() self.timer1 = QTimer() self.timer1.timeout.connect(self.update_frame1) self.timer2 = QTimer() self.timer2.timeout.connect(self.update_frame2) self.timer3 = QTimer() self.timer3.timeout.connect(self.update_frame3) # 定时读取视频流 self.timer1.start(33) self.timer2.start(33) self.timer3.start(33) def connect_signals(self): pass def update_frame1(self): frame = self.stream1.read() if frame is not None: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if self.cam1.need_rotate_180: frame_rgb = cv2.rotate(frame_rgb, cv2.ROTATE_180) h, w, ch = frame_rgb.shape q_img = QImage(frame_rgb.data, w, h, ch * w, QImage.Format_RGB888) self.cam1.update_raw_image(q_img) else: self.cam1.update_raw_image(None) def update_frame2(self): frame = self.stream2.read() if frame is not None: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if self.cam2.need_rotate_180: frame_rgb = cv2.rotate(frame_rgb, cv2.ROTATE_180) h, w, ch = frame_rgb.shape q_img = QImage(frame_rgb.data, w, h, ch * w, QImage.Format_RGB888) self.cam2.update_raw_image(q_img) else: self.cam2.update_raw_image(None) def update_frame3(self): frame = self.stream3.read() if frame is not None: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if self.cam3.need_rotate_180: frame_rgb = cv2.rotate(frame_rgb, cv2.ROTATE_180) h, w, ch = frame_rgb.shape q_img = QImage(frame_rgb.data, w, h, ch * w, QImage.Format_RGB888) self.cam3.update_raw_image(q_img) else: self.cam3.update_raw_image(None) # ====================== 清理资源 ====================== def closeEvent(self, event): self.hide() self.stream1.stop() self.stream2.stop() self.stream3.stop() self.timer1.stop() self.timer2.stop() self.timer3.stop() super().closeEvent(event) if __name__ == "__main__": app = QApplication([]) window = VibrationVideoWidget() window.show() sys.exit(app.exec())