205 lines
7.1 KiB
Python
205 lines
7.1 KiB
Python
# vision/camera.py
|
||
import cv2
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
import numpy as np
|
||
import shutil # 用于检查磁盘空间
|
||
|
||
class CameraUtil:
|
||
"""
|
||
海康摄像头工具类,用于初始化、设置配置、捕获图像等操作
|
||
"""
|
||
def __init__(self):
|
||
self.camera = None
|
||
self.camera_type = "ip"
|
||
self.camera_ip = "192.168.0.125"
|
||
self.camera_port = 554
|
||
self.camera_username = "admin"
|
||
self.camera_password = "ailaimiye123"
|
||
self.camera_channel = 1
|
||
|
||
def set_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1):
|
||
"""
|
||
设置摄像头配置
|
||
"""
|
||
self.camera_type = camera_type
|
||
if ip:
|
||
self.camera_ip = ip
|
||
if port:
|
||
self.camera_port = port
|
||
if username:
|
||
self.camera_username = username
|
||
if password:
|
||
self.camera_password = password
|
||
self.camera_channel = channel
|
||
|
||
def setup_capture(self, camera_index=0):
|
||
"""
|
||
设置摄像头捕获
|
||
"""
|
||
try:
|
||
rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01"
|
||
self.camera = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
|
||
|
||
if not self.camera.isOpened():
|
||
print(f"无法打开网络摄像头: {rtsp_url}")
|
||
return False
|
||
print(f"网络摄像头初始化成功,地址: {rtsp_url}")
|
||
# 关键优化:设置极低的缓冲区大小
|
||
self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||
# 确保设置合理的帧率
|
||
self.camera.set(cv2.CAP_PROP_FPS, 25)
|
||
|
||
# 设置超时参数
|
||
self.camera.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) # 5秒超时
|
||
self.camera.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 1000) # 1秒读取超时
|
||
return True
|
||
except Exception as e:
|
||
print(f"摄像头设置失败: {e}")
|
||
return False
|
||
|
||
def capture_frame(self):
|
||
"""捕获当前帧并返回numpy数组,设置5秒总超时"""
|
||
try:
|
||
if self.camera is None:
|
||
print("摄像头未初始化")
|
||
return None
|
||
|
||
# 设置总超时时间为5秒
|
||
total_timeout = 5.0 # 5秒总超时时间
|
||
start_time = time.time()
|
||
|
||
# 跳20帧,获取最新图像
|
||
frames_skipped = 0
|
||
while frames_skipped < 1:
|
||
# 检查总超时
|
||
if time.time() - start_time > total_timeout:
|
||
print("捕获图像总超时")
|
||
return None
|
||
self.camera.grab()
|
||
# time.sleep(0.05) # 稍微增加延迟,确保有新帧到达
|
||
frames_skipped += 1
|
||
|
||
# 尝试读取帧,使用同一超时计时器
|
||
read_attempts = 0
|
||
max_read_attempts = 3
|
||
if self.camera.grab():
|
||
while read_attempts < max_read_attempts:
|
||
# 使用同一个超时计时器检查
|
||
if time.time() - start_time > total_timeout:
|
||
print("捕获图像总超时")
|
||
return None
|
||
|
||
ret, frame = self.camera.retrieve()
|
||
if ret:
|
||
return frame
|
||
else:
|
||
print(f"尝试读取图像帧失败,重试 ({read_attempts+1}/{max_read_attempts})")
|
||
read_attempts += 1
|
||
# 短暂延迟后重试
|
||
time.sleep(0.05)
|
||
|
||
print("多次尝试后仍无法捕获有效图像帧")
|
||
return None
|
||
except Exception as e:
|
||
print(f"图像捕获失败: {e}")
|
||
return None
|
||
|
||
def release(self):
|
||
"""释放摄像头资源"""
|
||
if self.camera is not None:
|
||
self.camera.release()
|
||
self.camera = None
|
||
print("摄像头资源已释放")
|
||
|
||
def save_frame(self, frame, save_dir="captured_frames", filename=None):
|
||
"""
|
||
保存图像帧到指定目录,按日期创建子文件夹
|
||
:param frame: 要保存的图像帧
|
||
:param save_dir: 保存根目录
|
||
:param filename: 保存的文件名,如果为None则使用时分秒格式
|
||
:return: 保存的文件路径
|
||
"""
|
||
try:
|
||
# 确保保存根目录存在
|
||
if not os.path.exists(save_dir):
|
||
os.makedirs(save_dir)
|
||
print(f"创建保存根目录: {save_dir}")
|
||
|
||
# 检查磁盘可用空间 - 修复变量名称错误
|
||
total, used, free = shutil.disk_usage(save_dir)
|
||
if free < 1024 * 1024 * 20: # 小于 20MB 就停止
|
||
print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024**3):.2f} GB),停止运行。")
|
||
return None
|
||
|
||
# 获取当前日期,创建日期子文件夹
|
||
current_date = datetime.now().strftime("%Y%m%d")
|
||
date_dir = os.path.join(save_dir, current_date)
|
||
|
||
# 确保日期子文件夹存在
|
||
if not os.path.exists(date_dir):
|
||
os.makedirs(date_dir)
|
||
print(f"创建日期文件夹: {date_dir}")
|
||
|
||
# 生成文件名(时分秒格式)
|
||
if filename is None:
|
||
time_str = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
filename = f"{time_str}.jpg"
|
||
|
||
# 保存图像到日期子文件夹
|
||
save_path = os.path.join(date_dir, filename)
|
||
loc_status=cv2.imwrite(save_path, frame)
|
||
if not loc_status:
|
||
print(f"保存图像失败: {save_path}")
|
||
return None
|
||
print(f"图像已保存到: {save_path}")
|
||
return save_path
|
||
except Exception as e:
|
||
print(f"保存图像失败: {e}")
|
||
return None
|
||
|
||
def save_img(self)->Optional[str]:
|
||
"""
|
||
保存当前帧
|
||
:return: 保存的文件路径
|
||
"""
|
||
if self.camera is None:
|
||
self.setup_capture()
|
||
frame = self.capture_frame()
|
||
if frame is not None:
|
||
loc_filepath=self.save_frame(frame)
|
||
self.release()
|
||
return loc_filepath
|
||
else:
|
||
print("无法捕获图像帧")
|
||
return None
|
||
|
||
|
||
def __del__(self):
|
||
"""关闭
|
||
"""
|
||
self.release()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
camera = CameraUtil()
|
||
for i in range(10):
|
||
camera.save_img()
|
||
time.sleep(5)
|
||
camera.release()
|
||
# camera.set_config(camera_type="ip", ip="192.168.0.234", port=554, username="admin", password="ailaimiye123", channel=1)
|
||
# camera.setup_capture()
|
||
# for i in range(10):
|
||
# frame = camera.capture_frame()
|
||
|
||
# if frame is not None:
|
||
# 保存当前帧
|
||
# camera.save_frame(frame)
|
||
# cv2.imshow("Camera Feed", frame)
|
||
print('success')
|
||
|
||
# camera.release()
|
||
# cv2.destroyAllWindows()
|