first commit

This commit is contained in:
琉璃月光
2025-12-16 15:12:02 +08:00
commit 8506c3af79
227 changed files with 4060 additions and 0 deletions

View File

@ -0,0 +1,63 @@
# camera_capture.py
import os
from pypylon import pylon
import cv2
from datetime import datetime
def search_get_device():
"""搜索并返回 Basler GigE 相机对象"""
tl_factory = pylon.TlFactory.GetInstance()
for dev_info in tl_factory.EnumerateDevices():
if dev_info.GetDeviceClass() == 'BaslerGigE':
print(f"Found Basler GigE Camera: {dev_info.GetModelName()} IP: {dev_info.GetIpAddress()}")
return pylon.InstantCamera(tl_factory.CreateDevice(dev_info))
raise EnvironmentError("没有找到 Basler GigE 相机")
def grab_image(save=True):
"""
采集一张图像并返回 numpy ndarrayBGR
save=True 时自动保存 PNG 文件到 camera/ 文件夹
"""
# 创建保存目录
save_dir = "camera"
os.makedirs(save_dir, exist_ok=True)
cam = search_get_device()
cam.Open()
# 设置图像转换器BGR8 → OpenCV格式
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
cam.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
grabResult = cam.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
image = converter.Convert(grabResult)
img = image.GetArray() # OpenCV 可处理的 ndarray
print("成功采集图像:", img.shape)
# 保存 PNG
if save:
filename = os.path.join(save_dir, f"Image_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
cv2.imwrite(filename, img)
print("已保存:", filename)
else:
print("采集失败:", grabResult.ErrorCode, grabResult.ErrorDescription)
img = None
grabResult.Release()
cam.Close()
return img
if __name__ == "__main__":
img = grab_image(save=True)
if img is not None:
print("图像类型:", type(img))
print("图像尺寸:", img.shape)

View File

@ -0,0 +1,108 @@
import threading
import queue
import cv2
from pypylon import pylon
from datetime import datetime
import os
# =================== 配置 ===================
SAVE_DIR = "camera/continuous"
SAVE_EVERY_N_FRAMES = 1 # 每N帧保存一张None表示不保存
SHOW = True
QUEUE_MAXSIZE = 50
# =================== 相机搜索 ===================
def search_get_device():
"""搜索并返回 Basler GigE 相机对象"""
tl_factory = pylon.TlFactory.GetInstance()
for dev_info in tl_factory.EnumerateDevices():
if dev_info.GetDeviceClass() == 'BaslerGigE':
print(f"Found Basler GigE Camera: {dev_info.GetModelName()} IP: {dev_info.GetIpAddress()}")
return pylon.InstantCamera(tl_factory.CreateDevice(dev_info))
raise EnvironmentError("没有找到 Basler GigE 相机")
# =================== 异步处理线程 ===================
frame_queue = queue.Queue(maxsize=QUEUE_MAXSIZE)
exit_event = threading.Event()
def worker(save_dir, save_every_n_frames, show):
frame_count = 0
while not exit_event.is_set():
try:
item = frame_queue.get(timeout=0.1) # 队列空时短暂等待
except queue.Empty:
continue
if item is None:
break
img, frame_num = item
frame_count += 1
# 显示图像
if show:
cv2.imshow("Basler Live Feed", img)
if cv2.waitKey(1) & 0xFF in (ord('q'), 27):
exit_event.set()
break
# 保存图像
if save_every_n_frames and frame_num % save_every_n_frames == 0:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = os.path.join(save_dir, f"frame_{frame_num:06d}_{timestamp}.png")
cv2.imwrite(filename, img)
print(f"[Saved] {filename}")
if show:
cv2.destroyAllWindows()
# =================== 主采集函数 ===================
def continuous_grab(save_dir=SAVE_DIR, show=SHOW, save_every_n_frames=SAVE_EVERY_N_FRAMES):
os.makedirs(save_dir, exist_ok=True)
cam = search_get_device()
cam.Open()
# Load the User Set 1 user set
cam.UserSetSelector.Value = "UserSet1"
cam.UserSetLoad.Execute()
cam.MaxNumBuffer = 30
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
# 启动异步处理线程
t = threading.Thread(target=worker, args=(save_dir, save_every_n_frames, show), daemon=True)
t.start()
cam.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
frame_count = 0
try:
while cam.IsGrabbing() and not exit_event.is_set():
grab_result = cam.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grab_result.GrabSucceeded():
image = converter.Convert(grab_result)
img = image.GetArray()
frame_count += 1
# 拷贝一份到队列
if not frame_queue.full():
frame_queue.put((img.copy(), frame_count))
grab_result.Release()
except KeyboardInterrupt:
print("被 Ctrl+C 中断")
cam.StopGrabbing()
cam.Close()
exit_event.set()
finally:
cam.StopGrabbing()
cam.Close()
# 通知线程退出
frame_queue.put(None)
t.join()
print(f"共采集 {frame_count}")
# =================== 主入口 ===================
if __name__ == "__main__":
continuous_grab()