整合功能

This commit is contained in:
liangyuxuan
2025-11-17 13:46:10 +08:00
parent 2740a751f5
commit f19cdf6535
43 changed files with 2550 additions and 273 deletions

View File

@@ -1,7 +1,9 @@
<component name="ProjectDictionaryState">
<dictionary name="project">
<words>
<w>crossboard</w>
<w>pointclouds</w>
<w>xywh</w>
</words>
</dictionary>
</component>

View File

@@ -0,0 +1,28 @@
{
"T": [
[
1.0,
0.0,
0.0,
0.0
],
[
0.0,
1.0,
0.0,
0.0
],
[
0.0,
0.0,
1.0,
0.0
],
[
0.0,
0.0,
0.0,
1.0
]
]
}

View File

@@ -0,0 +1,28 @@
{
"T": [
[
1.0,
0.0,
0.0,
0.0
],
[
0.0,
1.0,
0.0,
0.0
],
[
0.0,
0.0,
1.0,
0.0
],
[
0.0,
0.0,
0.0,
1.0
]
]
}

View File

@@ -0,0 +1,28 @@
{
"T": [
[
1.0,
0.0,
0.0,
0.0
],
[
0.0,
1.0,
0.0,
0.0
],
[
0.0,
0.0,
1.0,
0.0
],
[
0.0,
0.0,
0.0,
1.0
]
]
}

View File

@@ -0,0 +1,31 @@
{
"node_name": "bottle_detect_service",
"output_boxes": "True",
"output_masks": "False",
"eye_in_left_hand_mat_path": "configs/hand_eye_mat/eye_in_left_hand.json",
"eye_in_right_hand_mat_path": "configs/hand_eye_mat/eye_in_right_hand.json",
"eye_to_hand_mat_path": "configs/hand_eye_mat/eye_to_hand.json",
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.25,
"classes": [39]
},
"calculate_mode": "PCA",
"ICP_configs": {
"complete_model_path": "pointclouds/bottle_model.pcd",
"depth_scale": 1000.0,
"depth_trunc": 2.0,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0,
"ransac_voxel_size": 0.005,
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]
}
}

View File

@@ -0,0 +1,28 @@
{
"node_name": "bottle_detect_service",
"output_boxes": "True",
"output_masks": "False",
"eye_in_left_hand_mat_path": "configs/hand_eye_mat/eye_in_left_hand.json",
"eye_in_right_hand_mat_path": "configs/hand_eye_mat/eye_in_right_hand.json",
"eye_to_hand_mat_path": "configs/hand_eye_mat/eye_to_hand.json",
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.25,
"classes": [39]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
}
}

View File

@@ -0,0 +1,29 @@
{
"node_name": "crossboard_topic",
"output_boxes": "True",
"output_masks": "False",
"eye_in_left_hand_mat_path": "configs/hand_eye_mat/eye_in_left_hand.json",
"eye_in_right_hand_mat_path": "configs/hand_eye_mat/eye_in_right_hand.json",
"eye_to_hand_mat_path": "configs/hand_eye_mat/eye_to_hand.json",
"get_camera_mode": "Topic",
"Topic_configs": {
"position": "right",
"color_image_topic_name": "/camera/color/image_raw",
"depth_image_topic_name": "/camera/depth/image_raw",
"camera_info_topic_name": "/camera/color/camera_info"
},
"detect_mode": "Crossboard",
"Crossboard_configs": {
"pattern_size": [8, 5]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
}
}

View File

@@ -0,0 +1,53 @@
{
"node_name": "default_config_detect_service",
"output_boxes": "True",
"output_masks": "False",
"eye_in_left_hand_mat_path": "configs/hand_eye_mat/eye_in_left_hand.json",
"eye_in_right_hand_mat_path": "configs/hand_eye_mat/eye_in_right_hand.json",
"eye_to_hand_mat_path": "configs/hand_eye_mat/eye_to_hand.json",
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"Topic_configs": {
"position": "right",
"color_image_topic_name": "/camera/color/image_raw",
"depth_image_topic_name": "/camera/depth/image_raw",
"camera_info_topic_name": "/camera/color/camera_info"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.25,
"classes": []
},
"Color_configs": {
"distance": 1500,
"color_range": [[[0, 120, 70], [10, 255, 255]], [[170, 120, 70], [180, 255, 255]]]
},
"Crossboard_configs": {
"pattern_size": [8, 5]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
},
"ICP_configs": {
"complete_model_path": "pointclouds/bottle_model.pcd",
"depth_scale": 1000.0,
"depth_trunc": 2.0,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0,
"ransac_voxel_size": 0.005,
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]
}
}

View File

@@ -1,7 +0,0 @@
{
"checkpoint_name": "yolo11s-seg.pt",
"output_boxes": "True",
"output_masks": "False",
"set_confidence": "0.25",
"classes": "None"
}

View File

@@ -0,0 +1,28 @@
{
"node_name": "detect_service",
"output_boxes": "True",
"output_masks": "False",
"eye_in_left_hand_mat_path": "configs/hand_eye_mat/eye_in_left_hand.json",
"eye_in_right_hand_mat_path": "configs/hand_eye_mat/eye_in_right_hand.json",
"eye_to_hand_mat_path": "configs/hand_eye_mat/eye_to_hand.json",
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.25,
"classes": []
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
}
}

View File

