Files
琉璃月光 8506c3af79 first commit
2025-12-16 15:12:02 +08:00

109 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import queue
import cv2
from pypylon import pylon
from datetime import datetime
import os
# =================== 配置 ===================
SAVE_DIR = "camera/continuous"
SAVE_EVERY_N_FRAMES = 1 # 每N帧保存一张None表示不保存
SHOW = True
QUEUE_MAXSIZE = 50
# =================== 相机搜索 ===================
def search_get_device():
"""搜索并返回 Basler GigE 相机对象"""
tl_factory = pylon.TlFactory.GetInstance()
for dev_info in tl_factory.EnumerateDevices():
if dev_info.GetDeviceClass() == 'BaslerGigE':
print(f"Found Basler GigE Camera: {dev_info.GetModelName()} IP: {dev_info.GetIpAddress()}")
return pylon.InstantCamera(tl_factory.CreateDevice(dev_info))
raise EnvironmentError("没有找到 Basler GigE 相机")
# =================== 异步处理线程 ===================
frame_queue = queue.Queue(maxsize=QUEUE_MAXSIZE)
exit_event = threading.Event()
def worker(save_dir, save_every_n_frames, show):
frame_count = 0
while not exit_event.is_set():
try:
item = frame_queue.get(timeout=0.1) # 队列空时短暂等待
except queue.Empty:
continue
if item is None:
break
img, frame_num = item
frame_count += 1
# 显示图像
if show:
cv2.imshow("Basler Live Feed", img)
if cv2.waitKey(1) & 0xFF in (ord('q'), 27):
exit_event.set()
break
# 保存图像
if save_every_n_frames and frame_num % save_every_n_frames == 0:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = os.path.join(save_dir, f"frame_{frame_num:06d}_{timestamp}.png")
cv2.imwrite(filename, img)
print(f"[Saved] {filename}")
if show:
cv2.destroyAllWindows()
# =================== 主采集函数 ===================
def continuous_grab(save_dir=SAVE_DIR, show=SHOW, save_every_n_frames=SAVE_EVERY_N_FRAMES):
os.makedirs(save_dir, exist_ok=True)
cam = search_get_device()
cam.Open()
# Load the User Set 1 user set
cam.UserSetSelector.Value = "UserSet1"
cam.UserSetLoad.Execute()
cam.MaxNumBuffer = 30
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
# 启动异步处理线程
t = threading.Thread(target=worker, args=(save_dir, save_every_n_frames, show), daemon=True)
t.start()
cam.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
frame_count = 0
try:
while cam.IsGrabbing() and not exit_event.is_set():
grab_result = cam.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
image = converter.Convert(grab_result)
img = image.GetArray()
frame_count += 1
# 拷贝一份到队列
if not frame_queue.full():
frame_queue.put((img.copy(), frame_count))
grab_result.Release()
except KeyboardInterrupt:
print("被 Ctrl+C 中断")
cam.StopGrabbing()
cam.Close()
exit_event.set()
finally:
cam.StopGrabbing()
cam.Close()
# 通知线程退出
frame_queue.put(None)
t.join()
print(f"共采集 {frame_count}")
# =================== 主入口 ===================
if __name__ == "__main__":
continuous_grab()