Files
general-system-framework/wndMain/camera_img_get.py

227 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
# import cv2
import time
import numpy as np
from ctypes import *
from .MvImport.MvCameraControl_class import *
class Camera:
def __init__(self):
"""
初始化相机类
"""
self.cam = MvCamera()
self.device_list = MV_CC_DEVICE_INFO_LIST()
self.handle_created = False
self.device_opened = False
self.is_grabbing = False
# 统计信息
self.frame_count = 0
self.start_time = 0
# 缓冲区变量 (将在打开设备后初始化)
self.data_buf = None
self.payload_size = 0
self.frame_info = MV_FRAME_OUT_INFO_EX()
def _open(self):
"""
枚举并打开相机
:return: bool, 成功返回 True失败返回 False
"""
# 1. 枚举设备
ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, self.device_list)
if ret != 0 or self.device_list.nDeviceNum == 0:
print("[Error] No device found!")
return False
print(f"[Info] Found {self.device_list.nDeviceNum} device(s). Using the first one.")
# 2. 选择第一个设备并创建句柄
st_device = cast(self.device_list.pDeviceInfo[0], POINTER(MV_CC_DEVICE_INFO)).contents
ret = self.cam.MV_CC_CreateHandle(st_device)
if ret != 0:
print(f"[Error] Create handle failed, ret={ret}")
return False
self.handle_created = True
# 3. 打开设备
ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
if ret != 0:
print(f"[Error] Open device failed, ret={ret}")
self._close()
return False
self.device_opened = True
# 4. 配置采集模式 (连续采集)
self.cam.MV_CC_SetEnumValue("AcquisitionMode", 2)
self.cam.MV_CC_SetEnumValue("TriggerMode", 0)
# 5. 开始取流
ret = self.cam.MV_CC_StartGrabbing()
if ret != 0:
print(f"[Error] Start grabbing failed, ret={ret}")
self._close()
return False
self.is_grabbing = True
# 6. 获取 PayloadSize 并分配缓冲区
st_payload = MVCC_INTVALUE_EX()
ret = self.cam.MV_CC_GetIntValueEx("PayloadSize", st_payload)
if ret == 0:
self.payload_size = st_payload.nCurValue
self.data_buf = (c_ubyte * self.payload_size)()
print(f"[Info] Camera opened successfully. PayloadSize: {self.payload_size}")
else:
print("[Error] Failed to get PayloadSize")
self._close()
return False
return True
def _close(self):
"""
停止采集并关闭相机资源
"""
if self.is_grabbing:
self.cam.MV_CC_StopGrabbing()
self.is_grabbing = False
if self.device_opened:
self.cam.MV_CC_CloseDevice()
self.device_opened = False
if self.handle_created:
self.cam.MV_CC_DestroyHandle()
self.handle_created = False
print("[Info] Camera resources released.")
def get_img(self, timeout_ms=1000):
"""
获取一帧图像并进行像素转换
:param timeout_ms: 获取帧的超时时间 (毫秒)
:return: numpy array (BGR格式) 或 None (如果获取失败)
"""
if not self.is_grabbing or self.data_buf is None:
return None
ret = self.cam.MV_CC_GetOneFrameTimeout(self.data_buf, self.payload_size, self.frame_info, timeout_ms)
if ret != 0:
return None
# 更新统计
self.frame_count += 1
if self.frame_count == 1:
self.start_time = time.time()
w, h = self.frame_info.nWidth, self.frame_info.nHeight
# 像素转换参数设置
convert = MV_CC_PIXEL_CONVERT_PARAM()
memset(byref(convert), 0, sizeof(convert))
convert.nWidth = w
convert.nHeight = h
convert.pSrcData = self.data_buf
convert.nSrcDataLen = self.frame_info.nFrameLen
convert.enSrcPixelType = self.frame_info.enPixelType
convert.enDstPixelType = PixelType_Gvsp_BGR8_Packed
convert.nDstBufferSize = w * h * 3
dst = (c_ubyte * convert.nDstBufferSize)()
convert.pDstBuffer = dst
ret_conv = self.cam.MV_CC_ConvertPixelType(convert)
if ret_conv != 0:
return None
# 转换为 numpy 数组
img = np.frombuffer(dst, dtype=np.uint8).reshape(h, w, 3)
return img
def get_fps_stats(self):
"""获取当前的 FPS 统计信息便于观察打印取流fps"""
if self.frame_count == 0 or self.start_time == 0:
return 0.0, 0, 0.0
total_time = time.time() - self.start_time
fps = self.frame_count / total_time if total_time > 0 else 0
return fps, self.frame_count, total_time
# ---------对外接口---------
def show_img():
try:
# 1. 实例化相机
cam_obj = Camera()
# 2. 非正常打开报警
if not cam_obj._open():
print("Failed to open camera. Exiting.")
sys.exit(1)
print("Capturing one frame...")
while(True):
# 3. 获取图像
img = cam_obj.get_img()
return img
except Exception as e:
print(f"Error: {e}")
finally:
# 4. 关闭相机
cam_obj._close()
print("Camera closed.")
# if img is not None:
# # 生成带时间戳的文件名,防止覆盖
# timestamp = time.strftime("%Y%m%d_%H%M%S")
# filename = f"capture_{timestamp}.jpg"
# # 保存图像
# # ret = cv2.imwrite(filename, img)
# # if ret:
# # print(f"Success! Image saved as: {filename}")
# # else:
# # print("Error: Failed to save image.")
# else:
# print("Error: Failed to capture any valid frame.")
# =====================================================
# 主程序示例
# =====================================================
if __name__ == "__main__":
# 1. 实例化相机
cam_obj = Camera()
# 2. 非正常打开报警
if not cam_obj._open():
print("Failed to open camera. Exiting.")
sys.exit(1)
print("Capturing one frame...")
# 3. 获取图像
img = cam_obj.get_img()
if img is not None:
# 生成带时间戳的文件名,防止覆盖
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"capture_{timestamp}.jpg"
# 保存图像
# ret = cv2.imwrite(filename, img)
# if ret:
# print(f"Success! Image saved as: {filename}")
# else:
# print("Error: Failed to save image.")
else:
print("Error: Failed to capture any valid frame.")
# 4. 关闭相机
cam_obj._close()
print("Camera closed.")