@@ -0,0 +1,73 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/bottle_detect_service_ica.json')
with open(config_dir, "r") as f:
configs = json.load(f)
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('eye_in_left_hand_mat_path', default_value=configs['eye_in_left_hand_mat_path']),
DeclareLaunchArgument('eye_in_right_hand_mat_path', default_value=configs['eye_in_right_hand_mat_path']),
DeclareLaunchArgument('eye_to_hand_mat_path', default_value=configs['eye_to_hand_mat_path']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("ICP_configs", default_value=json.dumps(configs['ICP_configs'])),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
eye_in_left_hand_mat_path = LaunchConfiguration('eye_in_left_hand_mat_path').perform(context)
eye_in_right_hand_mat_path = LaunchConfiguration('eye_in_right_hand_mat_path').perform(context)
eye_to_hand_mat_path = LaunchConfiguration('eye_to_hand_mat_path').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
ICP_configs = LaunchConfiguration('ICP_configs').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'eye_in_left_hand_mat_path': eye_in_left_hand_mat_path,
'eye_in_right_hand_mat_path': eye_in_right_hand_mat_path,
'eye_to_hand_mat_path': eye_to_hand_mat_path,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Service_configs': Service_configs,
'Detect_configs': Detect_configs,
'ICP_configs': ICP_configs,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -0,0 +1,73 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/bottle_detect_service_pca.json')
with open(config_dir, "r") as f:
configs = json.load(f)
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('eye_in_left_hand_mat_path', default_value=configs['eye_in_left_hand_mat_path']),
DeclareLaunchArgument('eye_in_right_hand_mat_path', default_value=configs['eye_in_right_hand_mat_path']),
DeclareLaunchArgument('eye_to_hand_mat_path', default_value=configs['eye_to_hand_mat_path']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
eye_in_left_hand_mat_path = LaunchConfiguration('eye_in_left_hand_mat_path').perform(context)
eye_in_right_hand_mat_path = LaunchConfiguration('eye_in_right_hand_mat_path').perform(context)
eye_to_hand_mat_path = LaunchConfiguration('eye_to_hand_mat_path').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'eye_in_left_hand_mat_path': eye_in_left_hand_mat_path,
'eye_in_right_hand_mat_path': eye_in_right_hand_mat_path,
'eye_to_hand_mat_path': eye_to_hand_mat_path,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Service_configs': Service_configs,
'Detect_configs': Detect_configs,
'PCA_configs': PCA_configs,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -0,0 +1,73 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/crossboard_topic_pca.json')
with open(config_dir, "r") as f:
configs = json.load(f)
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('eye_in_left_hand_mat_path', default_value=configs['eye_in_left_hand_mat_path']),
DeclareLaunchArgument('eye_in_right_hand_mat_path', default_value=configs['eye_in_right_hand_mat_path']),
DeclareLaunchArgument('eye_to_hand_mat_path', default_value=configs['eye_to_hand_mat_path']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Topic_configs", default_value=json.dumps(configs['Topic_configs'])),
DeclareLaunchArgument("Crossboard_configs", default_value=json.dumps(configs['Crossboard_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
eye_in_left_hand_mat_path = LaunchConfiguration('eye_in_left_hand_mat_path').perform(context)
eye_in_right_hand_mat_path = LaunchConfiguration('eye_in_right_hand_mat_path').perform(context)
eye_to_hand_mat_path = LaunchConfiguration('eye_to_hand_mat_path').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Topic_configs = LaunchConfiguration('Topic_configs').perform(context)
Crossboard_configs = LaunchConfiguration('Crossboard_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'eye_in_left_hand_mat_path': eye_in_left_hand_mat_path,
'eye_in_right_hand_mat_path': eye_in_right_hand_mat_path,
'eye_to_hand_mat_path': eye_to_hand_mat_path,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Topic_configs': Topic_configs,
'Crossboard_configs': Crossboard_configs,
'PCA_configs': PCA_configs,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -0,0 +1,73 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/detect_service_pca.json')
with open(config_dir, "r") as f:
configs = json.load(f)
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('eye_in_left_hand_mat_path', default_value=configs['eye_in_left_hand_mat_path']),
DeclareLaunchArgument('eye_in_right_hand_mat_path', default_value=configs['eye_in_right_hand_mat_path']),
DeclareLaunchArgument('eye_to_hand_mat_path', default_value=configs['eye_to_hand_mat_path']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
eye_in_left_hand_mat_path = LaunchConfiguration('eye_in_left_hand_mat_path').perform(context)
eye_in_right_hand_mat_path = LaunchConfiguration('eye_in_right_hand_mat_path').perform(context)
eye_to_hand_mat_path = LaunchConfiguration('eye_to_hand_mat_path').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'eye_in_left_hand_mat_path': eye_in_left_hand_mat_path,
'eye_in_right_hand_mat_path': eye_in_right_hand_mat_path,
'eye_to_hand_mat_path': eye_to_hand_mat_path,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Service_configs': Service_configs,
'Detect_configs': Detect_configs,
'PCA_configs': PCA_configs,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -0,0 +1,115 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/default_config.json')
with open(config_dir, "r") as f:
configs = json.load(f)
# def get_param_configs():
# param_configs = {}
# if configs["get_camera_mode"] == "Service":
# param_configs.update(configs["Service_configs"])
# elif configs["get_camera_mode"] == "Topic":
# param_configs.update(configs["Topic_configs"])
# else:
# param_configs.update(configs["Service_configs"])
#
# if configs["detect_mode"] == "Detect":
# param_configs.update(configs["Detect_configs"])
# elif configs["detect_mode"] == "Color":
# param_configs.update(configs["Color_configs"])
# elif configs["detect_mode"] == "Crossboard":
# param_configs.update(configs["Crossboard_configs"])
# else:
# param_configs.update(configs["Detect_configs"])
#
# if configs["calculate_mode"] == "PCA":
# param_configs.update(configs["PCA_configs"])
# elif configs["calculate_mode"] == "ICP":
# param_configs.update(configs["ICP_configs"])
# else:
# param_configs.update(configs["PCA_configs"])
# return param_configs
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('eye_in_left_hand_mat_path', default_value=configs['eye_in_left_hand_mat_path']),
DeclareLaunchArgument('eye_in_right_hand_mat_path', default_value=configs['eye_in_right_hand_mat_path']),
DeclareLaunchArgument('eye_to_hand_mat_path', default_value=configs['eye_to_hand_mat_path']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
# DeclareLaunchArgument("param_configs", default_value=json.dumps(get_param_configs())),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Topic_configs", default_value=json.dumps(configs['Topic_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("Color_configs", default_value=json.dumps(configs['Color_configs'])),
DeclareLaunchArgument("Crossboard_configs", default_value=json.dumps(configs['Crossboard_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
DeclareLaunchArgument("ICP_configs", default_value=json.dumps(configs['ICP_configs'])),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
eye_in_left_hand_mat_path = LaunchConfiguration('eye_in_left_hand_mat_path').perform(context)
eye_in_right_hand_mat_path = LaunchConfiguration('eye_in_right_hand_mat_path').perform(context)
eye_to_hand_mat_path = LaunchConfiguration('eye_to_hand_mat_path').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
# param_configs = LaunchConfiguration('param_configs').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Topic_configs = LaunchConfiguration('Topic_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
Color_configs = LaunchConfiguration('Color_configs').perform(context)
Crossboard_configs = LaunchConfiguration('Crossboard_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
ICP_configs = LaunchConfiguration('ICP_configs').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'eye_in_left_hand_mat_path': eye_in_left_hand_mat_path,
'eye_in_right_hand_mat_path': eye_in_right_hand_mat_path,
'eye_to_hand_mat_path': eye_to_hand_mat_path,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
# 'param_configs': param_configs,
'Service_configs': Service_configs,
'Topic_configs': Topic_configs,
'Detect_configs': Detect_configs,
'Color_configs': Color_configs,
'Crossboard_configs': Crossboard_configs,
'PCA_configs': PCA_configs,
'ICP_configs': ICP_configs,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -13,10 +13,11 @@ setup(
('share/' + package_name + '/launch', glob('launch/*.launch.py')),
('share/' + package_name + '/configs', glob('configs/*.json')),
('share/' + package_name + '/configs/flexiv_configs', glob('configs/flexiv_configs/*.json')),
('share/' + package_name + '/configs/hand_eye_mat', glob('configs/hand_eye_mat/*.json')),
('share/' + package_name + '/configs/launch_configs', glob('configs/launch_configs/*.json')),
('share/' + package_name + '/checkpoints', glob('vision_detect/checkpoints/*.pt')),
('share/' + package_name + '/pointclouds', glob('vision_detect/pointclouds/*.pcd')),
('share/' + package_name + '/checkpoints', glob('checkpoints/*.pt')),
('share/' + package_name + '/pointclouds', glob('pointclouds/*.pcd')),
],
install_requires=['setuptools'],
zip_safe=True,
@@ -42,7 +43,9 @@ setup(
'crossboard_detect_node = vision_detect.crossboard_detect:main',
'service_client_node = vision_detect.service_client:main',
'get_camera_pose_node = vision_detect.get_camera_pose:main',
'calculate_node = vision_detect.calculate:main'
'calculate_node = vision_detect.calculate:main',
'detect_node = vision_detect.detect_node:main',
],
},
)

View File

@@ -0,0 +1,3 @@
__all__ = []

View File

@@ -0,0 +1,3 @@
from .detect_node import DetectNode
__all__ = ['DetectNode']

View File

@@ -0,0 +1,594 @@
import os
import time
import threading
import json
from collections import defaultdict
from ament_index_python.packages import get_package_share_directory
import cv2
import open3d as o3d
import numpy as np
import torch
from message_filters import Subscriber, ApproximateTimeSynchronizer
from ultralytics import YOLO
import rclpy
from rclpy.node import Node
from rclpy.task import Future
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
from img_dev.msg import ImgMsg
from interfaces.srv import VisionObjectRecognition
from ..utils import calculate_pose_pca, calculate_pose_icp, get_map, distortion_correction, crop_imgs_box_xywh, \
draw_box, draw_mask, crop_imgs_mask
share_dir = get_package_share_directory('vision_detect')
class DetectNode(Node):
def __init__(self, name):
super().__init__(name)
self.device = None
self.checkpoint_path = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.source = None
self.K = None
self.eye_in_hand_mat = None
self.eye_to_hand_mat = None
self.get_camera_mode = None
self.detect_mode = None
self.calculate_mode = None
self.calculate_function = calculate_pose_pca
self.fx = self.fy = 0.5
self.camera_data = {}
self.future = Future()
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
self.distance = 1500
self.color_range = [
[[0, 120, 70], [10, 255, 255]],
[[170, 120, 70], [180, 255, 255]]
]
'''init'''
self._init_param()
self._init_publisher()
self._init_subscriber()
self.get_logger().info("Initialize done")
def _init_param(self):
"""init parameter"""
self.declare_parameter('output_boxes', True)
self.declare_parameter('output_masks', False)
self.declare_parameter('get_camera_mode', 'service')
self.declare_parameter('detect_mode', 'detect')
self.declare_parameter('calculate_mode', 'pca')
self.declare_parameter('eye_in_left_hand_mat_path', 'configs/hand_eye_mat/hand_in_left_hand.json')
self.declare_parameter('eye_in_right_hand_mat_path', 'configs/hand_eye_mat/hand_in_right_hand.json')
self.declare_parameter('eye_to_hand_mat_path', 'configs/hand_eye_mat/hand_to_hand.json')
self.output_boxes = self.get_parameter('output_boxes').value
self.output_masks = self.get_parameter('output_masks').value
self.get_camera_mode = self.get_parameter('get_camera_mode').value
self.detect_mode = self.get_parameter('detect_mode').value
self.calculate_mode = self.get_parameter("calculate_mode").value
self._init_json_file()
if self.get_camera_mode == "Service":
self.declare_parameter('Service_configs', '{}')
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
self._init_service()
elif self.get_camera_mode == "Topic":
self.declare_parameter('Topic_configs', '{}')
self.color_image_topic_name = json.loads(self.get_parameter('Topic_configs').value)["color_image_topic_name"]
self.depth_image_topic_name = json.loads(self.get_parameter('Topic_configs').value)["depth_image_topic_name"]
self.camera_info_topic_name = json.loads(self.get_parameter('Topic_configs').value)["camera_info_topic_name"]
self.position = json.loads(self.get_parameter('Topic_configs').value)["position"]
if self.position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif self.position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self.declare_parameter('Service_configs', '{}')
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
if self.detect_mode == 'Detect':
self.declare_parameter('Detect_configs', '{}')
self.function = self._seg_image
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = os.path.join(share_dir, json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"])
self._init_model()
elif self.detect_mode == 'Color':
self.declare_parameter('Color_configs', '{}')
self.function = self._seg_color
self.color_range = json.loads(self.get_parameter('Color_configs').value)["color_range"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in self.color_range]
elif self.detect_mode == 'Crossboard':
self.declare_parameter('Crossboard_configs', '{}')
self.function = self._seg_crossboard
self.pattern_size = json.loads(self.get_parameter('Crossboard_configs').value)["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
self.declare_parameter('Detect_configs', '{}')
self.function = self._seg_image
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"]
self._init_model()
if self.calculate_mode == "PCA":
self.declare_parameter('PCA_configs', '{}')
self.configs = json.loads(self.get_parameter('PCA_configs').value)
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP":
self.declare_parameter('ICA_configs', '{}')
self.configs = json.loads(self.get_parameter('ICA_configs').value)
self.source = o3d.io.read_point_cloud(os.path.join(share_dir, self.configs['complete_model_path']))
self.calculate_function = calculate_pose_icp
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.declare_parameter('PCA_configs', '{}')
self.configs = json.loads(self.get_parameter('PCA_configs').value)
self.calculate_function = calculate_pose_pca
self.get_logger().info("Initialize parameters done")
def _init_json_file(self):
self.eye_in_left_hand_path = os.path.join(share_dir, self.get_parameter('eye_in_left_hand_mat_path').value)
self.eye_in_right_hand_path = os.path.join(share_dir, self.get_parameter('eye_in_right_hand_mat_path').value)
self.eye_to_hand_path = os.path.join(share_dir, self.get_parameter('eye_to_hand_mat_path').value)
if not os.path.exists(self.eye_in_left_hand_path):
self.get_logger().warning(f"{self.eye_in_left_hand_path} not found, use E(4, 4)")
self.eye_in_left_hand_mat = np.eye(4)
else:
with open(self.eye_in_left_hand_path, "r") as f:
self.eye_in_left_hand_mat = np.array(json.load(f)["T"])
if self.eye_in_left_hand_mat.shape != (4, 4):
self.get_logger().warning("The shape of eye_in_right_hand_mat is wrong, use E(4, 4)")
self.eye_in_left_hand_mat = np.eye(4)
if not os.path.exists(self.eye_in_right_hand_path):
self.get_logger().warning(f"{self.eye_in_right_hand_path} not found, use E(4, 4)")
self.eye_in_right_hand_mat = np.eye(4)
else:
with open(self.eye_in_right_hand_path, "r") as f:
self.eye_in_right_hand_mat = np.array(json.load(f)["T"])
if self.eye_in_right_hand_mat.shape != (4, 4):
self.get_logger().warning("The shape of eye_in_right_hand_mat is wrong, use E(4, 4)")
self.eye_in_right_hand_mat = np.eye(4)
if not os.path.exists(self.eye_to_hand_path):
self.get_logger().warning(f"{self.eye_to_hand_path} not found, use E(4, 4)")
self.eye_to_hand_mat = np.eye(4)
else:
with open(self.eye_to_hand_path, "r") as f:
self.eye_to_hand_mat = np.array(json.load(f)["T"])
if self.eye_to_hand_mat.shape != (4, 4):
self.get_logger().warning("The shape of eye_to_hand_mat is wrong, use E(4, 4)")
self.eye_to_hand_mat = np.eye(4)
self.get_logger().info("Initialize read json file done")
def _init_model(self):
"""init model"""
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
if self.checkpoint_path.endswith('-seg.pt'):
self.function = self._seg_image
else:
self.function = None
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
raise
self.get_logger().info("Initialize model done")
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
if self.get_camera_mode == "Topic":
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose', 10)
self.get_logger().info("Initialize publisher done")
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
self.service_name,
self._service_callback
)
self.get_logger().info("Initialize service done")
def _init_subscriber(self):
"""init subscriber"""
if self.get_camera_mode == "Service":
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
elif self.get_camera_mode == "Topic":
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic_name,
self._camera_info_callback,
10
)
self.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(self, self.future)
self.get_logger().info("Camera info received.")
'''sync get color and depth img'''
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
else:
self.get_logger().warning("get_camera_mode is wrong, use service")
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
self.get_logger().info("Initialize subscriber done")
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""
self.K = msg.k
self.D = msg.d
self.camera_size = [msg.width, msg.height]
if self.K is not None and len(self.K) > 0 and self.D is not None and len(self.D) > 0:
self.map1, self.map2, self.K = get_map(msg.k, msg.d, self.camera_size)
if not self.future.done(): self.future.set_result(True)
self.destroy_subscription(self.sub_camera_info)
else:
self.get_logger().warning("K and D are not defined")
def _service_sub_callback(self, msgs):
"""同步回调函数"""
with self.lock:
self.camera_data[msgs.position] = [
msgs.image_color,
msgs.image_depth,
msgs.karr,
msgs.darr
]
def _sync_sub_callback(self, color_img_ros, depth_img_ros):
"""同步回调函数"""
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, self.map1, self.map2)
img, pose_dict = self.function(color_img_cv, depth_img_cv)
"""masks为空结束这一帧"""
if img is None:
img = self.cv_bridge.cv2_to_imgmsg(color_img_cv, "bgr8")
if self.output_boxes or self.output_masks:
self.pub_detect_image.publish(img)
if pose_dict:
pose_list_all = PoseArrayClassAndID()
for (class_id, class_name), pose_list in pose_dict.items():
pose_list_all.objects.append(
PoseClassAndID(
class_name = class_name,
class_id = class_id,
pose_list = pose_list
)
)
pose_list_all.header.stamp = self.get_clock().now().to_msg()
pose_list_all.header.frame_id = "pose_list"
self.pub_pose_list.publish(pose_list_all)
def _service_callback(self, request, response):
response.header.stamp = self.get_clock().now().to_msg()
response.header.frame_id = "camera_detect"
with self.lock:
if request.camera_position in self.camera_data:
color_img_ros, depth_img_ros, self.K, D = self.camera_data[request.camera_position]
else:
response.success = False
response.info = f"{request.camera_position} Camera data is empty or name is wrong"
response.objects = []
return response
if request.camera_position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif request.camera_position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
self.camera_size = [color_img_ros.width, color_img_ros.height]
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
map1, map2, self.K = get_map(self.K, D, self.camera_size)
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
img, pose_dict = self.function(color_img_cv, depth_img_cv)
"""masks为空结束这一帧"""
if self.output_boxes or self.output_masks:
if img is None:
img = color_img_ros
self.pub_detect_image.publish(img)
if pose_dict:
response.info = "Success get pose"
response.success = True
for (class_id, class_name), pose_list in pose_dict.items():
response.objects.append(
PoseClassAndID(
class_name=class_name,
class_id=class_id,
pose_list=pose_list
)
)
return response
else:
response.info = "pose dict is empty"
response.success = False
response.objects = []
return response
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_dict = defaultdict(list)
'''Get Predict Results'''
time1 = time.time()
results = self.model(rgb_img, retina_masks=True, conf=self.confidence, classes=self.classes)
time2 = time.time()
result = results[0]
'''Get masks'''
if result.masks is None or len(result.masks) == 0:
return None, None
masks = result.masks.data.cpu().numpy()
'''Get boxes'''
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
time3 = time.time()
for i, (mask, box) in enumerate(zip(masks, boxes)):
imgs_crop, (x_min, y_min) = crop_imgs_box_xywh([depth_img, mask], box, True)
depth_crop, mask_crop = imgs_crop
if depth_crop is None:
continue
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]),
int(self.camera_size[1]),
self.K[0],
self.K[4],
self.K[2] - x_min,
self.K[5] - y_min
)
x, y, z, rw, rx, ry, rz = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
self.source,
self.hand_eye_mat,
self.configs
)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
time4 = time.time()
self.get_logger().info(f'start')
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
self.get_logger().info(f'{(time3 - time2) * 1000} ms, get mask and some param')
self.get_logger().info(f'{(time4 - time3) * 1000} ms, calculate all mask PCA')
self.get_logger().info(f'{(time4 - time1) * 1000} ms, completing a picture entire process')
self.get_logger().info(f'end')
'''mask_img and box_img is or not output'''
if self.output_boxes and not self.output_masks:
draw_box(rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif self.output_boxes and self.output_masks:
draw_box(rgb_img, result)
draw_mask(rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif not self.output_boxes and self.output_masks:
draw_mask(rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return None, pose_dict
def _seg_color(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_dict = defaultdict(list)
hsv_img = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2HSV)
depth_filter_mask = np.zeros_like(depth_img, dtype=np.uint8)
depth_filter_mask[(depth_img > 0) & (depth_img < self.distance)] = 1
hsv_img[depth_filter_mask == 0] = 0
mask_part_list = [cv2.inRange(hsv_img, color[0], color[1]) for color in self.color_range]
mask = sum(mask_part_list[1:], mask_part_list[0])
mask = mask // 255
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([255, 0, 0]) * 0.5
imgs_crop, mask_crop, (x_min, y_min) = crop_imgs_mask([depth_img], mask, True)
depth_crop = imgs_crop[0]
if depth_crop is None:
self.get_logger().warning("depth_crop is None")
return None, None
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]),
int(self.camera_size[1]),
self.K[0],
self.K[4],
self.K[2] - x_min,
self.K[5] - y_min
)
x, y, z, rw, rx, ry, rz = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
self.source,
self.hand_eye_mat,
self.configs
)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
pose_dict[int(98), "red"].append(pose)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
def _seg_crossboard(self, rgb_img, depth_img):
pose_dict = defaultdict(list)
rgb_img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(rgb_img_gray, self.pattern_size, cv2.CALIB_CB_FAST_CHECK)
if ret:
# 角点亚像素精确化(提高标定精度)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
corners_subpix = cv2.cornerSubPix(rgb_img_gray, corners, (11, 11), (-1, -1), criteria)
corners_subpix = corners_subpix.reshape(self.pattern_size[1], self.pattern_size[0], 2)
mask = np.zeros(rgb_img_gray.shape, dtype=np.uint8)
for i in range(0, self.pattern_size[1] - 1):
for j in range(0, self.pattern_size[0] - 1):
pts = np.array([
corners_subpix[i, j],
corners_subpix[i, j + 1],
corners_subpix[i + 1, j + 1],
corners_subpix[i + 1, j]
], dtype=np.int32)
cv2.fillConvexPoly(mask, pts, 1)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
img_crop, mask_crop, (x_min, y_min) = crop_imgs_mask([depth_img], mask)
depth_crop = img_crop[0]
if depth_crop is None:
return None, None
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]),
int(self.camera_size[1]),
self.K[0],
self.K[4],
self.K[2] - x_min,
self.K[5] - y_min
)
x, y, z, rw, rx, ry, rz = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
self.source,
self.hand_eye_mat,
self.configs
)
self.get_logger().info(f"{x}, {y}, {z}, {rw}, {rx}, {ry}, {rz}")
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
pose_dict[int(99), 'crossboard'].append(pose)
cv2.putText(
rgb_img,
f'cs: x: {x:.3f}, y: {y:.3f}, z: {z:.3f}',
(0, 0 + 30),cv2.FONT_HERSHEY_SIMPLEX,
1,
(255, 255, 0),
2
)
cv2.putText(
rgb_img,
f'quat: rw: {rw:.3f}, rx: {rx:.3f}, ry: {ry:.3f}, rz: {rz:.3f}',
(0, 0 + 70), cv2.FONT_HERSHEY_SIMPLEX,
1,
(255, 255, 0),
2
)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), None
if __name__ == '__main__':
rclpy.init(args=None)
node = DetectNode('detect')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -0,0 +1,9 @@
from .object_icp import *
from .image_tools import *
from .draw_tools import *
from .calculate_tools import *
__all__ = [
"crop_imgs_box_xywh", "crop_imgs_box_xyxy", "crop_imgs_mask", "get_map", "distortion_correction", "draw_box",
"draw_mask", "draw_pointcloud", "calculate_hand_eye_matrix", "pca", "calculate_pose_pca", "calculate_pose_icp"
]

