修正错误,格式优化

This commit is contained in:
liangyuxuan
2025-11-28 14:50:45 +08:00
parent 57f7aacff0
commit 77bb8d2fa3
12 changed files with 194 additions and 94 deletions

View File

@@ -1,3 +1,3 @@
from .detect_node import DetectNode
__all__ = ['DetectNode']
__all__ = ['detect_node']

View File

@@ -25,9 +25,7 @@ from interfaces.msg import PoseClassAndID, PoseArrayClassAndID, ImgMsg
from interfaces.srv import VisionObjectRecognition
from ..utils import calculate_pose_pca, calculate_pose_icp, get_map, \
distortion_correction, crop_imgs_box_xywh, \
draw_box, draw_mask, crop_imgs_mask, calculate_grav_width
from ..utils import *
share_dir = get_package_share_directory('vision_detect')
@@ -88,22 +86,34 @@ class DetectNode(Node):
"""init parameter"""
self.declare_parameter('output_boxes', True)
self.declare_parameter('output_masks', False)
self.declare_parameter('get_camera_mode', 'service')
self.declare_parameter('detect_mode', 'detect')
self.declare_parameter('calculate_mode', 'pca')
self.declare_parameter('left_hand', 'configs/hand_eye_mat/hand_in_left_hand.json')
self.declare_parameter('right_hand', 'configs/hand_eye_mat/hand_in_right_hand.json')
self.declare_parameter('head', 'configs/hand_eye_mat/hand_to_hand.json')
self.output_boxes = self.get_parameter('output_boxes').value
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('get_camera_mode', 'service')
self.declare_parameter('detect_mode', 'detect')
self.declare_parameter('calculate_mode', 'pca')
self.get_camera_mode = self.get_parameter('get_camera_mode').value
self.detect_mode = self.get_parameter('detect_mode').value
self.calculate_mode = self.get_parameter("calculate_mode").value
self.declare_parameter('left_hand', 'configs/hand_eye_mat/hand_in_left_hand.json')
self.declare_parameter('right_hand', 'configs/hand_eye_mat/hand_in_right_hand.json')
self.declare_parameter('head', 'configs/hand_eye_mat/hand_to_hand.json')
self.declare_parameter('Service_configs', '{}')
self.declare_parameter('Topic_configs', '{}')
self.declare_parameter('Detect_configs', '{}')
self.declare_parameter('Color_configs', '{}')
self.declare_parameter('Crossboard_configs', '{}')
self.declare_parameter('PCA_configs', '{}')
self.declare_parameter('ICA_configs', '{}')
self._init_json_file()
if self.get_camera_mode == "Service":
self.declare_parameter('Service_configs', '{}')
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
self._init_service()
elif self.get_camera_mode == "Topic":
@@ -119,11 +129,9 @@ class DetectNode(Node):
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self.declare_parameter('Service_configs', '{}')
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
if self.detect_mode == 'Detect':
self.declare_parameter('Detect_configs', '{}')
self.function = self._seg_image
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
@@ -132,17 +140,14 @@ class DetectNode(Node):
self.checkpoint_path = os.path.join(share_dir, json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"])
self._init_model()
elif self.detect_mode == 'Color':
self.declare_parameter('Color_configs', '{}')
self.function = self._seg_color
self.color_range = json.loads(self.get_parameter('Color_configs').value)["color_range"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in self.color_range]
elif self.detect_mode == 'Crossboard':
self.declare_parameter('Crossboard_configs', '{}')
self.function = self._seg_crossboard
self.pattern_size = json.loads(self.get_parameter('Crossboard_configs').value)["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
self.declare_parameter('Detect_configs', '{}')
self.function = self._seg_image
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
@@ -152,56 +157,43 @@ class DetectNode(Node):
self._init_model()
if self.calculate_mode == "PCA":
self.declare_parameter('PCA_configs', '{}')
self.configs = json.loads(self.get_parameter('PCA_configs').value)
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP":
self.declare_parameter('ICA_configs', '{}')
self.configs = json.loads(self.get_parameter('ICA_configs').value)
self.source = o3d.io.read_point_cloud(os.path.join(share_dir, self.configs['complete_model_path']))
self.source = o3d.io.read_point_cloud(
os.path.join(share_dir, self.configs['complete_model_path'])
)
self.calculate_function = calculate_pose_icp
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.declare_parameter('PCA_configs', '{}')
self.configs = json.loads(self.get_parameter('PCA_configs').value)
self.calculate_function = calculate_pose_pca
self.get_logger().info("Initialize parameters done")
def _init_json_file(self):
self.eye_in_left_hand_path = os.path.join(share_dir, self.get_parameter('left_hand').value)
self.eye_in_right_hand_path = os.path.join(share_dir, self.get_parameter('right_hand').value)
self.eye_to_hand_path = os.path.join(share_dir, self.get_parameter('head').value)
self.eye_in_left_hand_path = os.path.join(
share_dir, self.get_parameter('left_hand').value
)
self.eye_in_right_hand_path = os.path.join(
share_dir, self.get_parameter('right_hand').value
)
self.eye_to_hand_path = os.path.join(
share_dir, self.get_parameter('head').value
)
if not os.path.exists(self.eye_in_left_hand_path):
self.get_logger().warning(f"{self.eye_in_left_hand_path} not found, use E(4, 4)")
self.eye_in_left_hand_mat = np.eye(4)
else:
with open(self.eye_in_left_hand_path, "r", encoding="utf-8") as f:
self.eye_in_left_hand_mat = np.array(json.load(f)["T"])
if self.eye_in_left_hand_mat.shape != (4, 4):
self.get_logger().warning("The shape of eye_in_right_hand_mat is wrong, use E(4, 4)")
self.eye_in_left_hand_mat = np.eye(4)
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(self.eye_in_left_hand_path)
if not sign:
self.get_logger().warning(f"left_mat: {info}")
if not os.path.exists(self.eye_in_right_hand_path):
self.get_logger().warning(f"{self.eye_in_right_hand_path} not found, use E(4, 4)")
self.eye_in_right_hand_mat = np.eye(4)
else:
with open(self.eye_in_right_hand_path, "r", encoding="utf-8") as f:
self.eye_in_right_hand_mat = np.array(json.load(f)["T"])
if self.eye_in_right_hand_mat.shape != (4, 4):
self.get_logger().warning("The shape of eye_in_right_hand_mat is wrong, use E(4, 4)")
self.eye_in_right_hand_mat = np.eye(4)
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(self.eye_in_right_hand_path)
if not sign:
self.get_logger().warning(f"right_mat: {info}")
if not os.path.exists(self.eye_to_hand_path):
self.get_logger().warning(f"{self.eye_to_hand_path} not found, use E(4, 4)")
self.eye_to_hand_mat = np.eye(4)
else:
with open(self.eye_to_hand_path, "r", encoding="utf-8") as f:
self.eye_to_hand_mat = np.array(json.load(f)["T"])
if self.eye_to_hand_mat.shape != (4, 4):
self.get_logger().warning("The shape of eye_to_hand_mat is wrong, use E(4, 4)")
self.eye_to_hand_mat = np.eye(4)
self.eye_to_hand_mat, info, sign = read_calibration_mat(self.eye_to_hand_path)
if not sign:
self.get_logger().warning(f"head_mat: {info}")
self.get_logger().info("Initialize read json file done")
@@ -455,7 +447,7 @@ class DetectNode(Node):
"class_id": int(class_ids[i]),
"class_name": labels[class_ids[i]],
"pose": pose,
"grap_width": grab_width
"grab_width": grab_width
}
)

