227 lines
6.8 KiB
Python
227 lines
6.8 KiB
Python
|
|
import sys
|
|||
|
|
import os
|
|||
|
|
# import cv2
|
|||
|
|
import time
|
|||
|
|
import numpy as np
|
|||
|
|
from ctypes import *
|
|||
|
|
from .MvImport.MvCameraControl_class import *
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Camera:
|
|||
|
|
def __init__(self):
|
|||
|
|
"""
|
|||
|
|
初始化相机类
|
|||
|
|
"""
|
|||
|
|
self.cam = MvCamera()
|
|||
|
|
self.device_list = MV_CC_DEVICE_INFO_LIST()
|
|||
|
|
self.handle_created = False
|
|||
|
|
self.device_opened = False
|
|||
|
|
self.is_grabbing = False
|
|||
|
|
|
|||
|
|
# 统计信息
|
|||
|
|
self.frame_count = 0
|
|||
|
|
self.start_time = 0
|
|||
|
|
|
|||
|
|
# 缓冲区变量 (将在打开设备后初始化)
|
|||
|
|
self.data_buf = None
|
|||
|
|
self.payload_size = 0
|
|||
|
|
self.frame_info = MV_FRAME_OUT_INFO_EX()
|
|||
|
|
|
|||
|
|
def _open(self):
|
|||
|
|
"""
|
|||
|
|
枚举并打开相机
|
|||
|
|
:return: bool, 成功返回 True,失败返回 False
|
|||
|
|
"""
|
|||
|
|
# 1. 枚举设备
|
|||
|
|
ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, self.device_list)
|
|||
|
|
if ret != 0 or self.device_list.nDeviceNum == 0:
|
|||
|
|
print("[Error] No device found!")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
print(f"[Info] Found {self.device_list.nDeviceNum} device(s). Using the first one.")
|
|||
|
|
|
|||
|
|
# 2. 选择第一个设备并创建句柄
|
|||
|
|
st_device = cast(self.device_list.pDeviceInfo[0], POINTER(MV_CC_DEVICE_INFO)).contents
|
|||
|
|
ret = self.cam.MV_CC_CreateHandle(st_device)
|
|||
|
|
if ret != 0:
|
|||
|
|
print(f"[Error] Create handle failed, ret={ret}")
|
|||
|
|
return False
|
|||
|
|
self.handle_created = True
|
|||
|
|
|
|||
|
|
# 3. 打开设备
|
|||
|
|
ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
|
|||
|
|
if ret != 0:
|
|||
|
|
print(f"[Error] Open device failed, ret={ret}")
|
|||
|
|
self._close()
|
|||
|
|
return False
|
|||
|
|
self.device_opened = True
|
|||
|
|
|
|||
|
|
# 4. 配置采集模式 (连续采集)
|
|||
|
|
self.cam.MV_CC_SetEnumValue("AcquisitionMode", 2)
|
|||
|
|
self.cam.MV_CC_SetEnumValue("TriggerMode", 0)
|
|||
|
|
|
|||
|
|
# 5. 开始取流
|
|||
|
|
ret = self.cam.MV_CC_StartGrabbing()
|
|||
|
|
if ret != 0:
|
|||
|
|
print(f"[Error] Start grabbing failed, ret={ret}")
|
|||
|
|
self._close()
|
|||
|
|
return False
|
|||
|
|
self.is_grabbing = True
|
|||
|
|
|
|||
|
|
# 6. 获取 PayloadSize 并分配缓冲区
|
|||
|
|
st_payload = MVCC_INTVALUE_EX()
|
|||
|
|
ret = self.cam.MV_CC_GetIntValueEx("PayloadSize", st_payload)
|
|||
|
|
if ret == 0:
|
|||
|
|
self.payload_size = st_payload.nCurValue
|
|||
|
|
self.data_buf = (c_ubyte * self.payload_size)()
|
|||
|
|
print(f"[Info] Camera opened successfully. PayloadSize: {self.payload_size}")
|
|||
|
|
else:
|
|||
|
|
print("[Error] Failed to get PayloadSize")
|
|||
|
|
self._close()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
def _close(self):
|
|||
|
|
"""
|
|||
|
|
停止采集并关闭相机资源
|
|||
|
|
"""
|
|||
|
|
if self.is_grabbing:
|
|||
|
|
self.cam.MV_CC_StopGrabbing()
|
|||
|
|
self.is_grabbing = False
|
|||
|
|
|
|||
|
|
if self.device_opened:
|
|||
|
|
self.cam.MV_CC_CloseDevice()
|
|||
|
|
self.device_opened = False
|
|||
|
|
|
|||
|
|
if self.handle_created:
|
|||
|
|
self.cam.MV_CC_DestroyHandle()
|
|||
|
|
self.handle_created = False
|
|||
|
|
|
|||
|
|
print("[Info] Camera resources released.")
|
|||
|
|
|
|||
|
|
def get_img(self, timeout_ms=1000):
|
|||
|
|
"""
|
|||
|
|
获取一帧图像并进行像素转换
|
|||
|
|
:param timeout_ms: 获取帧的超时时间 (毫秒)
|
|||
|
|
:return: numpy array (BGR格式) 或 None (如果获取失败)
|
|||
|
|
"""
|
|||
|
|
if not self.is_grabbing or self.data_buf is None:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
ret = self.cam.MV_CC_GetOneFrameTimeout(self.data_buf, self.payload_size, self.frame_info, timeout_ms)
|
|||
|
|
if ret != 0:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 更新统计
|
|||
|
|
self.frame_count += 1
|
|||
|
|
if self.frame_count == 1:
|
|||
|
|
self.start_time = time.time()
|
|||
|
|
|
|||
|
|
w, h = self.frame_info.nWidth, self.frame_info.nHeight
|
|||
|
|
|
|||
|
|
# 像素转换参数设置
|
|||
|
|
convert = MV_CC_PIXEL_CONVERT_PARAM()
|
|||
|
|
memset(byref(convert), 0, sizeof(convert))
|
|||
|
|
convert.nWidth = w
|
|||
|
|
convert.nHeight = h
|
|||
|
|
convert.pSrcData = self.data_buf
|
|||
|
|
convert.nSrcDataLen = self.frame_info.nFrameLen
|
|||
|
|
convert.enSrcPixelType = self.frame_info.enPixelType
|
|||
|
|
convert.enDstPixelType = PixelType_Gvsp_BGR8_Packed
|
|||
|
|
convert.nDstBufferSize = w * h * 3
|
|||
|
|
|
|||
|
|
dst = (c_ubyte * convert.nDstBufferSize)()
|
|||
|
|
convert.pDstBuffer = dst
|
|||
|
|
|
|||
|
|
ret_conv = self.cam.MV_CC_ConvertPixelType(convert)
|
|||
|
|
if ret_conv != 0:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 转换为 numpy 数组
|
|||
|
|
img = np.frombuffer(dst, dtype=np.uint8).reshape(h, w, 3)
|
|||
|
|
return img
|
|||
|
|
|
|||
|
|
def get_fps_stats(self):
|
|||
|
|
"""获取当前的 FPS 统计信息,便于观察打印取流fps"""
|
|||
|
|
if self.frame_count == 0 or self.start_time == 0:
|
|||
|
|
return 0.0, 0, 0.0
|
|||
|
|
|
|||
|
|
total_time = time.time() - self.start_time
|
|||
|
|
fps = self.frame_count / total_time if total_time > 0 else 0
|
|||
|
|
return fps, self.frame_count, total_time
|
|||
|
|
|
|||
|
|
# ---------对外接口---------
|
|||
|
|
def show_img():
|
|||
|
|
try:
|
|||
|
|
# 1. 实例化相机
|
|||
|
|
cam_obj = Camera()
|
|||
|
|
|
|||
|
|
# 2. 非正常打开报警
|
|||
|
|
if not cam_obj._open():
|
|||
|
|
print("Failed to open camera. Exiting.")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
print("Capturing one frame...")
|
|||
|
|
|
|||
|
|
while(True):
|
|||
|
|
# 3. 获取图像
|
|||
|
|
img = cam_obj.get_img()
|
|||
|
|
return img
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error: {e}")
|
|||
|
|
finally:
|
|||
|
|
# 4. 关闭相机
|
|||
|
|
cam_obj._close()
|
|||
|
|
print("Camera closed.")
|
|||
|
|
|
|||
|
|
# if img is not None:
|
|||
|
|
# # 生成带时间戳的文件名,防止覆盖
|
|||
|
|
# timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|||
|
|
# filename = f"capture_{timestamp}.jpg"
|
|||
|
|
|
|||
|
|
# # 保存图像
|
|||
|
|
# # ret = cv2.imwrite(filename, img)
|
|||
|
|
# # if ret:
|
|||
|
|
# # print(f"Success! Image saved as: {filename}")
|
|||
|
|
# # else:
|
|||
|
|
# # print("Error: Failed to save image.")
|
|||
|
|
# else:
|
|||
|
|
# print("Error: Failed to capture any valid frame.")
|
|||
|
|
|
|||
|
|
# =====================================================
|
|||
|
|
# 主程序示例
|
|||
|
|
# =====================================================
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
# 1. 实例化相机
|
|||
|
|
cam_obj = Camera()
|
|||
|
|
|
|||
|
|
# 2. 非正常打开报警
|
|||
|
|
if not cam_obj._open():
|
|||
|
|
print("Failed to open camera. Exiting.")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
print("Capturing one frame...")
|
|||
|
|
|
|||
|
|
# 3. 获取图像
|
|||
|
|
img = cam_obj.get_img()
|
|||
|
|
|
|||
|
|
if img is not None:
|
|||
|
|
# 生成带时间戳的文件名,防止覆盖
|
|||
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|||
|
|
filename = f"capture_{timestamp}.jpg"
|
|||
|
|
|
|||
|
|
# 保存图像
|
|||
|
|
# ret = cv2.imwrite(filename, img)
|
|||
|
|
# if ret:
|
|||
|
|
# print(f"Success! Image saved as: {filename}")
|
|||
|
|
# else:
|
|||
|
|
# print("Error: Failed to save image.")
|
|||
|
|
else:
|
|||
|
|
print("Error: Failed to capture any valid frame.")
|
|||
|
|
|
|||
|
|
# 4. 关闭相机
|
|||
|
|
cam_obj._close()
|
|||
|
|
print("Camera closed.")
|