View File

@@ -0,0 +1,401 @@
import math
import json
import logging
import numpy as np
import open3d as o3d
import transforms3d as tfs
from .object_icp import object_icp
def _get_matrix_quat(x, y, z, rw, rx, ry, rz):
"""从单位四元数构建齐次变换矩阵"""
'''构造旋转矩阵'''
q = [rw, rx, ry, rz]
rmat = tfs.quaternions.quat2mat(q)
"""构造齐次变换矩阵"""
rmat = tfs.affines.compose(
np.squeeze(np.asarray((x, y, z))),
rmat,
[1, 1, 1]
)
return rmat
def _get_matrix_eular_radu(x, y, z, rx, ry, rz):
"""从欧拉角构建齐次变换矩阵"""
'''构造旋转矩阵'''
rmat = tfs.euler.euler2mat(
# math.radians(rx), math.radians(ry), math.radians(rz)
rx, ry, rz
)
"""构造齐次变换矩阵"""
rmat = tfs.affines.compose(
np.squeeze(np.asarray((x, y, z))),
rmat,
[1, 1, 1]
)
return rmat
def _get_matrix_rotvector(x, y, z, rx, ry, rz):
"""从旋转向量构建齐次变换矩阵"""
'''构造旋转矩阵'''
rvec = np.array([rx, ry, rz])
theta = np.linalg.norm(rvec)
if theta < 1e-8:
rmat = np.eye(3) # 小角度直接返回单位矩阵
else:
axis = rvec / theta
rmat = tfs.axangles.axangle2mat(axis, theta)
"""构造齐次变换矩阵"""
rmat = tfs.affines.compose(
np.squeeze(np.asarray((x, y, z))),
rmat,
[1, 1, 1]
)
return rmat
def _skew(v):
return np.array([[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]])
def _R2P(T):
"""旋转矩阵 --> 修正罗德里格斯向量"""
axis, angle= tfs.axangles.mat2axangle(T[0:3, 0:3])
P = 2 * math.sin(angle / 2) * axis
return P
def _P2R(P):
"""修正罗德里格斯向量 --> 旋转矩阵"""
P2 = np.dot(P.T, P)
part_1 = (1 - P2 / 2) * np.eye(3)
part_2 = np.add(np.dot(P, P.T), np.sqrt(4- P2) * _skew(P.flatten().T))
R = np.add(part_1, np.divide(part_2, 2))
return R
def _calculate_(Hgs, Hcs):
"""计算标定矩阵"""
# H代表矩阵、h代表标量
Hgijs = []
Hcijs = []
A = []
B = []
size = 0
for i in range(len(Hgs)):
for j in range(i + 1, len(Hgs)):
size += 1
Hgij = np.dot(np.linalg.inv(Hgs[j]), Hgs[i])
Hgijs.append(Hgij)
Pgij = np.dot(2, _R2P(Hgij))
Hcij = np.dot(Hcs[j], np.linalg.inv(Hcs[i]))
Hcijs.append(Hcij)
Pcij = np.dot(2, _R2P(Hcij))
A.append(_skew(np.add(Pgij, Pcij)))
B.append(np.subtract(Pcij, Pgij).reshape(3, 1))
MA = np.vstack(A)
MB = np.vstack(B)
Pcg_ = np.dot(np.linalg.pinv(MA), MB)
pcg = np.sqrt(np.add(1, np.dot(Pcg_.T, Pcg_)))
Pcg = np.dot(np.dot(2, Pcg_), np.linalg.inv(pcg))
Rcg = _P2R(Pcg).reshape(3, 3)
A = []
B = []
id = 0
for i in range(len(Hgs)):
for j in range(i + 1, len(Hgs)):
Hgij = Hgijs[id]
Hcij = Hcijs[id]
A.append(np.subtract(Hgij[0:3, 0:3], np.eye(3, 3)))
B.append(np.subtract(np.dot(Rcg, Hcij[0:3, 3:4]), Hgij[0:3, 3:4]))
id += 1
MA = np.asarray(A).reshape(size * 3, 3)
MB = np.asarray(B).reshape(size * 3, 1)
Tcg = np.dot(np.linalg.pinv(MA), MB).reshape(3, )
return tfs.affines.compose(Tcg, np.squeeze(Rcg), [1, 1, 1])
class _Calibration:
def __init__(self, mode, input_format, hand_pose_path, camera_pose_path, save_path):
"""
matrix_name: str
mode: "eye_in_hand" or "eye_to_hand"
input_format: str, "eular" or "rotvertor" or "quat"
hand_pose_path: str
camera_pose_path: str
save_path: str
"""
self._Hgs, self._Hcs = [], []
self._hand, self._camera = [], []
self._calibration_matrix = None
self._done = False
self._mode = mode
if self._mode not in ['eye_in_hand', 'eye_to_hand']:
raise ValueError("mode must be 'eye_in_hand' or 'eye_to_hand'")
self._input = input_format
if self._input == 'eular':
self._function = _get_matrix_eular_radu
elif self._input == 'rotvertor':
self._function = _get_matrix_rotvector
elif self._input == 'quat':
self._function = _get_matrix_quat
else:
raise ValueError("INPUT ERROR")
self._camera_pose_path = camera_pose_path
self._hand_pose_path = hand_pose_path
self._save_path = save_path
self._get_pose()
def _get_pose(self):
with open(f'{self._camera_pose_path}', 'r', encoding='utf-8') as f:
self._camera = json.load(f)
with open(f'{self._hand_pose_path}', 'r', encoding='utf-8') as f:
self._hand = json.load(f)
self._calculate_calibration()
print(self._hand)
print(self._camera)
logging.info(f"{self._calibration_matrix}")
hand_eye_result = {
"T": self._calibration_matrix.tolist()
}
with open(f"{self._save_path}", "w") as f:
json.dump(hand_eye_result, f, indent=4)
logging.info(f"Save as {self._save_path}")
self._done = True
def _calculate_data(self):
if self._input == 'quat':
for i in range(0, len(self._hand), 7):
self._Hgs.append(
np.linalg.inv(
self._function(
self._hand[i], self._hand[i + 1], self._hand[i + 2],
self._hand[i + 3], self._hand[i + 4], self._hand[i + 5], self._hand[i + 6]
)
)
if self._mode == 'eye_to_hand' else
self._function(
self._hand[i], self._hand[i + 1], self._hand[i + 2],
self._hand[i + 3], self._hand[i + 4], self._hand[i + 5], self._hand[i + 6]
)
)
self._Hcs.append(
self._function(
self._camera[i], self._camera[i + 1], self._camera[i + 2],
self._camera[i + 3], self._camera[i + 4], self._camera[i + 5], self._camera[i + 6]
)
)
else:
for i in range(0, len(self._hand), 6):
self._Hgs.append(
np.linalg.inv(
self._function(
self._hand[i], self._hand[i + 1], self._hand[i + 2],
self._hand[i + 3], self._hand[i + 4], self._hand[i + 5]
)
)
if self._mode == 'eye_to_hand' else
self._function(
self._hand[i], self._hand[i + 1], self._hand[i + 2],
self._hand[i + 3], self._hand[i + 4], self._hand[i + 5]
)
)
self._Hcs.append(
self._function(
self._camera[i], self._camera[i + 1], self._camera[i + 2],
self._camera[i + 3], self._camera[i + 4], self._camera[i + 5]
)
)
def _calculate_calibration(self):
self._calculate_data()
self._calibration_matrix = _calculate_(self._Hgs, self._Hcs)
def calculate_hand_eye_matrix(
mode,
input_format,
hand_pose_path,
camera_pose_path,
save_path
):
"""
matrix_name: str
mode: "eye_in_hand" or "eye_to_hand"
input_format: str, "eular" or "rotvertor" or "quat"
hand_pose_path: str
camera_pose_path: str
save_path: str
"""
a = _Calibration(mode, input_format, hand_pose_path, camera_pose_path, save_path)
def pca(data: np.ndarray, sort=True):
"""主成分分析 """
center = np.mean(data, axis=0)
centered_points = data - center # 去中心化
try:
cov_matrix = np.cov(centered_points.T) # 转置
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
except np.linalg.LinAlgError:
return None, None
if sort:
sort = eigenvalues.argsort()[::-1] # 降序排列
eigenvalues = eigenvalues[sort] # 索引
eigenvectors = eigenvectors[:, sort]
return eigenvalues, eigenvectors
def calculate_pose_pca(
mask,
depth_img: np.ndarray,
intrinsics,
source=None,
hand_eye_mat=np.eye(4),
configs=None
):
"""计算位态"""
if configs is None:
configs = {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
}
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=configs["depth_scale"],
depth_trunc=configs["depth_trunc"],
)
point_cloud = point_cloud.remove_non_finite_points()
down_pcd = point_cloud.voxel_down_sample(voxel_size=configs["voxel_size"])
clean_pcd, _ = down_pcd.remove_radius_outlier(nb_points=configs["nb_points"], radius=configs["radius"])
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=configs["nb_neighbors"], std_ratio=configs["std_ratio"])
if len(clean_pcd.points) == 0:
logging.warning("clean_pcd is empty")
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
center = clean_pcd.get_center()
x, y, z = center
w, v = pca(np.asarray(clean_pcd.points))
if w is not None and v is not None:
vx, vy, vz = v[:,0], v[:,1], v[:,2]
if vx[0] < 0:
vx = -vx
if vy[1] < 0:
vy = -vy
if not np.allclose(np.cross(vx, vy), vz):
vz = -vz
R = np.column_stack((vx, vy, vz))
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
rmat = hand_eye_mat @ rmat
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
return x, y, z, rw, rx, ry, rz
else:
logging.warning("PCA output w or v is None")
return 0.0, 0.0, 0.0, None, None, None, None
def calculate_pose_icp(
mask,
depth_img: np.ndarray,
intrinsics,
source,
hand_eye_mat=np.eye(4),
configs = None,
):
if configs is None:
configs = {
"depth_scale": 1000.0,
"depth_trunc": 2.0,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0,
"ransac_voxel_size": 0.005,
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]
}
"""计算位态"""
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=configs["depth_scale"],
depth_trunc=configs["depth_trunc"],
)
point_cloud = point_cloud.remove_non_finite_points()
clean_pcd, _ = point_cloud.remove_radius_outlier(nb_points=configs["nb_points"], radius=configs["radius"])
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=configs["nb_neighbors"], std_ratio=configs["std_ratio"])
if len(clean_pcd.points) == 0:
logging.warning("clean_pcd is empty")
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
rmat = object_icp(
source,
clean_pcd,
ransac_voxel_size=configs["ransac_voxel_size"],
icp_voxel_radius=configs["icp_voxel_radius"],
icp_max_iter=configs["icp_max_iter"]
)
rmat = hand_eye_mat @ rmat
x, y, z = rmat[0:3, 3:4].flatten()
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
return x, y, z, rw, rx, ry, rz

