添加中转点和破袋点

This commit is contained in:
hjw
2024-09-12 15:06:14 +08:00
parent 05f468f7c9
commit fa61a7248a
49 changed files with 27660 additions and 6 deletions

3
AutoControlSystem/.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
AutoControlSystem/.idea/misc.xml generated Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (torch_v8)" project-jdk-type="Python SDK" />
</project>

8
AutoControlSystem/.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/AutoControlSystem-master.iml" filepath="$PROJECT_DIR$/.idea/AutoControlSystem-master.iml" />
</modules>
</component>
</project>

6
AutoControlSystem/.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@ -0,0 +1,69 @@
from COM.COM_TCP import TCPClient
import queue
import json
from Model.RobotModel import DataAddress,DATARequest,DATAReply
class RobotClient(TCPClient):
def __init__(self, ip, port, command_quene, status_model:DataAddress):
super().__init__(ip, port)
self.command_quene = command_quene
self.status_model = status_model
self.errorCommands = {}
def add_sendQuene(self,command): #后面 命令分等级,紧急命令直接执行
self.command_quene.put(command)
return
def send_Command(self):
try:
if self.command_quene.qsize()!=0:
command = self.command_quene.get()
self.client_socket.send(command.encode())
if False:
response = self.client_socket.recv(1024).decode('utf-8')
response_message = json.loads(response)
if True:
print('正确相应')
else:
self.errorCommands[json.dumps(command).encode('utf-8')] = response_message
print('出错')
return True
else:
return True
except Exception as e:
print('连接断开')
return False
def send_Status(self):
request = DATARequest()
dataAddr = DataAddress()
request.queryAddr.append(dataAddr)
# 移除特殊属性和方法
request_status_json = request.toString()
# 转字符串
try:
self.client_socket.send(request_status_json.encode())
if True:
response = self.client_socket.recv(1024).decode('utf-8')
response_message = json.loads(response)
if True:
data_status = DATAReply()
data_status.__dict__ = response_message
data_address_array = data_status.queryData
data_Address = DataAddress()
data_Address.setPosition(*data_address_array)
self.status_model.setPosition(*data_address_array)
return True
except Exception as e:
print(e.with_traceback())
return False

View File

@ -0,0 +1,58 @@
import json
import socket
import threading
import time
class TCPClient:
def __init__(self, ip, port):
self.error_count=0
self.IPAddress = ip
self.port = port
self.connected = False
def CreatConnect(self):
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.settimeout(5)
self.client_socket.connect((self.IPAddress, self.port))
def is_Connect(self):
try:
self.client_socket.send(b'')
return True
except OSError:
return False
def run(self):
while True:
time.sleep(0.2)
self.connected = (self.error_count == 0)
try:
if (self.send_Status() and self.send_Command()):
self.error_count = 0
except Exception as e:
self.error_count += 1
if self.error_count> 5:
print("Error: TCPClient is not connected")
try:
self.CreatConnect()
except OSError as e1:
if e1.errno == 10056:
self.client_socket.close()
print("Error: TCPClient is not connected")
except Exception as e2:
print(e2)
def send_Command(self):
return False
def send_Status(self):
return False

View File

@ -0,0 +1,16 @@
from enum import Enum
class Status(Enum):
Prepareing = 0
Runing = 1
End = 2
class Command:
def __init__(self):
self.status = Status.Prepareing
pass
class FeedCommand(Command):
def __init__(self, feedingConfig):
super().__init__()
self.feed_config = feedingConfig

View File