View File

@@ -2,8 +2,4 @@ from .object_icp import *
from .image_tools import *
from .draw_tools import *
from .calculate_tools import *
__all__ = [
"crop_imgs_box_xywh", "crop_imgs_box_xyxy", "crop_imgs_mask", "get_map", "distortion_correction", "draw_box",
"draw_mask", "draw_pointcloud", "pca", "calculate_pose_pca", "calculate_pose_icp", "calculate_grav_width"
]
from .file_tools import *

View File

@@ -1,5 +1,4 @@
import math
import json
"""计算工具"""
import logging
import cv2
@@ -9,6 +8,10 @@ import transforms3d as tfs
from .object_icp import object_icp
__all__ = [
"calculate_pose_pca", "calculate_pose_icp", "calculate_grav_width",
"rmat2quat", "quat2rmat",
]
def pca(data: np.ndarray, sort=True):
"""主成分分析 """
@@ -64,8 +67,14 @@ def calculate_pose_pca(
point_cloud = point_cloud.remove_non_finite_points()
down_pcd = point_cloud.voxel_down_sample(voxel_size=configs["voxel_size"])
clean_pcd, _ = down_pcd.remove_radius_outlier(nb_points=configs["nb_points"], radius=configs["radius"])
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=configs["nb_neighbors"], std_ratio=configs["std_ratio"])
clean_pcd, _ = down_pcd.remove_radius_outlier(
nb_points=configs["nb_points"],
radius=configs["radius"]
)
clean_pcd, _ = clean_pcd.remove_statistical_outlier(
nb_neighbors=configs["nb_neighbors"],
std_ratio=configs["std_ratio"]
)
if len(clean_pcd.points) == 0:
logging.warning("clean_pcd is empty")
@@ -75,26 +84,26 @@ def calculate_pose_pca(
w, v = pca(np.asarray(clean_pcd.points))
if w is not None and v is not None:
vx, vy, vz = v[:,0], v[:,1], v[:,2]
if vx[0] < 0:
vx = -vx
if vy[1] < 0:
vy = -vy
if not np.allclose(np.cross(vx, vy), vz):
vz = -vz
R = np.column_stack((vx, vy, vz))
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
rmat = hand_eye_mat @ rmat
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
x, y, z = rmat[0:3, 3].flatten()
return x, y, z, rw, rx, ry, rz
else:
if w is None or v is None:
logging.warning("PCA output w or v is None")
return 0.0, 0.0, 0.0, None, None, None, None
vx, vy, vz = v[:,0], v[:,1], v[:,2]
if vx[0] < 0:
vx = -vx
if vy[1] < 0:
vy = -vy
if not np.allclose(np.cross(vx, vy), vz):
vz = -vz
R = np.column_stack((vx, vy, vz))
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
rmat = hand_eye_mat @ rmat
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
x, y, z = rmat[0:3, 3].flatten()
return x, y, z, rw, rx, ry, rz
def calculate_pose_icp(
@@ -132,8 +141,14 @@ def calculate_pose_icp(
point_cloud = point_cloud.remove_non_finite_points()
clean_pcd, _ = point_cloud.remove_radius_outlier(nb_points=configs["nb_points"], radius=configs["radius"])
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=configs["nb_neighbors"], std_ratio=configs["std_ratio"])
clean_pcd, _ = point_cloud.remove_radius_outlier(
nb_points=configs["nb_points"],
radius=configs["radius"]
)
clean_pcd, _ = clean_pcd.remove_statistical_outlier(
nb_neighbors=configs["nb_neighbors"],
std_ratio=configs["std_ratio"]
)
if len(clean_pcd.points) == 0:
logging.warning("clean_pcd is empty")
@@ -155,7 +170,7 @@ def calculate_pose_icp(
return x, y, z, rw, rx, ry, rz
def calculate_grav_width(mask, K, depth):
def calculate_grav_width(mask, k, depth):
"""计算重心宽度"""
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
@@ -165,9 +180,25 @@ def calculate_grav_width(mask, K, depth):
else:
point_diff = box[1] - box[2]
grab_width = depth * np.sqrt(point_diff[0]**2 / K[0]**2 + point_diff[1]**2 / K[4]**2)
grab_width = depth * np.sqrt(
point_diff[0]**2 / k[0]**2 + point_diff[1]**2 / k[4]**2
)
return grab_width
else:
return 0.0
return 0.0
def rmat2quat(rmat):
"""Convert rotation matrix to quaternion."""
x, y, z = rmat[0:3, 3:4].flatten()
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
quat = [x, y, z, rw, rx, ry, rz]
return quat
def quat2rmat(quat):
"""Convert quaternion to rotation matrix."""
x, y, z, rw, rx, ry, rz = quat
r = tfs.quaternions.quat2mat([rw, rx, ry, rz])
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), r, [1, 1, 1])
return rmat

View File

@@ -5,6 +5,10 @@ import numpy as np
import open3d as o3d
__all__ = [
"draw_box", "draw_mask", "draw_pointcloud",
]
def draw_box(rgb_img: np.ndarray, segmentation_result, put_text: bool = True):
"""
绘制目标检测边界框

View File

@@ -0,0 +1,26 @@
"""Utility functions for file operations."""
import os
import json
import numpy as np
__all__ = [
"read_calibration_mat",
]
def read_calibration_mat(mat_path):
"""Read calibration matrix from a json file."""
sign = True
info = "Success"
if not os.path.exists(mat_path):
info = f"{mat_path} not found, use E(4, 4)"
sign = False
mat = np.eye(4)
else:
with open(mat_path, "r", encoding="utf-8") as f:
mat = np.array(json.load(f)["T"])
if mat.shape != (4, 4):
info = "The shape is wrong, use E(4, 4)"
sign = False
mat = np.eye(4)
return mat, info, sign

View File

@@ -3,6 +3,11 @@ import logging
import numpy as np
__all__ = [
"crop_imgs_box_xywh", "crop_imgs_box_xyxy", "crop_imgs_mask", "get_map",
"distortion_correction",
]
def crop_imgs_box_xywh(imgs: list, box, same_sign: bool = False):
"""
Crop imgs

View File

@@ -3,6 +3,10 @@ import open3d as o3d
from typing import Union, List, Tuple
__all__ = [
"object_icp",
]
def _draw_pointcloud(pcd, T):
R = T[0:3, 0:3]

View File

@@ -120,7 +120,7 @@ class DetectNode(Node):
self.config_name = self.get_parameter('config_name').value
self.config_dir = os.path.join(share_dir, 'configs/flexiv_configs', self.config_name)
self.declare_parameter('set_confidence', 0.25)
self.declare_parameter('set_confidence', 0.5)
self.set_confidence = self.get_parameter('set_confidence').value
self.declare_parameter('classes', 'None')
@@ -301,9 +301,11 @@ class DetectNode(Node):
"""Use segmentation model"""
pose_list = []
img = rgb_img.copy()
'''Get Predict Results'''
time1 = time.time()
results = self.model(rgb_img, retina_masks=True, classes=[39], conf=self.set_confidence)
results = self.model(rgb_img.copy(), retina_masks=True, classes=[39], conf=self.set_confidence)
time2 = time.time()
result = results[0]
@@ -325,6 +327,13 @@ class DetectNode(Node):
for i, (mask, box) in enumerate(zip(masks, boxes)):
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(img, p1, p2, (255, 255, 0), 2)
# rgb_crop, depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(rgb_img, depth_img, mask, box)
# if depth_crop is None:
@@ -444,6 +453,8 @@ class DetectNode(Node):
time4 = time.time()
cv2.imwrite("/home/nvidia/detect_result.png", img)
self.get_logger().info(f'start')
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
self.get_logger().info(f'{(time3 - time2) * 1000} ms, get mask and some param')

View File

@@ -3,4 +3,4 @@
#include <Eigen/Dense>
Eigen::Matrix4d read_data_from_json(const std::string& file_path, std::string& info);
Eigen::Matrix4d read_data_from_json(const std::string &file_path, std::string &info, bool &sign);

View File

@@ -95,18 +95,47 @@ void VisionDetectNode::init_model() {
void VisionDetectNode::init_calibration_mat() {
/*Init Calibration Mat*/
std::string info;
bool sign;
info = "left";
this->left_hand_mat = read_data_from_json(this->calibrition_path["left_hand"].get<std::string>(), info);
RCLCPP_INFO(this->get_logger(), info.c_str());
this->left_hand_mat = read_data_from_json(
this->calibrition_path["left_hand"].get<std::string>(),
info,
sign
);
if (sign) {
RCLCPP_INFO(this->get_logger(), info.c_str());
}
else {
RCLCPP_WARN(this->get_logger(), info.c_str());
}
info = "right";
this->right_hand_mat = read_data_from_json(this->calibrition_path["right_hand"].get<std::string>(), info);
RCLCPP_INFO(this->get_logger(), info.c_str());
this->right_hand_mat = read_data_from_json(
this->calibrition_path["right_hand"].get<std::string>(),
info,
sign
);
if (sign) {
RCLCPP_INFO(this->get_logger(), info.c_str());
}
else {
RCLCPP_WARN(this->get_logger(), info.c_str());
}
info = "head";
this->head_mat = read_data_from_json(this->calibrition_path["head"].get<std::string>(), info);
RCLCPP_INFO(this->get_logger(), info.c_str());
this->head_mat = read_data_from_json(
this->calibrition_path["head"].get<std::string>(),
info,
sign
);
if (sign) {
RCLCPP_INFO(this->get_logger(), info.c_str());
}
else {
RCLCPP_WARN(this->get_logger(), info.c_str());
}
RCLCPP_INFO(this->get_logger(), "Init calibration mat done");
// Done

View File

@@ -4,7 +4,7 @@
#include <nlohmann/json.hpp>
Eigen::Matrix4d read_data_from_json(const std::string& file_path, std::string& info) {
Eigen::Matrix4d read_data_from_json(const std::string &file_path, std::string &info, bool &sign) {
Eigen::Matrix4d mat;
nlohmann::json jfile;
std::ifstream file(file_path);
@@ -19,6 +19,7 @@ Eigen::Matrix4d read_data_from_json(const std::string& file_path, std::string& i
info = "Cannot open head file! use E(4, 4)";
}
mat = Eigen::Matrix4d::Identity();
sign = false;
}
else {
file >> jfile;
@@ -37,6 +38,7 @@ Eigen::Matrix4d read_data_from_json(const std::string& file_path, std::string& i
else {
info = "Success read head data";
}
sign = true;
}
return mat;
}