# vision/camera.py import cv2 import os import time from datetime import datetime from typing import Optional import numpy as np import shutil # 用于检查磁盘空间 class CameraUtil: """ 海康摄像头工具类,用于初始化、设置配置、捕获图像等操作 """ def __init__(self): self.camera = None self.camera_type = "ip" self.camera_ip = "192.168.0.125" self.camera_port = 554 self.camera_username = "admin" self.camera_password = "ailaimiye123" self.camera_channel = 1 def set_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1): """ 设置摄像头配置 """ self.camera_type = camera_type if ip: self.camera_ip = ip if port: self.camera_port = port if username: self.camera_username = username if password: self.camera_password = password self.camera_channel = channel def setup_capture(self, camera_index=0): """ 设置摄像头捕获 """ try: rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01" self.camera = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if not self.camera.isOpened(): print(f"无法打开网络摄像头: {rtsp_url}") return False print(f"网络摄像头初始化成功,地址: {rtsp_url}") # 关键优化:设置极低的缓冲区大小 self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 确保设置合理的帧率 self.camera.set(cv2.CAP_PROP_FPS, 25) # 设置超时参数 self.camera.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) # 5秒超时 self.camera.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 1000) # 1秒读取超时 return True except Exception as e: print(f"摄像头设置失败: {e}") return False def capture_frame(self): """捕获当前帧并返回numpy数组,设置5秒总超时""" try: if self.camera is None: print("摄像头未初始化") return None # 设置总超时时间为5秒 total_timeout = 5.0 # 5秒总超时时间 start_time = time.time() # 跳20帧,获取最新图像 frames_skipped = 0 while frames_skipped < 1: # 检查总超时 if time.time() - start_time > total_timeout: print("捕获图像总超时") return None self.camera.grab() # time.sleep(0.05) # 稍微增加延迟,确保有新帧到达 frames_skipped += 1 # 尝试读取帧,使用同一超时计时器 read_attempts = 0 max_read_attempts = 3 if self.camera.grab(): while read_attempts < max_read_attempts: # 使用同一个超时计时器检查 if time.time() - start_time > total_timeout: print("捕获图像总超时") return None ret, frame = self.camera.retrieve() if ret: return frame else: print(f"尝试读取图像帧失败,重试 ({read_attempts+1}/{max_read_attempts})") read_attempts += 1 # 短暂延迟后重试 time.sleep(0.05) print("多次尝试后仍无法捕获有效图像帧") return None except Exception as e: print(f"图像捕获失败: {e}") return None def release(self): """释放摄像头资源""" if self.camera is not None: self.camera.release() self.camera = None print("摄像头资源已释放") def save_frame(self, frame, save_dir="captured_frames", filename=None): """ 保存图像帧到指定目录,按日期创建子文件夹 :param frame: 要保存的图像帧 :param save_dir: 保存根目录 :param filename: 保存的文件名,如果为None则使用时分秒格式 :return: 保存的文件路径 """ try: # 确保保存根目录存在 if not os.path.exists(save_dir): os.makedirs(save_dir) print(f"创建保存根目录: {save_dir}") # 检查磁盘可用空间 - 修复变量名称错误 total, used, free = shutil.disk_usage(save_dir) if free < 1024 * 1024 * 20: # 小于 20MB 就停止 print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024**3):.2f} GB),停止运行。") return None # 获取当前日期,创建日期子文件夹 current_date = datetime.now().strftime("%Y%m%d") date_dir = os.path.join(save_dir, current_date) # 确保日期子文件夹存在 if not os.path.exists(date_dir): os.makedirs(date_dir) print(f"创建日期文件夹: {date_dir}") # 生成文件名(时分秒格式) if filename is None: time_str = datetime.now().strftime("%Y%m%d%H%M%S") filename = f"{time_str}.jpg" # 保存图像到日期子文件夹 save_path = os.path.join(date_dir, filename) loc_status=cv2.imwrite(save_path, frame) if not loc_status: print(f"保存图像失败: {save_path}") return None print(f"图像已保存到: {save_path}") return save_path except Exception as e: print(f"保存图像失败: {e}") return None def save_img(self)->Optional[str]: """ 保存当前帧 :return: 保存的文件路径 """ if self.camera is None: self.setup_capture() frame = self.capture_frame() if frame is not None: loc_filepath=self.save_frame(frame) self.release() return loc_filepath else: print("无法捕获图像帧") return None def __del__(self): """关闭 """ self.release() if __name__ == "__main__": camera = CameraUtil() for i in range(10): camera.save_img() time.sleep(5) camera.release() # camera.set_config(camera_type="ip", ip="192.168.0.234", port=554, username="admin", password="ailaimiye123", channel=1) # camera.setup_capture() # for i in range(10): # frame = camera.capture_frame() # if frame is not None: # 保存当前帧 # camera.save_frame(frame) # cv2.imshow("Camera Feed", frame) print('success') # camera.release() # cv2.destroyAllWindows()