#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2024/10/11 10:43 # @Author : hjw # @File : CameraHIK.py ''' import cv2 import socket def portisopen(ip, port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) state = sock.connect_ex((ip, port)) if 0 == state: # print("port is open") return True else: # print("port is closed") return False class camera_HIK(): def __init__(self, ip, port, name, pw): # "rtsp://admin:zlzk.123@192.168.1.64:554" ret = portisopen(ip, port) self.camera_url = "rtsp://" + str(name) + ":" + str(pw) + "@" + str(ip) + ":" + str(port) self.ip = ip self.port = port self.init_success = False if ret: self.cap = cv2.VideoCapture(self.camera_url) self.init_success = True else: print('海康摄像头网络错误,请检测IP') def get_img(self): ret = False frame = None if self.init_success==True: if portisopen(self.ip, self.port): # ret, frame = self.cap.read() # if ret == False: self.reconnect_camera() ret, frame = self.cap.read() else: print('海康摄像头网络断开') else: if portisopen(self.ip, self.port): self.reconnect_camera() ret, frame = self.cap.read() # if ret == False: # self.init_success =False # else: # self.init_success = True else: print('海康摄像头网络断开') return ret, frame def reconnect_camera(self): if self.init_success == True: self.cap.release() self.cap = cv2.VideoCapture(self.camera_url) ret, _ = self.cap.read() if ret: self.init_success = True else: self.init_success = False print("海康摄像头重连") def release_camera(self): self.cap.release()