View File

@@ -0,0 +1,60 @@
import logging
import cv2
import numpy as np
import open3d as o3d
def draw_box(rgb_img: np.ndarray, segmentation_result, put_text: bool = True):
"""
绘制目标检测边界框
"""
boxes = segmentation_result.boxes.xywh.cpu().numpy()
confidences = segmentation_result.boxes.conf.cpu().numpy()
class_ids = segmentation_result.boxes.cls.cpu().numpy()
labels = segmentation_result.names
for i, box in enumerate(boxes):
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
if put_text:
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
def draw_mask(rgb_img: np.ndarray, segmentation_result):
"""
绘制实例分割mask
"""
masks = segmentation_result.masks.data.cpu().numpy()
orig_shape = segmentation_result.masks.orig_shape
for i, mask in enumerate(masks):
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
def draw_pointcloud(pcd, axis: bool = True):
"""绘制点云"""
if not pcd:
logging.warning("pcd is empty")
if axis:
axis_point = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
] # 画点:原点、第一主成分、第二主成分
axis = [
[0, 1], [0, 2], [0, 3]
] # 画出三点之间两两连线
axis_colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
# 构造open3d中的LineSet对象用于主成分显示
axis_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(axis_point), lines=o3d.utility.Vector2iVector(axis))
axis_set.colors = o3d.utility.Vector3dVector(axis_colors)
pcd.append(axis_set)
o3d.visualization.draw_geometries(pcd)