@ -0,0 +1,155 @@
import time
import Constant
from Model.Position import Real_Position, Detection_Position
from enum import Enum
from COM.COM_Robot import RobotClient
from Model.RobotModel import CMDInstructRequest, MoveType
from Vision.camera_coordinate_dete import Detection
class FeedStatus(Enum):
FNone = 0
FStart = 1
FSafeP = 2
FPhoto = 3
FTake = 4
FSafeF = 5
FFeedP = 6
FBroken = 7
FFinished = 8
class FeedLine:
def __init__(self,name,safe_position:Real_Position,photo_position:Real_Position,mid_position:Real_Position,broken_position:Real_Position,feed_position:Real_Position):
self.safe_position = safe_position
self.photo_position = photo_position
self.feed_position = feed_position
self.mid_position = mid_position
self.broken_position = broken_position
self.take_position = None
self.name = name
class FeedingConfig:
def __init__(self, num:int, feedLine:FeedLine):
self.num = num
self.feedLine = feedLine
def get_line_info(self):
pass
class Feeding():
def __init__(self,robotClient:RobotClient,detection):
self.feedConfig = None
self.feedStatus = FeedStatus.FNone
self.robotClient = robotClient
self.detection = detection
pass
def run(self):
# 获取事件坐标
real_position = Real_Position()
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
# real_position.init_position(0,
# 0,
# 0,
# 0,
# 0,
# 0);
if self.feedConfig == None:
self.feedStatus = FeedStatus.FNone
elif self.feedConfig.num == 0:
self.feedStatus = FeedStatus.FNone
if self.feedStatus==FeedStatus.FNone:
pass
elif self.feedStatus==FeedStatus.FStart:
if self.feedConfig.num != 0:
self.feedStatus = FeedStatus.FSafeP
self.sendTargPosition(self.feedConfig.feedLine.safe_position)
#print(request_command)
pass
elif self.feedStatus==FeedStatus.FSafeP:
if self.feedConfig.feedLine.safe_position.compare(real_position):
self.feedStatus = FeedStatus.FPhoto
self.sendTargPosition(self.feedConfig.feedLine.photo_position)
elif self.feedStatus == FeedStatus.FPhoto:
if self.feedConfig.feedLine.photo_position.compare(real_position):
_, _, xyz, uvw = self.detection.get_position() #检测结果
if xyz!=None:
self.feedStatus = FeedStatus.FTake
#dp = Detection_Position().init_position(*xyz, *uvw)
from Trace.handeye_calibration import R_matrix,getPosition
rotation = R_matrix(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2-10,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
# 黄老师给我的xyz和法向量
target_position, noraml_base = getPosition(*xyz, *uvw,rotation)
self.feedConfig.feedLine.take_position = Real_Position().init_position(*target_position[:3],*noraml_base)
self.sendTargPosition(self.feedConfig.feedLine.take_position)
else:
print("打印日志,保存失败图像")
elif self.feedStatus == FeedStatus.FTake:
if self.feedConfig.feedLine.take_position != None and self.feedConfig.feedLine.take_position.compare(real_position):
self.feedStatus = FeedStatus.FSafeF
time.sleep(1)
self.sendTargPosition(self.feedConfig.feedLine.mid_position)
pass #打开吸嘴并返回
elif self.feedStatus == FeedStatus.FSafeF:
if self.feedConfig.feedLine.mid_position.compare(real_position):
self.feedStatus = FeedStatus.FFeedP
self.sendTargPosition(self.feedConfig.feedLine.feed_position)
pass #吸嘴开始
elif self.feedStatus==FeedStatus.FFeedP:
if self.feedConfig.feedLine.feed_position.compare(real_position):
self.feedStatus = FeedStatus.FBroken
self.sendTargPosition(self.feedConfig.feedLine.broken_position)
pass #破袋
elif self.feedStatus==FeedStatus.FBroken:
if self.feedConfig.feedLine.broken_position.compare(real_position):
self.feedConfig.num = self.feedConfig.num-1
if self.feedConfig.num == 0:
self.feedStatus=FeedStatus.FNone
self.sendTargPosition(self.feedConfig.feedLine.safe_position)
else:
self.feedStatus = FeedStatus.FSafeP
self.sendTargPosition(self.feedConfig.feedLine.safe_position)
pass
def sendTargPosition(self,real_position):
from Model.RobotModel import Instruction
position_instruction = Instruction()
position_instruction.speed = Constant.speed
position_instruction.m0 = real_position.X
position_instruction.m1 = real_position.Y
position_instruction.m2 = real_position.Z
position_instruction.m3 = real_position.U
position_instruction.m4 = real_position.V
position_instruction.m5 = real_position.W
position_instruction.action = MoveType.WORLD.value
instruction_command = CMDInstructRequest()
instruction_command.instructions.append(position_instruction)
request_command = instruction_command.toString()
self.robotClient.add_sendQuene(request_command)
pass

View File

@ -0,0 +1,54 @@
[FeedLine1]
name = 反应釜1
safeposition_x = 534.907898
safeposition_y = 5.525342
safeposition_z = 737.267822
safeposition_u = -2.196312
safeposition_v = -4.094595
safeposition_w = -5.294128
photoposition_x = 534.907898
photoposition_y = 5.525342
photoposition_z = 737.267822
photoposition_u = -2.196312
photoposition_v = -4.094595
photoposition_w = -5.294128
midposition_x = 638.432
midposition_y = 16.854
midposition_z = 406.721
midposition_u = -3.256
midposition_v = 7.334
midposition_w = -4.307
brokenposition_x = 357.269
brokenposition_y = 478.874
brokenposition_z = 209.979
brokenposition_u = -3.100
brokenposition_v = -3.198
brokenposition_w = 49.945
feedposition_x = 315.549
feedposition_y = 427.090
feedposition_z = 209.985
feedposition_u = -2.793
feedposition_v = -1.829
feedposition_w = 50.000
[FeedLine2]
name = 反应釜2
safeposition_x = 0.0
safeposition_y = 0.0
safeposition_z = 0.0
safeposition_u = 0.0
safeposition_v = 0.0
safeposition_w = 0.0
photoposition_x = 0.0
photoposition_y = 0.0
photoposition_z = 0.0
photoposition_u = 0.0
photoposition_v = 0.0
photoposition_w = 0.0
feedposition_x = 0.0
feedposition_y = 0.0
feedposition_z = 0.0
feedposition_u = 0.0
feedposition_v = 0.0
feedposition_w = 0.0

View File

@ -0,0 +1,8 @@
import os
position_accuracy = 0.05
manual_adjust_accuracy = 1
speed = 10
feedLine_set_section = 'FeedLine'
feedLine_set_file = f'.{os.sep}Config{os.sep}feedLine.ini'
set_ini = 'Seting.ini'

View File

@ -0,0 +1,14 @@
from enum import Enum
class Error_Code(Enum):
SYS_SUCCESS = 100
SYS_NETERROR = 104 #网络异常
SYS_NONEPoint = 105
class VisionError_Code(Enum):
CAMERA_SUCCESS = 200

Binary file not shown.

After

Width:  |  Height:  |  Size: 779 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1,48 @@
from Constant import position_accuracy
class Position:
def __init__(self):
self.X = 0.0
self.Y = 0.0
self.Z = 0.0
self.U = 0.0
self.V = 0.0
self.W = 0.0
def compare(self,position):
if self.X-position.X<position_accuracy and \
self.Y-position.Y<position_accuracy and \
self.Z - position.Z < position_accuracy and \
self.U - position.U < position_accuracy and \
self.V - position.V < position_accuracy and \
self.W - position.W < position_accuracy:
return True
else:
return False
pass
"""
摄像头识别位置和角度
"""
class Detection_Position(Position):
def __init__(self):
super().__init__()
def init_position(self,X,Y,Z,U,V,W):
self.X = X
self.Y = Y
self.Z = Z
self.U = U
self.V = V
self.W = W
class Real_Position(Position):
def __init__(self):
super().__init__()
def init_position(self, X, Y, Z, U, V, W):
self.X = X
self.Y = Y
self.Z = Z
self.U = U
self.V = V
self.W = W
return self

View File

@ -0,0 +1,169 @@
from enum import Enum
from Model.Position import Real_Position
class MoveType(Enum):
AXIS = 4
WORLD = 10
class DATARequest:
def __init__(self):
self.dsID = 'www.hc-system.com.RemoteMonitor'
self.reqType = 'query'
self.queryAddr = []
def toString(self):
model_str = '{'+f'"dsID":"{self.dsID}","reqType":"{self.reqType}","queryAddr":[' \
f'{self.queryAddr[0].toString()}]'+'}'
return model_str
class DataAddress:
def __init__(self):
self.version = ''
self.curMold = ''
self.counterList = ''
self.counter_n = ''
self.curMode = ''
self.boardIONum = ''
self.input_n = ''
self.output_n = ''
self.axisNum = '6'
self.axis_n = ''
self.world_0 = 0
self.world_1 = 0
self.world_2 = 0
self.world_3 = 0
self.world_4 = 0
self.world_5 = 0
self.axis_0 = 0
self.axis_1 = 0
self.axis_2 = 0
self.axis_3 = 0
self.axis_4 = 0
self.axis_5 = 0
self.curAlarm = ''
self.curCycle = ''
self.lastCycle = ''
self.machineName = ''
self.curTorque_n = ''
self.curSpeed_n = ''
self.curAccount = ''
self.origin = ''
self.moldList = ''
self.isMoving = ''
self.M_n = ''
#return
def setPosition(self,w0,w1,w2,w3,w4,w5,a0,a1,a2,a3,a4,a5):
self.world_0 = float(w0)
self.world_1 = float(w1)
self.world_2 = float(w2)
self.world_3 = float(w3)
self.world_4 = float(w4)
self.world_5 = float(w5)
self.axis_0 = float(a0)
self.axis_1 = float(a1)
self.axis_2 = float(a2)
self.axis_3 = float(a3)
self.axis_4 = float(a4)
self.axis_5 = float(a5)
def getRealPosition(self):
real_position = Real_Position().init_position(self.world_0,self.world_1,self.world_2,self.world_3,self.world_4,self.world_5)
return real_position
def setAngle(self,a0,a1,a2,a3,a4,a5):
pass
def toString(self):
model_str = f'"world-0","world-1","world-2","world-3","world-4","world-5","axis-0","axis-1","axis-2","axis-3","axis-4","axis-5"'
return model_str
class DATAReply:
def __init__(self):
self.dsID = ''
self.reqType = ''
self.queryData = []
def JsonToObject(self):
return
class CMDRequest:
def __init__(self):
self.dsID = 'www.hc-system.com.RemoteMonitor'
self.reqType = 'command'
self.cmdData = []
return
def toString(self):
model_str = '{'+f'"dsID":"{self.dsID}","reqType":"{self.reqType}","cmdData":'
if len(self.cmdData) != 0:
model_str = model_str+"["+','.join(f'"{item}"' for item in self.cmdData)+"]"+"}"
else:
model_str = model_str+"}"
return model_str
class CMDReply:
def __init__(self):
self.dsID = 'www.hc-system.com.RemoteMonitor'
self.reqType = 'command'
self.cmdData = []
return
class Instruction:
def __init__(self):
self.oneshot = 1
self.action = 4 #4 自由路径 10 姿势直线 17 姿势曲线
self.m0 = 0.0
self.m1 = 0.0
self.m2 = 0.0
self.m3 = 0.0
self.m4 = 0.0
self.m5 = 0.0
self.ckStatus = '0x3F'
self.speed = 10
self.delay = 1.0
self.smooth = 0
self.tool=2
def toString(self):
model_str = f'"oneshot":"{self.oneshot}","action":"{self.action}","m0":"{self.m0}","m1":"{self.m1}","m2":"{self.m2}",' \
f'"m3":"{self.m3}","m4":"{self.m4}","m5":"{self.m5}","ckStatus":"{self.ckStatus}","speed":"{self.speed}",' \
f'"delay":"{self.delay}","smooth":"{self.smooth}","tool":"{self.tool}"'
return model_str
class CMDInstructRequest:
def __init__(self):
self.dsID = 'www.hc-system.com.HCRemoteCommand'
self.reqType = "AddRCC"
self.emptyList = '1'
self.instructions = []
def toString(self):
model_str = '{'+f'"dsID":"{self.dsID}","reqType":"{self.reqType}","emptyList":"{self.emptyList}"'
if len(self.instructions) != 0:
model_str = model_str+',"instructions":'+"[{"+self.instructions[0].toString()+"}]"+"}"
else:
model_str = model_str+"}"
print(model_str)
return model_str
class CMDInstructReply:
def __init__(self):
self.dsID = 'www.hc-system.com.HCRemoteCommand'
self.reqType = 'command'
self.cmdReply = []
return

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,24 @@
[Main]
[Robot_Feed]
IPAddress=192.168.20.6
Port=502
j1_min=-150
j1_max=+150
j2_min=-150
j2_max=+150
j3_min=-150
j3_max=+150
j4_min=-150
j4_max=+150
j5_min=-150
j5_max=+150
j6_min=-150
j6_max=+150
LineCount=2
[Camera_Feed]
IPAddress=127.0.0.1

View File

@ -0,0 +1,42 @@
import sys
import time
from PySide6.QtCore import QThread, Signal, Slot
from PySide6.QtWidgets import QMainWindow, QApplication, QLineEdit
from ui_untitled import Ui_MainWindow
class WorkerThread(QThread):
# 定义一个信号,用于在计数完成后发送结果到主线程
result_signal = Signal(int)
finished = Signal()
def run(self):
count = 0
for i in range(1, 11):
count += i
time.sleep(1) # 模拟耗时操作
self.result_signal.emit(count)
# 发射信号
self.finished.emit()
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.setupUi(self)
#self.line = QLineEdit("设置")
# self.btn_1.clicked.connect(self.start_count)
# 创建并启动后台线程
if __name__ == '__main__':
app = QApplication(sys.argv)
win = MainWindow()
win.show()
app.exec()

View File

@ -0,0 +1,8 @@
# 路径处理
使用model的Position类和Expection的code定义
## 获取世界坐标
```
def fun1(Detection_Position):
return Code, RealPosition;
```

View File

@ -0,0 +1,4 @@
0.101852 -0.994553 -0.0221425 42.7527
-0.990341 -0.0992653 -0.0968082 30.2341
0.0940829 0.0317887 -0.995057 80.0869
0 0 0 1

View File

@ -0,0 +1,78 @@
import os
import numpy as np
from scipy.spatial.transform import Rotation as R
def vec2rpy(normal):
# 将法向量的反方向作为机械臂末端执行器的新Z轴
z_axis = -normal / np.linalg.norm(normal) # 归一化并取反向作为Z轴
# 尝试选择一个合适的X轴方向尽量避免与Z轴共线
if abs(z_axis[2]) != 1:
x_axis = np.array([1, 0, 0])
else:
x_axis = np.array([0, 1, 0])
x_axis = x_axis - np.dot(x_axis, z_axis) * z_axis # 投影到垂直于Z轴的平面
x_axis = x_axis / np.linalg.norm(x_axis)
# 计算Y轴方向确保它与X轴和Z轴正交
y_axis = np.cross(z_axis, x_axis)
# 构造旋转矩阵
rotation_matrix = np.vstack([x_axis, y_axis, z_axis]).T
# 将旋转矩阵转换为RPYroll, pitch, yaw
rpy = R.from_matrix(rotation_matrix).as_euler('xyz', degrees=True)
return rpy
#张啸给我的xyzuvw
def R_matrix(x,y,z,u,v,w):
rx = np.radians(u)
ry = np.radians(v)
rz = np.radians(w)
# 定义绕 X, Y, Z 轴的旋转矩阵
R_x = np.array([
[1, 0, 0],
[0, np.cos(rx), -np.sin(rx)],
[0, np.sin(rx), np.cos(rx)]
])
R_y = np.array([
[np.cos(ry), 0, np.sin(ry)],
[0, 1, 0],
[-np.sin(ry), 0, np.cos(ry)]
])
R_z = np.array([
[np.cos(rz), -np.sin(rz), 0],
[np.sin(rz), np.cos(rz), 0],
[0, 0, 1]
])
R = R_z @ R_y @ R_x
T = np.array([x, y, z])
# 构建齐次变换矩阵
transformation_matrix = np.eye(4)
transformation_matrix[:3, :3] = R
transformation_matrix[:3, 3] = T
return transformation_matrix
# 图像识别结果xyz和法向量
def getPosition(x,y,z,a,b,c,rotation):
target = np.asarray([x, y, z,1])
camera2robot = np.loadtxt('./Trace/com_pose.txt', delimiter=' ') #相对目录且分隔符采用os.sep
robot2base = rotation
camera2base = robot2base @ camera2robot
target_position = np.dot(camera2base, target)
angle = np.asarray([a,b,c])
noraml = camera2base[:3, :3]@angle
noraml_base = vec2rpy(noraml)
print(target_position, noraml_base)
return target_position,noraml_base

View File

@ -0,0 +1,40 @@
import configparser
def writeFeedLine_to_ini(feedLine_dirt,file_path:str):
config = configparser.ConfigParser()
for key,value in feedLine_dirt.items():
config.add_section(key)
config.set(key,'name',value.name)
config.set(key,'SafePosition_x',str(value.safe_position.X))
config.set(key,'SafePosition_y',str(value.safe_position.Y))
config.set(key,'SafePosition_z',str(value.safe_position.Z))
config.set(key,'SafePosition_u',str(value.safe_position.U))
config.set(key,'SafePosition_v',str(value.safe_position.V))
config.set(key,'SafePosition_w',str(value.safe_position.W))
config.set(key,'PhotoPosition_x',str(value.photo_position.X))
config.set(key,'PhotoPosition_y',str(value.photo_position.Y))
config.set(key,'PhotoPosition_z',str(value.photo_position.Z))
config.set(key,'PhotoPosition_u',str(value.photo_position.U))
config.set(key,'PhotoPosition_v',str(value.photo_position.V))
config.set(key, 'PhotoPosition_w', str(value.photo_position.W))
config.set(key,'MidPosition_x',str(value.mid_position.X))
config.set(key,'MidPosition_y',str(value.mid_position.Y))
config.set(key,'MidPosition_z',str(value.mid_position.Z))
config.set(key,'MidPosition_u',str(value.mid_position.U))
config.set(key,'MidPosition_v',str(value.mid_position.V))
config.set(key, 'BrokenPosition_w', str(value.broken_position.W))
config.set(key, 'BrokenPosition_x', str(value.broken_position.X))
config.set(key, 'BrokenPosition_y', str(value.broken_position.Y))
config.set(key, 'BrokenPosition_z', str(value.broken_position.Z))
config.set(key, 'BrokenPosition_u', str(value.broken_position.U))
config.set(key, 'BrokenPosition_v', str(value.broken_position.V))
config.set(key, 'BrokenPosition_w', str(value.broken_position.W))
config.set(key,'FeedPosition_x',str(value.feed_position.X))
config.set(key,'FeedPosition_y',str(value.feed_position.Y))
config.set(key,'FeedPosition_z',str(value.feed_position.Z))
config.set(key,'FeedPosition_u',str(value.feed_position.U))
config.set(key,'FeedPosition_v',str(value.feed_position.V))
config.set(key, 'FeedPosition_w', str(value.feed_position.W))
config.write(open(file_path,'w',encoding='utf-8'))
return True

View File

@ -0,0 +1,13 @@
import logging
class QTextEditLogger(logging.Handler):
def __init__(self, text_widget):
super().__init__()
self.widget = text_widget
def emit(self, record):
# 格式化日志信息
log_message = self.format(record)
# 在主线程中更新 QTextEdit
self.widget.append(log_message)

View File

@ -0,0 +1,18 @@
# 视觉处理
Code 在 Expection的VisionError_Code里 增加定义1
## 1 提供视频流,
```
func1()
return Code,frame #返回错误代码和当前帧
```
## 2 提供当前摄像头位置识别结果
```
func2()
return Code,frame,position #返回带识别标签的当前图像 和 识别坐标结果Detection_Position对象
```
**注意:** 识别不到报异常Code且带有识别结果标签和位置都绘制出来的图像保存到本地位置
保存位置在Seting.ini里面的[Camera_Feed]定义

View File

@ -0,0 +1,217 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project AutoControlSystem-master
@File camera_coordinate_dete.py
@IDE PyCharm
@Author hjw
@Date 2024/8/27 14:24
'''
import numpy as np
import cv2
import open3d as o3d
from Vision.tool.CameraRVC import camera
from Vision.yolo.yolov8_pt_seg import yolov8_segment
from Vision.yolo.yolov8_openvino import yolov8_segment_openvino
from Vision.tool.utils import find_position
from Vision.tool.utils import class_names
import os
class Detection:
def __init__(self, use_openvino_model = True):
self.use_openvino_model = use_openvino_model
if self.use_openvino_model==False:
model_path = ''.join([os.getcwd(), '/Vision/model/pt/best.pt'])
device = 'cpu'
self.camera_rvc = camera()
self.model = yolov8_segment()
self.model.load_model(model_path, device)
else:
model_path = ''.join([os.getcwd(), '/Vision/model/openvino/best.xml'])
device = 'CPU'
self.camera_rvc = camera()
self.model = yolov8_segment_openvino(model_path, device, conf_thres=0.3, iou_thres=0.3)
def get_position(self, Point_isVision=False):
""
'''
:param api: None
:return: ret , img, (x,y,z), (nx, ny, nz)
'''
ret, img, pm = self.camera_rvc.get_img_and_point_map() # 拍照,获取图像及
if self.camera_rvc.caminit_isok == True:
if ret == 1:
if self.use_openvino_model == False:
flag, det_cpu, dst_img, masks, category_names = self.model.model_inference(img, 0)
else:
flag, det_cpu, scores, masks, category_names = self.model.segment_objects(img)
if flag == 1:
xyz = []
nx_ny_nz = []
RegionalArea = []
Depth_Z = []
uv = []
seg_point = []
if Point_isVision==True:
pm2 = pm.copy()
pm2 = pm2.reshape(-1, 3)
pm2 = pm2[~np.isnan(pm2).all(axis=-1), :]
pm2[:, 2] = pm2[:, 2] + 0.25
pcd2 = o3d.geometry.PointCloud()
pcd2.points = o3d.utility.Vector3dVector(pm2)
# o3d.visualization.draw_geometries([pcd2])
for i, item in enumerate(det_cpu):
# 画box
box_x1, box_y1, box_x2, box_y2 = item[0:4].astype(np.int32)
if self.use_openvino_model == False:
label = category_names[int(item[5])]
else:
label = class_names[int(item[4])]
rand_color = (0, 255, 255)
score = item[4]
org = (int((box_x1 + box_x2) / 2), int((box_y1 + box_y2) / 2))
x_center = int((box_x1 + box_x2) / 2)
y_center = int((box_y1 + box_y2) / 2)
text = '{}|{:.2f}'.format(label, score)
cv2.putText(img, text, org=org, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8,
color=rand_color,
thickness=2)
# 画mask
# mask = masks[i].cpu().numpy().astype(int)
if self.use_openvino_model == False:
mask = masks[i].cpu().data.numpy().astype(int)
else:
mask = masks[i].astype(int)
mask = mask[box_y1:box_y2, box_x1:box_x2]
# mask = masks[i].numpy().astype(int)
h, w = box_y2 - box_y1, box_x2 - box_x1
mask_colored = np.zeros((h, w, 3), dtype=np.uint8)
mask_colored[np.where(mask)] = rand_color
##################################
imgray = cv2.cvtColor(mask_colored, cv2.COLOR_BGR2GRAY)
# cv2.imshow('mask',imgray)
# cv2.waitKey(1)
# 2、二进制图像
ret, binary = cv2.threshold(imgray, 10, 255, 0)
# 阈值 二进制图像
# cv2.imshow('bin',binary)
# cv2.waitKey(1)
contours, hierarchy = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
# all_piont_list = contours_in(contours)
# print(len(all_piont_list))
max_contour = None
max_perimeter = 0
for contour in contours: # 排除小分割区域或干扰区域
perimeter = cv2.arcLength(contour, True)
if perimeter > max_perimeter:
max_perimeter = perimeter
max_contour = contour
'''
提取区域范围内的x, y
'''
mask_inside = np.zeros(binary.shape, np.uint8)
cv2.drawContours(mask_inside, [max_contour], 0, 255, -1)
pixel_point2 = cv2.findNonZero(mask_inside)
# result = np.zeros_like(color_image)
select_point = []
for i in range(pixel_point2.shape[0]):
select_point.append(pm[pixel_point2[i][0][1]+box_y1, pixel_point2[i][0][0]+box_x1])
select_point = np.array(select_point)
pm_seg = select_point.reshape(-1, 3)
pm_seg = pm_seg[~np.isnan(pm_seg).all(axis=-1), :] # 剔除 nan
if pm_seg.size<300:
print("分割点云数量较少,无法拟合平面")
continue
# cv2.imshow('result', piont_result)
'''
拟合平面,计算法向量
'''
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pm_seg)
plane_model, inliers = pcd.segment_plane(distance_threshold=0.01,
ransac_n=5,
num_iterations=5000)
[a, b, c, d] = plane_model
#print(f"Plane equation: {a:.2f}x + {b:.2f}y + {c:.2f}z + {d:.2f} = 0")
# inlier_cloud = pcd.select_by_index(inliers) # 点云可视化
# inlier_cloud.paint_uniform_color([1.0, 0, 0])
# outlier_cloud = pcd.select_by_index(inliers, invert=True)
# outlier_cloud.paint_uniform_color([0, 1, 0])
# o3d.visualization.draw_geometries([inlier_cloud, outlier_cloud])
'''
拟合最小外接矩形,计算矩形中心
'''
rect = cv2.minAreaRect(max_contour)
# cv2.boxPoints可以将轮廓点转换为四个角点坐标
box = cv2.boxPoints(rect)
# 这一步不影响后面的画图,但是可以保证四个角点坐标为顺时针
startidx = box.sum(axis=1).argmin()
box = np.roll(box, 4 - startidx, 0)
# 在原图上画出预测的外接矩形
box = box.reshape((-1, 1, 2)).astype(np.int32)
box = box + [[[box_x1, box_y1]], [[box_x1, box_y1]], [[box_x1, box_y1]], [[box_x1, box_y1]]]
x_rotation_center = int((box[0][0][0] + box[1][0][0] + box[2][0][0] + box[3][0][0]) / 4)
y_rotation_center = int((box[0][0][1] + box[1][0][1] + box[2][0][1] + box[3][0][1]) / 4)
point_x, point_y, point_z = pm[y_rotation_center, x_rotation_center]
cv2.circle(img, (x_rotation_center, y_rotation_center), 4, (255, 255, 255), 5) # 标出中心点
if np.isnan(point_x): # 点云值为无效值
continue
else:
xyz.append([point_x*1000, point_y*1000, point_z*1000])
Depth_Z.append(point_z*1000)
nx_ny_nz.append([a, b, c])
RegionalArea.append(cv2.contourArea(max_contour))
uv.append([x_rotation_center, y_rotation_center])
seg_point.append(pm_seg)
cv2.polylines(img, [box], True, (0, 255, 0), 2)
_idx = find_position(Depth_Z, RegionalArea, 100,True)
if _idx == None:
return 1, img, None, None
else:
cv2.circle(img, (uv[_idx][0], uv[_idx][1]), 30, (255, 0, 0), 20) # 标出中心点
if Point_isVision==True:
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(seg_point[_idx])
plane_model, inliers = pcd.segment_plane(distance_threshold=0.01,
ransac_n=5,
num_iterations=5000)
inlier_cloud = pcd.select_by_index(inliers) # 点云可视化
inlier_cloud.paint_uniform_color([1.0, 0, 0])
outlier_cloud = pcd.select_by_index(inliers, invert=True)
outlier_cloud.paint_uniform_color([0, 1, 0])
o3d.visualization.draw_geometries([inlier_cloud, outlier_cloud, pcd2])
return 1, img, xyz[_idx], nx_ny_nz[_idx]
else:
return 1, img, None, None
else:
print("RVC X Camera capture failed!")
return 0, None, None, None
else:
print("RVC X Camera is not opened!")
return 0, None, None, None
def release(self):
self.camera_rvc.release()
self.model.clear()

View File

@ -0,0 +1,216 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project AutoControlSystem-master
@File camera_coordinate_dete.py
@IDE PyCharm
@Author hjw
@Date 2024/8/27 14:24
'''
import numpy as np
import cv2
import open3d as o3d
from Vision.tool.CameraRVC import camera
from Vision.yolo.yolov8_pt_seg import yolov8_segment
from Vision.yolo.yolov8_openvino import yolov8_segment_openvino
from Vision.tool.utils import find_position
from Vision.tool.utils import class_names
import os
class Detection:
def __init__(self, use_openvino_model = True):
self.use_openvino_model = use_openvino_model
if self.use_openvino_model == False:
model_path = ''.join([os.getcwd(), '/Vision/model/pt/best.pt'])
device = 'cpu'
self.model = yolov8_segment()
self.model.load_model(model_path, device)
else:
model_path = ''.join([os.getcwd(), '/Vision/model/openvino/best.xml'])
device = 'CPU'
self.model = yolov8_segment_openvino(model_path, device, conf_thres=0.3, iou_thres=0.3)
img_path = ''.join([os.getcwd(), '/Vision/model/data/test0910.png'])
point_path = ''.join([os.getcwd(), '/Vision/model/data/test0910.xyz'])
self.img = cv2.imread(img_path)
self.point = np.loadtxt(point_path).reshape((1080, 1440, 3))
def get_position(self, Point_isVision=False):
""
'''
:param api: None
:return: ret , img, (x,y,z), (nx, ny, nz)
'''
#ret, img, pm = self.camera_rvc.get_img_and_point_map() # 拍照,获取图像及
ret = 1
img = self.img
pm = self.point
if ret == 1:
if self.use_openvino_model == False:
flag, det_cpu, dst_img, masks, category_names = self.model.model_inference(img, 0)
else:
flag, det_cpu, scores, masks, category_names = self.model.segment_objects(img)
if flag == 1:
xyz = []
nx_ny_nz = []
RegionalArea = []
Depth_Z = []
uv = []
seg_point = []
if Point_isVision == True:
pm2 = pm.copy()
pm2 = pm2.reshape(-1, 3)
pm2 = pm2[~np.isnan(pm2).all(axis=-1), :]
pm2[:, 2] = pm2[:, 2] + 0.25
pcd2 = o3d.geometry.PointCloud()
pcd2.points = o3d.utility.Vector3dVector(pm2)
# o3d.visualization.draw_geometries([pcd2])
for i, item in enumerate(det_cpu):
# 画box
box_x1, box_y1, box_x2, box_y2 = item[0:4].astype(np.int32)
if self.use_openvino_model == False:
label = category_names[int(item[5])]
else:
label = class_names[int(item[4])]
rand_color = (0, 255, 255)
score = item[4]
org = (int((box_x1 + box_x2) / 2), int((box_y1 + box_y2) / 2))
x_center = int((box_x1 + box_x2) / 2)
y_center = int((box_y1 + box_y2) / 2)
text = '{}|{:.2f}'.format(label, score)
cv2.putText(img, text, org=org, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8,
color=rand_color,
thickness=2)
# 画mask
# mask = masks[i].cpu().numpy().astype(int)
if self.use_openvino_model == False:
mask = masks[i].cpu().data.numpy().astype(int)
else:
mask = masks[i].astype(int)
mask = mask[box_y1:box_y2, box_x1:box_x2]
# mask = masks[i].numpy().astype(int)
h, w = box_y2 - box_y1, box_x2 - box_x1
mask_colored = np.zeros((h, w, 3), dtype=np.uint8)
mask_colored[np.where(mask)] = rand_color
##################################
imgray = cv2.cvtColor(mask_colored, cv2.COLOR_BGR2GRAY)
# cv2.imshow('mask',imgray)
# cv2.waitKey(1)
# 2、二进制图像
ret, binary = cv2.threshold(imgray, 10, 255, 0)
# 阈值 二进制图像
# cv2.imshow('bin',binary)
# cv2.waitKey(1)
contours, hierarchy = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
# all_piont_list = contours_in(contours)
# print(len(all_piont_list))
max_contour = None
max_perimeter = 0
for contour in contours: # 排除小分割区域或干扰区域
perimeter = cv2.arcLength(contour, True)
if perimeter > max_perimeter:
max_perimeter = perimeter
max_contour = contour
'''
提取区域范围内的x, y
'''
mask_inside = np.zeros(binary.shape, np.uint8)
cv2.drawContours(mask_inside, [max_contour], 0, 255, -1)
pixel_point2 = cv2.findNonZero(mask_inside)
# result = np.zeros_like(color_image)
select_point = []
for i in range(pixel_point2.shape[0]):
select_point.append(pm[pixel_point2[i][0][1] + box_y1, pixel_point2[i][0][0] + box_x1])
select_point = np.array(select_point)
pm_seg = select_point.reshape(-1, 3)
pm_seg = pm_seg[~np.isnan(pm_seg).all(axis=-1), :] # 剔除 nan
if pm_seg.size < 300:
print("分割点云数量较少,无法拟合平面")
continue
# cv2.imshow('result', piont_result)
'''
拟合平面,计算法向量
'''
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pm_seg)
plane_model, inliers = pcd.segment_plane(distance_threshold=0.01,
ransac_n=5,
num_iterations=5000)
[a, b, c, d] = plane_model
# print(f"Plane equation: {a:.2f}x + {b:.2f}y + {c:.2f}z + {d:.2f} = 0")
# inlier_cloud = pcd.select_by_index(inliers) # 点云可视化
# inlier_cloud.paint_uniform_color([1.0, 0, 0])
# outlier_cloud = pcd.select_by_index(inliers, invert=True)
# outlier_cloud.paint_uniform_color([0, 1, 0])
# o3d.visualization.draw_geometries([inlier_cloud, outlier_cloud])
'''
拟合最小外接矩形,计算矩形中心
'''
rect = cv2.minAreaRect(max_contour)
# cv2.boxPoints可以将轮廓点转换为四个角点坐标
box = cv2.boxPoints(rect)
# 这一步不影响后面的画图,但是可以保证四个角点坐标为顺时针
startidx = box.sum(axis=1).argmin()
box = np.roll(box, 4 - startidx, 0)
# 在原图上画出预测的外接矩形
box = box.reshape((-1, 1, 2)).astype(np.int32)
box = box + [[[box_x1, box_y1]], [[box_x1, box_y1]], [[box_x1, box_y1]], [[box_x1, box_y1]]]
x_rotation_center = int((box[0][0][0] + box[1][0][0] + box[2][0][0] + box[3][0][0]) / 4)
y_rotation_center = int((box[0][0][1] + box[1][0][1] + box[2][0][1] + box[3][0][1]) / 4)
point_x, point_y, point_z = pm[y_rotation_center, x_rotation_center]
cv2.circle(img, (x_rotation_center, y_rotation_center), 4, (255, 255, 255), 5) # 标出中心点
if np.isnan(point_x): # 点云值为无效值
continue
else:
xyz.append([point_x * 1000, point_y * 1000, point_z * 1000])
Depth_Z.append(point_z * 1000)
nx_ny_nz.append([a, b, c])
RegionalArea.append(cv2.contourArea(max_contour))
uv.append([x_rotation_center, y_rotation_center])
seg_point.append(pm_seg)
cv2.polylines(img, [box], True, (0, 255, 0), 2)
_idx = find_position(Depth_Z, RegionalArea, 100, True)
if _idx == None:
return 1, img, None, None
else:
cv2.circle(img, (uv[_idx][0], uv[_idx][1]), 30, (0, 0, 255), 20) # 标出中心点
if Point_isVision == True:
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(seg_point[_idx])
plane_model, inliers = pcd.segment_plane(distance_threshold=0.01,
ransac_n=5,
num_iterations=5000)
inlier_cloud = pcd.select_by_index(inliers) # 点云可视化
inlier_cloud.paint_uniform_color([1.0, 0, 0])
outlier_cloud = pcd.select_by_index(inliers, invert=True)
outlier_cloud.paint_uniform_color([0, 1, 0])
o3d.visualization.draw_geometries([inlier_cloud, outlier_cloud, pcd2])
return 1, img, xyz[_idx], nx_ny_nz[_idx]
else:
return 1, img, None, None
else:
print("RVC X Camera capture failed!")
return 0, None, None, None
def release(self):
self.model.clear()

