Merge branch 'feature' into feature_cpp_test
This commit is contained in:
@@ -1,47 +0,0 @@
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.actions import DeclareLaunchArgument, OpaqueFunction
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
|
||||
import os
|
||||
import ast
|
||||
import json
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('output_boxes', default_value='True'),
|
||||
DeclareLaunchArgument('output_masks', default_value='False'),
|
||||
DeclareLaunchArgument('color_image_topic', default_value='/camera/color/image_raw'),
|
||||
DeclareLaunchArgument('depth_image_topic', default_value='/camera/depth/image_raw'),
|
||||
DeclareLaunchArgument('camera_info_topic', default_value='/camera/color/camera_info'),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
color_image_topic = LaunchConfiguration('color_image_topic').perform(context)
|
||||
depth_image_topic = LaunchConfiguration('depth_image_topic').perform(context)
|
||||
camera_info_topic = LaunchConfiguration('camera_info_topic').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
package='vision_detect',
|
||||
executable='crossboard_detect_node',
|
||||
parameters=[{
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'color_image_topic': color_image_topic,
|
||||
'depth_image_topic': depth_image_topic,
|
||||
'camera_info_topic': camera_info_topic,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
return LaunchDescription(args_detect + [
|
||||
OpaqueFunction(function=create_detect_node),
|
||||
])
|
||||
@@ -1,49 +0,0 @@
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.actions import DeclareLaunchArgument, OpaqueFunction
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
|
||||
import os
|
||||
import ast
|
||||
import json
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/detect_service.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('checkpoint_name', default_value=configs['checkpoint_name']),
|
||||
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
|
||||
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
|
||||
DeclareLaunchArgument('set_confidence', default_value=configs['set_confidence']),
|
||||
DeclareLaunchArgument('classes', default_value=configs['classes']),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
checkpoint = LaunchConfiguration('checkpoint_name').perform(context)
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
conf = LaunchConfiguration('set_confidence').perform(context)
|
||||
classes = LaunchConfiguration('classes').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
package='vision_detect',
|
||||
executable='detect_service_node',
|
||||
parameters=[{
|
||||
'checkpoint_name': checkpoint,
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'set_confidence': float(conf),
|
||||
'classes': classes,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
return LaunchDescription(args_detect + [
|
||||
OpaqueFunction(function=create_detect_node),
|
||||
])
|
||||
@@ -31,10 +31,6 @@ setup(
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
# 'detect_service_node = vision_detect.detect_service:main',
|
||||
# 'detect_topic_node = vision_detect.detect_topic:main',
|
||||
|
||||
# 'box_detect_service_node = vision_detect.detect_box_service:main',
|
||||
# 'red_detect_topic_node = vision_detect.detect_red_topic:main',
|
||||
# 'red_detect_service_node = vision_detect.detect_red_service:main',
|
||||
|
||||
@@ -42,11 +38,8 @@ setup(
|
||||
'flexiv_detect_service_node = vision_detect.flexivaidk_detect_service:main',
|
||||
|
||||
'sub_pose_node = vision_detect.sub_pose:main',
|
||||
# 'calibration_node = vision_detect.hand_eye_calibration:main',
|
||||
# 'crossboard_detect_node = vision_detect.crossboard_detect:main',
|
||||
'service_client_node = vision_detect.service_client:main',
|
||||
# 'get_camera_pose_node = vision_detect.get_camera_pose:main',
|
||||
# 'calculate_node = vision_detect.calculate:main',
|
||||
'get_camera_pose_node = vision_detect.get_camera_pose:main',
|
||||
|
||||
'detect_node = vision_detect.detect_node:main',
|
||||
],
|
||||
|
||||
@@ -7,8 +7,6 @@ from rclpy.node import Node
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
|
||||
from ..utils import read_calibration_mat
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
@@ -82,7 +80,7 @@ class ConfigBase(Node):
|
||||
self.detect_mode = self.configs['detect_mode']
|
||||
self.calculate_mode = self.configs['calculate_mode']
|
||||
|
||||
self._read_calibration_mat()
|
||||
# self._read_calibration_mat()
|
||||
|
||||
if self.get_camera_mode == "Service":
|
||||
self.service_name = self.configs["Service_configs"]["service_name"]
|
||||
@@ -133,22 +131,22 @@ class ConfigBase(Node):
|
||||
|
||||
self.get_logger().info("Get parameters done")
|
||||
|
||||
def _read_calibration_mat(self):
|
||||
eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
|
||||
eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
|
||||
eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
|
||||
# def _read_calibration_mat(self):
|
||||
# eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
|
||||
# eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
|
||||
# eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
|
||||
|
||||
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"left_mat: {info}")
|
||||
# self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
|
||||
# if not sign:
|
||||
# self.get_logger().warning(f"left_mat: {info}")
|
||||
|
||||
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"right_mat: {info}")
|
||||
# self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
|
||||
# if not sign:
|
||||
# self.get_logger().warning(f"right_mat: {info}")
|
||||
|
||||
self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"head_mat: {info}")
|
||||
# self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
|
||||
# if not sign:
|
||||
# self.get_logger().warning(f"head_mat: {info}")
|
||||
|
||||
self.get_logger().info("Read calibration mat file done")
|
||||
# self.get_logger().info("Read calibration mat file done")
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import torch
|
||||
@@ -12,7 +13,7 @@ from sensor_msgs.msg import Image, CameraInfo
|
||||
from interfaces.msg import PoseArrayClassAndID, ImgMsg
|
||||
from interfaces.srv import VisionObjectRecognition
|
||||
|
||||
from ..utils import calculate_pose_pca, calculate_pose_icp
|
||||
from ..utils import calculate_pose_pca, calculate_pose_icp, read_calibration_mat
|
||||
from .config_node import ConfigBase
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
@@ -21,6 +22,7 @@ class InitBase(ConfigBase):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.future = Future()
|
||||
self._read_calibration_mat()
|
||||
|
||||
if self.get_camera_mode == "Service":
|
||||
self._init_service()
|
||||
@@ -145,6 +147,28 @@ class InitBase(ConfigBase):
|
||||
|
||||
self.get_logger().info("Initialize subscriber done")
|
||||
|
||||
def _read_calibration_mat(self):
|
||||
eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
|
||||
eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
|
||||
eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
|
||||
|
||||
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
|
||||
self.get_logger().info(f"left_hand_mat: {self.eye_in_left_hand_mat}")
|
||||
if not sign:
|
||||
self.get_logger().warning(f"left_mat: {info}")
|
||||
|
||||
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
|
||||
self.get_logger().info(f"right_hand_mat: {self.eye_in_right_hand_mat}")
|
||||
if not sign:
|
||||
self.get_logger().warning(f"right_mat: {info}")
|
||||
|
||||
self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
|
||||
self.get_logger().info(f"head_mat: {self.eye_to_hand_mat}")
|
||||
if not sign:
|
||||
self.get_logger().warning(f"head_mat: {info}")
|
||||
|
||||
self.get_logger().info("Read calibration mat file done")
|
||||
|
||||
def _camera_info_callback(self, msg: CameraInfo):
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,260 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# coding: utf-8
|
||||
import transforms3d as tfs
|
||||
import numpy as np
|
||||
import math
|
||||
import json
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
|
||||
def get_matrix_quat(x, y, z, rw, rx, ry, rz):
|
||||
"""从单位四元数构建齐次变换矩阵"""
|
||||
'''构造旋转矩阵'''
|
||||
q = [rw, rx, ry, rz]
|
||||
rmat = tfs.quaternions.quat2mat(q)
|
||||
|
||||
"""构造齐次变换矩阵"""
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray((x, y, z))),
|
||||
rmat,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat
|
||||
|
||||
|
||||
def get_matrix_eular_radu(x, y, z, rx, ry, rz):
|
||||
"""从欧拉角构建齐次变换矩阵"""
|
||||
'''构造旋转矩阵'''
|
||||
rmat = tfs.euler.euler2mat(
|
||||
# math.radians(rx), math.radians(ry), math.radians(rz)
|
||||
rx, ry, rz
|
||||
)
|
||||
|
||||
"""构造齐次变换矩阵"""
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray((x, y, z))),
|
||||
rmat,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat
|
||||
|
||||
|
||||
def get_matrix_rotvector(x, y, z, rx, ry, rz):
|
||||
"""从旋转向量构建齐次变换矩阵"""
|
||||
'''构造旋转矩阵'''
|
||||
rvec = np.array([rx, ry, rz])
|
||||
theta = np.linalg.norm(rvec)
|
||||
if theta < 1e-8:
|
||||
rmat = np.eye(3) # 小角度直接返回单位矩阵
|
||||
else:
|
||||
axis = rvec / theta
|
||||
rmat = tfs.axangles.axangle2mat(axis, theta)
|
||||
|
||||
"""构造齐次变换矩阵"""
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray((x, y, z))),
|
||||
rmat,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat
|
||||
|
||||
|
||||
def skew(v):
|
||||
return np.array([[0, -v[2], v[1]],
|
||||
[v[2], 0, -v[0]],
|
||||
[-v[1], v[0], 0]])
|
||||
|
||||
|
||||
def R2P(T):
|
||||
"""旋转矩阵 --> 修正罗德里格斯向量"""
|
||||
axis, angle= tfs.axangles.mat2axangle(T[0:3, 0:3])
|
||||
P = 2 * math.sin(angle / 2) * axis
|
||||
return P
|
||||
|
||||
|
||||
def P2R(P):
|
||||
"""修正罗德里格斯向量 --> 旋转矩阵"""
|
||||
P2 = np.dot(P.T, P)
|
||||
part_1 = (1 - P2 / 2) * np.eye(3)
|
||||
part_2 = np.add(np.dot(P, P.T), np.sqrt(4- P2) * skew(P.flatten().T))
|
||||
R = np.add(part_1, np.divide(part_2, 2))
|
||||
return R
|
||||
|
||||
|
||||
def calculate(Hgs, Hcs):
|
||||
"""计算标定矩阵"""
|
||||
# H代表矩阵、h代表标量
|
||||
Hgijs = []
|
||||
Hcijs = []
|
||||
A = []
|
||||
B = []
|
||||
size = 0
|
||||
for i in range(len(Hgs)):
|
||||
for j in range(i + 1, len(Hgs)):
|
||||
size += 1
|
||||
Hgij = np.dot(np.linalg.inv(Hgs[j]), Hgs[i])
|
||||
Hgijs.append(Hgij)
|
||||
Pgij = np.dot(2, R2P(Hgij))
|
||||
|
||||
Hcij = np.dot(Hcs[j], np.linalg.inv(Hcs[i]))
|
||||
Hcijs.append(Hcij)
|
||||
Pcij = np.dot(2, R2P(Hcij))
|
||||
|
||||
A.append(skew(np.add(Pgij, Pcij)))
|
||||
B.append(np.subtract(Pcij, Pgij).reshape(3, 1))
|
||||
|
||||
MA = np.vstack(A)
|
||||
MB = np.vstack(B)
|
||||
Pcg_ = np.dot(np.linalg.pinv(MA), MB)
|
||||
pcg = np.sqrt(np.add(1, np.dot(Pcg_.T, Pcg_)))
|
||||
Pcg = np.dot(np.dot(2, Pcg_), np.linalg.inv(pcg))
|
||||
Rcg = P2R(Pcg).reshape(3, 3)
|
||||
|
||||
A = []
|
||||
B = []
|
||||
id = 0
|
||||
for i in range(len(Hgs)):
|
||||
for j in range(i + 1, len(Hgs)):
|
||||
Hgij = Hgijs[id]
|
||||
Hcij = Hcijs[id]
|
||||
A.append(np.subtract(Hgij[0:3, 0:3], np.eye(3, 3)))
|
||||
B.append(np.subtract(np.dot(Rcg, Hcij[0:3, 3:4]), Hgij[0:3, 3:4]))
|
||||
id += 1
|
||||
|
||||
MA = np.asarray(A).reshape(size * 3, 3)
|
||||
MB = np.asarray(B).reshape(size * 3, 1)
|
||||
Tcg = np.dot(np.linalg.pinv(MA), MB).reshape(3, )
|
||||
|
||||
return tfs.affines.compose(Tcg, np.squeeze(Rcg), [1, 1, 1])
|
||||
|
||||
|
||||
class Calibration(Node):
|
||||
def __init__(self, name):
|
||||
super(Calibration, self).__init__(name)
|
||||
self.sync_subscriber = None
|
||||
self.sub_camera_pose = None
|
||||
self.sub_hand_pose = None
|
||||
self.Hgs, self.Hcs = [], []
|
||||
self.hand, self.camera = [], []
|
||||
self.calibration_matrix = None
|
||||
|
||||
self.declare_parameter('matrix_name', 'eye_to_hand')
|
||||
self.matrix_name = self.get_parameter('matrix_name').value
|
||||
|
||||
self.declare_parameter('mode', 'eye_to_hand')
|
||||
self.mode = self.get_parameter('mode').value.lower()
|
||||
|
||||
if self.mode not in ['eye_in_hand', 'eye_to_hand']:
|
||||
raise ValueError("mode must be 'eye_in_hand' or 'eye_to_hand'")
|
||||
|
||||
self.declare_parameter('input', 'quat')
|
||||
self.input = self.get_parameter('input').value.lower()
|
||||
if self.input == 'eular':
|
||||
self.function = get_matrix_eular_radu
|
||||
elif self.input == 'rotvertor':
|
||||
self.function = get_matrix_rotvector
|
||||
elif self.input == 'quat':
|
||||
self.function = get_matrix_quat
|
||||
else:
|
||||
raise ValueError("INPUT ERROR")
|
||||
|
||||
self.declare_parameter('camera_pose_path', 'camera_pose_data.json')
|
||||
self.declare_parameter('hand_pose_path', 'hand_pose_data.json')
|
||||
self.camera_pose_path = self.get_parameter('camera_pose_path').value
|
||||
self.hand_pose_path = self.get_parameter('hand_pose_path').value
|
||||
|
||||
self.get_pose()
|
||||
|
||||
self.done = False
|
||||
|
||||
def get_pose(self):
|
||||
with open(f'{self.camera_pose_path}', 'r', encoding='utf-8') as f:
|
||||
self.camera = json.load(f)
|
||||
|
||||
with open(f'{self.hand_pose_path}', 'r', encoding='utf-8') as f:
|
||||
self.hand = json.load(f)
|
||||
|
||||
self.calculate_calibration()
|
||||
|
||||
print(self.hand)
|
||||
print(self.camera)
|
||||
|
||||
self.get_logger().info(f"{self.calibration_matrix}")
|
||||
hand_eye_result = {
|
||||
"T": self.calibration_matrix.tolist()
|
||||
}
|
||||
with open(f"{self.matrix_name}_matrix.json", "w") as f:
|
||||
json.dump(hand_eye_result, f, indent=4)
|
||||
self.get_logger().info(f"Save as {self.matrix_name}_matrix.json")
|
||||
|
||||
self.done = True
|
||||
|
||||
def calculate_data(self):
|
||||
if self.input == 'quat':
|
||||
for i in range(0, len(self.hand), 7):
|
||||
self.Hgs.append(
|
||||
np.linalg.inv(
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5], self.hand[i + 6]
|
||||
)
|
||||
)
|
||||
if self.mode == 'eye_to_hand' else
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5], self.hand[i + 6]
|
||||
)
|
||||
)
|
||||
self.Hcs.append(
|
||||
self.function(
|
||||
self.camera[i], self.camera[i + 1], self.camera[i + 2],
|
||||
self.camera[i + 3], self.camera[i + 4], self.camera[i + 5], self.camera[i + 6]
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
for i in range(0, len(self.hand), 6):
|
||||
self.Hgs.append(
|
||||
np.linalg.inv(
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5]
|
||||
)
|
||||
)
|
||||
if self.mode == 'eye_to_hand' else
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5]
|
||||
)
|
||||
)
|
||||
self.Hcs.append(
|
||||
self.function(
|
||||
self.camera[i], self.camera[i + 1], self.camera[i + 2],
|
||||
self.camera[i + 3], self.camera[i + 4], self.camera[i + 5]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def calculate_calibration(self):
|
||||
self.calculate_data()
|
||||
self.calibration_matrix = calculate(self.Hgs, self.Hcs)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = Calibration('calibration')
|
||||
try:
|
||||
while rclpy.ok() and not node.done:
|
||||
rclpy.spin_once(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -1,323 +0,0 @@
|
||||
import os
|
||||
from collections import defaultdict
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import cv2
|
||||
import open3d as o3d
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from message_filters import ApproximateTimeSynchronizer, Subscriber
|
||||
|
||||
from sensor_msgs.msg import Image, CameraInfo
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion
|
||||
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
|
||||
def get_map(K, D, camera_size):
|
||||
h, w = camera_size[::-1]
|
||||
K = np.array(K).reshape(3, 3)
|
||||
D = np.array(D)
|
||||
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
|
||||
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
|
||||
|
||||
return map1, map2, new_K.flatten()
|
||||
|
||||
|
||||
def pca(data, sort=True):
|
||||
center = np.mean(data, axis=0)
|
||||
centered_points = data - center # 去中心化
|
||||
|
||||
try:
|
||||
cov_matrix = np.cov(centered_points.T) # 转置
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
||||
except np.linalg.LinAlgError:
|
||||
return None, None
|
||||
|
||||
if sort:
|
||||
sort = eigenvalues.argsort()[::-1] # 降序排列
|
||||
eigenvalues = eigenvalues[sort] # 索引
|
||||
eigenvectors = eigenvectors[:, sort]
|
||||
|
||||
return eigenvalues, eigenvectors
|
||||
|
||||
def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics):
|
||||
"""计算位态"""
|
||||
depth_img_mask = np.zeros_like(depth_img)
|
||||
depth_img_mask[mask > 0] = depth_img[mask > 0]
|
||||
|
||||
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
||||
|
||||
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=1000.0,
|
||||
depth_trunc=2,
|
||||
)
|
||||
|
||||
point_cloud = point_cloud.remove_non_finite_points()
|
||||
|
||||
down_pcd = point_cloud.voxel_down_sample(voxel_size=0.02)
|
||||
clean_pcd, _ = down_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=3.0)
|
||||
|
||||
if len(clean_pcd.points) == 0:
|
||||
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
||||
center = clean_pcd.get_center()
|
||||
x, y, z = center
|
||||
|
||||
w, v = pca(np.asarray(clean_pcd.points))
|
||||
|
||||
if w is not None and v is not None:
|
||||
vx, vy, vz = v[:,0], v[:,1], v[:,2]
|
||||
|
||||
if vx[0] < 0:
|
||||
vx = -vx
|
||||
if vy[1] < 0:
|
||||
vy = -vy
|
||||
if not np.allclose(np.cross(vx, vy), vz):
|
||||
vz = -vz
|
||||
|
||||
R = np.column_stack((vx, vy, vz))
|
||||
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
|
||||
|
||||
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
||||
|
||||
# point = [
|
||||
# [x, y, z], [x, y, z] + vx, [x, y, z] + vy, [x, y, z] + vz,
|
||||
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
# ] # 画点:原点、第一主成分、第二主成分
|
||||
# lines = [
|
||||
# [0, 1], [0, 2], [0, 3], [4, 5], [4, 6], [4, 7]
|
||||
# ] # 画出三点之间两两连线
|
||||
# colors = [
|
||||
# [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
# ]
|
||||
# # 构造open3d中的LineSet对象,用于主成分显示
|
||||
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
|
||||
# line_set.colors = o3d.utility.Vector3dVector(colors)
|
||||
# o3d.visualization.draw_geometries([point_cloud, line_set])
|
||||
# o3d.visualization.draw_geometries([clean_pcd, line_set])
|
||||
|
||||
return x, y, z, rw, rx, ry, rz
|
||||
|
||||
else:
|
||||
return 0.0, 0.0, 0.0, None, None, None, None
|
||||
|
||||
|
||||
def distortion_correction(color_image, depth_image, map1, map2):
|
||||
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
|
||||
undistorted_color = undistorted_color.astype(color_image.dtype)
|
||||
|
||||
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
|
||||
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
|
||||
|
||||
return undistorted_color, undistorted_depth
|
||||
|
||||
|
||||
def crop_mask_bbox(depth_img, mask):
|
||||
"""
|
||||
输入:
|
||||
depth_img: H x W
|
||||
mask: H x W (0/1 或 bool)
|
||||
输出:
|
||||
depth_crop, mask_crop
|
||||
"""
|
||||
high, width = depth_img.shape
|
||||
ys, xs = np.where(mask > 0)
|
||||
|
||||
x_min, x_max = int(round(xs.min())), int(round(xs.max()))
|
||||
y_min, y_max = int(round(ys.min())), int(round(ys.max()))
|
||||
|
||||
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
|
||||
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
|
||||
|
||||
|
||||
class DetectNode(Node):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
self.K = self.D = None
|
||||
self.map1 = self.map2 = None
|
||||
self.intrinsics = None
|
||||
|
||||
self.function = self._test_image
|
||||
self.calculate_function = calculate_pose_cpu
|
||||
self.cv_bridge = CvBridge()
|
||||
|
||||
'''init'''
|
||||
self._init_param()
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
|
||||
def _init_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter('output_boxes', True)
|
||||
self.output_boxes = self.get_parameter('output_boxes').value
|
||||
|
||||
self.declare_parameter('output_masks', False)
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('color_image_topic', '/camera/color/image_raw')
|
||||
self.color_image_topic = self.get_parameter('color_image_topic').value
|
||||
|
||||
self.declare_parameter('depth_image_topic', '/camera/depth/image_raw')
|
||||
self.depth_image_topic = self.get_parameter('depth_image_topic').value
|
||||
|
||||
self.declare_parameter('camera_info_topic', '/camera/color/camera_info')
|
||||
self.camera_info_topic = self.get_parameter('camera_info_topic').value
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init_publisher"""
|
||||
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose', 10)
|
||||
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init_subscriber"""
|
||||
self.sub_camera_info = self.create_subscription(
|
||||
CameraInfo,
|
||||
self.camera_info_topic,
|
||||
self._camera_info_callback,
|
||||
10
|
||||
)
|
||||
|
||||
'''sync get color and depth img'''
|
||||
self.sub_color_image = Subscriber(self, Image, self.color_image_topic)
|
||||
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic)
|
||||
|
||||
self.sync_subscriber = ApproximateTimeSynchronizer(
|
||||
[self.sub_color_image, self.sub_depth_image],
|
||||
queue_size=10,
|
||||
slop=0.1
|
||||
)
|
||||
self.sync_subscriber.registerCallback(self._sync_callback)
|
||||
|
||||
def _camera_info_callback(self, msg: CameraInfo):
|
||||
"""Get camera info"""
|
||||
self.K = msg.k
|
||||
self.D = msg.d
|
||||
|
||||
self.camera_size = [msg.width, msg.height]
|
||||
|
||||
if self.K is not None and self.D is not None:
|
||||
self.map1, self.map2, self.K = get_map(msg.k, msg.d, self.camera_size)
|
||||
|
||||
if len(self.D) != 0:
|
||||
self.destroy_subscription(self.sub_camera_info)
|
||||
else:
|
||||
self.D = [0, 0, 0, 0, 0]
|
||||
self.destroy_subscription(self.sub_camera_info)
|
||||
else:
|
||||
raise "K and D are not defined"
|
||||
|
||||
|
||||
def _sync_callback(self, color_img_ros, depth_img_ros):
|
||||
"""同步回调函数"""
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
|
||||
|
||||
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, self.map1, self.map2)
|
||||
|
||||
img, pose_dict = self.function(color_img_cv, depth_img_cv)
|
||||
|
||||
"""masks为空,结束这一帧"""
|
||||
if img is None:
|
||||
img = self.cv_bridge.cv2_to_imgmsg(color_img_cv, "bgr8")
|
||||
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image.publish(img)
|
||||
|
||||
if pose_dict:
|
||||
pose_list_all = PoseArrayClassAndID()
|
||||
for (class_id, class_name), pose_list in pose_dict.items():
|
||||
pose_list_all.objects.append(
|
||||
PoseClassAndID(
|
||||
class_name = class_name,
|
||||
class_id = class_id,
|
||||
pose_list = pose_list
|
||||
)
|
||||
)
|
||||
pose_list_all.header.stamp = self.get_clock().now().to_msg()
|
||||
pose_list_all.header.frame_id = "pose_list"
|
||||
self.pub_pose_list.publish(pose_list_all)
|
||||
|
||||
def _test_image(self, rgb_img, depth_img):
|
||||
pose_dict = defaultdict(list)
|
||||
rgb_img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
|
||||
pattern_size = (8, 5)
|
||||
# pattern_size = (15, 7)
|
||||
ret, corners = cv2.findChessboardCorners(rgb_img_gray, pattern_size, cv2.CALIB_CB_FAST_CHECK)
|
||||
if ret:
|
||||
# 角点亚像素精确化(提高标定精度)
|
||||
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
|
||||
corners_subpix = cv2.cornerSubPix(rgb_img_gray, corners, (11, 11), (-1, -1), criteria)
|
||||
|
||||
corners_subpix = corners_subpix.reshape(pattern_size[1], pattern_size[0], 2)
|
||||
mask = np.zeros(rgb_img_gray.shape, dtype=np.uint8)
|
||||
|
||||
for i in range(0, pattern_size[1] - 1):
|
||||
for j in range(0, pattern_size[0] - 1):
|
||||
pts = np.array([
|
||||
corners_subpix[i, j],
|
||||
corners_subpix[i, j + 1],
|
||||
corners_subpix[i + 1, j + 1],
|
||||
corners_subpix[i + 1, j]
|
||||
], dtype=np.int32)
|
||||
cv2.fillConvexPoly(mask, pts, 1)
|
||||
|
||||
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
orig_shape = rgb_img_gray.shape
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(depth_img, mask)
|
||||
|
||||
if depth_crop is None:
|
||||
return None, None
|
||||
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(self.camera_size[0]),
|
||||
int(self.camera_size[1]),
|
||||
self.K[0],
|
||||
self.K[4],
|
||||
self.K[2] - x_min,
|
||||
self.K[5] - y_min
|
||||
)
|
||||
|
||||
x, y, z, rw, rx, ry, rz = self.calculate_function(mask_crop, depth_crop, intrinsics)
|
||||
self.get_logger().info(f"{x}, {y}, {z}, {rw}, {rx}, {ry}, {rz}")
|
||||
|
||||
if (x, y, z) != (0.0, 0.0, 0.0):
|
||||
pose = Pose()
|
||||
pose.position = Point(x=x, y=y, z=z)
|
||||
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
|
||||
pose_dict[int(99), 'crossboard'].append(pose)
|
||||
|
||||
cv2.putText(rgb_img, f'cs: x: {x:.3f}, y: {y:.3f}, z: {z:.3f}', (0, 0 + 30),cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
cv2.putText(rgb_img, f'quat: rw: {rw:.3f}, rx: {rx:.3f}, ry: {ry:.3f}, rz: {rz:.3f}', (0, 0 + 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
else:
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), None
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DetectNode('detect')
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,575 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import ast
|
||||
from collections import defaultdict
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import cv2
|
||||
import open3d as o3d
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
import torch
|
||||
from ultralytics import YOLO
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
from sensor_msgs.msg import Image
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion
|
||||
from interfaces.msg import PoseClassAndID, ImgMsg
|
||||
from interfaces.srv import VisionObjectRecognition
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
|
||||
def draw_pointcloud(pcd, T):
|
||||
|
||||
R = T[0:3, 0:3]
|
||||
point = T[0:3, 3:4].flatten()
|
||||
x, y, z = R[:, 0], R[:, 1], R[:, 2]
|
||||
|
||||
points = [
|
||||
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1],
|
||||
point, point + x, point + y, point + z
|
||||
|
||||
] # 画点:原点、第一主成分、第二主成分
|
||||
lines = [
|
||||
[0, 1], [0, 2], [0, 3],
|
||||
[4, 5], [4, 6], [4, 7]
|
||||
] # 画出三点之间两两连线
|
||||
colors = [
|
||||
[1, 0, 0], [0, 1, 0], [0, 0, 1],
|
||||
[1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
]
|
||||
|
||||
line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(points), lines=o3d.utility.Vector2iVector(lines))
|
||||
line_set.colors = o3d.utility.Vector3dVector(colors)
|
||||
|
||||
pcd.append(line_set)
|
||||
o3d.visualization.draw_geometries(pcd)
|
||||
|
||||
|
||||
def preprocess_point_cloud(pcd, voxel_size):
|
||||
|
||||
pcd_down = pcd.voxel_down_sample(voxel_size)
|
||||
radius_normal = voxel_size * 2
|
||||
|
||||
pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))
|
||||
|
||||
radius_feature = voxel_size * 5
|
||||
|
||||
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
|
||||
pcd_down,
|
||||
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100)
|
||||
)
|
||||
|
||||
return pcd_down, pcd_fpfh
|
||||
|
||||
def prepare_dataset(source, target, voxel_size):
|
||||
|
||||
trans_init = np.identity(4)
|
||||
source.transform(trans_init)
|
||||
|
||||
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
|
||||
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
|
||||
|
||||
return source_down, target_down, source_fpfh, target_fpfh
|
||||
|
||||
|
||||
def execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
|
||||
|
||||
distance_threshold = voxel_size * 1.5
|
||||
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
|
||||
source_down,
|
||||
target_down,
|
||||
source_fpfh,
|
||||
target_fpfh,
|
||||
True,
|
||||
distance_threshold,
|
||||
o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
|
||||
3,
|
||||
[
|
||||
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
|
||||
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
|
||||
],
|
||||
o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))
|
||||
|
||||
return result.transformation
|
||||
|
||||
|
||||
def object_icp(
|
||||
target: o3d.geometry.PointCloud,
|
||||
source: o3d.geometry.PointCloud | str,
|
||||
ransac_voxel_size: float = 0.005,
|
||||
icp_voxel_radius: list[float] | None = None,
|
||||
icp_max_iter: list[int] | None = None,
|
||||
):
|
||||
|
||||
if icp_voxel_radius is None:
|
||||
icp_voxel_radius = [0.004, 0.002, 0.001]
|
||||
|
||||
if icp_max_iter is None:
|
||||
icp_max_iter = [50, 30, 14]
|
||||
|
||||
if isinstance(source, str):
|
||||
source = o3d.io.read_point_cloud(source)
|
||||
elif isinstance(source, o3d.geometry.PointCloud):
|
||||
pass
|
||||
else:
|
||||
raise TypeError(f"Unsupported Type {type(source)}")
|
||||
|
||||
voxel_size = 0.005 # means 5mm for this dataset
|
||||
source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(source, target, voxel_size)
|
||||
|
||||
T = execute_global_registration(
|
||||
source_down,
|
||||
target_down,
|
||||
source_fpfh,
|
||||
target_fpfh,
|
||||
ransac_voxel_size
|
||||
)
|
||||
|
||||
for scale in range(3):
|
||||
iter = icp_max_iter[scale]
|
||||
radius = icp_voxel_radius[scale]
|
||||
# print([iter, radius, scale])
|
||||
|
||||
source_down = source.voxel_down_sample(radius)
|
||||
target_down = target.voxel_down_sample(radius)
|
||||
|
||||
source_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
|
||||
target_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
|
||||
|
||||
result_icp = o3d.pipelines.registration.registration_icp(
|
||||
source_down,
|
||||
target_down,
|
||||
radius * 5,
|
||||
T,
|
||||
o3d.pipelines.registration.TransformationEstimationPointToPoint(),
|
||||
o3d.pipelines.registration.ICPConvergenceCriteria(
|
||||
relative_fitness=1e-6,
|
||||
relative_rmse=1e-6,
|
||||
max_iteration=iter
|
||||
)
|
||||
)
|
||||
|
||||
T = result_icp.transformation
|
||||
|
||||
draw_pointcloud([source.transform(T), target], T)
|
||||
|
||||
return T
|
||||
|
||||
|
||||
def get_map(K, D, camera_size):
|
||||
h, w = camera_size[::-1]
|
||||
K = np.array(K).reshape(3, 3)
|
||||
D = np.array(D)
|
||||
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
|
||||
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
|
||||
|
||||
return map1, map2, new_K.flatten()
|
||||
|
||||
|
||||
def pca(data, sort=True):
|
||||
"""主成分分析 """
|
||||
center = np.mean(data, axis=0)
|
||||
centered_points = data - center # 去中心化
|
||||
|
||||
try:
|
||||
cov_matrix = np.cov(centered_points.T) # 转置
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
||||
|
||||
except np.linalg.LinAlgError:
|
||||
return None, None
|
||||
|
||||
if sort:
|
||||
sort = eigenvalues.argsort()[::-1] # 降序排列
|
||||
eigenvalues = eigenvalues[sort] # 索引
|
||||
eigenvectors = eigenvectors[:, sort]
|
||||
|
||||
return eigenvalues, eigenvectors
|
||||
|
||||
def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics, model_pcd):
|
||||
"""计算位态"""
|
||||
depth_img_mask = np.zeros_like(depth_img)
|
||||
depth_img_mask[mask > 0] = depth_img[mask > 0]
|
||||
|
||||
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
||||
|
||||
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=1000.0,
|
||||
depth_trunc=2.0,
|
||||
)
|
||||
|
||||
point_cloud = point_cloud.remove_non_finite_points()
|
||||
|
||||
clean_pcd, _ = point_cloud.remove_radius_outlier(nb_points=10, radius=0.1)
|
||||
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=3.0)
|
||||
|
||||
if len(clean_pcd.points) == 0:
|
||||
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
||||
|
||||
rmat = object_icp(model_pcd, clean_pcd)
|
||||
|
||||
x, y, z = rmat[0:3, 3:4].flatten()
|
||||
|
||||
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
||||
|
||||
return x, y, z, rw, rx, ry, rz
|
||||
|
||||
|
||||
def draw_box(set_confidence, rgb_img, result):
|
||||
"""绘制目标检测边界框"""
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
for i, box in enumerate(boxes):
|
||||
if confidences[i] >= set_confidence:
|
||||
x_center, y_center, width, height = box[:4]
|
||||
|
||||
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
|
||||
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
|
||||
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
|
||||
|
||||
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
|
||||
|
||||
def draw_mask(set_confidence, rgb_img, result):
|
||||
"""绘制实例分割mask"""
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
|
||||
for i, mask in enumerate(masks):
|
||||
if confidences[i] >= set_confidence:
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
else:
|
||||
continue
|
||||
|
||||
|
||||
def distortion_correction(color_image, depth_image, map1, map2):
|
||||
"""畸变矫正"""
|
||||
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
|
||||
undistorted_color = undistorted_color.astype(color_image.dtype)
|
||||
|
||||
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
|
||||
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
|
||||
|
||||
return undistorted_color, undistorted_depth
|
||||
|
||||
|
||||
def crop_mask_bbox(depth_img, mask, box):
|
||||
"""
|
||||
输入:
|
||||
depth_img: H x W
|
||||
mask: H x W (0/1 或 bool)
|
||||
输出:
|
||||
depth_crop, mask_crop
|
||||
"""
|
||||
high, width = depth_img.shape
|
||||
x_center, y_center, w, h = box[:4]
|
||||
|
||||
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
|
||||
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
|
||||
|
||||
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
|
||||
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
|
||||
|
||||
|
||||
def calculate_grav_width(mask, K, depth):
|
||||
"""计算重心宽度"""
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if contours:
|
||||
box = cv2.boxPoints(cv2.minAreaRect(contours[0]))
|
||||
if np.linalg.norm(box[1] - box[0]) < np.linalg.norm(box[1] - box[2]):
|
||||
point_diff = box[1] - box[0]
|
||||
else:
|
||||
point_diff = box[1] - box[2]
|
||||
|
||||
grab_width = depth * np.sqrt(point_diff[0]**2 / K[0]**2 + point_diff[1]**2 / K[4]**2)
|
||||
return grab_width
|
||||
else:
|
||||
return 0.0
|
||||
|
||||
|
||||
class DetectNode(Node):
|
||||
def __init__(self, name, mode):
|
||||
super().__init__(name)
|
||||
self.mode = mode
|
||||
self.device = None
|
||||
self.checkpoint_path = None
|
||||
self.checkpoint_name = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
self.function = None
|
||||
self.calculate_function = None
|
||||
|
||||
self.K = None
|
||||
|
||||
self.fx = self.fy = 0.5
|
||||
self.camera_data = {}
|
||||
|
||||
self.cv_bridge = CvBridge()
|
||||
self.lock = threading.Lock()
|
||||
|
||||
'''init'''
|
||||
self._init_param()
|
||||
|
||||
self.source_model = o3d.io.read_point_cloud(self.source_model_path)
|
||||
|
||||
if mode == 'detect':
|
||||
self._init_model()
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().error('Error: Mode Unknown')
|
||||
|
||||
if self.device == 'cpu':
|
||||
self.calculate_function = calculate_pose_cpu
|
||||
elif self.device == 'gpu':
|
||||
raise NotImplementedError('Function: calculate_pose_gpu not implemented')
|
||||
else:
|
||||
raise ValueError(f"device must be cpu or gpu, now {self.device}")
|
||||
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
self._init_service()
|
||||
|
||||
def _init_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
|
||||
self.checkpoint_name = self.get_parameter('checkpoint_name').value
|
||||
self.checkpoint_path = os.path.join(share_dir, 'checkpoints', self.checkpoint_name)
|
||||
|
||||
self.source_model_path = os.path.join(share_dir, 'pointclouds/bottle_model.pcd')
|
||||
|
||||
self.declare_parameter('output_boxes', True)
|
||||
self.output_boxes = self.get_parameter('output_boxes').value
|
||||
|
||||
self.declare_parameter('output_masks', False)
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('set_confidence', 0.25)
|
||||
self.set_confidence = self.get_parameter('set_confidence').value
|
||||
|
||||
self.declare_parameter('device', 'cpu')
|
||||
self.device = self.get_parameter('device').value
|
||||
|
||||
self.declare_parameter('classes', 'None')
|
||||
self.classes = ast.literal_eval(self.get_parameter('classes').value)
|
||||
|
||||
def _init_model(self):
|
||||
"""init model"""
|
||||
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
try:
|
||||
self.model = YOLO(self.checkpoint_path).to(device_model)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to load YOLO model: {e}')
|
||||
raise
|
||||
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
|
||||
|
||||
if self.checkpoint_name.endswith('-seg.pt'):
|
||||
self.function = self._seg_image
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init publisher"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
def _init_service(self):
|
||||
"""init service server"""
|
||||
self.server = self.create_service(
|
||||
VisionObjectRecognition,
|
||||
"/vision_object_recognition",
|
||||
self._service_callback
|
||||
)
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init subscriber"""
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._sub_callback,
|
||||
10
|
||||
)
|
||||
|
||||
def _sub_callback(self, msg):
|
||||
"""同步回调函数"""
|
||||
with self.lock:
|
||||
self.camera_data[msg.position] = [
|
||||
msg.image_color,
|
||||
msg.image_depth,
|
||||
msg.karr,
|
||||
msg.darr
|
||||
]
|
||||
|
||||
def _service_callback(self, request, response):
|
||||
response.header.stamp = self.get_clock().now().to_msg()
|
||||
response.header.frame_id = "camera_detect"
|
||||
|
||||
with self.lock:
|
||||
if request.camera_position in self.camera_data:
|
||||
color_img_ros, depth_img_ros, self.K, D = self.camera_data[request.camera_position]
|
||||
else:
|
||||
response.success = False
|
||||
response.info = f"{request.camera_position} Camera data is empty or name is wrong"
|
||||
response.objects = []
|
||||
|
||||
return response
|
||||
|
||||
self.camera_size = [color_img_ros.width, color_img_ros.height]
|
||||
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
|
||||
|
||||
map1, map2, self.K = get_map(self.K, D, self.camera_size)
|
||||
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
|
||||
|
||||
img, pose_dict = self.function(color_img_cv, depth_img_cv)
|
||||
|
||||
"""masks为空,结束这一帧"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
if img is None:
|
||||
img = color_img_ros
|
||||
self.pub_detect_image.publish(img)
|
||||
|
||||
if pose_dict:
|
||||
response.info = "Success get pose"
|
||||
response.success = True
|
||||
# self.get_logger().info('get_pose')
|
||||
for (class_id, class_name), pose_list in pose_dict.items():
|
||||
response.objects.append(
|
||||
PoseClassAndID(
|
||||
class_name = class_name,
|
||||
class_id = class_id,
|
||||
pose_list = pose_list
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
response.info = "pose dict is empty"
|
||||
response.success = False
|
||||
response.objects = []
|
||||
|
||||
return response
|
||||
|
||||
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
|
||||
"""Use segmentation model"""
|
||||
pose_dict = defaultdict(list)
|
||||
|
||||
'''Get Predict Results'''
|
||||
# time1 = time.time()
|
||||
results = self.model(rgb_img, retina_masks=True, conf=self.set_confidence, classes=[39])
|
||||
# time2 = time.time()
|
||||
result = results[0]
|
||||
|
||||
'''Get masks'''
|
||||
if result.masks is None or len(result.masks) == 0:
|
||||
return None, None
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
|
||||
'''Get boxes'''
|
||||
boxes = result.boxes.data.cpu().numpy()
|
||||
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
# time3 = time.time()
|
||||
|
||||
for i, (mask, box) in enumerate(zip(masks, boxes)):
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(depth_img, mask, box)
|
||||
|
||||
if depth_crop is None:
|
||||
continue
|
||||
|
||||
if mask.shape[0] >= (orig_shape[0] * 0.5) and mask.shape[1] >= (orig_shape[1] * 0.5):
|
||||
mask_crop = cv2.resize(mask_crop, None, fx=self.fx, fy=self.fy, interpolation=cv2.INTER_NEAREST)
|
||||
depth_crop = cv2.resize(depth_crop, None, fx=self.fx, fy=self.fy, interpolation=cv2.INTER_NEAREST)
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(self.camera_size[0] * self.fx),
|
||||
int(self.camera_size[1] * self.fy),
|
||||
self.K[0] * self.fx,
|
||||
self.K[4] * self.fy,
|
||||
(self.K[2] - x_min) * self.fx,
|
||||
(self.K[5] - y_min) * self.fy
|
||||
)
|
||||
else:
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(self.camera_size[0]),
|
||||
int(self.camera_size[1]),
|
||||
self.K[0],
|
||||
self.K[4],
|
||||
self.K[2] - x_min,
|
||||
self.K[5] - y_min
|
||||
)
|
||||
|
||||
x, y, z, rw, rx, ry, rz = self.calculate_function(mask_crop, depth_crop, intrinsics, self.source_model)
|
||||
|
||||
grab_width = calculate_grav_width(mask, self.K, z)
|
||||
z = z + grab_width * 0.45
|
||||
|
||||
if (x, y, z) == (None, None, None):
|
||||
self.get_logger().error("have wrong pose")
|
||||
continue
|
||||
|
||||
if (x, y, z) != (0.0, 0.0, 0.0):
|
||||
pose = Pose()
|
||||
pose.position = Point(x=x, y=y, z=z)
|
||||
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
|
||||
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
|
||||
|
||||
# time4 = time.time()
|
||||
|
||||
|
||||
# self.get_logger().info(f'start')
|
||||
# self.get_logger().info(f'{(time2 - time1)*1000} ms, model predict')
|
||||
# self.get_logger().info(f'{(time3 - time2)*1000} ms, get mask and some param')
|
||||
# self.get_logger().info(f'{(time4 - time3)*1000} ms, calculate all mask PCA')
|
||||
# self.get_logger().info(f'{(time4 - time1)*1000} ms, completing a picture entire process')
|
||||
# self.get_logger().info(f'end')
|
||||
|
||||
'''mask_img and box_img is or not output'''
|
||||
if self.output_boxes and not self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
elif self.output_boxes and self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
elif not self.output_boxes and self.output_masks:
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
else:
|
||||
return None, pose_dict
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DetectNode('detect', 'detect')
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,446 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import ast
|
||||
from collections import defaultdict
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import cv2
|
||||
import open3d as o3d
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
import torch
|
||||
from ultralytics import YOLO
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
from sensor_msgs.msg import Image
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion
|
||||
from interfaces.msg import PoseClassAndID, ImgMsg
|
||||
from interfaces.srv import VisionObjectRecognition
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
|
||||
def get_map(K, D, camera_size):
|
||||
h, w = camera_size[::-1]
|
||||
K = np.array(K).reshape(3, 3)
|
||||
D = np.array(D)
|
||||
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
|
||||
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
|
||||
|
||||
return map1, map2, new_K.flatten()
|
||||
|
||||
|
||||
def pca(data, sort=True):
|
||||
"""主成分分析 """
|
||||
center = np.mean(data, axis=0)
|
||||
centered_points = data - center # 去中心化
|
||||
|
||||
try:
|
||||
cov_matrix = np.cov(centered_points.T) # 转置
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
||||
|
||||
except np.linalg.LinAlgError:
|
||||
return None, None
|
||||
|
||||
if sort:
|
||||
sort = eigenvalues.argsort()[::-1] # 降序排列
|
||||
eigenvalues = eigenvalues[sort] # 索引
|
||||
eigenvectors = eigenvectors[:, sort]
|
||||
|
||||
return eigenvalues, eigenvectors
|
||||
|
||||
def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics, hand_eye_mat):
|
||||
"""计算位态"""
|
||||
depth_img_mask = np.zeros_like(depth_img)
|
||||
depth_img_mask[mask > 0] = depth_img[mask > 0]
|
||||
|
||||
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
||||
|
||||
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=1000.0,
|
||||
depth_trunc=8.0,
|
||||
)
|
||||
|
||||
point_cloud = point_cloud.remove_non_finite_points()
|
||||
|
||||
down_pcd = point_cloud.voxel_down_sample(voxel_size=0.022)
|
||||
clean_pcd, _ = down_pcd.remove_radius_outlier(nb_points=10, radius=0.1)
|
||||
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=3.0)
|
||||
|
||||
if len(clean_pcd.points) == 0:
|
||||
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
||||
center = clean_pcd.get_center()
|
||||
x, y, z = center
|
||||
|
||||
w, v = pca(np.asarray(clean_pcd.points))
|
||||
|
||||
if w is not None and v is not None:
|
||||
vx, vy, vz = v[:,0], v[:,1], v[:,2]
|
||||
|
||||
if vx[0] < 0:
|
||||
vx = -vx
|
||||
if vy[1] < 0:
|
||||
vy = -vy
|
||||
if not np.allclose(np.cross(vx, vy), vz):
|
||||
vz = -vz
|
||||
|
||||
R = np.column_stack((vx, vy, vz))
|
||||
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
|
||||
rmat = hand_eye_mat @ rmat
|
||||
|
||||
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
||||
x, y, z = rmat[0:3, 3].flatten()
|
||||
|
||||
# point = [
|
||||
# [x, y, z], [x, y, z] + vx, [x, y, z] + vy, [x, y, z] + vz,
|
||||
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
# ] # 画点:原点、第一主成分、第二主成分
|
||||
# lines = [
|
||||
# [0, 1], [0, 2], [0, 3], [4, 5], [4, 6], [4, 7]
|
||||
# ] # 画出三点之间两两连线
|
||||
# colors = [
|
||||
# [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
# ]
|
||||
# # 构造open3d中的LineSet对象,用于主成分显示
|
||||
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
|
||||
# line_set.colors = o3d.utility.Vector3dVector(colors)
|
||||
# o3d.visualization.draw_geometries([point_cloud, line_set])
|
||||
# o3d.visualization.draw_geometries([clean_pcd, line_set])
|
||||
|
||||
return x, y, z, rw, rx, ry, rz
|
||||
|
||||
|
||||
def draw_box(set_confidence, rgb_img, result):
|
||||
"""绘制目标检测边界框"""
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
for i, box in enumerate(boxes):
|
||||
if confidences[i] >= set_confidence:
|
||||
x_center, y_center, width, height = box[:4]
|
||||
|
||||
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
|
||||
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
|
||||
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
|
||||
|
||||
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
|
||||
|
||||
def draw_mask(set_confidence, rgb_img, result):
|
||||
"""绘制实例分割mask"""
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
|
||||
for i, mask in enumerate(masks):
|
||||
if confidences[i] >= set_confidence:
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
else:
|
||||
continue
|
||||
|
||||
|
||||
def distortion_correction(color_image, depth_image, map1, map2):
|
||||
"""畸变矫正"""
|
||||
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
|
||||
undistorted_color = undistorted_color.astype(color_image.dtype)
|
||||
|
||||
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
|
||||
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
|
||||
|
||||
return undistorted_color, undistorted_depth
|
||||
|
||||
|
||||
def crop_mask_bbox(depth_img, mask, box):
|
||||
"""
|
||||
输入:
|
||||
depth_img: H x W
|
||||
mask: H x W (0/1 或 bool)
|
||||
输出:
|
||||
depth_crop, mask_crop
|
||||
"""
|
||||
high, width = depth_img.shape
|
||||
x_center, y_center, w, h = box[:4]
|
||||
|
||||
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
|
||||
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
|
||||
|
||||
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
|
||||
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
|
||||
|
||||
|
||||
|
||||
class DetectNode(Node):
|
||||
def __init__(self, name, mode):
|
||||
super().__init__(name)
|
||||
self.mode = mode
|
||||
self.device = None
|
||||
self.checkpoint_path = None
|
||||
self.checkpoint_name = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
self.function = None
|
||||
self.calculate_function = None
|
||||
self.eye_in_hand_mat = None
|
||||
self.eye_to_hand_mat = None
|
||||
|
||||
self.K = None
|
||||
|
||||
self.fx = self.fy = 0.5
|
||||
self.camera_data = {}
|
||||
|
||||
self.cv_bridge = CvBridge()
|
||||
self.lock = threading.Lock()
|
||||
|
||||
'''init'''
|
||||
self._init_param()
|
||||
|
||||
|
||||
if mode == 'detect':
|
||||
self._init_model()
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().error('Error: Mode Unknown')
|
||||
|
||||
if self.device == 'cpu':
|
||||
self.calculate_function = calculate_pose_cpu
|
||||
elif self.device == 'gpu':
|
||||
raise NotImplementedError('Function: calculate_pose_gpu not implemented')
|
||||
else:
|
||||
raise ValueError(f"device must be cpu or gpu, now {self.device}")
|
||||
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
self._init_service()
|
||||
|
||||
def _init_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter('checkpoint_name', 'hivecorebox-seg.pt')
|
||||
self.checkpoint_name = self.get_parameter('checkpoint_name').value
|
||||
self.checkpoint_path = os.path.join(share_dir, 'checkpoints', self.checkpoint_name)
|
||||
|
||||
self.declare_parameter('output_boxes', True)
|
||||
self.output_boxes = self.get_parameter('output_boxes').value
|
||||
|
||||
self.declare_parameter('output_masks', True)
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('set_confidence', 0.60)
|
||||
self.set_confidence = self.get_parameter('set_confidence').value
|
||||
|
||||
self.declare_parameter('device', 'cpu')
|
||||
self.device = self.get_parameter('device').value
|
||||
|
||||
self.declare_parameter('classes', 'None')
|
||||
self.classes = ast.literal_eval(self.get_parameter('classes').value)
|
||||
|
||||
self.declare_parameter('eye_in_hand', [1.0, 0.0, 0.0, 0.0,
|
||||
0.0, 1.0, 0.0, 0.0,
|
||||
0.0, 0.0, 1.0, 0.0,
|
||||
0.0, 0.0, 0.0, 1.0])
|
||||
self.eye_in_hand_mat = np.array(self.get_parameter('eye_in_hand').value).reshape(4, 4)
|
||||
|
||||
self.declare_parameter('eye_to_hand', [1.0, 0.0, 0.0, 0.0,
|
||||
0.0, 1.0, 0.0, 0.0,
|
||||
0.0, 0.0, 1.0, 0.0,
|
||||
0.0, 0.0, 0.0, 1.0])
|
||||
self.eye_to_hand_mat = np.array(self.get_parameter('eye_to_hand').value).reshape(4, 4)
|
||||
|
||||
def _init_model(self):
|
||||
"""init model"""
|
||||
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
try:
|
||||
self.model = YOLO(self.checkpoint_path).to(device_model)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to load YOLO model: {e}')
|
||||
raise
|
||||
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
|
||||
|
||||
if self.checkpoint_name.endswith('-seg.pt'):
|
||||
self.function = self._seg_image
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init publisher"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
def _init_service(self):
|
||||
"""init service server"""
|
||||
self.server = self.create_service(
|
||||
VisionObjectRecognition,
|
||||
"/vision_object_recognition",
|
||||
self._service_callback
|
||||
)
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init subscriber"""
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._sub_callback,
|
||||
10
|
||||
)
|
||||
|
||||
def _sub_callback(self, msg):
|
||||
"""同步回调函数"""
|
||||
with self.lock:
|
||||
self.camera_data[msg.position] = [
|
||||
msg.image_color,
|
||||
msg.image_depth,
|
||||
msg.karr,
|
||||
msg.darr
|
||||
]
|
||||
|
||||
def _service_callback(self, request, response):
|
||||
response.header.stamp = self.get_clock().now().to_msg()
|
||||
response.header.frame_id = "camera_detect"
|
||||
|
||||
with self.lock:
|
||||
if request.camera_position in self.camera_data:
|
||||
color_img_ros, depth_img_ros, self.K, D = self.camera_data[request.camera_position]
|
||||
else:
|
||||
response.success = False
|
||||
response.info = f"{request.camera_position} Camera data is empty or name is wrong"
|
||||
response.objects = []
|
||||
|
||||
return response
|
||||
|
||||
if request.camera_position == 'left' or request.camera_position == 'right':
|
||||
hand_eye_mat = self.eye_in_hand_mat
|
||||
else:
|
||||
hand_eye_mat = self.eye_to_hand_mat
|
||||
|
||||
self.camera_size = [color_img_ros.width, color_img_ros.height]
|
||||
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
|
||||
|
||||
map1, map2, self.K = get_map(self.K, D, self.camera_size)
|
||||
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
|
||||
|
||||
img, pose_dict = self.function(color_img_cv, depth_img_cv, hand_eye_mat)
|
||||
|
||||
"""masks为空,结束这一帧"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
if img is None:
|
||||
img = color_img_ros
|
||||
self.pub_detect_image.publish(img)
|
||||
|
||||
if pose_dict:
|
||||
response.info = "Success get pose"
|
||||
response.success = True
|
||||
# self.get_logger().info('get_pose')
|
||||
for (class_id, class_name), pose_list in pose_dict.items():
|
||||
response.objects.append(
|
||||
PoseClassAndID(
|
||||
class_name = class_name,
|
||||
class_id = class_id,
|
||||
pose_list = pose_list
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
response.info = "pose dict is empty"
|
||||
response.success = False
|
||||
response.objects = []
|
||||
|
||||
return response
|
||||
|
||||
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray, hand_eye_mat):
|
||||
"""Use segmentation model"""
|
||||
pose_dict = defaultdict(list)
|
||||
|
||||
depth_filter_mask = np.zeros_like(depth_img, dtype=np.uint8)
|
||||
depth_filter_mask[(depth_img > 0) & (depth_img < 2000)] = 1
|
||||
|
||||
rgb_img[depth_filter_mask == 0] = 0
|
||||
|
||||
'''Get Predict Results'''
|
||||
results = self.model(rgb_img, retina_masks=True, conf=self.set_confidence, classes=[0])
|
||||
result = results[0]
|
||||
|
||||
'''Get masks'''
|
||||
if result.masks is None or len(result.masks) == 0:
|
||||
return None, None
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
|
||||
'''Get boxes'''
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
for i, (mask, box) in enumerate(zip(masks, boxes)):
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(depth_img, mask, box)
|
||||
|
||||
if depth_crop is None:
|
||||
continue
|
||||
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(self.camera_size[0]),
|
||||
int(self.camera_size[1]),
|
||||
self.K[0],
|
||||
self.K[4],
|
||||
self.K[2] - x_min,
|
||||
self.K[5] - y_min
|
||||
)
|
||||
|
||||
x, y, z, rw, rx, ry, rz = self.calculate_function(mask_crop, depth_crop, intrinsics, hand_eye_mat)
|
||||
|
||||
if (x, y, z) == (None, None, None):
|
||||
self.get_logger().error("have wrong pose")
|
||||
continue
|
||||
|
||||
if (x, y, z) != (0.0, 0.0, 0.0):
|
||||
pose = Pose()
|
||||
pose.position = Point(x=x, y=y, z=z)
|
||||
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
|
||||
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
|
||||
|
||||
'''mask_img and box_img is or not output'''
|
||||
if self.output_boxes and not self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
elif self.output_boxes and self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
elif not self.output_boxes and self.output_masks:
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
else:
|
||||
return None, pose_dict
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DetectNode('detect', 'detect')
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,415 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import ast
|
||||
from collections import defaultdict
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import cv2
|
||||
import open3d as o3d
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
import torch
|
||||
from ultralytics import YOLO
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
from sensor_msgs.msg import Image
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion
|
||||
from interfaces.msg import PoseClassAndID, ImgMsg
|
||||
from interfaces.srv import VisionObjectRecognition
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
|
||||
def get_map(K, D, camera_size):
|
||||
h, w = camera_size[::-1]
|
||||
K = np.array(K).reshape(3, 3)
|
||||
D = np.array(D)
|
||||
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
|
||||
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
|
||||
|
||||
return map1, map2, new_K.flatten()
|
||||
|
||||
|
||||
def pca(data, sort=True):
|
||||
"""主成分分析 """
|
||||
center = np.mean(data, axis=0)
|
||||
centered_points = data - center # 去中心化
|
||||
|
||||
try:
|
||||
cov_matrix = np.cov(centered_points.T) # 转置
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
||||
|
||||
except np.linalg.LinAlgError:
|
||||
return None, None
|
||||
|
||||
if sort:
|
||||
sort = eigenvalues.argsort()[::-1] # 降序排列
|
||||
eigenvalues = eigenvalues[sort] # 索引
|
||||
eigenvectors = eigenvectors[:, sort]
|
||||
|
||||
return eigenvalues, eigenvectors
|
||||
|
||||
def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics, hand_eye_mat):
|
||||
"""计算位态"""
|
||||
depth_img_mask = np.zeros_like(depth_img)
|
||||
depth_img_mask[mask > 0] = depth_img[mask > 0]
|
||||
|
||||
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
||||
|
||||
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=1000.0,
|
||||
depth_trunc=3.0,
|
||||
)
|
||||
|
||||
point_cloud = point_cloud.remove_non_finite_points()
|
||||
|
||||
down_pcd = point_cloud.voxel_down_sample(voxel_size=0.020)
|
||||
clean_pcd, _ = down_pcd.remove_radius_outlier(nb_points=10, radius=0.1)
|
||||
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=3.0)
|
||||
|
||||
if len(clean_pcd.points) == 0:
|
||||
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
||||
center = clean_pcd.get_center()
|
||||
x, y, z = center
|
||||
|
||||
w, v = pca(np.asarray(clean_pcd.points))
|
||||
|
||||
if w is not None and v is not None:
|
||||
vx, vy, vz = v[:,0], v[:,1], v[:,2]
|
||||
|
||||
if vx[0] < 0:
|
||||
vx = -vx
|
||||
if vy[1] < 0:
|
||||
vy = -vy
|
||||
if not np.allclose(np.cross(vx, vy), vz):
|
||||
vz = -vz
|
||||
|
||||
R = np.column_stack((vx, vy, vz))
|
||||
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
|
||||
rmat = hand_eye_mat @ rmat
|
||||
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
||||
x, y, z = rmat[0:3, 3].flatten()
|
||||
|
||||
return x, y, z, rw, rx, ry, rz
|
||||
|
||||
else:
|
||||
return 0.0, 0.0, 0.0, None, None, None, None
|
||||
|
||||
|
||||
def draw_box(set_confidence, rgb_img, result):
|
||||
"""绘制目标检测边界框"""
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
for i, box in enumerate(boxes):
|
||||
if confidences[i] >= set_confidence:
|
||||
x_center, y_center, width, height = box[:4]
|
||||
|
||||
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
|
||||
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
|
||||
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
|
||||
|
||||
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
|
||||
|
||||
def draw_mask(set_confidence, rgb_img, result):
|
||||
"""绘制实例分割mask"""
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
|
||||
for i, mask in enumerate(masks):
|
||||
if confidences[i] >= set_confidence:
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
else:
|
||||
continue
|
||||
|
||||
|
||||
def distortion_correction(color_image, depth_image, map1, map2):
|
||||
"""畸变矫正"""
|
||||
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
|
||||
undistorted_color = undistorted_color.astype(color_image.dtype)
|
||||
|
||||
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
|
||||
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
|
||||
|
||||
return undistorted_color, undistorted_depth
|
||||
|
||||
|
||||
def crop_mask_bbox(depth_img, mask, box):
|
||||
"""
|
||||
输入:
|
||||
depth_img: H x W
|
||||
mask: H x W (0/1 或 bool)
|
||||
输出:
|
||||
depth_crop, mask_crop
|
||||
"""
|
||||
high, width = depth_img.shape
|
||||
x_center, y_center, w, h = box[:4]
|
||||
|
||||
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
|
||||
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
|
||||
|
||||
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
|
||||
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
|
||||
|
||||
|
||||
class DetectNode(Node):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.device = None
|
||||
self.checkpoint_path = None
|
||||
self.checkpoint_name = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
self.function = None
|
||||
self.K = None
|
||||
self.eye_in_hand_mat = None
|
||||
self.eye_to_hand_mat = None
|
||||
|
||||
self.calculate_function = calculate_pose_cpu
|
||||
self.fx = self.fy = 0.5
|
||||
self.camera_data = {}
|
||||
self.cv_bridge = CvBridge()
|
||||
self.lock = threading.Lock()
|
||||
|
||||
'''init'''
|
||||
self._init_param()
|
||||
self._init_model()
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
self._init_service()
|
||||
|
||||
def _init_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
|
||||
self.checkpoint_name = self.get_parameter('checkpoint_name').value
|
||||
self.checkpoint_path = os.path.join(share_dir, 'checkpoints', self.checkpoint_name)
|
||||
|
||||
self.declare_parameter('output_boxes', True)
|
||||
self.output_boxes = self.get_parameter('output_boxes').value
|
||||
|
||||
self.declare_parameter('output_masks', False)
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('set_confidence', 0.25)
|
||||
self.set_confidence = self.get_parameter('set_confidence').value
|
||||
|
||||
self.declare_parameter('classes', 'None')
|
||||
self.classes = ast.literal_eval(self.get_parameter('classes').value)
|
||||
|
||||
self.declare_parameter('eye_in_hand', [1.0, 0.0, 0.0, 0.0,
|
||||
0.0, 1.0, 0.0, 0.0,
|
||||
0.0, 0.0, 1.0, 0.0,
|
||||
0.0, 0.0, 0.0, 1.0])
|
||||
self.eye_in_hand_mat = np.array(self.get_parameter('eye_in_hand').value).reshape(4, 4)
|
||||
|
||||
self.declare_parameter('eye_to_hand', [1, 0, 0, 0,
|
||||
0, 1, 0, 0,
|
||||
0, 0, 1, 0,
|
||||
0, 0, 0, 1])
|
||||
self.eye_to_hand_mat = np.array(self.get_parameter('eye_to_hand').value).reshape(4, 4)
|
||||
|
||||
def _init_model(self):
|
||||
"""init model"""
|
||||
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
try:
|
||||
self.model = YOLO(self.checkpoint_path).to(device_model)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to load YOLO model: {e}')
|
||||
raise
|
||||
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
|
||||
|
||||
if self.checkpoint_name.endswith('-seg.pt'):
|
||||
self.function = self._seg_image
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init publisher"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
def _init_service(self):
|
||||
"""init service server"""
|
||||
self.server = self.create_service(
|
||||
VisionObjectRecognition,
|
||||
"/vision_object_recognition",
|
||||
self._service_callback
|
||||
)
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init subscriber"""
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._sub_callback,
|
||||
10
|
||||
)
|
||||
|
||||
def _sub_callback(self, msg):
|
||||
"""同步回调函数"""
|
||||
with self.lock:
|
||||
self.camera_data[msg.position] = [
|
||||
msg.image_color,
|
||||
msg.image_depth,
|
||||
msg.karr,
|
||||
msg.darr
|
||||
]
|
||||
|
||||
def _service_callback(self, request, response):
|
||||
response.header.stamp = self.get_clock().now().to_msg()
|
||||
response.header.frame_id = "camera_detect"
|
||||
|
||||
with self.lock:
|
||||
if request.camera_position in self.camera_data:
|
||||
color_img_ros, depth_img_ros, self.K, D = self.camera_data[request.camera_position]
|
||||
else:
|
||||
response.success = False
|
||||
response.info = f"{request.camera_position} Camera data is empty or name is wrong"
|
||||
response.objects = []
|
||||
|
||||
return response
|
||||
|
||||
if request.camera_position == 'left' or request.camera_position == 'right':
|
||||
hand_eye_mat = self.eye_in_hand_mat
|
||||
else:
|
||||
hand_eye_mat = self.eye_to_hand_mat
|
||||
|
||||
self.camera_size = [color_img_ros.width, color_img_ros.height]
|
||||
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
|
||||
|
||||
map1, map2, self.K = get_map(self.K, D, self.camera_size)
|
||||
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
|
||||
|
||||
img, pose_dict = self.function(color_img_cv, depth_img_cv, hand_eye_mat)
|
||||
|
||||
"""masks为空,结束这一帧"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
if img is None:
|
||||
img = color_img_ros
|
||||
self.pub_detect_image.publish(img)
|
||||
|
||||
if pose_dict:
|
||||
response.info = "Success get pose"
|
||||
response.success = True
|
||||
for (class_id, class_name), pose_list in pose_dict.items():
|
||||
response.objects.append(
|
||||
PoseClassAndID(
|
||||
class_name = class_name,
|
||||
class_id = class_id,
|
||||
pose_list = pose_list
|
||||
)
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
else:
|
||||
response.info = "pose dict is empty"
|
||||
response.success = False
|
||||
response.objects = []
|
||||
|
||||
return response
|
||||
|
||||
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray, hand_eye_mat):
|
||||
"""Use segmentation model"""
|
||||
pose_dict = defaultdict(list)
|
||||
|
||||
'''Get Predict Results'''
|
||||
time1 = time.time()
|
||||
results = self.model(rgb_img, retina_masks=True, conf=self.set_confidence, classes=self.classes)
|
||||
time2 = time.time()
|
||||
result = results[0]
|
||||
|
||||
'''Get masks'''
|
||||
if result.masks is None or len(result.masks) == 0:
|
||||
return None, None
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
|
||||
'''Get boxes'''
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
time3 = time.time()
|
||||
|
||||
for i, (mask, box) in enumerate(zip(masks, boxes)):
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(depth_img, mask, box)
|
||||
|
||||
if depth_crop is None:
|
||||
continue
|
||||
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(self.camera_size[0]),
|
||||
int(self.camera_size[1]),
|
||||
self.K[0],
|
||||
self.K[4],
|
||||
self.K[2] - x_min,
|
||||
self.K[5] - y_min
|
||||
)
|
||||
|
||||
x, y, z, rw, rx, ry, rz = self.calculate_function(mask_crop, depth_crop, intrinsics, hand_eye_mat)
|
||||
|
||||
if (x, y, z) != (0.0, 0.0, 0.0):
|
||||
pose = Pose()
|
||||
pose.position = Point(x=x, y=y, z=z)
|
||||
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
|
||||
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
|
||||
|
||||
time4 = time.time()
|
||||
|
||||
self.get_logger().info(f'start')
|
||||
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
|
||||
self.get_logger().info(f'{(time3 - time2) * 1000} ms, get mask and some param')
|
||||
self.get_logger().info(f'{(time4 - time3) * 1000} ms, calculate all mask PCA')
|
||||
self.get_logger().info(f'{(time4 - time1) * 1000} ms, completing a picture entire process')
|
||||
self.get_logger().info(f'end')
|
||||
|
||||
'''mask_img and box_img is or not output'''
|
||||
if self.output_boxes and not self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
elif self.output_boxes and self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
elif not self.output_boxes and self.output_masks:
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
|
||||
else:
|
||||
return None, pose_dict
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DetectNode('detect')
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,429 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import cv2
|
||||
import open3d as o3d
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
import torch
|
||||
from ultralytics import YOLO
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from message_filters import ApproximateTimeSynchronizer, Subscriber
|
||||
|
||||
from sensor_msgs.msg import Image, CameraInfo
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion
|
||||
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
|
||||
def get_map(K, D, camera_size):
|
||||
h, w = camera_size[::-1]
|
||||
K = np.array(K).reshape(3, 3)
|
||||
D = np.array(D)
|
||||
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
|
||||
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
|
||||
|
||||
return map1, map2, new_K.flatten()
|
||||
|
||||
|
||||
def pca(data, sort=True):
|
||||
"""主成分分析 """
|
||||
center = np.mean(data, axis=0)
|
||||
centered_points = data - center # 去中心化
|
||||
|
||||
try:
|
||||
cov_matrix = np.cov(centered_points.T) # 转置
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
||||
|
||||
except np.linalg.LinAlgError:
|
||||
return None, None
|
||||
|
||||
if sort:
|
||||
sort = eigenvalues.argsort()[::-1] # 降序排列
|
||||
eigenvalues = eigenvalues[sort] # 索引
|
||||
eigenvectors = eigenvectors[:, sort]
|
||||
|
||||
return eigenvalues, eigenvectors
|
||||
|
||||
def calculate_pose_cpu(mask, rgb_img, depth_img: np.ndarray, intrinsics):
|
||||
"""计算位态"""
|
||||
depth_img_mask = np.zeros_like(depth_img)
|
||||
depth_img_mask[mask > 0] = depth_img[mask > 0]
|
||||
|
||||
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
||||
|
||||
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=1000.0,
|
||||
depth_trunc=8.0,
|
||||
)
|
||||
|
||||
point_cloud = point_cloud.remove_non_finite_points()
|
||||
|
||||
down_pcd = point_cloud.voxel_down_sample(voxel_size=0.022)
|
||||
clean_pcd, _ = down_pcd.remove_radius_outlier(nb_points=10, radius=0.1)
|
||||
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=3.0)
|
||||
|
||||
if len(clean_pcd.points) == 0:
|
||||
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
||||
center = clean_pcd.get_center()
|
||||
x, y, z = center
|
||||
|
||||
w, v = pca(np.asarray(clean_pcd.points))
|
||||
|
||||
if w is not None and v is not None:
|
||||
vx, vy, vz = v[:,0], v[:,1], v[:,2]
|
||||
|
||||
if vx[0] < 0:
|
||||
vx = -vx
|
||||
if vy[1] < 0:
|
||||
vy = -vy
|
||||
if not np.allclose(np.cross(vx, vy), vz):
|
||||
vz = -vz
|
||||
|
||||
R = np.column_stack((vx, vy, vz))
|
||||
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
|
||||
|
||||
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
||||
x, y, z = rmat[0:3, 3].flatten()
|
||||
|
||||
# point = [
|
||||
# [x, y, z], [x, y, z] + vx, [x, y, z] + vy, [x, y, z] + vz,
|
||||
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
# ] # 画点:原点、第一主成分、第二主成分
|
||||
# lines = [
|
||||
# [0, 1], [0, 2], [0, 3], [4, 5], [4, 6], [4, 7]
|
||||
# ] # 画出三点之间两两连线
|
||||
# colors = [
|
||||
# [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [0, 1, 0], [0, 0, 1]
|
||||
# ]
|
||||
# # 构造open3d中的LineSet对象,用于主成分显示
|
||||
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
|
||||
# line_set.colors = o3d.utility.Vector3dVector(colors)
|
||||
# o3d.visualization.draw_geometries([point_cloud, line_set])
|
||||
# o3d.visualization.draw_geometries([clean_pcd, line_set])
|
||||
|
||||
return x, y, z, rw, rx, ry, rz
|
||||
|
||||
|
||||
def draw_box(set_confidence, rgb_img, result):
|
||||
"""绘制目标检测边界框"""
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
for i, box in enumerate(boxes):
|
||||
if confidences[i] >= set_confidence:
|
||||
x_center, y_center, width, height = box[:4]
|
||||
|
||||
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
|
||||
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
|
||||
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
|
||||
|
||||
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
|
||||
|
||||
def draw_mask(set_confidence, rgb_img, result):
|
||||
"""绘制实例分割mask"""
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
|
||||
for i, mask in enumerate(masks):
|
||||
if confidences[i] >= set_confidence:
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
else:
|
||||
continue
|
||||
|
||||
|
||||
def distortion_correction(color_image, depth_image, map1, map2):
|
||||
"""畸变矫正"""
|
||||
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
|
||||
undistorted_color = undistorted_color.astype(color_image.dtype)
|
||||
|
||||
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
|
||||
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
|
||||
|
||||
return undistorted_color, undistorted_depth
|
||||
|
||||
|
||||
def crop_mask_bbox(rgb_img, depth_img, mask, box):
|
||||
"""
|
||||
输入:
|
||||
depth_img: H x W
|
||||
mask: H x W (0/1 或 bool)
|
||||
输出:
|
||||
depth_crop, mask_crop
|
||||
"""
|
||||
high, width = depth_img.shape
|
||||
x_center, y_center, w, h = box[:4]
|
||||
|
||||
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
|
||||
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
|
||||
|
||||
rgb_crop = rgb_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
|
||||
|
||||
return rgb_crop, depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
|
||||
|
||||
class DetectNode(Node):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.checkpoint_path = None
|
||||
self.checkpoint_name = None
|
||||
self.function = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
self.K = self.D = None
|
||||
self.map1 = self.map2 = None
|
||||
self.eye_in_hand_mat = None
|
||||
self.eye_to_hand_mat = None
|
||||
|
||||
self.calculate_function = calculate_pose_cpu
|
||||
self.fx = self.fy = 1.0
|
||||
self.cv_bridge = CvBridge()
|
||||
|
||||
'''init'''
|
||||
self._init_param()
|
||||
self._init_model()
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
|
||||
def _init_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter('checkpoint_name', 'hivecorebox-seg.pt')
|
||||
self.checkpoint_name = self.get_parameter('checkpoint_name').value
|
||||
self.checkpoint_path = os.path.join(share_dir, 'checkpoints', self.checkpoint_name)
|
||||
|
||||
self.declare_parameter('output_boxes', True)
|
||||
self.output_boxes = self.get_parameter('output_boxes').value
|
||||
|
||||
self.declare_parameter('output_masks', True)
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('set_confidence', 0.60)
|
||||
self.set_confidence = self.get_parameter('set_confidence').value
|
||||
|
||||
self.declare_parameter('color_image_topic', '/camera/color/image_raw')
|
||||
self.color_image_topic = self.get_parameter('color_image_topic').value
|
||||
|
||||
self.declare_parameter('depth_image_topic', '/camera/depth/image_raw')
|
||||
self.depth_image_topic = self.get_parameter('depth_image_topic').value
|
||||
|
||||
self.declare_parameter('camera_info_topic', '/camera/color/camera_info')
|
||||
self.camera_info_topic = self.get_parameter('camera_info_topic').value
|
||||
|
||||
self.declare_parameter('eye_in_hand', [1.0, 0.0, 0.0, 0.0,
|
||||
0.0, 1.0, 0.0, 0.0,
|
||||
0.0, 0.0, 1.0, 0.0,
|
||||
0.0, 0.0, 0.0, 1.0])
|
||||
self.eye_in_hand_mat = np.array(self.get_parameter('eye_in_hand').value).reshape(4, 4)
|
||||
|
||||
self.declare_parameter('eye_to_hand', [1.0, 0.0, 0.0, 0.0,
|
||||
0.0, 1.0, 0.0, 0.0,
|
||||
0.0, 0.0, 1.0, 0.0,
|
||||
0.0, 0.0, 0.0, 1.0])
|
||||
self.eye_to_hand_mat = np.array(self.get_parameter('eye_to_hand').value).reshape(4, 4)
|
||||
|
||||
|
||||
def _init_model(self):
|
||||
"""init model"""
|
||||
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
|
||||
try:
|
||||
self.model = YOLO(self.checkpoint_path).to(device)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to load YOLO model: {e}')
|
||||
raise
|
||||
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
|
||||
|
||||
if self.checkpoint_name.endswith('-seg.pt'):
|
||||
self.function = self._seg_image
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init_publisher"""
|
||||
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose', 10)
|
||||
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init_subscriber"""
|
||||
self.sub_camera_info = self.create_subscription(
|
||||
CameraInfo,
|
||||
self.camera_info_topic,
|
||||
self._camera_info_callback,
|
||||
10
|
||||
)
|
||||
|
||||
'''sync get color and depth img'''
|
||||
self.sub_color_image = Subscriber(self, Image, self.color_image_topic)
|
||||
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic)
|
||||
|
||||
self.sync_subscriber = ApproximateTimeSynchronizer(
|
||||
[self.sub_color_image, self.sub_depth_image],
|
||||
queue_size=10,
|
||||
slop=0.1
|
||||
)
|
||||
self.sync_subscriber.registerCallback(self._sync_callback)
|
||||
|
||||
def _camera_info_callback(self, msg: CameraInfo):
|
||||
"""Get camera info"""
|
||||
self.K = msg.k
|
||||
self.D = msg.d
|
||||
|
||||
self.camera_size = [msg.width, msg.height]
|
||||
|
||||
if self.K is not None and self.D is not None:
|
||||
self.map1, self.map2, self.K = get_map(msg.k, msg.d, self.camera_size)
|
||||
|
||||
if len(self.D) != 0:
|
||||
self.destroy_subscription(self.sub_camera_info)
|
||||
else:
|
||||
self.D = [0, 0, 0, 0, 0, 0, 0, 0]
|
||||
self.destroy_subscription(self.sub_camera_info)
|
||||
else:
|
||||
raise "K and D are not defined"
|
||||
|
||||
def _sync_callback(self, color_img_ros, depth_img_ros):
|
||||
"""同步回调函数"""
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
|
||||
|
||||
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, self.map1, self.map2)
|
||||
|
||||
img, pose_list = self.function(color_img_cv, depth_img_cv)
|
||||
|
||||
"""masks为空,结束这一帧"""
|
||||
if img is None:
|
||||
img = self.cv_bridge.cv2_to_imgmsg(color_img_cv, "bgr8")
|
||||
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image.publish(img)
|
||||
|
||||
if pose_list:
|
||||
pose_list_all = PoseArrayClassAndID()
|
||||
for item in pose_list:
|
||||
pose_list_all.objects.append(
|
||||
PoseClassAndID(
|
||||
class_name = item["class_name"],
|
||||
class_id = item["class_id"],
|
||||
pose = item["pose"]
|
||||
)
|
||||
)
|
||||
pose_list_all.header.stamp = self.get_clock().now().to_msg()
|
||||
pose_list_all.header.frame_id = "pose_list"
|
||||
self.pub_pose_list.publish(pose_list_all)
|
||||
|
||||
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
|
||||
"""Use segmentation model"""
|
||||
pose_list = []
|
||||
|
||||
depth_filter_mask = np.zeros_like(depth_img, dtype=np.uint8)
|
||||
depth_filter_mask[(depth_img > 0) & (depth_img < 2000)] = 1
|
||||
|
||||
rgb_img[depth_filter_mask == 0] = 0
|
||||
|
||||
'''Get Predict Results'''
|
||||
time1 = time.time()
|
||||
results = self.model(rgb_img, retina_masks=True, conf=self.set_confidence)
|
||||
time2 = time.time()
|
||||
result = results[0]
|
||||
|
||||
'''Get masks'''
|
||||
if result.masks is None or len(result.masks) == 0:
|
||||
return None, None
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
orig_shape = result.masks.orig_shape
|
||||
|
||||
'''Get boxes'''
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy()
|
||||
labels = result.names
|
||||
|
||||
time3 = time.time()
|
||||
|
||||
for i, (mask, box) in enumerate(zip(masks, boxes)):
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
rgb_crop, depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(rgb_img, depth_img, mask, box)
|
||||
|
||||
if depth_crop is None:
|
||||
continue
|
||||
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(self.camera_size[0]),
|
||||
int(self.camera_size[1]),
|
||||
self.K[0],
|
||||
self.K[4],
|
||||
self.K[2] - x_min,
|
||||
self.K[5] - y_min
|
||||
)
|
||||
|
||||
x, y, z, rw, rx, ry, rz = self.calculate_function(mask_crop, rgb_crop, depth_crop, intrinsics)
|
||||
print(x, y, z)
|
||||
|
||||
if (x, y, z) != (0.0, 0.0, 0.0):
|
||||
pose = Pose()
|
||||
pose.position = Point(x=x, y=y, z=z)
|
||||
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
|
||||
pose_list.append(
|
||||
{
|
||||
"class_id": int(class_ids[i]),
|
||||
"class_name": labels[class_ids[i]],
|
||||
"pose": pose
|
||||
}
|
||||
)
|
||||
|
||||
time4 = time.time()
|
||||
|
||||
self.get_logger().info(f'start')
|
||||
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
|
||||
self.get_logger().info(f'{(time3 - time2) * 1000} ms, get mask and some param')
|
||||
self.get_logger().info(f'{(time4 - time3) * 1000} ms, calculate all mask PCA')
|
||||
self.get_logger().info(f'{(time4 - time1) * 1000} ms, completing a picture entire process')
|
||||
self.get_logger().info(f'end')
|
||||
|
||||
'''mask_img and box_img is or not output'''
|
||||
if self.output_boxes and not self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list
|
||||
elif self.output_boxes and self.output_masks:
|
||||
draw_box(self.set_confidence, rgb_img, result)
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list
|
||||
elif not self.output_boxes and self.output_masks:
|
||||
draw_mask(self.set_confidence, rgb_img, result)
|
||||
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list
|
||||
else:
|
||||
return None, pose_list
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DetectNode('detect')
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,360 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# coding: utf-8
|
||||
import transforms3d as tfs
|
||||
import numpy as np
|
||||
import math
|
||||
import json
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.parameter import Parameter
|
||||
|
||||
from tf2_ros import Buffer
|
||||
from tf2_msgs.msg import TFMessage
|
||||
from interfaces.msg import PoseArrayClassAndID
|
||||
from message_filters import ApproximateTimeSynchronizer, Subscriber
|
||||
|
||||
|
||||
def get_matrix_quat(x, y, z, rw, rx, ry, rz):
|
||||
"""从单位四元数构建齐次变换矩阵"""
|
||||
'''构造旋转矩阵'''
|
||||
q = [rw, rx, ry, rz]
|
||||
rmat = tfs.quaternions.quat2mat(q)
|
||||
|
||||
"""构造齐次变换矩阵"""
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray((x, y, z))),
|
||||
rmat,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat
|
||||
|
||||
|
||||
def get_matrix_eular_radu(x, y, z, rx, ry, rz):
|
||||
"""从欧拉角构建齐次变换矩阵"""
|
||||
'''构造旋转矩阵'''
|
||||
rmat = tfs.euler.euler2mat(
|
||||
# math.radians(rx), math.radians(ry), math.radians(rz)
|
||||
rx, ry, rz
|
||||
)
|
||||
|
||||
"""构造齐次变换矩阵"""
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray((x, y, z))),
|
||||
rmat,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat
|
||||
|
||||
|
||||
def get_matrix_rotvector(x, y, z, rx, ry, rz):
|
||||
"""从旋转向量构建齐次变换矩阵"""
|
||||
'''构造旋转矩阵'''
|
||||
rvec = np.array([rx, ry, rz])
|
||||
theta = np.linalg.norm(rvec)
|
||||
if theta < 1e-8:
|
||||
rmat = np.eye(3) # 小角度直接返回单位矩阵
|
||||
else:
|
||||
axis = rvec / theta
|
||||
rmat = tfs.axangles.axangle2mat(axis, theta)
|
||||
|
||||
"""构造齐次变换矩阵"""
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray((x, y, z))),
|
||||
rmat,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat
|
||||
|
||||
|
||||
def skew(v):
|
||||
return np.array([[0, -v[2], v[1]],
|
||||
[v[2], 0, -v[0]],
|
||||
[-v[1], v[0], 0]])
|
||||
|
||||
|
||||
def R2P(T):
|
||||
"""旋转矩阵 --> 修正罗德里格斯向量"""
|
||||
axis, angle= tfs.axangles.mat2axangle(T[0:3, 0:3])
|
||||
P = 2 * math.sin(angle / 2) * axis
|
||||
return P
|
||||
|
||||
|
||||
def P2R(P):
|
||||
"""修正罗德里格斯向量 --> 旋转矩阵"""
|
||||
P2 = np.dot(P.T, P)
|
||||
part_1 = (1 - P2 / 2) * np.eye(3)
|
||||
part_2 = np.add(np.dot(P, P.T), np.sqrt(4- P2) * skew(P.flatten().T))
|
||||
R = np.add(part_1, np.divide(part_2, 2))
|
||||
return R
|
||||
|
||||
|
||||
def calculate(Hgs, Hcs):
|
||||
"""计算标定矩阵"""
|
||||
# H代表矩阵、h代表标量
|
||||
Hgijs = []
|
||||
Hcijs = []
|
||||
A = []
|
||||
B = []
|
||||
size = 0
|
||||
for i in range(len(Hgs)):
|
||||
for j in range(i + 1, len(Hgs)):
|
||||
size += 1
|
||||
Hgij = np.dot(np.linalg.inv(Hgs[j]), Hgs[i])
|
||||
Hgijs.append(Hgij)
|
||||
Pgij = np.dot(2, R2P(Hgij))
|
||||
|
||||
Hcij = np.dot(Hcs[j], np.linalg.inv(Hcs[i]))
|
||||
Hcijs.append(Hcij)
|
||||
Pcij = np.dot(2, R2P(Hcij))
|
||||
|
||||
A.append(skew(np.add(Pgij, Pcij)))
|
||||
B.append(np.subtract(Pcij, Pgij).reshape(3, 1))
|
||||
|
||||
MA = np.vstack(A)
|
||||
MB = np.vstack(B)
|
||||
Pcg_ = np.dot(np.linalg.pinv(MA), MB)
|
||||
pcg = np.sqrt(np.add(1, np.dot(Pcg_.T, Pcg_)))
|
||||
Pcg = np.dot(np.dot(2, Pcg_), np.linalg.inv(pcg))
|
||||
Rcg = P2R(Pcg).reshape(3, 3)
|
||||
|
||||
A = []
|
||||
B = []
|
||||
id = 0
|
||||
for i in range(len(Hgs)):
|
||||
for j in range(i + 1, len(Hgs)):
|
||||
Hgij = Hgijs[id]
|
||||
Hcij = Hcijs[id]
|
||||
A.append(np.subtract(Hgij[0:3, 0:3], np.eye(3, 3)))
|
||||
B.append(np.subtract(np.dot(Rcg, Hcij[0:3, 3:4]), Hgij[0:3, 3:4]))
|
||||
id += 1
|
||||
|
||||
MA = np.asarray(A).reshape(size * 3, 3)
|
||||
MB = np.asarray(B).reshape(size * 3, 1)
|
||||
Tcg = np.dot(np.linalg.pinv(MA), MB).reshape(3, )
|
||||
|
||||
return tfs.affines.compose(Tcg, np.squeeze(Rcg), [1, 1, 1])
|
||||
|
||||
|
||||
class Calibration(Node):
|
||||
def __init__(self, name):
|
||||
super(Calibration, self).__init__(name)
|
||||
self.sync_subscriber = None
|
||||
self.sub_camera_pose = None
|
||||
self.sub_hand_pose = None
|
||||
self.Hgs, self.Hcs = [], []
|
||||
self.hand, self.camera = [], []
|
||||
self.calibration_matrix = None
|
||||
self.calculate = False
|
||||
self.collect = False
|
||||
self.base_name = 'base_link'
|
||||
|
||||
self.tf_buffer = Buffer()
|
||||
|
||||
self.declare_parameter('start_collect_once', False)
|
||||
self.declare_parameter('start_calculate', False)
|
||||
self.declare_parameter('base_name', 'base_link')
|
||||
|
||||
self.declare_parameter('end_name', 'Link6')
|
||||
self.declare_parameter('class_name', 'crossboard')
|
||||
self.declare_parameter('matrix_name', 'eye_in_hand')
|
||||
|
||||
self.end_name = self.get_parameter('end_name').value
|
||||
self.class_name = self.get_parameter('class_name').value
|
||||
self.matrix_name = self.get_parameter('matrix_name').value
|
||||
|
||||
self.declare_parameter('mode', 'eye_in_hand')
|
||||
self.mode = self.get_parameter('mode').value.lower()
|
||||
|
||||
if self.mode not in ['eye_in_hand', 'eye_to_hand']:
|
||||
raise ValueError("mode must be 'eye_in_hand' or 'eye_to_hand'")
|
||||
|
||||
self.declare_parameter('input', 'quat')
|
||||
self.input = self.get_parameter('input').value.lower()
|
||||
if self.input == 'eular':
|
||||
self.function = get_matrix_eular_radu
|
||||
elif self.input == 'rotvertor':
|
||||
self.function = get_matrix_rotvector
|
||||
elif self.input == 'quat':
|
||||
self.function = get_matrix_quat
|
||||
else:
|
||||
raise ValueError("INPUT ERROR")
|
||||
self.done = False
|
||||
|
||||
self._init_sub()
|
||||
|
||||
def _init_sub(self):
|
||||
self.sub_hand_pose = Subscriber(self, TFMessage, '/tf')
|
||||
self.sub_camera_pose = Subscriber(self, PoseArrayClassAndID, '/pose/cv_detect_pose')
|
||||
|
||||
self.sync_subscriber = ApproximateTimeSynchronizer(
|
||||
[self.sub_hand_pose, self.sub_camera_pose],
|
||||
queue_size=10,
|
||||
slop=0.1,
|
||||
allow_headerless = True
|
||||
)
|
||||
self.sync_subscriber.registerCallback(self.get_pose_callback)
|
||||
|
||||
def get_pose_callback(self, hand_tf, camera_pose):
|
||||
self.collect = self.get_parameter('start_collect_once').value
|
||||
self.calculate = self.get_parameter('start_calculate').value
|
||||
self.base_name = self.get_parameter('base_name').value
|
||||
self.end_name = self.get_parameter('end_name').value
|
||||
self.class_name = self.get_parameter('class_name').value
|
||||
self.matrix_name = self.get_parameter('matrix_name').value
|
||||
self.mode = self.get_parameter('mode').value.lower()
|
||||
|
||||
if self.collect:
|
||||
_hand, _camera = None, None
|
||||
|
||||
for transform in hand_tf.transforms:
|
||||
self.tf_buffer.set_transform(transform, "default_authority")
|
||||
|
||||
if self.base_name in self.tf_buffer.all_frames_as_string() and self.end_name in self.tf_buffer.all_frames_as_string():
|
||||
|
||||
trans = self.tf_buffer.lookup_transform(
|
||||
self.base_name,
|
||||
self.end_name,
|
||||
rclpy.time.Time()
|
||||
)
|
||||
|
||||
t = trans.transform.translation
|
||||
r = trans.transform.rotation
|
||||
_hand = [
|
||||
t.x, t.y, t.z,
|
||||
r.w, r.x, r.y, r.z
|
||||
]
|
||||
else: return
|
||||
|
||||
pose_dict = {}
|
||||
for object in camera_pose.objects:
|
||||
pose_dict[object.class_name] = object.pose_list
|
||||
|
||||
if self.class_name in pose_dict:
|
||||
_camera = [
|
||||
pose_dict[self.class_name][-1].position.x,
|
||||
pose_dict[self.class_name][-1].position.y,
|
||||
pose_dict[self.class_name][-1].position.z,
|
||||
pose_dict[self.class_name][-1].orientation.w,
|
||||
pose_dict[self.class_name][-1].orientation.x,
|
||||
pose_dict[self.class_name][-1].orientation.y,
|
||||
pose_dict[self.class_name][-1].orientation.z,
|
||||
]
|
||||
else:
|
||||
self.set_parameters([Parameter('start_collect_once', Parameter.Type.BOOL, False)])
|
||||
self.get_logger().info(f"Have not camera data")
|
||||
|
||||
if _hand is None or _camera is None:
|
||||
_hand, _camera = None, None
|
||||
self.get_logger().info("Have not camera data or end data")
|
||||
return
|
||||
|
||||
self.get_logger().info(f"add hand: {_hand}")
|
||||
self.get_logger().info(f"add camera: {_camera}")
|
||||
|
||||
self.hand.extend(_hand)
|
||||
self.camera.extend(_camera)
|
||||
self.set_parameters([Parameter('start_collect_once', Parameter.Type.BOOL, False)])
|
||||
|
||||
if self.calculate:
|
||||
self.calculate_calibration()
|
||||
|
||||
print(self.hand)
|
||||
print(self.camera)
|
||||
|
||||
self.get_logger().info(f"{self.calibration_matrix}")
|
||||
hand_eye_result = {
|
||||
"T": self.calibration_matrix.tolist()
|
||||
}
|
||||
with open(f"{self.matrix_name}_matrix.json", "w") as f:
|
||||
json.dump(hand_eye_result, f, indent=4)
|
||||
self.get_logger().info(f"Save as {self.matrix_name}_matrix.json")
|
||||
|
||||
with open(f"hand_pose_data.json", "w") as f:
|
||||
json.dump(self.hand, f, indent=4)
|
||||
|
||||
with open(f"camera_pose_data.json", "w") as f:
|
||||
json.dump(self.camera, f, indent=4)
|
||||
|
||||
self.done = True
|
||||
|
||||
def calculate_data(self):
|
||||
if self.input == 'quat':
|
||||
for i in range(0, len(self.hand), 7):
|
||||
self.Hgs.append(
|
||||
np.linalg.inv(
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5], self.hand[i + 6]
|
||||
)
|
||||
)
|
||||
if self.mode == 'eye_to_hand' else
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5], self.hand[i + 6]
|
||||
)
|
||||
)
|
||||
self.Hcs.append(
|
||||
self.function(
|
||||
self.camera[i], self.camera[i + 1], self.camera[i + 2],
|
||||
self.camera[i + 3], self.camera[i + 4], self.camera[i + 5], self.camera[i + 6]
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
for i in range(0, len(self.hand), 6):
|
||||
self.Hgs.append(
|
||||
np.linalg.inv(
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5]
|
||||
)
|
||||
)
|
||||
if self.mode == 'eye_to_hand' else
|
||||
self.function(
|
||||
self.hand[i], self.hand[i + 1], self.hand[i + 2],
|
||||
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5]
|
||||
)
|
||||
)
|
||||
self.Hcs.append(
|
||||
self.function(
|
||||
self.camera[i], self.camera[i + 1], self.camera[i + 2],
|
||||
self.camera[i + 3], self.camera[i + 4], self.camera[i + 5]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def calculate_calibration(self):
|
||||
self.calculate_data()
|
||||
self.calibration_matrix = calculate(self.Hgs, self.Hcs)
|
||||
|
||||
def get_data_test(self, hand, camera):
|
||||
self.hand, self.camera = hand, camera
|
||||
self.calculate_calibration()
|
||||
print(self.calibration_matrix)
|
||||
|
||||
hand_eye_result = {
|
||||
"T": self.calibration_matrix.tolist()
|
||||
}
|
||||
with open(f"{self.matrix_name}_matrix.json", "w") as f:
|
||||
json.dump(hand_eye_result, f, indent=4)
|
||||
self.get_logger().info(f"Save as {self.matrix_name}_matrix.json")
|
||||
|
||||
self.done = True
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = Calibration('calibration')
|
||||
try:
|
||||
while rclpy.ok() and not node.done:
|
||||
rclpy.spin_once(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user