View File

@@ -0,0 +1,151 @@
import cv2
import logging
import numpy as np
def crop_imgs_box_xywh(imgs: list, box, same_sign: bool = False):
"""
Crop imgs
input:
imgs: list, Each img in imgs has the same Width and High.
box: The YOLO model outputs bounding box data in the format [x, y, w, h, confidence, class_id].
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
output:
crop_imgs: list;
(x_min, y_min);
"""
if not imgs:
logging.warning("imgs is empty")
return [], (0, 0)
if not same_sign and len(imgs) != 1:
for img in imgs:
if imgs[0].shape != img.shape:
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
high, width = imgs[0].shape[:2]
x_center, y_center, w, h = box[:4]
x_min, x_max = max(0, int(round(x_center - w/2))), min(int(round(x_center + w/2)), width-1)
y_min, y_max = max(0, int(round(y_center - h/2))), min(int(round(y_center + h/2)), high-1)
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
return crop_imgs, (x_min, y_min)
def crop_imgs_box_xyxy(imgs: list, box, same_sign: bool = False):
"""
Crop imgs
input:
imgs: list, Each img in imgs has the same Width and High.
box: The YOLO model outputs bounding box data in the format [x1, y1, x2, y2, confidence, class_id].
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
output:
crop_imgs: list
(x_min, y_min)
"""
if not imgs:
logging.warning("imgs is empty")
return [], (0, 0)
if not same_sign and len(imgs) != 1:
for img in imgs:
if imgs[0].shape != img.shape:
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
high, width = imgs[0].shape[:2]
x1, y1, x2, y2 = box[:4]
x_min, x_max = max(0, int(round(x1))), min(int(round(x2)), width - 1)
y_min, y_max = max(0, int(round(y1))), min(int(round(y2)), high - 1)
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
return crop_imgs, (x_min, y_min)
def crop_imgs_mask(imgs: list, mask: np.ndarray, same_sign: bool = False):
"""
Crop imgs
input:
imgs: list, Each img in imgs has the same Width and High.
mask: np.ndarray
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
output:
crop_imgs: list
(x_min, y_min)
"""
if not imgs:
logging.warning("imgs is empty")
return [], (0, 0)
if not same_sign and len(imgs) != 1:
for img in imgs:
if imgs[0].shape != img.shape:
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
high, width = imgs[0].shape[:2]
ys, xs = np.where(mask > 0)
if xs.size == 0 or ys.size == 0:
# 没有有效像素
return None, None, (None, None)
x_min, x_max = max(0, int(round(xs.min()))), min(int(round(xs.max())), width - 1)
y_min, y_max = max(0, int(round(ys.min()))), min(int(round(ys.max())), high - 1)
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
mask_crop = mask[y_min:y_max + 1, x_min:x_max + 1]
return crop_imgs, mask_crop, (x_min, y_min)
def get_map(K: list, D: list, camera_size: list):
"""
input:
K: list, shape (9)
D: list
camera_size: list, [w, h]
output:
map1: np.ndarray
map2: np.ndarray
new_K: list, shape (9)
"""
h, w = camera_size[::-1]
K = np.array(K).reshape(3, 3)
D = np.array(D)
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
return map1, map2, new_K.flatten()
def distortion_correction(
color_image: np.ndarray,
depth_image: np.ndarray,
map1: np.ndarray,
map2: np.ndarray
):
"""
畸变矫正
input:
color_image: np.ndarray
depth_image: np.ndarray
map1: np.ndarray
map2: np.ndarray
output:
undistorted_color: np.ndarray
undistorted_depth: np.ndarray
"""
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
undistorted_color = undistorted_color.astype(color_image.dtype)
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
return undistorted_color, undistorted_depth