View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2024/9/5 14:57
# @Author : hjw
# @File : camera_coordinate_dete_test.py.py
'''
import time
from camera_coordinate_dete import Detection
import cv2
detection = Detection()
while True:
t1 = time.time()
ret, img, xyz, nx_ny_nz = detection.get_position()
if ret==1:
print('xyz点云坐标', xyz)
print('nx_ny_nz法向量', nx_ny_nz)
img = cv2.resize(img,(720, 540))
cv2.imshow('img', img)
cv2.waitKey(1)
t2 = time.time()
print(t2-t1)

View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2024/9/7 11:49
# @Author : hjw
# @File : get_position_test.py
'''
from camera_coordinate_dete import Detection
from Trace.handeye_calibration import *
import cv2
detection = Detection()
while True:
ret, img, xyz, nx_ny_nz = detection.get_position()
if ret==1:
print('xyz点云坐标', xyz)
print('nx_ny_nz法向量', nx_ny_nz)
img = cv2.resize(img,(720, 540))
transformation_matrix = R_matrix(521.534, 0.705, 850.03, 0.0, 90.0, 0.0) # (x, y, z, u, v, w)
target_position, noraml_base = getPosition(xyz[0], xyz[1], xyz[2], nx_ny_nz[0], nx_ny_nz[1], nx_ny_nz[2], transformation_matrix)
print("target_position:", target_position)
print("noraml_base", noraml_base)
cv2.imshow('img', img)
cv2.waitKey(0)

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,14 @@
description: Ultralytics YOLOv8n-seg model trained on D:\work\ultralytics-main\ultralytics\cfg\datasets\coco8-seg-dy.yaml
author: Ultralytics
date: '2024-09-10T13:48:59.901222'
version: 8.2.86
license: AGPL-3.0 License (https://ultralytics.com/license)
docs: https://docs.ultralytics.com
stride: 32
task: segment
batch: 1
imgsz:
- 640
- 640
names:
0: box

Binary file not shown.

View File

@ -0,0 +1 @@
# 模型文件超3M

View File

@ -0,0 +1,149 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project my_work
@File camera.py
@IDE PyCharm
@Author hjw
@Date 2024/8/13 11:34
'''
import PyRVC as RVC
import numpy as np
class camera:
def __init__(self):
self.caminit_isok = False
RVC.SystemInit()
# Choose RVC X Camera type (USB, GigE or All)
opt = RVC.SystemListDeviceTypeEnum.GigE
# Scan all RVC X Camera devices.
ret, devices = RVC.SystemListDevices(opt)
print("RVC X Camera devices number:%d" % len(devices))
# Find whether any RVC X Camera is connected or not.
if len(devices) == 0:
print("Can not find any RVC X Camera!")
RVC.SystemShutdown()
else:
print("devices size = %d" % len(devices))
# Create a RVC X Camera and choose use left side camera.
self.x = RVC.X1.Create(devices[0], RVC.CameraID_Left)
# x = RVC.X1.Create(devices[0], RVC.CameraID_Right)
# Test RVC X Camera is valid or not.
if self.x.IsValid() == True:
print("RVC X Camera is valid!")
# Open RVC X Camera.
ret1 = self.x.Open()
# Test RVC X Camera is opened or not.
if ret1 and self.x.IsOpen() == True:
print("RVC X Camera is opened!")
self.caminit_isok = True
else:
print("RVC X Camera is not opened!")
RVC.X1.Destroy(self.x)
RVC.SystemShutdown()
self.caminit_isok = False
else:
print("RVC X Camera is not valid!")
RVC.X1.Destroy(self.x)
RVC.SystemShutdown()
self.caminit_isok = False
def get_img(self):
""
'''
:param api: None
:return: ret ,img
'''
if self.caminit_isok == False:
return 0, None
else:
# Capture a point map and a image.
ret2 = self.x.Capture()
# Create saving address of image and point map.
if ret2 == True:
print("RVC X Camera capture successed!")
# Get image data and image size.
img = self.x.GetImage()
# Convert image to array and save it.
img = np.array(img, copy=False)
return 1, img
else:
print("RVC X Camera capture failed!")
self.x.Close()
RVC.X1.Destroy(self.x)
RVC.SystemShutdown()
return 0, None
def get_point_map(self):
""
'''
:param api: None
:return: img
'''
if self.caminit_isok == False:
return 0, None
else:
# Capture a point map and a image.
ret2 = self.x.Capture()
# Create saving address of image and point map.
if ret2 == True:
print("RVC X Camera capture successed!")
# Convert point map (m) to array and save it.
pm = np.array(self.x.GetPointMap(), copy=False)
return 1, pm
else:
print("RVC X Camera capture failed!")
self.x.Close()
RVC.X1.Destroy(self.x)
RVC.SystemShutdown()
return 0, None
def get_img_and_point_map(self):
""
'''
:param api: None
:return: ret , img, point_map
'''
if self.caminit_isok == False:
return 0, None, None
else:
# Capture a point map and a image.
ret2 = self.x.Capture()
# Create saving address of image and point map.
if ret2 == True:
print("RVC X Camera capture successed!")
# Get image data and image size.
img = self.x.GetImage()
# Convert image to array and save it.
img = np.array(img, copy=False)
# Convert point map (m) to array and save it.
pm = np.array(self.x.GetPointMap(), copy=False)
return 1, img, pm
else:
print("RVC X Camera capture failed!")
self.x.Close()
RVC.X1.Destroy(self.x)
RVC.SystemShutdown()
return 0, None, None
def release(self):
if self.caminit_isok == False:
RVC.SystemShutdown()
else:
RVC.X1.Destroy(self.x)
RVC.SystemShutdown()

View File

@ -0,0 +1,188 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project AutoControlSystem-master
@File utils.py
@IDE PyCharm
@Author hjw
@Date 2024/8/29 15:07
'''
import numpy as np
import cv2
def find_position(Depth_Z, RegionalArea, RegionalArea_Threshold, first_depth=True):
if first_depth == True:
sorted_id = sorted(range(len(Depth_Z)), key=lambda k: Depth_Z[k], reverse=False)
Depth_Z1 = [Depth_Z[i] for i in sorted_id]
RegionalArea1 = [RegionalArea[i] for i in sorted_id]
for i in range(len(Depth_Z1)):
if RegionalArea1[i] > RegionalArea_Threshold:
return sorted_id[i]
else:
sorted_id = sorted(range(len(RegionalArea)), key=lambda k: RegionalArea[k], reverse=True)
Depth_Z1 = [Depth_Z[i] for i in sorted_id]
RegionalArea1 = [RegionalArea[i] for i in sorted_id]
for i in range(len(Depth_Z1)):
if RegionalArea1[i] > RegionalArea_Threshold:
return sorted_id[i]
class_names = ['box', 'other']
# Create a list of colors for each class where each color is a tuple of 3 integer values
rng = np.random.default_rng(3)
colors = rng.uniform(0, 255, size=(len(class_names), 3))
def nms(boxes, scores, iou_threshold):
# Sort by score
sorted_indices = np.argsort(scores)[::-1]
keep_boxes = []
while sorted_indices.size > 0:
# Pick the last box
box_id = sorted_indices[0]
keep_boxes.append(box_id)
# Compute IoU of the picked box with the rest
ious = compute_iou(boxes[box_id, :], boxes[sorted_indices[1:], :])
# Remove boxes with IoU over the threshold
keep_indices = np.where(ious < iou_threshold)[0]
# print(keep_indices.shape, sorted_indices.shape)
sorted_indices = sorted_indices[keep_indices + 1]
return keep_boxes
def compute_iou(box, boxes):
# Compute xmin, ymin, xmax, ymax for both boxes
xmin = np.maximum(box[0], boxes[:, 0])
ymin = np.maximum(box[1], boxes[:, 1])
xmax = np.minimum(box[2], boxes[:, 2])
ymax = np.minimum(box[3], boxes[:, 3])
# Compute intersection area
intersection_area = np.maximum(0, xmax - xmin) * np.maximum(0, ymax - ymin)
# Compute union area
box_area = (box[2] - box[0]) * (box[3] - box[1])
boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
union_area = box_area + boxes_area - intersection_area
# Compute IoU
iou = intersection_area / union_area
return iou
def xywh2xyxy(x):
# Convert bounding box (x, y, w, h) to bounding box (x1, y1, x2, y2)
y = np.copy(x)
y[..., 0] = x[..., 0] - x[..., 2] / 2
y[..., 1] = x[..., 1] - x[..., 3] / 2
y[..., 2] = x[..., 0] + x[..., 2] / 2
y[..., 3] = x[..., 1] + x[..., 3] / 2
return y
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def draw_detections(image, boxes, scores, class_ids, mask_alpha=0.3, mask_maps=None):
img_height, img_width = image.shape[:2]
size = min([img_height, img_width]) * 0.0006
text_thickness = int(min([img_height, img_width]) * 0.001)
mask_img = draw_masks(image, boxes, class_ids, mask_alpha, mask_maps)
# Draw bounding boxes and labels of detections
for box, score, class_id in zip(boxes, scores, class_ids):
color = colors[class_id]
x1, y1, x2, y2 = box.astype(int)
# Draw rectangle
cv2.rectangle(mask_img, (x1, y1), (x2, y2), color, 2)
label = class_names[class_id]
caption = f'{label} {int(score * 100)}%'
(tw, th), _ = cv2.getTextSize(text=caption, fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=size, thickness=text_thickness)
th = int(th * 1.2)
cv2.rectangle(mask_img, (x1, y1),
(x1 + tw, y1 - th), color, -1)
cv2.putText(mask_img, caption, (x1, y1),
cv2.FONT_HERSHEY_SIMPLEX, size, (255, 255, 255), text_thickness, cv2.LINE_AA)
return mask_img
def draw_masks(image, boxes, class_ids, mask_alpha=0.3, mask_maps=None):
mask_img = image.copy()
# Draw bounding boxes and labels of detections
for i, (box, class_id) in enumerate(zip(boxes, class_ids)):
color = colors[class_id]
x1, y1, x2, y2 = box.astype(int)
# Draw fill mask image
if mask_maps is None:
cv2.rectangle(mask_img, (x1, y1), (x2, y2), color, -1)
else:
crop_mask = mask_maps[i][y1:y2, x1:x2, np.newaxis]
crop_mask_img = mask_img[y1:y2, x1:x2]
crop_mask_img = crop_mask_img * (1 - crop_mask) + crop_mask * color
mask_img[y1:y2, x1:x2] = crop_mask_img
return cv2.addWeighted(mask_img, mask_alpha, image, 1 - mask_alpha, 0)
def draw_comparison(img1, img2, name1, name2, fontsize=2.6, text_thickness=3):
(tw, th), _ = cv2.getTextSize(text=name1, fontFace=cv2.FONT_HERSHEY_DUPLEX,
fontScale=fontsize, thickness=text_thickness)
x1 = img1.shape[1] // 3
y1 = th
offset = th // 5
cv2.rectangle(img1, (x1 - offset * 2, y1 + offset),
(x1 + tw + offset * 2, y1 - th - offset), (0, 115, 255), -1)
cv2.putText(img1, name1,
(x1, y1),
cv2.FONT_HERSHEY_DUPLEX, fontsize,
(255, 255, 255), text_thickness)
(tw, th), _ = cv2.getTextSize(text=name2, fontFace=cv2.FONT_HERSHEY_DUPLEX,
fontScale=fontsize, thickness=text_thickness)
x1 = img2.shape[1] // 3
y1 = th
offset = th // 5
cv2.rectangle(img2, (x1 - offset * 2, y1 + offset),
(x1 + tw + offset * 2, y1 - th - offset), (94, 23, 235), -1)
cv2.putText(img2, name2,
(x1, y1),
cv2.FONT_HERSHEY_DUPLEX, fontsize,
(255, 255, 255), text_thickness)
combined_img = cv2.hconcat([img1, img2])
if combined_img.shape[1] > 3840:
combined_img = cv2.resize(combined_img, (3840, 2160))
return combined_img

View File

@ -0,0 +1,232 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2024/9/10 16:33
# @Author : hjw
# @File : yolov8_openvino.py
'''
import math
import time
import cv2
import numpy as np
from openvino.runtime import Core
from Vision.tool.utils import xywh2xyxy, nms, draw_detections, sigmoid
class yolov8_segment_openvino:
def __init__(self, path, device_name="CPU", conf_thres=0.7, iou_thres=0.5, num_masks=32):
self.conf_threshold = conf_thres
self.iou_threshold = iou_thres
self.num_masks = num_masks
# Initialize model
self.initialize_model(path, device_name)
def __call__(self, image):
return self.segment_objects(image)
def initialize_model(self, path, device_name):
self.core = Core()
self.net = self.core.compile_model(path, device_name=device_name)
self.ir = self.net.create_infer_request()
input_shape = self.net.inputs[0].shape
self.input_height = input_shape[2]
self.input_width = input_shape[3]
def segment_objects(self, image):
input_tensor = self.prepare_input(image)
# Perform inference on the image
outputs = self.ir.infer(input_tensor)
self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0])
self.mask_maps = self.process_mask_output(mask_pred, outputs[1])
self.boxes = self.boxes.tolist()
self.scores = self.scores.tolist()
for t in range(len(self.scores)):
self.boxes[t].append(self.scores[t])
self.boxes = np.array(self.boxes)
if len(self.boxes) == 0:
return 0, None, None, None, None
else:
return 1, self.boxes, self.scores, self.mask_maps, self.class_ids
def prepare_input(self, image):
self.img_height, self.img_width = image.shape[:2]
input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Resize input image
input_img = cv2.resize(input_img, (self.input_width, self.input_height))
# Scale input pixel values to 0 to 1
input_img = input_img / 255.0
input_img = input_img.transpose(2, 0, 1)
input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32)
return input_tensor
def process_box_output(self, box_output):
predictions = np.squeeze(box_output).T
num_classes = box_output.shape[1] - self.num_masks - 4
# Filter out object confidence scores below threshold
scores = np.max(predictions[:, 4:4+num_classes], axis=1)
predictions = predictions[scores > self.conf_threshold, :]
scores = scores[scores > self.conf_threshold]
if len(scores) == 0:
return [], [], [], np.array([])
box_predictions = predictions[..., :num_classes+4]
mask_predictions = predictions[..., num_classes+4:]
# Get the class with the highest confidence
class_ids = np.argmax(box_predictions[:, 4:], axis=1)
# Get bounding boxes for each object
boxes = self.extract_boxes(box_predictions)
# Apply non-maxima suppression to suppress weak, overlapping bounding boxes
indices = nms(boxes, scores, self.iou_threshold)
return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices]
def process_mask_output(self, mask_predictions, mask_output):
if mask_predictions.shape[0] == 0:
return []
mask_output = np.squeeze(mask_output)
# Calculate the mask maps for each box
num_mask, mask_height, mask_width = mask_output.shape # CHW
masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1)))
masks = masks.reshape((-1, mask_height, mask_width))
# Downscale the boxes to match the mask size
scale_boxes = self.rescale_boxes(self.boxes,
(self.img_height, self.img_width),
(mask_height, mask_width))
# For every box/mask pair, get the mask map
mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width))
blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height))
for i in range(len(scale_boxes)):
scale_x1 = int(math.floor(scale_boxes[i][0]))
scale_y1 = int(math.floor(scale_boxes[i][1]))
scale_x2 = int(math.ceil(scale_boxes[i][2]))
scale_y2 = int(math.ceil(scale_boxes[i][3]))
x1 = int(math.floor(self.boxes[i][0]))
y1 = int(math.floor(self.boxes[i][1]))
x2 = int(math.ceil(self.boxes[i][2]))
y2 = int(math.ceil(self.boxes[i][3]))
scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2]
crop_mask = cv2.resize(scale_crop_mask,
(x2 - x1, y2 - y1),
interpolation=cv2.INTER_CUBIC)
crop_mask = cv2.blur(crop_mask, blur_size)
crop_mask = (crop_mask > 0.5).astype(np.uint8)
mask_maps[i, y1:y2, x1:x2] = crop_mask
return mask_maps
def extract_boxes(self, box_predictions):
# Extract boxes from predictions
boxes = box_predictions[:, :4]
# Scale boxes to original image dimensions
boxes = self.rescale_boxes(boxes,
(self.input_height, self.input_width),
(self.img_height, self.img_width))
# Convert boxes to xyxy format
boxes = xywh2xyxy(boxes)
# Check the boxes are within the image
boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width)
boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height)
boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width)
boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height)
return boxes
def draw_detections(self, image, draw_scores=True, mask_alpha=0.4):
return draw_detections(image, self.boxes, self.scores,
self.class_ids, mask_alpha)
def draw_masks(self, image, draw_scores=True, mask_alpha=0.5):
return draw_detections(image, self.boxes, self.scores,
self.class_ids, mask_alpha, mask_maps=self.mask_maps)
@staticmethod
def rescale_boxes(boxes, input_shape, image_shape):
# Rescale boxes to original image dimensions
input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]])
boxes = np.divide(boxes, input_shape, dtype=np.float32)
boxes *= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]])
return boxes
# frame = cv2.imread('../1.png')
# # frame = cv2.resize(frame,(640,640))
# model_path = ("../runs/segment/train2/weights/last-0903_openvino_model/last-0903.xml")
# #model_path = ("../yolov8n-seg_openvino_model/yolov8n-seg.xml")
# device_name = "GPU"
# yoloseg = yolov8_segment_openvino(model_path, device_name, conf_thres=0.3, iou_thres=0.3)
#
# start = time.time()
# boxes, scores, class_ids, masks = yoloseg(frame)
# # postprocess and draw masks
# combined_img = yoloseg.draw_masks(frame)
# end = time.time()
# # show FPS
# print(end- start)
# fps = (1 / (end - start))
# fps_label = "Throughput: %.2f FPS" % fps
# cv2.putText(combined_img, fps_label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# cv2.imwrite('rs2.jpg', combined_img)
# # show ALL
# cv2.imshow("YOLOv8 Segmentation OpenVINO inference Demo", combined_img)
#
# cv2.waitKey(0)
# # Initialize the VideoCapture
# cap = cv2.VideoCapture("store-aisle-detection.mp4")
# img = cv2.imread('../1.png')
# # Initialize YOLOv5 Instance Segmentator
# model_path = "yolov8n-seg.xml"
# device_name = "GPU"
# yoloseg = YOLOSeg(model_path, device_name, conf_thres=0.3, iou_thres=0.3)
# while cap.isOpened():
# # Read frame from the video
# ret, frame = cap.read()
# if not ret:
# break
# # Update object localizer
# start = time.time()
# boxes, scores, class_ids, masks = yoloseg(frame)
# # postprocess and draw masks
# combined_img = yoloseg.draw_masks(frame)
# end = time.time()
# # show FPS
# fps = (1 / (end - start))
# fps_label = "Throughput: %.2f FPS" % fps
# cv2.putText(combined_img, fps_label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
# # show ALL
# cv2.imshow("YOLOv8 Segmentation OpenVINO inference Demo", combined_img)
#
# # Press Any key stop
# if cv2.waitKey(1) > -1:
# print("finished by user")
# break
#
# cap.release()
# cv2.destroyAllWindows()

View File

@ -0,0 +1,365 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project -> File yolov8_segment.py
@IDE PyCharm
@Author hjw
@Version : 1.0.0
@Date 2024/8/20 9:25
@Function
'''
# yolov8 pt模型实例分割推理
import cv2
import time
import numpy as np
import torch, torchvision
import torch.nn.functional as F
def load_model(model_path, device):
model = torch.load(model_path, map_location=device)
category_list = model.get('CLASSES', model.get('model').names)
model = (model.get('ema') or model['model']).float() # FP32 model
model.__setattr__('CLASSES', category_list)
model.fuse().eval()
#model = model.cuda()
return model
def data_preprocess(model, img, img_scale, device):
stride, auto = 32, True
stride = max(int(model.stride.max()), 32)
img = letterbox(img, new_shape=img_scale, stride=stride, auto=auto)[0] # padded resize
img = np.ascontiguousarray(img.transpose((2, 0, 1))[::-1]) # HWC to CHW, BGR to RGB,contiguous
#img = torch.from_numpy(img) # ndarray to tensor
img = torch.from_numpy(img).to(device)
#img = torch.from_numpy(img)
img = img.float() # uint8 to fp32
img /= 255 # 0 - 255 to 0.0 - 1.0
if len(img.shape) == 3:
img = img[None] # expand for batch dim
return img
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better val mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im, ratio, (dw, dh)
def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False,
labels=(), max_det=300, nc=0, max_time_img=0.05, max_nms=30000, max_wh=7680, ):
# Checks
assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
if isinstance(prediction, (list, tuple)): # YOLOv8 model in validation model, output = (inference_out, loss_out)
prediction = prediction[0] # select only inference output
device = prediction.device
mps = 'mps' in device.type # Apple MPS
if mps: # MPS not fully supported yet, convert tensors to CPU before NMS
prediction = prediction.cpu()
bs = prediction.shape[0] # batch size
nc = nc or (prediction.shape[1] - 4) # number of classes
nm = prediction.shape[1] - nc - 4
mi = 4 + nc # mask start index
xc = prediction[:, 4:mi].amax(1) > conf_thres # candidates
# Settings
# min_wh = 2 # (pixels) minimum box width and height
time_limit = 0.5 + max_time_img * bs # seconds to quit after
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
prediction = prediction.transpose(-1, -2) # shape(1,84,6300) to shape(1,6300,84)
prediction[..., :4] = xywh2xyxy(prediction[..., :4]) # xywh to xyxy
t = time.time()
output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
for xi, x in enumerate(prediction): # image index, image inference
# Apply constraints
# x[((x[:, 2:4] < min_wh) | (x[:, 2:4] > max_wh)).any(1), 4] = 0 # width-height
x = x[xc[xi]] # confidence
# Cat apriori labels if autolabelling
if labels and len(labels[xi]):
lb = labels[xi]
v = torch.zeros((len(lb), nc + nm + 4), device=x.device)
v[:, :4] = xywh2xyxy(lb[:, 1:5]) # box
v[range(len(lb)), lb[:, 0].long() + 4] = 1.0 # cls
x = torch.cat((x, v), 0)
# If none remain process next image
if not x.shape[0]:
continue
# Detections matrix nx6 (xyxy, conf, cls)
box, cls, mask = x.split((4, nc, nm), 1)
if multi_label:
i, j = torch.where(cls > conf_thres)
x = torch.cat((box[i], x[i, 4 + j, None], j[:, None].float(), mask[i]), 1)
else: # best class only
conf, j = cls.max(1, keepdim=True)
x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]
# Filter by class
if classes is not None:
x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]
# Check shape
n = x.shape[0] # number of boxes
if not n: # no boxes
continue
if n > max_nms: # excess boxes
x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence and remove excess boxes
# Batched NMS
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS
i = i[:max_det] # limit detections
output[xi] = x[i]
if mps:
output[xi] = output[xi].to(device)
if (time.time() - t) > time_limit:
print(f'WARNING ⚠️ NMS time limit {time_limit:.3f}s exceeded')
break # time limit exceeded
return output
def xywh2xyxy(x):
"""
Convert bounding box coordinates from (x, y, width, height) format to (x1, y1, x2, y2) format where (x1, y1) is the
top-left corner and (x2, y2) is the bottom-right corner.
Args:
x (np.ndarray | torch.Tensor): The input bounding box coordinates in (x, y, width, height) format.
Returns:
y (np.ndarray | torch.Tensor): The bounding box coordinates in (x1, y1, x2, y2) format.
"""
assert x.shape[-1] == 4, f'input shape last dimension expected 4 but input shape is {x.shape}'
y = torch.empty_like(x) if isinstance(x, torch.Tensor) else np.empty_like(x) # faster than clone/copy
dw = x[..., 2] / 2 # half-width
dh = x[..., 3] / 2 # half-height
y[..., 0] = x[..., 0] - dw # top left x
y[..., 1] = x[..., 1] - dh # top left y
y[..., 2] = x[..., 0] + dw # bottom right x
y[..., 3] = x[..., 1] + dh # bottom right y
return y
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None, padding=True):
"""
Rescales bounding boxes (in the format of xyxy) from the shape of the image they were originally specified in
(img1_shape) to the shape of a different image (img0_shape).
Args:
img1_shape (tuple): The shape of the image that the bounding boxes are for, in the format of (height, width).
boxes (torch.Tensor): the bounding boxes of the objects in the image, in the format of (x1, y1, x2, y2)
img0_shape (tuple): the shape of the target image, in the format of (height, width).
ratio_pad (tuple): a tuple of (ratio, pad) for scaling the boxes. If not provided, the ratio and pad will be
calculated based on the size difference between the two images.
padding (bool): If True, assuming the boxes is based on image augmented by yolo style. If False then do regular
rescaling.
Returns:
boxes (torch.Tensor): The scaled bounding boxes, in the format of (x1, y1, x2, y2)
"""
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
pad = round((img1_shape[1] - img0_shape[1] * gain) / 2 - 0.1), round(
(img1_shape[0] - img0_shape[0] * gain) / 2 - 0.1) # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
if padding:
boxes[..., [0, 2]] -= pad[0] # x padding
boxes[..., [1, 3]] -= pad[1] # y padding
boxes[..., :4] /= gain
clip_boxes(boxes, img0_shape)
return boxes
def clip_boxes(boxes, shape):
"""
Takes a list of bounding boxes and a shape (height, width) and clips the bounding boxes to the shape.
Args:
boxes (torch.Tensor): the bounding boxes to clip
shape (tuple): the shape of the image
"""
if isinstance(boxes, torch.Tensor): # faster individually
boxes[..., 0].clamp_(0, shape[1]) # x1
boxes[..., 1].clamp_(0, shape[0]) # y1
boxes[..., 2].clamp_(0, shape[1]) # x2
boxes[..., 3].clamp_(0, shape[0]) # y2
else: # np.array (faster grouped)
boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) # x1, x2
boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0]) # y1, y2
def process_mask(protos, masks_in, bboxes, shape, ori_shape):
"""
Crop after upsample.
proto_out: [mask_dim, mask_h, mask_w]
out_masks: [n, mask_dim], n is number of masks after nms
bboxes: [n, 4], n is number of masks after nms
shape:input_image_size, (h, w)
return: h, w, n
"""
# mask转换成自定义尺寸
c, mh, mw = protos.shape # CHW
masks = (masks_in @ protos.float().view(c, -1)).sigmoid().view(-1, mh, mw)
masks = F.interpolate(masks[None], shape, mode='bilinear', align_corners=False)[0] # CHW
# mask转换成原图尺寸
gain = min(shape[0] / ori_shape[0], shape[1] / ori_shape[1]) # gain = old / new
pad = (shape[1] - ori_shape[1] * gain) / 2, (shape[0] - ori_shape[0] * gain) / 2 # wh padding
top, left = int(pad[1]), int(pad[0]) # y, x
bottom, right = int(shape[0] - pad[1]), int(shape[1] - pad[0])
if len(masks.shape) < 2:
raise ValueError(f'"len of masks shape" should be 2 or 3, but got {len(masks.shape)}')
masks = masks[:, top:bottom, left:right]
masks = F.interpolate(masks[None], ori_shape, mode='bilinear', align_corners=False)[0] # CHW
# 裁去box以外的图像
crop_masks = []
for i, mask in enumerate(masks):
mask = mask[int(bboxes[i][1]):int(bboxes[i][3]), int(bboxes[i][0]):int(bboxes[i][2])]
crop_masks.append(mask.gt_(0.5))
return crop_masks
def plot_result(det_cpu, dst_img, masks, category_names):
circle_max_contour = []
concrete_max_contour = []
for i, item in enumerate(det_cpu):
# rand_color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
# 画box
box_x1, box_y1, box_x2, box_y2 = item[0:4].astype(np.int32)
label = category_names[int(item[5])]
rand_color = (0, 255, 255)
#cv2.rectangle(dst_img, (box_x1, box_y1), (box_x2, box_y2), color=rand_color, thickness=2)
score = item[4]
org = (int((box_x1+box_x2)/2), int((box_y1+box_y2)/2))
text = '{}|{:.2f}'.format(label, score)
cv2.putText(dst_img, text, org=org, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.8, color=rand_color, thickness=2)
# 画mask
#mask = masks[i].cpu().numpy().astype(int)
mask = masks[i].cpu().data.numpy().astype(int)
#mask = masks[i].numpy().astype(int)
bbox_image = dst_img[box_y1:box_y2, box_x1:box_x2]
h, w = box_y2 - box_y1, box_x2 - box_x1
mask_colored = np.zeros((h, w, 3), dtype=np.uint8)
mask_colored[np.where(mask)] = rand_color
##################################
imgray = cv2.cvtColor(mask_colored, cv2.COLOR_BGR2GRAY)
# cv2.imshow('mask',imgray)
# cv2.waitKey(1)
# 2、二进制图像
ret, binary = cv2.threshold(imgray, 10, 255, 0)
# 阈值 二进制图像
# cv2.imshow('bin',binary)
# cv2.waitKey(1)
contours, hierarchy = cv2.findContours(binary, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
max_contour = None
max_perimeter = 0
for contour in contours:
perimeter = cv2.arcLength(contour, True)
if perimeter > max_perimeter:
max_perimeter = perimeter
max_contour = contour
rect = cv2.minAreaRect(max_contour)
# cv2.boxPoints可以将轮廓点转换为四个角点坐标
box = cv2.boxPoints(rect)
# 这一步不影响后面的画图,但是可以保证四个角点坐标为顺时针
startidx = box.sum(axis=1).argmin()
box = np.roll(box, 4 - startidx, 0)
# 在原图上画出预测的外接矩形
box = box.reshape((-1, 1, 2)).astype(np.int32)
box = box + [[[box_x1, box_y1]], [[box_x1, box_y1]], [[box_x1, box_y1]], [[box_x1, box_y1]]]
cv2.polylines(dst_img, [box], True, (0, 255, 0), 2)
return dst_img
# cv2.imwrite('rs.jpg', dst_img)
class yolov8_segment():
def __init__(self):
super(yolov8_segment, self).__init__()
def load_model(self, model_path, device):
self.model = load_model(model_path, device)
self.device = device
def model_inference(self, frame, upd_arr):
img = data_preprocess(self.model, frame, [640, 640], self.device)
# 推理
ori_img = frame.copy()
result = self.model(img, augment=False)
preds = result[0]
proto = result[1][-1]
# NMS
det = non_max_suppression(preds, conf_thres=0.25, iou_thres=0.3, nc=len(self.model.CLASSES))[0]
if det.shape[0] != 0:
# bbox还原至原图尺寸
det[:, :4] = scale_boxes(img.shape[2:], det[:, :4], ori_img.shape)
# mask转换成原图尺寸并做裁剪
masks = process_mask(proto[0], det[:, 6:], det[:, :4], img.shape[2:], ori_img.shape[0:2])
category_names = self.model.CLASSES
# 画图
# result_frame = plot_result(det.cpu().data.numpy(), ori_img, masks, category_names)
return 1 , det.cpu().data.numpy(), ori_img, masks, category_names
else:
return 0 , None, None, None, None
def clear(self):
del self.model
# model = yolov8_segment()
# model.load_model('./pt_model/yolov8n-seg.pt','cpu')
# cap = cv2.VideoCapture(1)
# while True:
# # count_file = len(os.listdir('E:\\A_panckg\\cv_sdk_discharge\\video_save')) # 数量
# ret, frame = cap.read()
# if ret:
# frame_save_count = 1000
# frame = cv2.resize(frame, (1280, 720))
# img = model.model_inference(frame, 0)
# cv2.imshow("imgrr", img)
# cv2.waitKey(1)
# #videoWriter(img)

735
AutoControlSystem/app.py Normal file
View File

@ -0,0 +1,735 @@
import configparser
import json
import logging
import queue
import sys
from multiprocessing import Process
from PyQt5.uic.properties import QtWidgets
from PySide6.QtCore import QThread, Signal, Slot, QObject, QEvent
from PySide6.QtGui import QIntValidator, QStandardItemModel, QStandardItem, Qt
from PySide6.QtWidgets import QApplication, QMainWindow, QPushButton, QLabel, QHeaderView, QTableWidget, \
QTableWidgetItem, QWidget, QHBoxLayout, QAbstractItemView, QMessageBox
from datetime import datetime
from Util.util_ini import writeFeedLine_to_ini
import Constant
from CU.Command import FeedCommand
from CU.Feeding import FeedLine, FeedingConfig, Feeding, FeedStatus
from Util.util_log import QTextEditLogger
from Vision.camera_coordinate_dete import Detection
from ui_untitled import Ui_MainWindow
from COM.COM_Robot import RobotClient
from Expection import Error_Code
from queue import Queue
from Model.RobotModel import *
import time
from queue import Queue
from Model.Position import Real_Position, Detection_Position
from threading import Thread
from CU.Command import Status
class MainWindow(QMainWindow,Ui_MainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.setupUi(self)
self.init_UI()
self.init_Run()
self.init_robot_info()
self.init_FeedLine()
self.start_Runing()
self.init_log()
def init_log(self):
self.logger_textEdit = logging.getLogger("QTextEditLogger")
self.logger_textEdit.setLevel(logging.DEBUG)
text_edit_handler = QTextEditLogger(self.textEdit_log_info)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
text_edit_handler.setFormatter(formatter)
self.logger_textEdit.addHandler(text_edit_handler)
def init_UI(self):
self.pushButton_num1.clicked.connect(self.send_num_button_click)
self.pushButton_num2.clicked.connect(self.send_num_button_click)
self.pushButton_num3.clicked.connect(self.send_num_button_click)
self.pushButton_num4.clicked.connect(self.send_num_button_click)
self.pushButton_num5.clicked.connect(self.send_num_button_click)
self.pushButton_num6.clicked.connect(self.send_num_button_click)
self.pushButton_AddNum.clicked.connect(self.send_addNum_button_click)
self.pushButton_SubNum.clicked.connect(self.send_subNum_button_click)
self.pushButton_num_free.clicked.connect(self.send_num_button_click)
self.lineEdit_j1.returnPressed.connect(self.send_position_returnPressed)
self.lineEdit_j2.returnPressed.connect(self.send_position_returnPressed)
self.lineEdit_j3.returnPressed.connect(self.send_position_returnPressed)
self.lineEdit_j4.returnPressed.connect(self.send_position_returnPressed)
self.lineEdit_j5.returnPressed.connect(self.send_position_returnPressed)
self.lineEdit_j6.returnPressed.connect(self.send_position_returnPressed)
self.pushButton_j1_sub.clicked.connect(self.send_subOneAsix_button_click)
self.pushButton_j2_sub.clicked.connect(self.send_subOneAsix_button_click)
self.pushButton_j3_sub.clicked.connect(self.send_subOneAsix_button_click)
self.pushButton_j4_sub.clicked.connect(self.send_subOneAsix_button_click)
self.pushButton_j5_sub.clicked.connect(self.send_subOneAsix_button_click)
self.pushButton_j6_sub.clicked.connect(self.send_subOneAsix_button_click)
self.pushButton_j1_add.clicked.connect(self.send_addOneAsix_button_click)
self.pushButton_j2_add.clicked.connect(self.send_addOneAsix_button_click)
self.pushButton_j3_add.clicked.connect(self.send_addOneAsix_button_click)
self.pushButton_j4_add.clicked.connect(self.send_addOneAsix_button_click)
self.pushButton_j5_add.clicked.connect(self.send_addOneAsix_button_click)
self.pushButton_j6_add.clicked.connect(self.send_addOneAsix_button_click)
self.pushButton_stopFeed.clicked.connect(self.send_stopFeed_button_click)
self.pushButton_pauseFeed.clicked.connect(self.send_pauseFeed_button_click)
self.pushButton_clearAlarm.clicked.connect(self.send_clear_alarm_command)
self.pushButton_speed.setText(str(Constant.speed))
self.pushButton_speed.clicked.connect(self.send_setSpeed_label_doubelClick)
self.lineEdit_speed.returnPressed.connect(self.send_setSpeed_lineEdit_returePressed)
self.lineEdit_num.returnPressed.connect(self.send_custom_num_returnPressed)
int_validator = QIntValidator(0, 100, self.lineEdit_num)
self.lineEdit_num.setValidator(int_validator)
# self.horizontalSlider_J1.sliderReleased
self.pushButton_startFeed.clicked.connect(self.send_startFeed_button_click)
self.pushButton_stack_feedControl.clicked.connect(lambda _, index=0: self.send_stack_feedSet_button_click(index))
self.pushButton_stack_feedSet.clicked.connect(lambda _,index=1:self.send_stack_feedSet_button_click(index))
self.horizontalSlider_feedingNum.blockSignals(True)
self.horizontalSlider_feedingNum.setMinimum(0)
self.lineEdit_speed.hide()
self.pushButton_stopFeed.hide()
self.pushButton_pauseFeed.hide()
def init_Run(self):
self.robotClient = None
self.configReader = configparser.ConfigParser()
self.detection = Detection()
self.command_position_quene = Queue()
self.status_address = DataAddress()
self.feedLine_dict = {}
self.command_quene = Queue()
self.main_threading = None
self.configReader.read(Constant.set_ini)
ip = self.configReader.get('Robot_Feed', 'IPAddress')
port = int(self.configReader.get('Robot_Feed', 'Port'))
self.robotClient = RobotClient(ip, port, self.command_position_quene, self.status_address)
self.feeding = Feeding(self.robotClient,self.detection) # 临时
self.last_time=time.time()
try:
self.robotClient.CreatConnect()
except:
self.set_label_status_style(False)
return Error_Code.SYS_NETERROR
if self.robotClient.connected:
return 0
else:
return Error_Code.SYS_NETERROR
def init_seting_frame(self):
rows = len(self.feedLine_dict.keys())+1
self.tableWidget_feedSeting.setRowCount(rows)
self.tableWidget_feedSeting.setColumnCount(19)
# 设置第一重表头的合并 (三列一组)
self.tableWidget_feedSeting_addtional_col_num = 2
self.tableWidget_feedSeting.setSpan(0, 0+self.tableWidget_feedSeting_addtional_col_num, 1, 6) # 合并前3列
self.tableWidget_feedSeting.setSpan(0, 6+self.tableWidget_feedSeting_addtional_col_num, 1, 6) # 合并后3列
self.tableWidget_feedSeting.setSpan(0, 12+self.tableWidget_feedSeting_addtional_col_num, 1, 6) # 合并后3列
self.tableWidget_feedSeting.itemChanged.connect(self.send_tabelFeedSet_itemChanged)
btn_safe = QPushButton("获取安全位置")
widget_safe = QWidget()
layout_safe = QHBoxLayout()
layout_safe.addWidget(btn_safe)
# 调整布局的间距,使之更紧凑
layout_safe.setContentsMargins(0, 0, 0, 0)
# 将布局设置到 QWidget 容器中
widget_safe.setLayout(layout_safe)
btn_photo = QPushButton("获取拍照位置")
widget_photo = QWidget()
layout_photo = QHBoxLayout()
layout_photo.addWidget(btn_photo)
# 调整布局的间距,使之更紧凑
layout_photo.setContentsMargins(0, 0, 0, 0)
# 将布局设置到 QWidget 容器中
widget_photo.setLayout(layout_photo)
btn_feed = QPushButton("获取投料位置")
widget_feed = QWidget()
layout_feed = QHBoxLayout()
layout_feed.addWidget(btn_feed)
# 调整布局的间距,使之更紧凑
layout_feed.setContentsMargins(0, 0, 0, 0)
# 将布局设置到 QWidget 容器中
widget_feed.setLayout(layout_feed)
btn_safe.clicked.connect(self.send_get_safe_position_button_click)
btn_photo.clicked.connect(self.send_get_photo_position_button_click)
btn_feed.clicked.connect(self.send_get_feed_position_button_click)
# 添加第一重表头项
self.tableWidget_feedSeting.setCellWidget(0, 0+self.tableWidget_feedSeting_addtional_col_num, widget_safe)
self.tableWidget_feedSeting.setCellWidget(0, 6+self.tableWidget_feedSeting_addtional_col_num, widget_photo)
self.tableWidget_feedSeting.setCellWidget(0, 12+self.tableWidget_feedSeting_addtional_col_num, widget_feed) # 设置在合并的第2组
self.tableWidget_feedSeting.setSelectionBehavior(QTableWidget.SelectRows)
self.tableWidget_feedSeting.setAutoScroll(True)
# 添加第二重表头
self.tableWidget_feedSeting.setHorizontalHeaderLabels(['header','线名','X1','Y1','Z1','U1','V1','W1','X2','Y2','Z2','U2','V2','W2','X3','Y3','Z3','U3','V3','W3'])
self.tableWidget_feedSeting.hideColumn(0)
# 填充数据行
for row,(feed_line_key,feed_line)in enumerate(self.feedLine_dict.items()):
self.tableWidget_feedSeting.setItem(row+1, 0, QTableWidgetItem(feed_line_key))
self.tableWidget_feedSeting.setItem(row+1, 1, QTableWidgetItem(feed_line.name))
self.set_position_to_tabel(row+1, 0, feed_line.safe_position)
self.set_position_to_tabel(row+1, 1, feed_line.photo_position)
self.set_position_to_tabel(row+1, 2, feed_line.feed_position)
# 禁用自动表头
self.tableWidget_feedSeting.verticalHeader().setVisible(True)
self.pushButton_tableFeedSet_addRow.clicked.connect(self.send_tabelFeedSet_addRow)
self.pushButton_tableFeedSet_deleRow.clicked.connect(self.send_tabelFeedSet_delRow)
self.pushButton_tableFeedSet_save.clicked.connect(self.send_tabelFeedSet_save)
def init_FeedLine(self):
line_count = self.configReader.get('Robot_Feed', 'LineCount', fallback=0)
self.configReader.read(Constant.feedLine_set_file, encoding='utf-8')
for i in range(int(line_count)):
line_str = f'FeedLine{i+1}'
safe_position = Real_Position()
photo_position = Real_Position()
mid_position = Real_Position()
broken_position = Real_Position()
feed_position = Real_Position()
safe_position.X = float(self.configReader.get(line_str, 'SafePosition_x', fallback=0))
safe_position.Y = float(self.configReader.get(line_str, 'SafePosition_y', fallback=0))
safe_position.Z = float(self.configReader.get(line_str, 'SafePosition_z', fallback=0))
safe_position.U = float(self.configReader.get(line_str, 'SafePosition_u', fallback=0))
safe_position.V = float(self.configReader.get(line_str, 'SafePosition_v', fallback=0))
safe_position.W = float(self.configReader.get(line_str, 'SafePosition_w', fallback=0))
photo_position.X = float(self.configReader.get(line_str, 'PhotoPosition_x', fallback=0))
photo_position.Y = float(self.configReader.get(line_str, 'PhotoPosition_y', fallback=0))
photo_position.Z = float(self.configReader.get(line_str, 'PhotoPosition_z', fallback=0))
photo_position.U = float(self.configReader.get(line_str, 'PhotoPosition_u', fallback=0))
photo_position.V = float(self.configReader.get(line_str, 'PhotoPosition_v', fallback=0))
photo_position.W = float(self.configReader.get(line_str, 'PhotoPosition_w', fallback=0))
mid_position.X = float(self.configReader.get(line_str, 'MidPosition_x', fallback=0))
mid_position.Y = float(self.configReader.get(line_str, 'MidPosition_y', fallback=0))
mid_position.Z = float(self.configReader.get(line_str, 'MidPosition_z', fallback=0))
mid_position.U = float(self.configReader.get(line_str, 'MidPosition_u', fallback=0))
mid_position.V = float(self.configReader.get(line_str, 'MidPosition_v', fallback=0))
mid_position.W = float(self.configReader.get(line_str, 'MidPosition_w', fallback=0))
broken_position.X = float(self.configReader.get(line_str, 'BrokenPosition_x', fallback=0))
broken_position.Y = float(self.configReader.get(line_str, 'BrokenPosition_y', fallback=0))
broken_position.Z = float(self.configReader.get(line_str, 'BrokenPosition_z', fallback=0))
broken_position.U = float(self.configReader.get(line_str, 'BrokenPosition_u', fallback=0))
broken_position.V = float(self.configReader.get(line_str, 'BrokenPosition_v', fallback=0))
broken_position.W = float(self.configReader.get(line_str, 'BrokenPosition_w', fallback=0))
feed_position.X = float(self.configReader.get(line_str, 'FeedPosition_x', fallback=0))
feed_position.Y = float(self.configReader.get(line_str, 'FeedPosition_y', fallback=0))
feed_position.Z = float(self.configReader.get(line_str, 'FeedPosition_z', fallback=0))
feed_position.U = float(self.configReader.get(line_str, 'FeedPosition_u', fallback=0))
feed_position.V = float(self.configReader.get(line_str, 'FeedPosition_v', fallback=0))
feed_position.W = float(self.configReader.get(line_str, 'FeedPosition_w', fallback=0))
name = self.configReader.get(line_str, 'Name', fallback='未命名')
self.feedLine_dict[f'{Constant.feedLine_set_section}{i+1}'] = FeedLine(name ,safe_position, photo_position,mid_position, broken_position,feed_position)
self.init_seting_frame()
self.updateUI_Select_Line()
pass
def init_robot_info(self):
j1_min = int(self.configReader.get('Robot_Feed', 'j1_min'))
j1_max = int(self.configReader.get('Robot_Feed', 'j1_max'))
j2_min = int(self.configReader.get('Robot_Feed', 'j2_min'))
j2_max = int(self.configReader.get('Robot_Feed', 'j2_max'))
j3_min = int(self.configReader.get('Robot_Feed', 'j3_min'))
j3_max = int(self.configReader.get('Robot_Feed', 'j3_max'))
j4_min = int(self.configReader.get('Robot_Feed', 'j4_min'))
j4_max = int(self.configReader.get('Robot_Feed', 'j4_max'))
j5_min = int(self.configReader.get('Robot_Feed', 'j5_min'))
j5_max = int(self.configReader.get('Robot_Feed', 'j5_max'))
j6_min = int(self.configReader.get('Robot_Feed', 'j6_min'))
j6_max = int(self.configReader.get('Robot_Feed', 'j6_max'))
self.horizontalSlider_J1.setMinimum(j1_min)
self.horizontalSlider_J1.setMaximum(j1_max)
self.horizontalSlider_J2.setMinimum(j2_min)
self.horizontalSlider_J2.setMaximum(j2_max)
self.horizontalSlider_J3.setMinimum(j3_min)
self.horizontalSlider_J3.setMaximum(j3_max)
self.horizontalSlider_J4.setMinimum(j4_min)
self.horizontalSlider_J4.setMaximum(j4_max)
self.horizontalSlider_J5.setMinimum(j5_min)
self.horizontalSlider_J5.setMaximum(j5_max)
self.horizontalSlider_J6.setMinimum(j6_min)
self.horizontalSlider_J6.setMaximum(j6_max)
self.label_j1_min.setText(j1_min.__str__())
self.label_j1_max.setText(j1_max.__str__())
self.label_j2_min.setText(j2_min.__str__())
self.label_j2_max.setText(j2_max.__str__())
self.label_j3_min.setText(j3_min.__str__())
self.label_j3_max.setText(j3_max.__str__())
self.label_j4_min.setText(j4_min.__str__())
self.label_j4_max.setText(j4_max.__str__())
self.label_j5_min.setText(j5_min.__str__())
self.label_j5_max.setText(j5_max.__str__())
self.label_j6_min.setText(j6_min.__str__())
self.label_j6_max.setText(str(j6_max))
def start_Runing(self):
self.main_threading = Thread(target=self.run)
self.robot_connect_threading = Thread(target=self.robotClient.run)
self.main_threading.start()
self.robot_connect_threading.start()
pass
def send_startFeed_button_click(self):
num = self.horizontalSlider_feedingNum.maximum()
line_index = str(self.comboBox_lineIndex.currentIndex()+1)
line_head = self.comboBox_lineIndex.currentData()
self.command_quene.put(FeedCommand(FeedingConfig(num, self.feedLine_dict[line_head])))
self.stackedWidget_num.setCurrentIndex(1)
self.set_run_status_button(True)
def send_num_button_click(self):
button = self.sender()
if button.text() != "自定义":
num = int(button.text())
self.horizontalSlider_feedingNum.setMaximum(num)
self.label_maxNum.setText(str(num))
self.horizontalSlider_feedingNum.setValue(0)
else:
self.pushButton_num_free.hide()
self.lineEdit_num.show()
def send_subOneAsix_button_click(self):
btn_str = self.sender().objectName()
Constant.manual_adjust_accuracy = float(self.lineEdit_manual_adjust_accuracy.text())
x1 = self.horizontalSlider_J1.value()
x2 = self.horizontalSlider_J2.value()
x3 = self.horizontalSlider_J3.value()
x4 = self.horizontalSlider_J4.value()
x5 = self.horizontalSlider_J5.value()
x6 = self.horizontalSlider_J6.value()
if 'j1' in btn_str:
x1 = x1 - Constant.manual_adjust_accuracy
elif 'j2' in btn_str:
x2 = x2 - Constant.manual_adjust_accuracy
pass
elif 'j3' in btn_str:
x3 = x3 - Constant.manual_adjust_accuracy
pass
elif 'j4' in btn_str:
x4 = x4 - Constant.manual_adjust_accuracy
pass
elif 'j5' in btn_str:
x5 = x5 - Constant.manual_adjust_accuracy
pass
elif 'j6' in btn_str:
x6 = x6 - Constant.manual_adjust_accuracy
self.send_position_command(x1,x2,x3,x4,x5,x6,MoveType.AXIS)
def send_addOneAsix_button_click(self, btn):
btn_str = self.sender().objectName()
Constant.manual_adjust_accuracy = float(self.lineEdit_manual_adjust_accuracy.text())
x1 = self.horizontalSlider_J1.value()
x2 = self.horizontalSlider_J2.value()
x3 = self.horizontalSlider_J3.value()
x4 = self.horizontalSlider_J4.value()
x5 = self.horizontalSlider_J5.value()
x6 = self.horizontalSlider_J6.value()
if 'j1' in btn_str:
x1 = x1 + Constant.manual_adjust_accuracy
elif 'j2' in btn_str:
x2 = x2 + Constant.manual_adjust_accuracy
pass
elif 'j3' in btn_str:
x3 = x3 + Constant.manual_adjust_accuracy
pass
elif 'j4' in btn_str:
x4 = x4 + Constant.manual_adjust_accuracy
pass
elif 'j5' in btn_str:
x5 = x5 + Constant.manual_adjust_accuracy
pass
elif 'j6' in btn_str:
x6 = x6 + Constant.manual_adjust_accuracy
self.send_position_command(x1, x2, x3, x4, x5, x6, move_type=MoveType.AXIS)
def send_addNum_button_click(self):
self.feeding.feedConfig.num +=1
def send_subNum_button_click(self):
self.feeding.feedConfig.num -=1
def send_custom_num_returnPressed(self):
self.pushButton_num_free.show()
self.lineEdit_num.hide()
self.horizontalSlider_feedingNum.setMaximum(int(self.lineEdit_num.text()))
self.horizontalSlider_feedingNum.setValue(0)
self.label_maxNum.setText(self.lineEdit_num.text())
def send_stack_feedSet_button_click(self,index):
#self.logger.info("This is an info message")
self.stackedWidget_feed.setCurrentIndex(index)
if index == 0:
self.updateUI_Select_Line()
def on_button_click(self):
self.button.setText("Clicked!")
def send_position_button_click(self):
# if True:
# cmd_command = CMDRequest()
# cmd_command.cmdData = ['rewriteData', '800', f'{position_instruction.m0}', 0]
# request_command = vars(cmd_command)
# self.robotClient.add_sendQuene(request_command)
return
def send_position_returnPressed(self):
self.send_position_command(float(self.lineEdit_j1.text()),
float(self.lineEdit_j2.text()),
float(self.lineEdit_j3.text()),
float(self.lineEdit_j4.text()),
float(self.lineEdit_j5.text()),
float(self.lineEdit_j6.text()))
def send_setSpeed_label_doubelClick(self):
self.pushButton_speed.hide()
self.lineEdit_speed.show()
pass
def send_setSpeed_lineEdit_returePressed(self):
Constant.speed = float(self.lineEdit_speed.text())
self.lineEdit_speed.hide()
self.pushButton_speed.setText(str(Constant.speed))
self.pushButton_speed.show()
pass
def send_get_safe_position_button_click(self):
real_position = self.robotClient.status_model.getRealPosition()
row_i = self.tableWidget_feedSeting.currentRow()
head = self.tableWidget_feedSeting.item(row_i, 0).text()
self.feedLine_dict[head].safe_position.init_position(real_position.X,real_position.Y,real_position.Z,real_position.U,real_position.V,real_position.W)
self.set_position_to_tabel(row_i,0,real_position)
def send_get_photo_position_button_click(self):
real_position = self.robotClient.status_model.getRealPosition()
row_i = self.tableWidget_feedSeting.currentRow()
head = self.tableWidget_feedSeting.item(row_i, 0).text()
self.feedLine_dict[head].photo_position.init_position(real_position.X, real_position.Y, real_position.Z, real_position.U, real_position.V, real_position.W)
self.set_position_to_tabel(row_i, 1, real_position)
def send_get_feed_position_button_click(self):
real_position = self.robotClient.status_model.getRealPosition()
row_i = self.tableWidget_feedSeting.currentRow()
head = self.tableWidget_feedSeting.item(row_i, 0).text()
self.feedLine_dict[head].feed_position.init_position(real_position.X, real_position.Y, real_position.Z, real_position.U, real_position.V, real_position.W)
self.set_position_to_tabel(row_i, 2, real_position)
def send_tabelFeedSet_addRow(self):
for i in range(1,20):
head = f'{Constant.feedLine_set_section}{i}'
if head not in self.feedLine_dict:
row_position = self.tableWidget_feedSeting.rowCount() # 当前行数
self.tableWidget_feedSeting.insertRow(row_position)
safe_position = Real_Position()
photo_position = Real_Position()
feed_position = Real_Position()
self.feedLine_dict[head] = FeedLine('新建', safe_position, photo_position, feed_position)
self.tableWidget_feedSeting.setItem(self.tableWidget_feedSeting.rowCount() - 1, 0,
QTableWidgetItem(head))
self.tableWidget_feedSeting.setItem(self.tableWidget_feedSeting.rowCount() - 1, 1,
QTableWidgetItem('新建'))
self.set_position_to_tabel(self.tableWidget_feedSeting.rowCount() - 1, 0, safe_position)
self.set_position_to_tabel(self.tableWidget_feedSeting.rowCount() - 1, 1, photo_position)
self.set_position_to_tabel(self.tableWidget_feedSeting.rowCount() - 1, 2, feed_position)
break
def send_tabelFeedSet_delRow(self):
selectRow = self.tableWidget_feedSeting.currentRow()
if selectRow >= 0:
for feed_line_key, feed_line in self.feedLine_dict.items():
if feed_line_key == self.tableWidget_feedSeting.item(selectRow, 0).text():
self.feedLine_dict.pop(feed_line_key)
self.tableWidget_feedSeting.removeRow(selectRow)
break
else:
QMessageBox.information(self, "提示", "请选择要删除的行")
def send_tabelFeedSet_save(self):
count = len(self.feedLine_dict.keys())
self.configReader.read(Constant.set_ini)
self.configReader.set('Robot_Feed', 'LineCount', str(count))
writeFeedLine_to_ini(self.feedLine_dict, Constant.feedLine_set_file)
pass
def send_stopFeed_button_click(self):
pass
# 清空状态
self.feeding.feedStatus = FeedStatus.FNone
# 清空运行命令
self.send_clear_auto_command()
#
def send_pauseFeed_button_click(self):
if self.pushButton_pauseFeed.text()=='暂停':
self.pushButton_pauseFeed.setText('继续')
self.send_pause_command(True)
else:
self.pushButton_pauseFeed.setText('暂停')
self.send_pause_command(False)
pass
def send_tabelFeedSet_itemChanged(self, item):
row = item.row()
column = item.column()
value = item.text()
head = self.tableWidget_feedSeting.item(row, 0).text()
if column == 1:
self.feedLine_dict[head].name = value
elif column==(0+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].safe_position.X = float(value)
elif column == (1+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].safe_position.Y = float(value)
elif column == (2+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].safe_position.Z = float(value)
elif column == (3+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].safe_position.U = float(value)
elif column == (4+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].safe_position.V = float(value)
elif column == (5+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].safe_position.W = float(value)
# elif column == 7:
elif column==(6+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].photo_position.X = float(value)
elif column == (7+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].photo_position.Y = float(value)
elif column == (8+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].photo_position.Z = float(value)
elif column == (9+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].photo_position.U = float(value)
elif column == (10+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].photo_position.V = float(value)
elif column == (11+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].photo_position.W = float(value)
elif column==(12+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].feed_position.X = float(value)
elif column == (13+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].feed_position.Y = float(value)
elif column == (14+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].feed_position.Z = float(value)
elif column == (15+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].feed_position.U = float(value)
elif column == (16+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].feed_position.V = float(value)
elif column == (17+self.tableWidget_feedSeting_addtional_col_num):
self.feedLine_dict[head].feed_position.W = float(value)
def run(self):
while True:
time.sleep(0.2)
if not self.command_quene.empty():
command = self.command_quene.get()
if isinstance(command, FeedCommand) and command.status == Status.Prepareing:
if self.feeding.feedStatus == FeedStatus.FNone:
self.feeding.feedConfig = command.feed_config
self.feeding.feedStatus = FeedStatus.FStart
command.status = Status.Runing
try:
self.feeding.run()
except:
print(Error_Code.SYS_NONEPoint)
self.updateUI()
#pass #主线程
def updateUI(self):
if self.robotClient.connected:
self.set_label_status_style(True)
else:
self.set_label_status_style(False)
if self.feeding.feedStatus != FeedStatus.FNone:
self.horizontalSlider_feedingNum.setValue(self.horizontalSlider_feedingNum.maximum()-self.feeding.feedConfig.num)
else:
self.set_run_status_button(False)
if self.feeding.feedStatus == FeedStatus.FNone:
self.stackedWidget_num.setCurrentIndex(0)
else:
self.stackedWidget_num.setCurrentIndex(1)
self.label_date.setText(datetime.now().strftime("%Y-%m-%d"))
self.label_time.setText(datetime.now().strftime("%H:%M:%S"))
self.updateUI_Position()
def updateUI_Select_Line(self):
self.comboBox_lineIndex.clear()
for key,value in self.feedLine_dict.items():
self.comboBox_lineIndex.addItem(value.name,key)
def updateUI_Position(self):
self.horizontalSlider_J1.setValue(self.status_address.axis_0)
self.horizontalSlider_J2.setValue(self.status_address.axis_1)
self.horizontalSlider_J3.setValue(self.status_address.axis_2)
self.horizontalSlider_J4.setValue(self.status_address.axis_3)
self.horizontalSlider_J5.setValue(self.status_address.axis_4)
self.horizontalSlider_J6.setValue(self.status_address.axis_5)
self.label_j1.setText(str(self.status_address.axis_0))
self.label_j2.setText(str(self.status_address.axis_1))
self.label_j3.setText(str(self.status_address.axis_2))
self.label_j4.setText(str(self.status_address.axis_3))
self.label_j5.setText(str(self.status_address.axis_4))
self.label_j6.setText(str(self.status_address.axis_5))
def set_position_to_tabel(self,row_i,position_j,real_Position:Real_Position):
self.tableWidget_feedSeting.setItem(row_i, position_j*6+self.tableWidget_feedSeting_addtional_col_num, QTableWidgetItem(str(real_Position.X)))
self.tableWidget_feedSeting.setItem(row_i, position_j*6+1+self.tableWidget_feedSeting_addtional_col_num, QTableWidgetItem(str(real_Position.Y)))
self.tableWidget_feedSeting.setItem(row_i, position_j*6+2+self.tableWidget_feedSeting_addtional_col_num, QTableWidgetItem(str(real_Position.Z)))
self.tableWidget_feedSeting.setItem(row_i, position_j*6+3+self.tableWidget_feedSeting_addtional_col_num, QTableWidgetItem(str(real_Position.U)))
self.tableWidget_feedSeting.setItem(row_i, position_j*6+4+self.tableWidget_feedSeting_addtional_col_num, QTableWidgetItem(str(real_Position.V)))
self.tableWidget_feedSeting.setItem(row_i, position_j*6+5+self.tableWidget_feedSeting_addtional_col_num, QTableWidgetItem(str(real_Position.W)))
def set_label_status_style(self, connected: bool):
if connected:
self.label_connect_status.setStyleSheet("""
QLabel {
background-color: #A2EF4D; /* 设置背景颜色 */
color: #ffffff; /* 设置字体颜色 */
border-radius: 8px; /* 圆角半径设置为 QLabel 的一半,形成圆形 */
border: 1px solid #A2EF4D; /* 设置边框颜色和宽度 */
qproperty-alignment: 'AlignCenter'; /* 设置文本居中 */
}
""")
else:
self.label_connect_status.setStyleSheet("""
QLabel {
background-color: #FD3251; /* 设置背景颜色 */
color: #ffffff; /* 设置字体颜色 */
border-radius: 8px; /* 圆角半径设置为 QLabel 的一半,形成圆形 */
border: 1px solid #FD3251; /* 设置边框颜色和宽度 */
qproperty-alignment: 'AlignCenter'; /* 设置文本居中 */
}
""")
def set_run_status_button(self, isRuning:bool):
self.pushButton_pauseFeed.setText("暂停")
if isRuning:
self.pushButton_pauseFeed.show()
self.pushButton_stopFeed.show()
self.pushButton_startFeed.hide()
else:
self.pushButton_pauseFeed.hide()
self.pushButton_stopFeed.hide()
self.pushButton_startFeed.show()
def send_clear_auto_command(self):
clear_command = CMDInstructRequest()
request_command = clear_command.toString()
print(request_command)
log_str = f'清除自动指令'
self.command_quene.put(request_command)
def send_position_command(self,x1,x2,x3,x4,x5,x6,move_type:MoveType=MoveType.WORLD):
position_instruction = Instruction()
position_instruction.m0 = float(x1)
position_instruction.m1 = float(x2)
position_instruction.m2 = float(x3)
position_instruction.m3 = float(x4)
position_instruction.m4 = float(x5)
position_instruction.m5 = float(x6)
position_instruction.action = move_type.value
instruction_command = CMDInstructRequest()
instruction_command.instructions.append(position_instruction)
request_command = instruction_command.toString()
log_str = f'移动到位置:{"自由路径" if move_type==MoveType.AXIS else "姿势直线"}' \
f'm0:{position_instruction.m0}-' \
f'm2:{position_instruction.m1}-' \
f'm3:{position_instruction.m2}-' \
f'm4:{position_instruction.m3}-' \
f'm5:{position_instruction.m4}-' \
f'm6:{position_instruction.m5}'
self.logger_textEdit.info(log_str)
self.robotClient.add_sendQuene(request_command)
def send_pause_command(self,pause:bool):
pause_command = CMDRequest()
pause_command.cmdData.append("actionPause")
if pause:
pause_command.cmdData.append("1")
else:
pause_command.cmdData.append("0")
request_command = pause_command.toString()
print(request_command)
log_str = f'暂停:{pause}'
self.robotClient.add_sendQuene(request_command)
def send_clear_alarm_command(self,pause:bool):
pause_command = CMDRequest()
pause_command.cmdData.append("clearAlarmContinue")
pause_command.cmdData.append("1")
request_command = pause_command.toString()
print(request_command)
log_str = f'暂停:{pause}'
self.robotClient.add_sendQuene(request_command)
def send_switch_tool_command(self):
switch_command = CMDRequest()
switch_command.cmdData.append("switchTool")
switch_command.cmdData.append("2")
request_command = switch_command.toString()
print(request_command)
log_str = f'切换工具'
self.robotClient.add_sendQuene(request_command)
def send_start_tool_command(self):
switch_command = CMDRequest()
switch_command.cmdData.append("startButton")
switch_command.cmdData.append("1")
request_command = switch_command.toString()
print(request_command)
log_str = f'切换工具'
self.robotClient.add_sendQuene(request_command)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())

View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2024/9/7 11:49
# @Author : hjw
# @File : get_position_test.py
'''
from Vision.camera_coordinate_dete_img import Detection
from Trace.handeye_calibration import *
import cv2
detection = Detection()
while True:
ret, img, xyz, nx_ny_nz = detection.get_position()
if ret==1:
print('xyz点云坐标', xyz)
print('nx_ny_nz法向量', nx_ny_nz)
img = cv2.resize(img,(720, 540))
transformation_matrix = R_matrix(521.534, 0.705, 850.03, 0.0, 90.0, 0.0) # (x, y, z, u, v, w)
target_position, noraml_base = getPosition(xyz[0], xyz[1], xyz[2], nx_ny_nz[0], nx_ny_nz[1], nx_ny_nz[2], transformation_matrix)
print("target_position:", target_position)
print("noraml_base", noraml_base)
cv2.imshow('img', img)
cv2.waitKey(0)

View File

@ -0,0 +1,26 @@
colorthief=0.2.1
darkdetect=0.8.0
numpy=2.0.1
pillow=10.4
pip=24.0
PyQt5=5.15.11
PyQt5-Frameless-Window=0.3.9
PyQt5-Qt5=5.15.2
PyQt5_sip=12.15.0
PySide6=6.7.2
PySide6_Addons=6.7.2
PySide6_Essentials=6.7.2
pywin32=306
scipy=1.13.1
setuptools=69.5.1
shiboken6=6.7.2
wheel=0.43.0
python=3.9.19
open3d=0.18.0
opencv-python=4.7.0
PyRVC=1.10.0
torch=1.13.1
torchvision=0.14.1
ultralytics=8.2.86
openvino=2024.0.0
lapx=0.5.10

View File

@ -0,0 +1,5 @@
<RCC>
<qresource prefix="bg">
<file>Image/robot.png</file>
</qresource>
</RCC>

File diff suppressed because it is too large Load Diff

49
AutoControlSystem/test.py Normal file
View File

@ -0,0 +1,49 @@
from PySide6.QtWidgets import QApplication, QWidget, QVBoxLayout, QSlider, QLabel, QPushButton
from PySide6.QtCore import Qt #211
class SliderExample(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
# 创建一个垂直布局
layout = QVBoxLayout()
# 创建一个标签来显示滑块的值
self.label = QLabel("当前值: 0", self)
layout.addWidget(self.label)
# 创建一个滑块
self.slider = QSlider(Qt.Horizontal, self)
self.slider.setMinimum(0)
self.slider.setMaximum(100)
self.slider.setValue(0) # 初始值设为0
self.slider.setTickPosition(QSlider.TicksBelow)
self.slider.setTickInterval(10)
layout.addWidget(self.slider)
# 创建一个按钮来获取滑块的当前值
self.button = QPushButton("获取滑块值", self)
self.button.clicked.connect(self.show_slider_value)
layout.addWidget(self.button)
# 设置窗口布局
self.setLayout(layout)
self.setWindowTitle('QSlider 示例')
def show_slider_value(self):
# 直接获取滑块的当前值并更新标签
value = self.slider.value()
print(f"滑块的值是: {value}") # 添加打印以调试
self.label.setText(f"当前值: {value}")
if __name__ == "__main__":
app = QApplication([])
window = SliderExample()
window.show()
app.exec()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
import Constant
import time
from Model.Position import Real_Position, Detection_Position
from enum import Enum
from COM.COM_Robot import RobotClient
@ -102,6 +103,7 @@ class Feeding():
elif self.feedStatus == FeedStatus.FTake:
if self.feedConfig.feedLine.take_position != None and self.feedConfig.feedLine.take_position.compare(real_position):
self.feedStatus = FeedStatus.FSafeF
time.sleep(2)
self.sendTargPosition(self.feedConfig.feedLine.safe_position)
pass #打开吸嘴并返回

View File

@ -12,12 +12,12 @@ photoposition_z = 737.267822
photoposition_u = -2.196312
photoposition_v = -4.094595
photoposition_w = -5.294128
feedposition_x = 534.907898
feedposition_y = 5.525342
feedposition_z = 737.267822
feedposition_u = -2.196312
feedposition_v = -4.094595
feedposition_w = -5.294128
feedposition_x = 392.634
feedposition_y = 431.593
feedposition_z = 238.453
feedposition_u = -3.386
feedposition_v = 8.849
feedposition_w = 41.829
[FeedLine2]
name = 反应釜2