import sys import os import cv2 import time import queue import threading import numpy as np from ctypes import * # ================= MVS SDK ================= # 位置需要根据所在位置替换 sys.path.append("/opt/MVS/Samples/64/Python/MvImport") from MvCameraControl_class import * SAVE_DIR = "./capture" os.makedirs(SAVE_DIR, exist_ok=True) SAVE_QUEUE_SIZE = 200 # 队列缓存帧数 JPEG_QUALITY = 95 frame_queue = queue.Queue(maxsize=SAVE_QUEUE_SIZE) exit_flag = False # ===================================================== # 保存线程 # ===================================================== def save_worker(): while not exit_flag or not frame_queue.empty(): try: img = frame_queue.get(timeout=0.1) except queue.Empty: continue # ===== 本地时间(精确到毫秒)===== now = time.time() local_time = time.localtime(now) ms = int((now - int(now)) * 1000) timestamp = time.strftime("%Y%m%d_%H%M%S", local_time) filename = f"{timestamp}_{ms:03d}.jpg" path = os.path.join(SAVE_DIR, filename) # =============================== cv2.imwrite(path, img, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]) # ===================================================== def main(): global exit_flag # ---------- 枚举设备 ---------- deviceList = MV_CC_DEVICE_INFO_LIST() ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, deviceList) if ret != 0 or deviceList.nDeviceNum == 0: print("No device") return cam = MvCamera() stDevice = cast(deviceList.pDeviceInfo[0], POINTER(MV_CC_DEVICE_INFO)).contents cam.MV_CC_CreateHandle(stDevice) cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) # 连续采集 cam.MV_CC_SetEnumValue("AcquisitionMode", 2) cam.MV_CC_SetEnumValue("TriggerMode", 0) cam.MV_CC_StartGrabbing() stPayload = MVCC_INTVALUE_EX() cam.MV_CC_GetIntValueEx("PayloadSize", stPayload) payload = stPayload.nCurValue data_buf = (c_ubyte * payload)() frame_info = MV_FRAME_OUT_INFO_EX() # 启动保存线程 saver = threading.Thread(target=save_worker, daemon=True) saver.start() print("Grabbing... press q to exit") # ================= FPS 统计 ================= frame_count = 0 start_time = time.time() # =================================================== while True: ret = cam.MV_CC_GetOneFrameTimeout(data_buf, payload, frame_info, 1000) if ret != 0: continue frame_count += 1 # <<< 统计成功采集帧数 w, h = frame_info.nWidth, frame_info.nHeight # 像素转换 convert = MV_CC_PIXEL_CONVERT_PARAM() memset(byref(convert), 0, sizeof(convert)) convert.nWidth = w convert.nHeight = h convert.pSrcData = data_buf convert.nSrcDataLen = frame_info.nFrameLen convert.enSrcPixelType = frame_info.enPixelType convert.enDstPixelType = PixelType_Gvsp_BGR8_Packed convert.nDstBufferSize = w * h * 3 dst = (c_ubyte * convert.nDstBufferSize)() convert.pDstBuffer = dst cam.MV_CC_ConvertPixelType(convert) img = np.frombuffer(dst, dtype=np.uint8).reshape(h, w, 3) # 显示 cv2.imshow("Camera", cv2.resize(img, (640, 480))) # 入队 if not frame_queue.full(): frame_queue.put(img.copy()) else: print("queue full, drop frame") if cv2.waitKey(1) & 0xFF == ord('q'): break # ================= FPS 输出 ================= total_time = time.time() - start_time fps = frame_count / total_time if total_time > 0 else 0 print(f"\n===== Capture FPS Stats =====") print(f"Frames captured : {frame_count}") print(f"Total time (s) : {total_time:.3f}") print(f"Average FPS : {fps:.2f}") print(f"=============================\n") # =================================================== # =================== 正确退出 =================== exit_flag = True saver.join() cam.MV_CC_StopGrabbing() cam.MV_CC_CloseDevice() cam.MV_CC_DestroyHandle() cv2.destroyAllWindows() print("Exit clean") # ===================================================== if __name__ == "__main__": main()