import sys import os # import cv2 import time import numpy as np from ctypes import * from .MvImport.MvCameraControl_class import * class Camera: def __init__(self): """ 初始化相机类 """ self.cam = MvCamera() self.device_list = MV_CC_DEVICE_INFO_LIST() self.handle_created = False self.device_opened = False self.is_grabbing = False # 统计信息 self.frame_count = 0 self.start_time = 0 # 缓冲区变量 (将在打开设备后初始化) self.data_buf = None self.payload_size = 0 self.frame_info = MV_FRAME_OUT_INFO_EX() def _open(self): """ 枚举并打开相机 :return: bool, 成功返回 True,失败返回 False """ # 1. 枚举设备 ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, self.device_list) if ret != 0 or self.device_list.nDeviceNum == 0: print("[Error] No device found!") return False print(f"[Info] Found {self.device_list.nDeviceNum} device(s). Using the first one.") # 2. 选择第一个设备并创建句柄 st_device = cast(self.device_list.pDeviceInfo[0], POINTER(MV_CC_DEVICE_INFO)).contents ret = self.cam.MV_CC_CreateHandle(st_device) if ret != 0: print(f"[Error] Create handle failed, ret={ret}") return False self.handle_created = True # 3. 打开设备 ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: print(f"[Error] Open device failed, ret={ret}") self._close() return False self.device_opened = True # 4. 配置采集模式 (连续采集) self.cam.MV_CC_SetEnumValue("AcquisitionMode", 2) self.cam.MV_CC_SetEnumValue("TriggerMode", 0) # 5. 开始取流 ret = self.cam.MV_CC_StartGrabbing() if ret != 0: print(f"[Error] Start grabbing failed, ret={ret}") self._close() return False self.is_grabbing = True # 6. 获取 PayloadSize 并分配缓冲区 st_payload = MVCC_INTVALUE_EX() ret = self.cam.MV_CC_GetIntValueEx("PayloadSize", st_payload) if ret == 0: self.payload_size = st_payload.nCurValue self.data_buf = (c_ubyte * self.payload_size)() print(f"[Info] Camera opened successfully. PayloadSize: {self.payload_size}") else: print("[Error] Failed to get PayloadSize") self._close() return False return True def _close(self): """ 停止采集并关闭相机资源 """ if self.is_grabbing: self.cam.MV_CC_StopGrabbing() self.is_grabbing = False if self.device_opened: self.cam.MV_CC_CloseDevice() self.device_opened = False if self.handle_created: self.cam.MV_CC_DestroyHandle() self.handle_created = False print("[Info] Camera resources released.") def get_img(self, timeout_ms=1000): """ 获取一帧图像并进行像素转换 :param timeout_ms: 获取帧的超时时间 (毫秒) :return: numpy array (BGR格式) 或 None (如果获取失败) """ if not self.is_grabbing or self.data_buf is None: return None ret = self.cam.MV_CC_GetOneFrameTimeout(self.data_buf, self.payload_size, self.frame_info, timeout_ms) if ret != 0: return None # 更新统计 self.frame_count += 1 if self.frame_count == 1: self.start_time = time.time() w, h = self.frame_info.nWidth, self.frame_info.nHeight # 像素转换参数设置 convert = MV_CC_PIXEL_CONVERT_PARAM() memset(byref(convert), 0, sizeof(convert)) convert.nWidth = w convert.nHeight = h convert.pSrcData = self.data_buf convert.nSrcDataLen = self.frame_info.nFrameLen convert.enSrcPixelType = self.frame_info.enPixelType convert.enDstPixelType = PixelType_Gvsp_BGR8_Packed convert.nDstBufferSize = w * h * 3 dst = (c_ubyte * convert.nDstBufferSize)() convert.pDstBuffer = dst ret_conv = self.cam.MV_CC_ConvertPixelType(convert) if ret_conv != 0: return None # 转换为 numpy 数组 img = np.frombuffer(dst, dtype=np.uint8).reshape(h, w, 3) return img def get_fps_stats(self): """获取当前的 FPS 统计信息,便于观察打印取流fps""" if self.frame_count == 0 or self.start_time == 0: return 0.0, 0, 0.0 total_time = time.time() - self.start_time fps = self.frame_count / total_time if total_time > 0 else 0 return fps, self.frame_count, total_time # ---------对外接口--------- def show_img(): try: # 1. 实例化相机 cam_obj = Camera() # 2. 非正常打开报警 if not cam_obj._open(): print("Failed to open camera. Exiting.") sys.exit(1) print("Capturing one frame...") while(True): # 3. 获取图像 img = cam_obj.get_img() return img except Exception as e: print(f"Error: {e}") finally: # 4. 关闭相机 cam_obj._close() print("Camera closed.") # if img is not None: # # 生成带时间戳的文件名,防止覆盖 # timestamp = time.strftime("%Y%m%d_%H%M%S") # filename = f"capture_{timestamp}.jpg" # # 保存图像 # # ret = cv2.imwrite(filename, img) # # if ret: # # print(f"Success! Image saved as: {filename}") # # else: # # print("Error: Failed to save image.") # else: # print("Error: Failed to capture any valid frame.") # ===================================================== # 主程序示例 # ===================================================== if __name__ == "__main__": # 1. 实例化相机 cam_obj = Camera() # 2. 非正常打开报警 if not cam_obj._open(): print("Failed to open camera. Exiting.") sys.exit(1) print("Capturing one frame...") # 3. 获取图像 img = cam_obj.get_img() if img is not None: # 生成带时间戳的文件名,防止覆盖 timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"capture_{timestamp}.jpg" # 保存图像 # ret = cv2.imwrite(filename, img) # if ret: # print(f"Success! Image saved as: {filename}") # else: # print("Error: Failed to save image.") else: print("Error: Failed to capture any valid frame.") # 4. 关闭相机 cam_obj._close() print("Camera closed.")