料袋末尾检测
This commit is contained in:
160
Mv3D/CameraImg.py
Normal file
160
Mv3D/CameraImg.py
Normal file
@ -0,0 +1,160 @@
|
||||
from tokenize import String
|
||||
from typing import Optional
|
||||
import numpy as np
|
||||
from PySide6.QtCore import Signal, QObject
|
||||
import threading
|
||||
import ctypes
|
||||
import time
|
||||
import cv2
|
||||
import os
|
||||
import datetime
|
||||
from .Mv3dRgbdImport.Mv3dRgbdApi import *
|
||||
from .Mv3dRgbdImport.Mv3dRgbdDefine import *
|
||||
from .Mv3dRgbdImport.Mv3dRgbdDefine import DeviceType_Ethernet, DeviceType_USB, MV3D_RGBD_FLOAT_EXPOSURETIME, \
|
||||
ParamType_Enum, ParamType_Int, CoordinateType_Depth, MV3D_RGBD_FLOAT_Z_UNIT, MV3D_RGBD_OK, \
|
||||
FileType_BMP,ImageType_Depth, ImageType_RGB8_Planar, ImageType_YUV420SP_NV12 ,ImageType_YUV420SP_NV21 , ImageType_YUV422, ImageType_Mono8
|
||||
|
||||
|
||||
class CameraImg(QObject):
|
||||
# camera_image_finish = Signal(str) # 原有信号,传递文件路径
|
||||
# stream_image_data = Signal(np.ndarray) # 新信号,直接传递图像数据
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.camera:Optional[Mv3dRgbd]=None
|
||||
self.running=False
|
||||
self.fetchimg=False
|
||||
# self._feeding=feeding
|
||||
# 连接信号到启动保存线程的方法,而不是直接连接到save_frame_path
|
||||
# self._feeding.take_photo_sigal.connect(self._set_fetch)
|
||||
pass
|
||||
|
||||
def _set_fetch(self):
|
||||
"""设置保存图片
|
||||
Args:
|
||||
value (bool): 暂停状态
|
||||
"""
|
||||
self.fetchimg=True
|
||||
|
||||
def _camera_info(self)->Optional[MV3D_RGBD_DEVICE_INFO_LIST]:
|
||||
"""获取相机信息
|
||||
Returns:
|
||||
Optional[MV3D_RGBD_DEVICE_INFO_LIST]: 相机信息列表
|
||||
"""
|
||||
_nDeviceNum=ctypes.c_uint(0)
|
||||
_nDeviceNum_p=byref(_nDeviceNum)
|
||||
# ch:获取设备数量 | en:Get device number
|
||||
ret=Mv3dRgbd.MV3D_RGBD_GetDeviceNumber(DeviceType_Ethernet | DeviceType_USB, _nDeviceNum_p)
|
||||
if ret!=0:
|
||||
return None
|
||||
if _nDeviceNum==0:
|
||||
print("find no device!")
|
||||
return None
|
||||
stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST()
|
||||
ret = Mv3dRgbd.MV3D_RGBD_GetDeviceList(DeviceType_Ethernet | DeviceType_USB, pointer(stDeviceList.DeviceInfo[0]), 2, _nDeviceNum_p)
|
||||
if ret != 0:
|
||||
return None
|
||||
return stDeviceList
|
||||
|
||||
def open_camera(self)->bool:
|
||||
"""打开相机
|
||||
Returns:
|
||||
bool: 是否成功
|
||||
"""
|
||||
if self.camera is not None:
|
||||
return True
|
||||
_stDeviceInfo=self._camera_info()
|
||||
if _stDeviceInfo is None:
|
||||
return False
|
||||
_stDeviceInfo=_stDeviceInfo.DeviceInfo[0]
|
||||
# ch:创建相机示例 | en:Create a camera instance
|
||||
camera=Mv3dRgbd()
|
||||
# ch:打开设备 | en:Open device
|
||||
ret=camera.MV3D_RGBD_OpenDevice(pointer(_stDeviceInfo))
|
||||
if ret!=0:
|
||||
print ("MV3D_RGBD_OpenDevice fail! ret[0x%x]" % ret)
|
||||
return False
|
||||
|
||||
# ch:开始取流 | en:Start grabbing
|
||||
ret=camera.MV3D_RGBD_Start()
|
||||
if ret != 0:
|
||||
print ("start fail! ret[0x%x]" % ret)
|
||||
camera.MV3D_RGBD_CloseDevice()
|
||||
return False
|
||||
self.camera=camera
|
||||
return True
|
||||
|
||||
def close_camera(self):
|
||||
"""关闭相机
|
||||
"""
|
||||
if self.camera is not None:
|
||||
self.camera.MV3D_RGBD_CloseDevice()
|
||||
self.camera=None
|
||||
|
||||
def save_frame_path(self):
|
||||
"""保存当前帧并返回路径(内部实现,由线程调用)
|
||||
Returns:
|
||||
Optional[str]: 保存的文件名
|
||||
"""
|
||||
try:
|
||||
if self.camera is None:
|
||||
#打开相机
|
||||
if not self.open_camera():
|
||||
return None
|
||||
stFrameData=MV3D_RGBD_FRAME_DATA()
|
||||
# ch:获取图像数据 | en:Get image data
|
||||
ret=self.camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000)
|
||||
_now=datetime.datetime.now()
|
||||
# 创建日期格式文件夹
|
||||
_current_date = _now.strftime('%Y-%m-%d')
|
||||
if not os.path.exists(_current_date):
|
||||
os.makedirs(_current_date)
|
||||
|
||||
if MV3D_RGBD_OK == ret:
|
||||
print("MV3D_RGBD_FetchFrame success.")
|
||||
for i in range(0, stFrameData.nImageCount):
|
||||
if ImageType_RGB8_Planar == stFrameData.stImageData[i].enImageType or ImageType_YUV420SP_NV12 == stFrameData.stImageData[i].enImageType or ImageType_YUV420SP_NV21 == stFrameData.stImageData[i].enImageType or ImageType_YUV422 == stFrameData.stImageData[i].enImageType:
|
||||
try:
|
||||
_current_time = _now.strftime('%H%M%S')
|
||||
_chColorFileName = os.path.join(_current_date, f"{_current_time}_1")
|
||||
ret = self.camera.MV3D_RGBD_SaveImage(pointer(stFrameData.stImageData[i]), FileType_BMP, _chColorFileName)
|
||||
|
||||
if MV3D_RGBD_OK == ret:
|
||||
print(f"Save color image success: {_chColorFileName}")
|
||||
# self.camera_image_finish.emit(f"{_chColorFileName}.bmp")
|
||||
return _chColorFileName
|
||||
else:
|
||||
print("保存图片失败.")
|
||||
except Exception as e:
|
||||
print(f"保存图片异常: {e}")
|
||||
return None
|
||||
else:
|
||||
print("相机拍照失败!")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"保存图片异常: {e}")
|
||||
return None
|
||||
|
||||
def stop(self):
|
||||
self.running=False
|
||||
self.fetchimg=False
|
||||
|
||||
# def run(self):
|
||||
# """线程运行方法,用于保存当前帧
|
||||
# """
|
||||
# while self.running:
|
||||
# if self.fetchimg:
|
||||
# self.fetchimg=False
|
||||
# self.update_camera_image.emit(r"Image\032446.bmp")
|
||||
# # self.save_frame_path()
|
||||
# time.sleep(5)
|
||||
|
||||
def __del__(self):
|
||||
"""关闭
|
||||
"""
|
||||
self.close_camera()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
camera_img=CameraImg()
|
||||
camera_img.save_frame_path()
|
||||
Reference in New Issue
Block a user