View File

@@ -1,10 +1,8 @@
import numpy as np
import open3d as o3d
import copy
import json
def draw_pointcloud(pcd, T):
def _draw_pointcloud(pcd, T):
R = T[0:3, 0:3]
point = T[0:3, 3:4].flatten()
@@ -31,7 +29,7 @@ def draw_pointcloud(pcd, T):
o3d.visualization.draw_geometries(pcd)
def preprocess_point_cloud(pcd, voxel_size):
def _preprocess_point_cloud(pcd, voxel_size):
pcd_down = pcd.voxel_down_sample(voxel_size)
radius_normal = voxel_size * 2
@@ -47,18 +45,18 @@ def preprocess_point_cloud(pcd, voxel_size):
return pcd_down, pcd_fpfh
def prepare_dataset(source, target, voxel_size):
def _prepare_dataset(source, target, voxel_size):
trans_init = np.identity(4)
source.transform(trans_init)
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
source_down, source_fpfh = _preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = _preprocess_point_cloud(target, voxel_size)
return source_down, target_down, source_fpfh, target_fpfh
def execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
def _execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
distance_threshold = voxel_size * 1.5
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
@@ -83,16 +81,9 @@ def object_icp(
target: o3d.geometry.PointCloud,
source: o3d.geometry.PointCloud | str,
ransac_voxel_size: float = 0.005,
icp_voxel_radius: list[float] | None = None,
icp_max_iter: list[int] | None = None,
icp_voxel_radius: list[float] | tuple[float] | None = (0.004, 0.002, 0.001),
icp_max_iter: list[int] | tuple[int] | None = (50, 30, 14),
):
if icp_voxel_radius is None:
icp_voxel_radius = [0.004, 0.002, 0.001]
if icp_max_iter is None:
icp_max_iter = [50, 30, 14]
if isinstance(source, str):
source = o3d.io.read_point_cloud(source)
elif isinstance(source, o3d.geometry.PointCloud):
@@ -101,15 +92,9 @@ def object_icp(
raise TypeError(f"Unsupported Type {type(source)}")
voxel_size = 0.005 # means 5mm for this dataset
source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(source, target, voxel_size)
source_down, target_down, source_fpfh, target_fpfh = _prepare_dataset(source, target, voxel_size)
T = execute_global_registration(
source_down,
target_down,
source_fpfh,
target_fpfh,
ransac_voxel_size
)
T = _execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, ransac_voxel_size)
for scale in range(3):
iter = icp_max_iter[scale]
@@ -137,18 +122,14 @@ def object_icp(
T = result_icp.transformation
draw_pointcloud([source.transform(T), target], T)
_draw_pointcloud([source.transform(T), target], T)
return T
def main():
if __name__ == "__main__":
target = o3d.io.read_point_cloud("pointcloud/pointcloud_0.pcd")
source = o3d.io.read_point_cloud("pointcloud/bottle_model.pcd")
Rmat = object_icp(target, source)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,50 @@
import logging
import numpy as np
import open3d as o3d
def read_stl(stl_path: str, save_path: str, view_pointcloud: bool = False) -> None:
mesh = o3d.io.read_triangle_mesh(stl_path)
mesh.scale(0.001, center=[0, 0, 0])
mesh.compute_vertex_normals()
pcd_model = mesh.sample_points_uniformly(number_of_points=50000)
# pcd_model = pcd_model.voxel_down_sample(voxel_size=0.01)
center = pcd_model.get_center().flatten()
R = np.eye(4)
R[0, 3], R[1, 3], R[2, 3] = -center
pcd_model.transform(R)
R = np.array(
[[0, 0, 1, 0],
[0, 1, 0, 0],
[-1, 0, 0, 0],
[0, 0, 0, 1]],
)
pcd_model.transform(R)
if view_pointcloud:
point = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
] # 画点:原点、第一主成分、第二主成分
lines = [
[0, 1], [0, 2], [0, 3]
] # 画出三点之间两两连线
colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
# 构造open3d中的LineSet对象用于主成分显示
line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
line_set.colors = o3d.utility.Vector3dVector(colors)
o3d.visualization.draw_geometries([pcd_model, line_set])
success = o3d.io.write_point_cloud(save_path, pcd_model)
logging.info(f"PCD_0, 写入结果: {success}")
if __name__ == "__main__":
read_stl()

View File

