227 lines
6.8 KiB
Python
227 lines
6.8 KiB
Python
import sys
|
||
import os
|
||
# import cv2
|
||
import time
|
||
import numpy as np
|
||
from ctypes import *
|
||
from .MvImport.MvCameraControl_class import *
|
||
|
||
|
||
class Camera:
|
||
def __init__(self):
|
||
"""
|
||
初始化相机类
|
||
"""
|
||
self.cam = MvCamera()
|
||
self.device_list = MV_CC_DEVICE_INFO_LIST()
|
||
self.handle_created = False
|
||
self.device_opened = False
|
||
self.is_grabbing = False
|
||
|
||
# 统计信息
|
||
self.frame_count = 0
|
||
self.start_time = 0
|
||
|
||
# 缓冲区变量 (将在打开设备后初始化)
|
||
self.data_buf = None
|
||
self.payload_size = 0
|
||
self.frame_info = MV_FRAME_OUT_INFO_EX()
|
||
|
||
def _open(self):
|
||
"""
|
||
枚举并打开相机
|
||
:return: bool, 成功返回 True,失败返回 False
|
||
"""
|
||
# 1. 枚举设备
|
||
ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, self.device_list)
|
||
if ret != 0 or self.device_list.nDeviceNum == 0:
|
||
print("[Error] No device found!")
|
||
return False
|
||
|
||
print(f"[Info] Found {self.device_list.nDeviceNum} device(s). Using the first one.")
|
||
|
||
# 2. 选择第一个设备并创建句柄
|
||
st_device = cast(self.device_list.pDeviceInfo[0], POINTER(MV_CC_DEVICE_INFO)).contents
|
||
ret = self.cam.MV_CC_CreateHandle(st_device)
|
||
if ret != 0:
|
||
print(f"[Error] Create handle failed, ret={ret}")
|
||
return False
|
||
self.handle_created = True
|
||
|
||
# 3. 打开设备
|
||
ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
|
||
if ret != 0:
|
||
print(f"[Error] Open device failed, ret={ret}")
|
||
self._close()
|
||
return False
|
||
self.device_opened = True
|
||
|
||
# 4. 配置采集模式 (连续采集)
|
||
self.cam.MV_CC_SetEnumValue("AcquisitionMode", 2)
|
||
self.cam.MV_CC_SetEnumValue("TriggerMode", 0)
|
||
|
||
# 5. 开始取流
|
||
ret = self.cam.MV_CC_StartGrabbing()
|
||
if ret != 0:
|
||
print(f"[Error] Start grabbing failed, ret={ret}")
|
||
self._close()
|
||
return False
|
||
self.is_grabbing = True
|
||
|
||
# 6. 获取 PayloadSize 并分配缓冲区
|
||
st_payload = MVCC_INTVALUE_EX()
|
||
ret = self.cam.MV_CC_GetIntValueEx("PayloadSize", st_payload)
|
||
if ret == 0:
|
||
self.payload_size = st_payload.nCurValue
|
||
self.data_buf = (c_ubyte * self.payload_size)()
|
||
print(f"[Info] Camera opened successfully. PayloadSize: {self.payload_size}")
|
||
else:
|
||
print("[Error] Failed to get PayloadSize")
|
||
self._close()
|
||
return False
|
||
|
||
return True
|
||
|
||
def _close(self):
|
||
"""
|
||
停止采集并关闭相机资源
|
||
"""
|
||
if self.is_grabbing:
|
||
self.cam.MV_CC_StopGrabbing()
|
||
self.is_grabbing = False
|
||
|
||
if self.device_opened:
|
||
self.cam.MV_CC_CloseDevice()
|
||
self.device_opened = False
|
||
|
||
if self.handle_created:
|
||
self.cam.MV_CC_DestroyHandle()
|
||
self.handle_created = False
|
||
|
||
print("[Info] Camera resources released.")
|
||
|
||
def get_img(self, timeout_ms=1000):
|
||
"""
|
||
获取一帧图像并进行像素转换
|
||
:param timeout_ms: 获取帧的超时时间 (毫秒)
|
||
:return: numpy array (BGR格式) 或 None (如果获取失败)
|
||
"""
|
||
if not self.is_grabbing or self.data_buf is None:
|
||
return None
|
||
|
||
ret = self.cam.MV_CC_GetOneFrameTimeout(self.data_buf, self.payload_size, self.frame_info, timeout_ms)
|
||
if ret != 0:
|
||
return None
|
||
|
||
# 更新统计
|
||
self.frame_count += 1
|
||
if self.frame_count == 1:
|
||
self.start_time = time.time()
|
||
|
||
w, h = self.frame_info.nWidth, self.frame_info.nHeight
|
||
|
||
# 像素转换参数设置
|
||
convert = MV_CC_PIXEL_CONVERT_PARAM()
|
||
memset(byref(convert), 0, sizeof(convert))
|
||
convert.nWidth = w
|
||
convert.nHeight = h
|
||
convert.pSrcData = self.data_buf
|
||
convert.nSrcDataLen = self.frame_info.nFrameLen
|
||
convert.enSrcPixelType = self.frame_info.enPixelType
|
||
convert.enDstPixelType = PixelType_Gvsp_BGR8_Packed
|
||
convert.nDstBufferSize = w * h * 3
|
||
|
||
dst = (c_ubyte * convert.nDstBufferSize)()
|
||
convert.pDstBuffer = dst
|
||
|
||
ret_conv = self.cam.MV_CC_ConvertPixelType(convert)
|
||
if ret_conv != 0:
|
||
return None
|
||
|
||
# 转换为 numpy 数组
|
||
img = np.frombuffer(dst, dtype=np.uint8).reshape(h, w, 3)
|
||
return img
|
||
|
||
def get_fps_stats(self):
|
||
"""获取当前的 FPS 统计信息,便于观察打印取流fps"""
|
||
if self.frame_count == 0 or self.start_time == 0:
|
||
return 0.0, 0, 0.0
|
||
|
||
total_time = time.time() - self.start_time
|
||
fps = self.frame_count / total_time if total_time > 0 else 0
|
||
return fps, self.frame_count, total_time
|
||
|
||
# ---------对外接口---------
|
||
def show_img():
|
||
try:
|
||
# 1. 实例化相机
|
||
cam_obj = Camera()
|
||
|
||
# 2. 非正常打开报警
|
||
if not cam_obj._open():
|
||
print("Failed to open camera. Exiting.")
|
||
sys.exit(1)
|
||
|
||
print("Capturing one frame...")
|
||
|
||
while(True):
|
||
# 3. 获取图像
|
||
img = cam_obj.get_img()
|
||
return img
|
||
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
finally:
|
||
# 4. 关闭相机
|
||
cam_obj._close()
|
||
print("Camera closed.")
|
||
|
||
# if img is not None:
|
||
# # 生成带时间戳的文件名,防止覆盖
|
||
# timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
# filename = f"capture_{timestamp}.jpg"
|
||
|
||
# # 保存图像
|
||
# # ret = cv2.imwrite(filename, img)
|
||
# # if ret:
|
||
# # print(f"Success! Image saved as: {filename}")
|
||
# # else:
|
||
# # print("Error: Failed to save image.")
|
||
# else:
|
||
# print("Error: Failed to capture any valid frame.")
|
||
|
||
# =====================================================
|
||
# 主程序示例
|
||
# =====================================================
|
||
if __name__ == "__main__":
|
||
# 1. 实例化相机
|
||
cam_obj = Camera()
|
||
|
||
# 2. 非正常打开报警
|
||
if not cam_obj._open():
|
||
print("Failed to open camera. Exiting.")
|
||
sys.exit(1)
|
||
|
||
print("Capturing one frame...")
|
||
|
||
# 3. 获取图像
|
||
img = cam_obj.get_img()
|
||
|
||
if img is not None:
|
||
# 生成带时间戳的文件名,防止覆盖
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
filename = f"capture_{timestamp}.jpg"
|
||
|
||
# 保存图像
|
||
# ret = cv2.imwrite(filename, img)
|
||
# if ret:
|
||
# print(f"Success! Image saved as: {filename}")
|
||
# else:
|
||
# print("Error: Failed to save image.")
|
||
else:
|
||
print("Error: Failed to capture any valid frame.")
|
||
|
||
# 4. 关闭相机
|
||
cam_obj._close()
|
||
print("Camera closed.")
|