@@ -0,0 +1,557 @@
import os
import time
import threading
import ast
from collections import defaultdict
from ament_index_python.packages import get_package_share_directory
import cv2
import open3d as o3d
import numpy as np
import transforms3d as tfs
from cv_bridge import CvBridge
import torch
from ultralytics import YOLO
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseClassAndID
from img_dev.msg import ImgMsg
from interfaces.srv import VisionObjectRecognition
share_dir = get_package_share_directory('vision_detect')
def draw_pointcloud(pcd, T):
R = T[0:3, 0:3]
point = T[0:3, 3:4].flatten()
x, y, z = R[:, 0], R[:, 1], R[:, 2]
points = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1],
point, point + x, point + y, point + z
] # 画点:原点、第一主成分、第二主成分
lines = [
[0, 1], [0, 2], [0, 3],
[4, 5], [4, 6], [4, 7]
] # 画出三点之间两两连线
colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1],
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(points), lines=o3d.utility.Vector2iVector(lines))
line_set.colors = o3d.utility.Vector3dVector(colors)
pcd.append(line_set)
o3d.visualization.draw_geometries(pcd)
def preprocess_point_cloud(pcd, voxel_size):
pcd_down = pcd.voxel_down_sample(voxel_size)
radius_normal = voxel_size * 2
pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))
radius_feature = voxel_size * 5
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd_down,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100)
)
return pcd_down, pcd_fpfh
def prepare_dataset(source, target, voxel_size):
trans_init = np.identity(4)
source.transform(trans_init)
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
return source_down, target_down, source_fpfh, target_fpfh
def execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
distance_threshold = voxel_size * 1.5
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_down,
target_down,
source_fpfh,
target_fpfh,
True,
distance_threshold,
o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
3,
[
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
],
o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))
return result.transformation
def object_icp(
target: o3d.geometry.PointCloud,
source: o3d.geometry.PointCloud | str,
ransac_voxel_size: float = 0.005,
icp_voxel_radius: list[float] | None = None,
icp_max_iter: list[int] | None = None,
):
if icp_voxel_radius is None:
icp_voxel_radius = [0.004, 0.002, 0.001]
if icp_max_iter is None:
icp_max_iter = [50, 30, 14]
if isinstance(source, str):
source = o3d.io.read_point_cloud(source)
elif isinstance(source, o3d.geometry.PointCloud):
pass
else:
raise TypeError(f"Unsupported Type {type(source)}")
voxel_size = 0.005 # means 5mm for this dataset
source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(source, target, voxel_size)
T = execute_global_registration(
source_down,
target_down,
source_fpfh,
target_fpfh,
ransac_voxel_size
)
for scale in range(3):
iter = icp_max_iter[scale]
radius = icp_voxel_radius[scale]
# print([iter, radius, scale])
source_down = source.voxel_down_sample(radius)
target_down = target.voxel_down_sample(radius)
source_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
target_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
result_icp = o3d.pipelines.registration.registration_icp(
source_down,
target_down,
radius * 5,
T,
o3d.pipelines.registration.TransformationEstimationPointToPoint(),
o3d.pipelines.registration.ICPConvergenceCriteria(
relative_fitness=1e-6,
relative_rmse=1e-6,
max_iteration=iter
)
)
T = result_icp.transformation
draw_pointcloud([source.transform(T), target], T)
return T
def get_map(K, D, camera_size):
h, w = camera_size[::-1]
K = np.array(K).reshape(3, 3)
D = np.array(D)
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
return map1, map2, new_K.flatten()
def pca(data, sort=True):
"""主成分分析 """
center = np.mean(data, axis=0)
centered_points = data - center # 去中心化
try:
cov_matrix = np.cov(centered_points.T) # 转置
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
except np.linalg.LinAlgError:
return None, None
if sort:
sort = eigenvalues.argsort()[::-1] # 降序排列
eigenvalues = eigenvalues[sort] # 索引
eigenvectors = eigenvectors[:, sort]
return eigenvalues, eigenvectors
def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics, model_pcd):
"""计算位态"""
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=1000.0,
depth_trunc=2.0,
)
point_cloud = point_cloud.remove_non_finite_points()
clean_pcd, _ = point_cloud.remove_radius_outlier(nb_points=10, radius=0.1)
clean_pcd, _ = clean_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=3.0)
if len(clean_pcd.points) == 0:
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
rmat = object_icp(model_pcd, clean_pcd)
x, y, z = rmat[0:3, 3:4].flatten()
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
return x, y, z, rw, rx, ry, rz
def draw_box(set_confidence, rgb_img, result):
"""绘制目标检测边界框"""
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, box in enumerate(boxes):
if confidences[i] >= set_confidence:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
def draw_mask(set_confidence, rgb_img, result):
"""绘制实例分割mask"""
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
confidences = result.boxes.conf.cpu().numpy()
for i, mask in enumerate(masks):
if confidences[i] >= set_confidence:
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
else:
continue
def distortion_correction(color_image, depth_image, map1, map2):
"""畸变矫正"""
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
undistorted_color = undistorted_color.astype(color_image.dtype)
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
return undistorted_color, undistorted_depth
def crop_mask_bbox(depth_img, mask, box):
"""
输入:
depth_img: H x W
mask: H x W (0/1 或 bool)
输出:
depth_crop, mask_crop
"""
high, width = depth_img.shape
x_center, y_center, w, h = box[:4]
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
class DetectNode(Node):
def __init__(self, name, mode):
super().__init__(name)
self.mode = mode
self.device = None
self.checkpoint_path = None
self.checkpoint_name = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.calculate_function = None
self.K = None
self.fx = self.fy = 0.5
self.camera_data = {}
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
'''init'''
self._init_param()
self.source_model = o3d.io.read_point_cloud(self.source_model_path)
if mode == 'detect':
self._init_model()
else:
self.function = None
self.get_logger().error('Error: Mode Unknown')
if self.device == 'cpu':
self.calculate_function = calculate_pose_cpu
elif self.device == 'gpu':
raise NotImplementedError('Function: calculate_pose_gpu not implemented')
else:
raise ValueError(f"device must be cpu or gpu, now {self.device}")
self._init_publisher()
self._init_subscriber()
self._init_service()
def _init_param(self):
"""init parameter"""
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
self.checkpoint_name = self.get_parameter('checkpoint_name').value
self.checkpoint_path = os.path.join(share_dir, 'checkpoints', self.checkpoint_name)
self.source_model_path = os.path.join(share_dir, 'pointclouds/bottle_model.pcd')
self.declare_parameter('output_boxes', True)
self.output_boxes = self.get_parameter('output_boxes').value
self.declare_parameter('output_masks', False)
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('set_confidence', 0.25)
self.set_confidence = self.get_parameter('set_confidence').value
self.declare_parameter('device', 'cpu')
self.device = self.get_parameter('device').value
self.declare_parameter('classes', 'None')
self.classes = ast.literal_eval(self.get_parameter('classes').value)
def _init_model(self):
"""init model"""
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
if self.checkpoint_name.endswith('-seg.pt'):
self.function = self._seg_image
else:
self.function = None
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
"/vision_object_recognition",
self._service_callback
)
def _init_subscriber(self):
"""init subscriber"""
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._sub_callback,
10
)
def _sub_callback(self, msg):
"""同步回调函数"""
with self.lock:
self.camera_data[msg.position] = [
msg.image_color,
msg.image_depth,
msg.karr,
msg.darr
]
def _service_callback(self, request, response):
response.header.stamp = self.get_clock().now().to_msg()
response.header.frame_id = "camera_detect"
with self.lock:
if request.camera_position in self.camera_data:
color_img_ros, depth_img_ros, self.K, D = self.camera_data[request.camera_position]
else:
response.success = False
response.info = f"{request.camera_position} Camera data is empty or name is wrong"
response.objects = []
return response
self.camera_size = [color_img_ros.width, color_img_ros.height]
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
map1, map2, self.K = get_map(self.K, D, self.camera_size)
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
img, pose_dict = self.function(color_img_cv, depth_img_cv)
"""masks为空结束这一帧"""
if self.output_boxes or self.output_masks:
if img is None:
img = color_img_ros
self.pub_detect_image.publish(img)
if pose_dict:
response.info = "Success get pose"
response.success = True
# self.get_logger().info('get_pose')
for (class_id, class_name), pose_list in pose_dict.items():
response.objects.append(
PoseClassAndID(
class_name = class_name,
class_id = class_id,
pose_list = pose_list
)
)
else:
response.info = "pose dict is empty"
response.success = False
response.objects = []
return response
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_dict = defaultdict(list)
'''Get Predict Results'''
# time1 = time.time()
results = self.model(rgb_img, retina_masks=True, conf=self.set_confidence, classes=[39])
# time2 = time.time()
result = results[0]
'''Get masks'''
if result.masks is None or len(result.masks) == 0:
return None, None
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
'''Get boxes'''
boxes = result.boxes.data.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
# time3 = time.time()
for i, (mask, box) in enumerate(zip(masks, boxes)):
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
depth_crop, mask_crop, (x_min, y_min) = crop_mask_bbox(depth_img, mask, box)
if depth_crop is None:
continue
if mask.shape[0] >= (orig_shape[0] * 0.5) and mask.shape[1] >= (orig_shape[1] * 0.5):
mask_crop = cv2.resize(mask_crop, None, fx=self.fx, fy=self.fy, interpolation=cv2.INTER_NEAREST)
depth_crop = cv2.resize(depth_crop, None, fx=self.fx, fy=self.fy, interpolation=cv2.INTER_NEAREST)
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0] * self.fx),
int(self.camera_size[1] * self.fy),
self.K[0] * self.fx,
self.K[4] * self.fy,
(self.K[2] - x_min) * self.fx,
(self.K[5] - y_min) * self.fy
)
else:
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]),
int(self.camera_size[1]),
self.K[0],
self.K[4],
self.K[2] - x_min,
self.K[5] - y_min
)
x, y, z, rw, rx, ry, rz = self.calculate_function(mask_crop, depth_crop, intrinsics, self.source_model)
if (x, y, z) == (None, None, None):
self.get_logger().error("have wrong pose")
continue
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
# time4 = time.time()
# self.get_logger().info(f'start')
# self.get_logger().info(f'{(time2 - time1)*1000} ms, model predict')
# self.get_logger().info(f'{(time3 - time2)*1000} ms, get mask and some param')
# self.get_logger().info(f'{(time4 - time3)*1000} ms, calculate all mask PCA')
# self.get_logger().info(f'{(time4 - time1)*1000} ms, completing a picture entire process')
# self.get_logger().info(f'end')
'''mask_img and box_img is or not output'''
if self.output_boxes and not self.output_masks:
draw_box(self.set_confidence, rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif self.output_boxes and self.output_masks:
draw_box(self.set_confidence, rgb_img, result)
draw_mask(self.set_confidence, rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif not self.output_boxes and self.output_masks:
draw_mask(self.set_confidence, rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return None, pose_dict
def main(args=None):
rclpy.init(args=args)
node = DetectNode('detect', 'detect')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -382,7 +382,7 @@ class DetectNode(Node):
orig_shape = result.masks.orig_shape
'''Get boxes'''
boxes = result.boxes.data.cpu().numpy()
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names

View File

@@ -0,0 +1,14 @@
import rclpy
from vision_detect.VisionDetect.node import DetectNode
def main(args=None):
rclpy.init(args=args)
node = DetectNode('detect')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -345,7 +345,7 @@ class DetectNode(Node):
orig_shape = result.masks.orig_shape
'''Get boxes'''
boxes = result.boxes.data.cpu().numpy()
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names

View File

@@ -352,7 +352,7 @@ class DetectNode(Node):
orig_shape = result.masks.orig_shape
'''Get boxes'''
boxes = result.boxes.data.cpu().numpy()
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names

View File

@@ -84,7 +84,7 @@ class DetectNode(Node):
self.fx = self.fy = 1.0
self.cv_bridge = CvBridge()
self.aidk_client = aidk.AIDKClient(ip='127.0.0.1', detect_timeout=10)
self.aidk_client = aidk.AIDKClient('127.0.0.1', 10)
while not self.aidk_client.is_ready():
time.sleep(0.5)
@@ -97,11 +97,11 @@ class DetectNode(Node):
def _init_param(self):
"""init parameter"""
self.declare_parameter('checkpoint_name', 'yolo11m-seg.pt')
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
self.checkpoint_name = self.get_parameter('checkpoint_name').value
self.checkpoint_path = os.path.join(share_dir, 'checkpoint', self.checkpoint_name)
self.checkpoint_path = os.path.join(share_dir, 'checkpoints', self.checkpoint_name)
self.declare_parameter('config_name', True)
self.declare_parameter('config_name', 'default_config.json')
self.config_name = self.get_parameter('config_name').value
self.config_dir = os.path.join(share_dir, 'configs/flexiv_configs', self.config_name)
@@ -114,13 +114,13 @@ class DetectNode(Node):
self.declare_parameter('set_confidence', 0.25)
self.set_confidence = self.get_parameter('set_confidence').value
self.declare_parameter('color_image_topic', '/camera/color/image_raw')
self.declare_parameter('color_image_topic', '/camera/camera/color/image_raw')
self.color_image_topic = self.get_parameter('color_image_topic').value
self.declare_parameter('depth_image_topic', '/camera/depth/image_raw')
self.declare_parameter('depth_image_topic', '/camera/camera/aligned_depth_to_color/image_raw')
self.depth_image_topic = self.get_parameter('depth_image_topic').value
self.declare_parameter('camera_info_topic', '/camera/color/camera_info')
self.declare_parameter('camera_info_topic', '/camera/camera/color/camera_info')
self.camera_info_topic = self.get_parameter('camera_info_topic').value
@@ -240,7 +240,7 @@ class DetectNode(Node):
orig_shape = result.masks.orig_shape
'''Get boxes'''
boxes = result.boxes.data.cpu().numpy()
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
@@ -254,10 +254,14 @@ class DetectNode(Node):
continue
depth_img_crop_mask = np.zeros_like(depth_crop)
depth_img_crop_mask[mask > 0] = depth_img[mask > 0]
depth_img_crop_mask[mask_crop > 0] = depth_crop[mask_crop > 0]
rgb_bytes = cv2.imencode('png', rgb_crop)[1]
depth_bytes = cv2.imencode('png', depth_img_crop_mask)[1]
print(rgb_crop.shape)
print(rgb_crop.dtype)
rgb_bytes = cv2.imencode('.png', rgb_crop)[1]
depth_bytes = cv2.imencode('.png', depth_img_crop_mask)[1]
intrinsics = [
int(self.camera_size[0]),
@@ -269,25 +273,23 @@ class DetectNode(Node):
]
state = self.aidk_client.detect_with_image(
obj_name=self.config["obj_name"],
camera_id=self.config["camera_id"],
coordinate_id=self.config["coordinate_id"],
camera_pose=self.config["camera_pose"],
obj_name=self.config["command"]["obj_name"],
camera_id=self.config["command"]["camera_id"],
coordinate_id=self.config["command"]["coordinate_id"],
camera_pose=self.config["command"]["camera_pose"],
camera_intrinsic=intrinsics,
rgb_input=aidk.ImageBuffer(rgb_bytes),
depth_input=aidk.ImageBuffer(depth_bytes),
custom=self.config["custom"],
custom=self.config["command"]["custom"],
)
self.get_logger().info("state: %s", state)
self.get_logger().info(f"state: {state}")
self.get_logger().info(
"current detected object names: %s, current detected object nums: %s",
self.aidk_client.get_detected_obj_names(),
self.aidk_client.get_detected_obj_nums(),
f"current detected object names: {self.aidk_client.get_detected_obj_names()}, current detected object nums: {self.aidk_client.get_detected_obj_nums()}"
)
for key in self.config["keys"]:
parse_state, result_list = self.aidk_client.parse_result(self.config["obj_name"], key, -1)
parse_state, result_list = self.aidk_client.parse_result(self.config["command"]["obj_name"], key, -1)
self.get_logger().info(
"detected time stamp: {}".format(
datetime.fromtimestamp(self.aidk_client.get_detected_time())
@@ -300,7 +302,7 @@ class DetectNode(Node):
if key in ["bbox", "keypoints", "positions", "obj_pose"]:
for result in result_list:
for vec in result.vect:
self.get_logger().info(vec)
self.get_logger().info(f"vec: {vec}")
x, y, z, rw, rx, ry, rz = vec
pose = Pose()
@@ -310,7 +312,7 @@ class DetectNode(Node):
elif key in ["valid", "double_value", "int_value", "name"]:
for result in result_list:
self.get_logger().info(getattr(result, key))
self.get_logger().info(f"{key}: {getattr(result, key)}")
time4 = time.time()

View File

@@ -1,55 +0,0 @@
import cv2
import numpy as np
import open3d as o3d
def draw_box(rgb_img, result, set_confidence: float=0.0):
"""绘制目标检测边界框"""
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, box in enumerate(boxes):
if confidences[i] >= set_confidence:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
def draw_mask(set_confidence, rgb_img, result):
"""绘制实例分割mask"""
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
confidences = result.boxes.conf.cpu().numpy()
for i, mask in enumerate(masks):
if confidences[i] >= set_confidence:
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
else:
continue
def draw_pointcloud(pcd: list, axis=True):
"""绘制点云"""
axis_point = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
] # 画点:原点、第一主成分、第二主成分
axis = [
[0, 1], [0, 2], [0, 3]
] # 画出三点之间两两连线
axis_colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
# 构造open3d中的LineSet对象用于主成分显示
axis_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(axis_point), lines=o3d.utility.Vector2iVector(axis))
axis_set.colors = o3d.utility.Vector3dVector(axis_colors)
pcd.append(axis_set)
o3d.visualization.draw_geometries(pcd)

View File

@@ -1,103 +0,0 @@
import cv2
import numpy as np
def crop_mask_bbox(depth_img, mask, box):
"""
input:
depth_img: H x W
mask: H x W (0/1)
box:
output:
depth_crop, mask_crop
"""
high, width = depth_img.shape
x_center, y_center, w, h = box[:4]
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
def crop_mask_numpy(depth_img, mask):
"""
inout:
depth_img: H x W
mask: H x W (0/1)
output:
depth_crop, mask_crop
"""
high, width = depth_img.shape
ys, xs = np.where(mask > 0)
if xs.size == 0 or ys.size == 0:
# 没有有效像素
return None, None, (None, None)
x_min, x_max = int(round(xs.min())), int(round(xs.max()))
y_min, y_max = int(round(ys.min())), int(round(ys.max()))
depth_crop = depth_img[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
mask_crop = mask[max(0, y_min):min(y_max, high) + 1, max(0, x_min):min(x_max, width) + 1]
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
def draw_box(set_confidence, rgb_img, result):
"""绘制目标检测边界框"""
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, box in enumerate(boxes):
if confidences[i] >= set_confidence:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
def draw_mask(set_confidence, rgb_img, result):
"""绘制实例分割mask"""
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
confidences = result.boxes.conf.cpu().numpy()
for i, mask in enumerate(masks):
if confidences[i] >= set_confidence:
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
else:
continue
def get_map(K, D, camera_size):
h, w = camera_size[::-1]
K = np.array(K).reshape(3, 3)
D = np.array(D)
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
return map1, map2, new_K.flatten()
def distortion_correction(color_image, depth_image, map1, map2):
"""畸变矫正"""
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
undistorted_color = undistorted_color.astype(color_image.dtype)
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
return undistorted_color, undistorted_depth

View File

@@ -1,48 +0,0 @@
import numpy as np
import open3d as o3d
def read_stl():
mesh = o3d.io.read_triangle_mesh("model/bottle.stl")
mesh.scale(0.001, center=[0, 0, 0])
mesh.compute_vertex_normals()
pcd_model = mesh.sample_points_uniformly(number_of_points=50000)
# pcd_model = pcd_model.voxel_down_sample(voxel_size=0.01)
center = pcd_model.get_center().flatten()
R = np.eye(4)
R[0, 3], R[1, 3], R[2, 3] = -center
pcd_model.transform(R)
R = np.array(
[[0, 0, 1, 0],
[0, 1, 0, 0],
[-1, 0, 0, 0],
[0, 0, 0, 1]],
)
pcd_model.transform(R)
point = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
] # 画点:原点、第一主成分、第二主成分
lines = [
[0, 1], [0, 2], [0, 3]
] # 画出三点之间两两连线
colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
# 构造open3d中的LineSet对象用于主成分显示
line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
line_set.colors = o3d.utility.Vector3dVector(colors)
o3d.visualization.draw_geometries([pcd_model, line_set])
success = o3d.io.write_point_cloud(f'pointcloud/bottle_model.pcd', pcd_model)
print(f"PCD_0, 写入结果:", success)
if __name__ == "__main__":
read_stl()