Compare commits
6 Commits
feature_cp
...
260224
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d960da1192 | ||
|
|
6ebf159234 | ||
|
|
86e5ad76fc | ||
|
|
54981b3892 | ||
|
|
1602b1423d | ||
|
|
b04196de57 |
17
tools/get_checkpoint_name_map.py
Normal file
17
tools/get_checkpoint_name_map.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import os
|
||||
import json
|
||||
from ultralytics import YOLO
|
||||
|
||||
|
||||
checkpoint_path = "vision_detect/checkpoints/medical_sense-seg.pt"
|
||||
save_path = "vision_detect/map/label/medical_sense.json"
|
||||
|
||||
model = YOLO(os.path.expanduser(checkpoint_path))
|
||||
|
||||
# 反转:name -> id
|
||||
name_to_id = {v: k for k, v in model.names.items()}
|
||||
|
||||
print(name_to_id)
|
||||
|
||||
with open(os.path.expanduser(save_path), "w", encoding="utf-8") as f:
|
||||
json.dump(name_to_id, f, ensure_ascii=False, indent=2)
|
||||
0
vision_detect/__init__.py
Normal file
0
vision_detect/__init__.py
Normal file
@@ -1,29 +0,0 @@
|
||||
{
|
||||
"info": {},
|
||||
|
||||
"warring": {
|
||||
"0000": "Success",
|
||||
|
||||
"1000": "Detected object count is 0",
|
||||
"1001": "Depth crop is None",
|
||||
"1003": "Failed to detect a valid pose",
|
||||
|
||||
"1100": "Object point cloud contains excessive noise",
|
||||
"1101": "The point cloud is empty",
|
||||
|
||||
"1200": "The number of points is insufficient to compute an OBB",
|
||||
"1201": "PCA output vector is None",
|
||||
"1202": "This pose cannot be grab, and position refine fail",
|
||||
|
||||
"1300": "E2E model input data 'coors' are fewer than 128",
|
||||
"1301": "E2E model input data 'point_clouds' are fewer than 128",
|
||||
"1302": "The 'true num' of points is 0; No graspable points are available",
|
||||
"1303": "The model returned no predictions",
|
||||
"1304": "All rotation vector processing failed; no valid rotation matrix was generated"
|
||||
},
|
||||
|
||||
"error": {},
|
||||
|
||||
"fatal": {}
|
||||
}
|
||||
|
||||
77
vision_detect/configs/launch/default_action_config.json
Normal file
77
vision_detect/configs/launch/default_action_config.json
Normal file
@@ -0,0 +1,77 @@
|
||||
{
|
||||
"node_name": "default_config_detect_service",
|
||||
"output_boxes": false,
|
||||
"output_masks": false,
|
||||
"save_image": true,
|
||||
"image_save_dir": "~/images",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "calibration/eye_in_left_hand.json",
|
||||
"right_hand": "calibration/eye_in_right_hand.json",
|
||||
"head": "calibration/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"node_mode": "ACTION",
|
||||
"service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
},
|
||||
"publisher_configs": {
|
||||
"publish_time": 0.1,
|
||||
"position": "right",
|
||||
"publisher_name": "/detect/pose"
|
||||
},
|
||||
"action_configs": {
|
||||
"action_name": "/vision_object_recognition"
|
||||
},
|
||||
|
||||
"image_source": "DRIVER",
|
||||
"driver_configs": {
|
||||
"subscription_name": "/img_msg"
|
||||
},
|
||||
"direct_configs": {
|
||||
"position": "right",
|
||||
"color_image_topic_name": "/camera/color/image_raw",
|
||||
"depth_image_topic_name": "/camera/depth/image_raw",
|
||||
"camera_info_topic_name": "/camera/color/camera_info"
|
||||
},
|
||||
|
||||
"detect_mode": "OBJECT",
|
||||
"object_configs": {
|
||||
"checkpoint_path": "checkpoints/medical_sense-seg.pt",
|
||||
"confidence": 0.70,
|
||||
"label_map_path": "map/label/medical_sense.json",
|
||||
"classes": []
|
||||
},
|
||||
"color_configs": {
|
||||
"distance": 1500,
|
||||
"color_range": [[[0, 120, 70], [10, 255, 255]], [[170, 120, 70], [180, 255, 255]]]
|
||||
},
|
||||
"crossboard_configs": {
|
||||
"pattern_size": [8, 5]
|
||||
},
|
||||
|
||||
"estimate_mode": "PCA",
|
||||
"pca_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.004
|
||||
},
|
||||
"icp_configs": {
|
||||
"complete_model_path": "pointclouds/bottle_model.pcd",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 2.0,
|
||||
"voxel_size": 0.002,
|
||||
"ransac_voxel_size": 0.005,
|
||||
"icp_voxel_radius": [0.004, 0.002, 0.001],
|
||||
"icp_max_iter": [50, 30, 14]
|
||||
},
|
||||
"e2e_configs": {
|
||||
"checkpoint_path": "checkpoints/posenet.pt",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.010,
|
||||
"collision_thresh": 0.01
|
||||
},
|
||||
|
||||
"refine_mode": "FIXED"
|
||||
}
|
||||
77
vision_detect/configs/launch/default_service_config.json
Normal file
77
vision_detect/configs/launch/default_service_config.json
Normal file
@@ -0,0 +1,77 @@
|
||||
{
|
||||
"node_name": "default_config_detect_service",
|
||||
"output_boxes": false,
|
||||
"output_masks": false,
|
||||
"save_image": true,
|
||||
"image_save_dir": "~/images",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "calibration/eye_in_left_hand.json",
|
||||
"right_hand": "calibration/eye_in_right_hand.json",
|
||||
"head": "calibration/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"node_mode": "SERVICE",
|
||||
"service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
},
|
||||
"publisher_configs": {
|
||||
"publish_time": 0.1,
|
||||
"position": "right",
|
||||
"publisher_name": "/detect/pose"
|
||||
},
|
||||
"action_configs": {
|
||||
"action_name": "/vision_object_recognition"
|
||||
},
|
||||
|
||||
"image_source": "DRIVER",
|
||||
"driver_configs": {
|
||||
"subscription_name": "/img_msg"
|
||||
},
|
||||
"direct_configs": {
|
||||
"position": "right",
|
||||
"color_image_topic_name": "/camera/color/image_raw",
|
||||
"depth_image_topic_name": "/camera/depth/image_raw",
|
||||
"camera_info_topic_name": "/camera/color/camera_info"
|
||||
},
|
||||
|
||||
"detect_mode": "OBJECT",
|
||||
"object_configs": {
|
||||
"checkpoint_path": "checkpoints/medical_sense-seg.pt",
|
||||
"confidence": 0.70,
|
||||
"label_map_path": "map/label/medical_sense.json",
|
||||
"classes": []
|
||||
},
|
||||
"color_configs": {
|
||||
"distance": 1500,
|
||||
"color_range": [[[0, 120, 70], [10, 255, 255]], [[170, 120, 70], [180, 255, 255]]]
|
||||
},
|
||||
"crossboard_configs": {
|
||||
"pattern_size": [8, 5]
|
||||
},
|
||||
|
||||
"estimate_mode": "PCA",
|
||||
"pca_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.004
|
||||
},
|
||||
"icp_configs": {
|
||||
"complete_model_path": "pointclouds/bottle_model.pcd",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 2.0,
|
||||
"voxel_size": 0.002,
|
||||
"ransac_voxel_size": 0.005,
|
||||
"icp_voxel_radius": [0.004, 0.002, 0.001],
|
||||
"icp_max_iter": [50, 30, 14]
|
||||
},
|
||||
"e2e_configs": {
|
||||
"checkpoint_path": "checkpoints/posenet.pt",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.010,
|
||||
"collision_thresh": 0.01
|
||||
},
|
||||
|
||||
"refine_mode": "FIXED"
|
||||
}
|
||||
@@ -25,7 +25,7 @@
|
||||
"PCA_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.005
|
||||
"voxel_size": 0.004
|
||||
},
|
||||
"E2E_configs": {
|
||||
"checkpoint_path": "checkpoints/posenet.pt",
|
||||
@@ -1,59 +0,0 @@
|
||||
{
|
||||
"node_name": "default_config_detect_service",
|
||||
"output_boxes": "True",
|
||||
"output_masks": "False",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"get_camera_mode": "Service",
|
||||
"Service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
},
|
||||
"Topic_configs": {
|
||||
"position": "right",
|
||||
"color_image_topic_name": "/camera/color/image_raw",
|
||||
"depth_image_topic_name": "/camera/depth/image_raw",
|
||||
"camera_info_topic_name": "/camera/color/camera_info"
|
||||
},
|
||||
|
||||
"detect_mode": "Detect",
|
||||
"Detect_configs": {
|
||||
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
|
||||
"confidence": 0.50,
|
||||
"classes": []
|
||||
},
|
||||
"Color_configs": {
|
||||
"distance": 1500,
|
||||
"color_range": [[[0, 120, 70], [10, 255, 255]], [[170, 120, 70], [180, 255, 255]]]
|
||||
},
|
||||
"Crossboard_configs": {
|
||||
"pattern_size": [8, 5]
|
||||
},
|
||||
|
||||
"calculate_mode": "PCA",
|
||||
"PCA_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.010
|
||||
},
|
||||
"ICP_configs": {
|
||||
"complete_model_path": "pointclouds/bottle_model.pcd",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 2.0,
|
||||
"voxel_size": 0.010,
|
||||
"ransac_voxel_size": 0.005,
|
||||
"icp_voxel_radius": [0.004, 0.002, 0.001],
|
||||
"icp_max_iter": [50, 30, 14]
|
||||
},
|
||||
"E2E_configs": {
|
||||
"checkpoint_path": "checkpoints/posenet.pt",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 3.0,
|
||||
"voxel_size": 0.010,
|
||||
"collision_thresh": 0.01
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/bottle_detect_service_icp.json')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch/bottle_detect_service_icp.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/bottle_detect_service_pca.json')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch/bottle_detect_service_pca.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/crossboard_topic_pca.json')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch/crossboard_topic_pca.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/detect_service_pca.json')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch/detect_service_pca.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/default_config.json')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch/default_service_config.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch_configs/medical_sense.json')
|
||||
config_dir = os.path.join(share_dir, 'configs/launch/medical_sense.json')
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
|
||||
18
vision_detect/launch/test_action.launch.py
Normal file
18
vision_detect/launch/test_action.launch.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess, TimerAction
|
||||
|
||||
|
||||
def cmd(_cmd, period=None, output="screen"):
|
||||
command = ExecuteProcess(cmd=_cmd, output=output, sigterm_timeout='5', sigkill_timeout='5')
|
||||
if period is None:
|
||||
return command
|
||||
else:
|
||||
return TimerAction(period=float(period), actions=[command])
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
return LaunchDescription([
|
||||
cmd(['ros2', 'launch', 'img_dev', 'img_dev.launch.py']),
|
||||
cmd(['ros2', 'launch', 'vision_detect', 'test_action_medical_sense.launch.py'], period=2.0),
|
||||
cmd(['ros2', 'run', 'vision_detect', 'test_action_client'], period=2.0)
|
||||
])
|
||||
47
vision_detect/launch/test_action_medical_sense.launch.py
Normal file
47
vision_detect/launch/test_action_medical_sense.launch.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.actions import DeclareLaunchArgument, OpaqueFunction
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
|
||||
import os
|
||||
import json
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
SHARED_DIR = get_package_share_directory('vision_detect')
|
||||
CONFIGS_PATH = os.path.join(SHARED_DIR, 'configs/launch/default_action_config.json')
|
||||
LOGGING_MAP_PATH = os.path.join(SHARED_DIR, 'map/logging/report_logging_define.json')
|
||||
|
||||
|
||||
def get_name(path):
|
||||
with open(path, "r") as f:
|
||||
name = json.load(f)["node_name"]
|
||||
return name
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('logging_map_path', default_value=LOGGING_MAP_PATH),
|
||||
DeclareLaunchArgument('configs_path', default_value=CONFIGS_PATH)
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
logging_map_path = LaunchConfiguration('logging_map_path').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
package='vision_detect',
|
||||
executable='detect_node_test',
|
||||
name=get_name(CONFIGS_PATH),
|
||||
output="screen",
|
||||
parameters=[{
|
||||
'logging_map_path': logging_map_path,
|
||||
'configs_path': configs_path
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
return LaunchDescription(args_detect + [
|
||||
OpaqueFunction(function=create_detect_node),
|
||||
])
|
||||
18
vision_detect/launch/test_service.launch.py
Normal file
18
vision_detect/launch/test_service.launch.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess, TimerAction
|
||||
|
||||
|
||||
def cmd(_cmd, period=None, output="screen"):
|
||||
command = ExecuteProcess(cmd=_cmd, output=output, sigterm_timeout='5', sigkill_timeout='5')
|
||||
if period is None:
|
||||
return command
|
||||
else:
|
||||
return TimerAction(period=float(period), actions=[command])
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
return LaunchDescription([
|
||||
cmd(['ros2', 'launch', 'img_dev', 'img_dev.launch.py']),
|
||||
cmd(['ros2', 'launch', 'vision_detect', 'test_service_medical_sense.launch.py'], period=2.0),
|
||||
cmd(['ros2', 'run', 'vision_detect', 'service_client_node'], period=2.0)
|
||||
])
|
||||
47
vision_detect/launch/test_service_medical_sense.launch.py
Normal file
47
vision_detect/launch/test_service_medical_sense.launch.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.actions import DeclareLaunchArgument, OpaqueFunction
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
|
||||
import os
|
||||
import json
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
SHARED_DIR = get_package_share_directory('vision_detect')
|
||||
CONFIGS_PATH = os.path.join(SHARED_DIR, 'configs/launch/default_service_config.json')
|
||||
LOGGING_MAP_PATH = os.path.join(SHARED_DIR, 'map/logging/report_logging_define.json')
|
||||
|
||||
|
||||
def get_name(path):
|
||||
with open(path, "r") as f:
|
||||
name = json.load(f)["node_name"]
|
||||
return name
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('logging_map_path', default_value=LOGGING_MAP_PATH),
|
||||
DeclareLaunchArgument('configs_path', default_value=CONFIGS_PATH)
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
logging_map_path = LaunchConfiguration('logging_map_path').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
package='vision_detect',
|
||||
executable='detect_node_test',
|
||||
name=get_name(CONFIGS_PATH),
|
||||
output="screen",
|
||||
parameters=[{
|
||||
'logging_map_path': logging_map_path,
|
||||
'configs_path': configs_path
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
return LaunchDescription(args_detect + [
|
||||
OpaqueFunction(function=create_detect_node),
|
||||
])
|
||||
5
vision_detect/map/label/medical_sense.json
Normal file
5
vision_detect/map/label/medical_sense.json
Normal file
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"bottle_plate": 0,
|
||||
"medicine_box": 1,
|
||||
"paper": 2
|
||||
}
|
||||
52
vision_detect/map/logging/report_logging_define.json
Normal file
52
vision_detect/map/logging/report_logging_define.json
Normal file
@@ -0,0 +1,52 @@
|
||||
{
|
||||
"info": {},
|
||||
|
||||
"warning": {
|
||||
"0000": "Success",
|
||||
|
||||
"0100": "Fail to load config file: File is not open or JSON parse error",
|
||||
"0101": "config file have no attribute 'node_name'",
|
||||
"0102": "config file have no attribute 'output_boxe' or 'output_mask'",
|
||||
"0103": "Fail to load source part",
|
||||
"0104": "Fail to load detect part",
|
||||
"0105": "Fail to load estimate part",
|
||||
"0110": "Cannot load calibration file, use E(4, 4)",
|
||||
"0111": "Calibration file have not KEY: 'T', use E(4, 4)",
|
||||
"0112": "Calibration file has wrong shape of mat, use E(4, 4)",
|
||||
|
||||
"0200": "Have not receive any camera data",
|
||||
"0201": "Receive wrong position, or this position have no camera data",
|
||||
"0202": "All input position have no camera data",
|
||||
|
||||
"0300": "Worker thread is not alive",
|
||||
"0301": "Can't submit task, task executor is already stop",
|
||||
"0302": "Task is aborted",
|
||||
"0303": "Worker time out",
|
||||
|
||||
"1000": "Detected object count is 0",
|
||||
"1001": "Depth crop is None",
|
||||
"1003": "Failed to detect a valid pose",
|
||||
"1010": "No specified color within the designated distance.",
|
||||
"1020": "Didn't detect Crossboard",
|
||||
|
||||
"1100": "Object point cloud contains excessive noise",
|
||||
"1101": "The point cloud is empty",
|
||||
"1102": "Points is too closely",
|
||||
|
||||
"1200": "The number of points is insufficient to compute an OBB",
|
||||
"1201": "PCA output vector is None",
|
||||
"1202": "This pose cannot be grab, and position refine fail",
|
||||
"1203": "All pose refine failed",
|
||||
"1210": "No object can be estimate",
|
||||
|
||||
"1300": "E2E model input data 'coors' are fewer than 128",
|
||||
"1301": "E2E model input data 'point_clouds' are fewer than 128",
|
||||
"1302": "The 'true num' of points is 0; No graspable points are available",
|
||||
"1303": "The model returned no predictions",
|
||||
"1304": "All rotation vector processing failed; no valid rotation matrix was generated"
|
||||
},
|
||||
|
||||
"error": {},
|
||||
|
||||
"fatal": {}
|
||||
}
|
||||
@@ -17,12 +17,12 @@ data_files = [
|
||||
('share/' + package_name, ['package.xml']),
|
||||
('share/' + package_name + '/launch', glob('launch/*.launch.py')),
|
||||
('share/' + package_name + '/configs', glob('configs/*.json')),
|
||||
('share/' + package_name + '/configs/flexiv_configs',
|
||||
glob('configs/flexiv_configs/*.json')),
|
||||
('share/' + package_name + '/configs/hand_eye_mat', glob('configs/hand_eye_mat/*.json')),
|
||||
('share/' + package_name + '/configs/launch_configs',
|
||||
glob('configs/launch_configs/*.json')),
|
||||
('share/' + package_name + '/configs/error_configs', glob('configs/error_configs/*.json')),
|
||||
('share/' + package_name + '/configs/flexiv', glob('configs/flexiv/*.json')),
|
||||
('share/' + package_name + '/configs/launch', glob('configs/launch/*.json')),
|
||||
|
||||
('share/' + package_name + '/calibration', glob('calibration/*.json')),
|
||||
('share/' + package_name + '/map/logging', glob('map/logging/*.json')),
|
||||
('share/' + package_name + '/map/label', glob('map/label/*.json')),
|
||||
|
||||
('share/' + package_name + '/checkpoints', glob('checkpoints/*.pt')),
|
||||
('share/' + package_name + '/checkpoints', glob('checkpoints/*.onnx')),
|
||||
@@ -63,6 +63,8 @@ setup(
|
||||
'get_camera_pose_node = vision_detect.get_camera_pose:main',
|
||||
|
||||
'detect_node = vision_detect.detect_node:main',
|
||||
'detect_node_test = vision_detect.detect_node_test:main',
|
||||
'test_action_client = vision_detect.action_client_node:main'
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
@@ -21,5 +21,5 @@ import pytest
|
||||
def test_flake8():
|
||||
rc, errors = main_with_errors(argv=[])
|
||||
assert rc == 0, \
|
||||
'Found %d code style errors / warnings:\n' % len(errors) + \
|
||||
'Found %d enum style errors / warnings:\n' % len(errors) + \
|
||||
'\n'.join(errors)
|
||||
|
||||
@@ -20,4 +20,4 @@ import pytest
|
||||
@pytest.mark.pep257
|
||||
def test_pep257():
|
||||
rc = main(argv=['.', 'test'])
|
||||
assert rc == 0, 'Found code style errors / warnings'
|
||||
assert rc == 0, 'Found enum style errors / warnings'
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
# This source code is licensed under the MIT license found in the
|
||||
# This source enum is licensed under the MIT license found in the
|
||||
# LICENSE file in the root directory of this source tree.
|
||||
|
||||
''' Pointnet2 layers.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
# This source code is licensed under the MIT license found in the
|
||||
# This source enum is licensed under the MIT license found in the
|
||||
# LICENSE file in the root directory of this source tree.
|
||||
|
||||
''' Modified based on: https://github.com/erikwijmans/Pointnet2_PyTorch '''
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
# This source code is licensed under the MIT license found in the
|
||||
# This source enum is licensed under the MIT license found in the
|
||||
# LICENSE file in the root directory of this source tree.
|
||||
|
||||
''' Modified based on Ref: https://github.com/erikwijmans/Pointnet2_PyTorch '''
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
# This source code is licensed under the MIT license found in the
|
||||
# This source enum is licensed under the MIT license found in the
|
||||
# LICENSE file in the root directory of this source tree.
|
||||
|
||||
from setuptools import setup
|
||||
|
||||
@@ -20,7 +20,7 @@ class ConfigBase(Node):
|
||||
with open(os.path.join(
|
||||
SHARE_DIR, "configs/error_configs/report_logging_define.json"), "r"
|
||||
) as f:
|
||||
WARNING_LOG_MAP = json.load(f)["warring"]
|
||||
WARNING_LOG_MAP = json.load(f)["warning"]
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
@@ -78,7 +78,7 @@ class ConfigBase(Node):
|
||||
"""init parameter"""
|
||||
self.declare_parameter(
|
||||
'configs_path',
|
||||
os.path.join(share_dir, "configs/launch_configs/default_config.json")
|
||||
os.path.join(share_dir, "configs/launch/default_service_config.json")
|
||||
)
|
||||
configs_path = self.get_parameter('configs_path').value
|
||||
with open(configs_path, 'r') as f:
|
||||
|
||||
@@ -94,6 +94,7 @@ class DetectNode(InitBase):
|
||||
def _service_callback(self, request, response):
|
||||
print(" \n ")
|
||||
print("========================== < start > ==========================")
|
||||
time_start = time.time()
|
||||
response.header.stamp = self.get_clock().now().to_msg()
|
||||
response.header.frame_id = "camera_detect"
|
||||
|
||||
@@ -125,13 +126,14 @@ class DetectNode(InitBase):
|
||||
self.p = "head"
|
||||
|
||||
self.camera_size = [color_img_ros.width, color_img_ros.height]
|
||||
|
||||
time1 = time.time()
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
|
||||
|
||||
map1, map2, self.k = get_map(self.k, d, self.camera_size)
|
||||
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
|
||||
|
||||
time2 = time.time()
|
||||
print(f"cv: {(time2 - time1) * 1000} ms")
|
||||
img, pose_list, sign = self.function(color_img_cv, depth_img_cv)
|
||||
if not sign:
|
||||
self.get_logger().warning(self.WARNING_LOG_MAP[str(pose_list)])
|
||||
@@ -158,6 +160,8 @@ class DetectNode(InitBase):
|
||||
if img is None:
|
||||
img = color_img_ros
|
||||
self.pub_detect_image.publish(img)
|
||||
time_end = time.time()
|
||||
print(f"full process: {(time_end - time_start) * 1000} ms")
|
||||
print("=========================== < end > ===========================")
|
||||
return response
|
||||
|
||||
|
||||
@@ -60,6 +60,8 @@ def collision_detector(
|
||||
return 2
|
||||
if hand_top_box.any():
|
||||
return 3
|
||||
else:
|
||||
iou *= 0.5
|
||||
|
||||
left_finger_box = (
|
||||
(points[2] < (z + left_size[2])) & (points[2] > z -0.05)
|
||||
@@ -213,8 +215,9 @@ def refine_grasp_pose(
|
||||
|
||||
if search_mode:
|
||||
right_num = left_num = 0
|
||||
# already in left side
|
||||
if left_last and not right_last:
|
||||
y_min = refine[1]
|
||||
y_min = refine[1] - 0.004
|
||||
while left_num < 10:
|
||||
left_num += 1
|
||||
refine[1] += 0.004
|
||||
@@ -225,11 +228,12 @@ def refine_grasp_pose(
|
||||
refine[1] = (refine[1] - 0.004 + y_min) / 2
|
||||
break
|
||||
else:
|
||||
refine[1] = y_min + 0.2
|
||||
refine[1] = y_min + 0.02
|
||||
print(f"left_num = {left_num}")
|
||||
|
||||
elif not left_last and right_last:
|
||||
y_max = refine[1]
|
||||
# already in right side
|
||||
y_max = refine[1] + 0.004
|
||||
while right_num < 10:
|
||||
right_num += 1
|
||||
refine[1] -= 0.004
|
||||
@@ -240,12 +244,13 @@ def refine_grasp_pose(
|
||||
refine[1] = (refine[1] + 0.004 + y_max) / 2
|
||||
break
|
||||
else:
|
||||
refine[1] = y_max - 0.2
|
||||
refine[1] = y_max - 0.02
|
||||
print(f"right_num = {right_num}")
|
||||
|
||||
elif not left_last and not right_last:
|
||||
# in middle
|
||||
y_cur = refine[1]
|
||||
while left_num < 6:
|
||||
while left_num < 10:
|
||||
left_num += 1
|
||||
refine[1] += 0.004
|
||||
collision_code = collision_detector(
|
||||
@@ -255,11 +260,11 @@ def refine_grasp_pose(
|
||||
y_max = refine[1] - 0.004
|
||||
break
|
||||
else:
|
||||
y_max = refine[1] + 0.024
|
||||
y_max = y_cur + 0.040
|
||||
print(f"left_num = {left_num}")
|
||||
|
||||
refine[1] = y_cur
|
||||
while right_num < 6:
|
||||
while right_num < 10:
|
||||
right_num += 1
|
||||
refine[1] -= 0.004
|
||||
collision_code = collision_detector(
|
||||
@@ -269,7 +274,7 @@ def refine_grasp_pose(
|
||||
refine[1] = (refine[1] + 0.004 + y_max) / 2
|
||||
break
|
||||
else:
|
||||
refine[1] = (y_cur - 0.024 + y_max) / 2
|
||||
refine[1] = (y_cur - 0.040 + y_max) / 2
|
||||
print(f"right_num = {right_num}")
|
||||
|
||||
position = target_position + (expect_orientation @ refine.T).T
|
||||
|
||||
72
vision_detect/vision_detect/action_client_node.py
Normal file
72
vision_detect/vision_detect/action_client_node.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.action import ActionClient
|
||||
from interfaces.action import VisionObjectRecognition # 修改为你的包名
|
||||
|
||||
class VisionObjectRecognitionClient(Node):
|
||||
def __init__(self):
|
||||
super().__init__('vision_object_recognition_client')
|
||||
self._action_client = ActionClient(
|
||||
self,
|
||||
VisionObjectRecognition,
|
||||
'/vision_object_recognition'
|
||||
)
|
||||
|
||||
# 示例目标参数
|
||||
self.camera_position = "left"
|
||||
self.classes = ["medical_box"]
|
||||
|
||||
# 创建 1 秒定时器
|
||||
self.timer = self.create_timer(1.0, self.timer_callback)
|
||||
|
||||
def timer_callback(self):
|
||||
if not self._action_client.server_is_ready():
|
||||
self.get_logger().info("Waiting for action server...")
|
||||
return
|
||||
|
||||
# 发送目标
|
||||
goal_msg = VisionObjectRecognition.Goal()
|
||||
goal_msg.camera_position = self.camera_position
|
||||
goal_msg.classes = self.classes
|
||||
|
||||
self.get_logger().info(f"Sending goal: camera_position={self.camera_position}, classes={self.classes}")
|
||||
future = self._action_client.send_goal_async(
|
||||
goal_msg,
|
||||
feedback_callback=self.feedback_callback
|
||||
)
|
||||
future.add_done_callback(self.goal_response_callback)
|
||||
|
||||
def feedback_callback(self, feedback_msg):
|
||||
feedback = feedback_msg.feedback
|
||||
self.get_logger().info(f"Feedback: status={feedback.status}, info={feedback.info}")
|
||||
|
||||
def goal_response_callback(self, future_response):
|
||||
goal_handle = future_response.result()
|
||||
if not goal_handle.accepted:
|
||||
self.get_logger().warn("Goal rejected by server")
|
||||
return
|
||||
|
||||
self.get_logger().info("Goal accepted by server")
|
||||
result_future = goal_handle.get_result_async()
|
||||
result_future.add_done_callback(self.result_callback)
|
||||
|
||||
def result_callback(self, future_result):
|
||||
result = future_result.result().result
|
||||
self.get_logger().info(f"Result received: success={result.success}, info={result.info}")
|
||||
for obj in result.objects:
|
||||
self.get_logger().info(
|
||||
f"Object: class={obj.class_name}, id={obj.class_id}, pose={obj.pose}, grab_width={obj.grab_width}"
|
||||
)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
client = VisionObjectRecognitionClient()
|
||||
rclpy.spin(client)
|
||||
client.destroy_node()
|
||||
if rclpy.ok():
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -11,4 +11,5 @@ def main(args=None):
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
if rclpy.ok():
|
||||
rclpy.shutdown()
|
||||
18
vision_detect/vision_detect/detect_node_test.py
Normal file
18
vision_detect/vision_detect/detect_node_test.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import rclpy
|
||||
from rclpy.executors import MultiThreadedExecutor
|
||||
|
||||
from vision_detect.vision_core import NodeManager
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = NodeManager('detect')
|
||||
try:
|
||||
executor = MultiThreadedExecutor()
|
||||
rclpy.spin(node, executor=executor)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
if rclpy.ok():
|
||||
rclpy.shutdown()
|
||||
@@ -116,9 +116,9 @@ class DetectNode(Node):
|
||||
self.declare_parameter('output_masks', False)
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('config_name', 'default_config.json')
|
||||
self.declare_parameter('config_name', 'default_service_config.json')
|
||||
self.config_name = self.get_parameter('config_name').value
|
||||
self.config_dir = os.path.join(share_dir, 'configs/flexiv_configs', self.config_name)
|
||||
self.config_dir = os.path.join(share_dir, 'configs/flexiv', self.config_name)
|
||||
|
||||
self.declare_parameter('set_confidence', 0.5)
|
||||
self.set_confidence = self.get_parameter('set_confidence').value
|
||||
|
||||
3
vision_detect/vision_detect/vision_core/__init__.py
Normal file
3
vision_detect/vision_detect/vision_core/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .node import NodeManager
|
||||
|
||||
__all__ = ["NodeManager"]
|
||||
@@ -0,0 +1 @@
|
||||
from . import io
|
||||
@@ -0,0 +1 @@
|
||||
from . import image
|
||||
@@ -0,0 +1,2 @@
|
||||
from .save import *
|
||||
from .draw import *
|
||||
@@ -0,0 +1,60 @@
|
||||
from typing import Union
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from .save import save_img
|
||||
|
||||
|
||||
__all__ = ["draw_boxes", "draw_masks"]
|
||||
|
||||
def draw_boxes(
|
||||
rgb_img: np.ndarray,
|
||||
segmentation_result,
|
||||
put_text: bool = True,
|
||||
save_dir: Union[bool, str] = False,
|
||||
mark_time: bool = False
|
||||
):
|
||||
"""
|
||||
绘制目标检测边界框
|
||||
"""
|
||||
boxes = segmentation_result.boxes.xywh.cpu().numpy()
|
||||
confidences = segmentation_result.boxes.conf.cpu().numpy()
|
||||
class_ids = segmentation_result.boxes.cls.cpu().numpy()
|
||||
labels = segmentation_result.names
|
||||
|
||||
x_centers, y_centers = boxes[:, 0], boxes[:, 1]
|
||||
sorted_index = np.lexsort((-y_centers, x_centers))
|
||||
boxes = boxes[sorted_index]
|
||||
class_ids = class_ids[sorted_index]
|
||||
confidences = confidences[sorted_index]
|
||||
|
||||
for i, box in enumerate(boxes):
|
||||
x_center, y_center, width, height = box[:4]
|
||||
|
||||
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
|
||||
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
|
||||
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
|
||||
|
||||
if put_text:
|
||||
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}',
|
||||
(p1[0], p1[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 1,
|
||||
(255, 255, 0), 2)
|
||||
|
||||
cv2.putText(rgb_img, f"{i}", (p1[0] + 15, p1[1] + 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
|
||||
|
||||
if save_dir:
|
||||
save_img(rgb_img, "detect_color_img.png", save_dir, mark_time)
|
||||
|
||||
|
||||
def draw_masks(rgb_img: np.ndarray, segmentation_result):
|
||||
"""
|
||||
绘制实例分割mask
|
||||
"""
|
||||
masks = segmentation_result.masks.data.cpu().numpy()
|
||||
orig_shape = segmentation_result.masks.orig_shape
|
||||
|
||||
for i, mask in enumerate(masks):
|
||||
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
|
||||
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
@@ -0,0 +1,34 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
__all__ = ["save_img"]
|
||||
|
||||
def save_img(img: np.ndarray, save_name, save_dir: str | None = None, mark_cur_time: bool = False):
|
||||
"""
|
||||
保存图像
|
||||
|
||||
input:
|
||||
img: np.ndarray
|
||||
- uint8, BGR (H, W, 3)
|
||||
- uint16 single-channel
|
||||
save_name: str
|
||||
save_path: str | None
|
||||
mark_cur_time: bool
|
||||
"""
|
||||
if isinstance(save_dir, str):
|
||||
os.makedirs(save_dir, exist_ok=True)
|
||||
save_path = os.path.join(save_dir, save_name)
|
||||
else:
|
||||
home_path = os.path.expanduser("~")
|
||||
save_path = os.path.join(home_path, save_name)
|
||||
|
||||
if mark_cur_time:
|
||||
cur_time = int(time.time() * 1000)
|
||||
path, ext = os.path.splitext(save_path)
|
||||
save_path = path + f'_{cur_time}' + ext
|
||||
|
||||
cv2.imwrite(save_path, img)
|
||||
3
vision_detect/vision_detect/vision_core/core/__init__.py
Normal file
3
vision_detect/vision_detect/vision_core/core/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .managers import *
|
||||
from .struct import *
|
||||
from .enum import NodeType
|
||||
@@ -0,0 +1,5 @@
|
||||
from .object_detector import *
|
||||
from .color_detector import *
|
||||
from .crossboard_detector import *
|
||||
|
||||
# from .detector_baseline import *
|
||||
@@ -0,0 +1,40 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from ..enum import Status
|
||||
from ..struct import ImageData, SegmentationData
|
||||
from .detector_baseline import DetectorBaseline
|
||||
|
||||
|
||||
__all__ = ["ColorDetector"]
|
||||
|
||||
class ColorDetector(DetectorBaseline):
|
||||
def __init__(self, config, _logger):
|
||||
super().__init__()
|
||||
self.distance = config["distance"]
|
||||
self.color_range = config["color_range"]
|
||||
|
||||
def _detect(self, classes_name, image_data: ImageData) -> tuple[SegmentationData | None, int]:
|
||||
if image_data.status != Status.SUCCESS:
|
||||
return None, image_data.status
|
||||
|
||||
color_image = image_data.color_image
|
||||
depth_image = image_data.depth_image
|
||||
|
||||
hsv_img = cv2.cvtColor(color_image, cv2.COLOR_BGR2HSV)
|
||||
|
||||
depth_filter_mask = np.zeros_like(depth_image, dtype=np.uint8)
|
||||
depth_filter_mask[(depth_image > 0) & (depth_image < self.distance)] = 1
|
||||
|
||||
hsv_img[depth_filter_mask == 0] = 0
|
||||
|
||||
mask_part_list = [cv2.inRange(hsv_img, color[0], color[1]) for color in self.color_range]
|
||||
mask = sum(mask_part_list[1:], mask_part_list[0])
|
||||
mask = mask // 255
|
||||
|
||||
if mask is None or not np.any(mask):
|
||||
return None, Status.NO_COLOR
|
||||
|
||||
color_image[mask > 0] = color_image[mask > 0] * 0.5 + np.array([255, 0, 0]) * 0.5
|
||||
|
||||
return SegmentationData.create_mask_only_data(masks=[mask]), Status.SUCCESS
|
||||
@@ -0,0 +1,45 @@
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
from ..enum import Status
|
||||
from ..struct import ImageData, SegmentationData
|
||||
from .detector_baseline import DetectorBaseline
|
||||
|
||||
|
||||
__all__ = ["CrossboardDetector"]
|
||||
|
||||
class CrossboardDetector(DetectorBaseline):
|
||||
def __init__(self, config, _logger):
|
||||
super().__init__()
|
||||
self.pattern_size = config["pattern_size"]
|
||||
|
||||
def _detect(self, classes_name, image_data: ImageData) -> tuple[SegmentationData | None, int]:
|
||||
color_image = image_data.color_image
|
||||
|
||||
rgb_img_gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
|
||||
ret, corners = cv2.findChessboardCorners(
|
||||
rgb_img_gray, self.pattern_size, cv2.CALIB_CB_FAST_CHECK
|
||||
)
|
||||
if not ret:
|
||||
return None, Status.NO_CROSSBOARD
|
||||
|
||||
# 角点亚像素精确化(提高标定精度)
|
||||
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
|
||||
corners_subpix = cv2.cornerSubPix(rgb_img_gray, corners, (11, 11), (-1, -1), criteria)
|
||||
|
||||
corners_subpix = corners_subpix.reshape(self.pattern_size[1], self.pattern_size[0], 2)
|
||||
mask = np.zeros(rgb_img_gray.shape, dtype=np.uint8)
|
||||
|
||||
for i in range(0, self.pattern_size[1] - 1):
|
||||
for j in range(0, self.pattern_size[0] - 1):
|
||||
pts = np.array([
|
||||
corners_subpix[i, j],
|
||||
corners_subpix[i, j + 1],
|
||||
corners_subpix[i + 1, j + 1],
|
||||
corners_subpix[i + 1, j]
|
||||
], dtype=np.int32)
|
||||
cv2.fillConvexPoly(mask, pts, 1)
|
||||
|
||||
color_image[mask > 0] = color_image[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
|
||||
|
||||
return SegmentationData.create_mask_only_data(masks=[mask]), Status.SUCCESS
|
||||
@@ -0,0 +1,14 @@
|
||||
from ..struct import SegmentationData, ImageDataContainer
|
||||
|
||||
|
||||
__all__ = ["DetectorBaseline"]
|
||||
|
||||
class DetectorBaseline:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def _detect(self, classes_name, image_data) -> tuple[SegmentationData | None, int]:
|
||||
pass
|
||||
|
||||
def get_masks(self, position, classes_name, image_data: ImageDataContainer) -> tuple[SegmentationData | None, int]:
|
||||
return self._detect(classes_name, image_data[position])
|
||||
@@ -0,0 +1,65 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
import torch
|
||||
from ultralytics import YOLO
|
||||
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
from ..enum import Status
|
||||
from .detector_baseline import DetectorBaseline
|
||||
from ..struct import ImageData, SegmentationData
|
||||
|
||||
|
||||
__all__ = ["ObjectDetector"]
|
||||
|
||||
SHARE_DIR = get_package_share_directory('vision_detect')
|
||||
|
||||
class ObjectDetector(DetectorBaseline):
|
||||
|
||||
def __init__(self, config, _logger):
|
||||
super().__init__()
|
||||
self.logger = _logger
|
||||
self.confidence = config["confidence"]
|
||||
with open(os.path.join(SHARE_DIR, config["label_map_path"]), "r") as f:
|
||||
self.labels_map = json.load(f)
|
||||
|
||||
"""init model"""
|
||||
checkpoint_path = str(os.path.join(SHARE_DIR, config["checkpoint_path"]))
|
||||
self._device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
try:
|
||||
self.model = YOLO(checkpoint_path)
|
||||
except Exception as e:
|
||||
raise ValueError(f'Failed to load YOLO model: {e}')
|
||||
|
||||
def _detect(
|
||||
self,
|
||||
classes_name: list[str],
|
||||
image_data: ImageData
|
||||
) -> tuple[SegmentationData | None, int]:
|
||||
|
||||
if image_data.status != Status.SUCCESS:
|
||||
return None, image_data.status
|
||||
|
||||
classes = []
|
||||
for _class in classes_name:
|
||||
if _class in self.labels_map:
|
||||
classes.append(self.labels_map[_class])
|
||||
if not classes:
|
||||
self.logger.warning("No legal classes name")
|
||||
classes = None
|
||||
|
||||
color_image = image_data.color_image
|
||||
with torch.no_grad():
|
||||
results = self.model.predict(
|
||||
color_image,
|
||||
device=self._device,
|
||||
retina_masks=True,
|
||||
conf=self.confidence,
|
||||
classes=classes,
|
||||
)
|
||||
|
||||
if results[0].masks is None or len(results[0].masks) == 0:
|
||||
return None, Status.NO_DETECT
|
||||
|
||||
return SegmentationData.create_data(results=results), Status.SUCCESS
|
||||
@@ -0,0 +1,2 @@
|
||||
from .mode_type import *
|
||||
from .logging_code import *
|
||||
@@ -0,0 +1,51 @@
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
__all__ = ["Status"]
|
||||
|
||||
class Status(IntEnum):
|
||||
SUCCESS = 0
|
||||
|
||||
|
||||
FAIL_LOAD_CONFIG = 100
|
||||
NO_NODE_NAME = 101
|
||||
NO_OUTPUT_ATTRIBUTE = 102
|
||||
FAIL_LOAD_SOURCE_CONFIG = 103
|
||||
FAIL_LOAD_DETECT_CONFIG = 104
|
||||
FAIL_LOAD_ESTIMATE_CONFIG = 105
|
||||
|
||||
FAIL_LOAD_CALIBRATION_FILE = 110
|
||||
WRONG_KEY = 111
|
||||
WRONG_SHAPE = 112
|
||||
|
||||
NO_CAMERA_DATA = 200
|
||||
NO_POSITION_DATA = 201
|
||||
NO_ALL_POSITION_DATA = 202
|
||||
|
||||
WORKER_NOT_ALIVE = 300
|
||||
EXECUTOR_ALREADY_STOP = 301
|
||||
TASK_ABORTED = 302
|
||||
WORKER_TIMEOUT = 303
|
||||
|
||||
NO_DETECT = 1000
|
||||
NO_DEPTH_CROP = 1001
|
||||
FAIL_DETECT_VALID_POSE = 1003
|
||||
|
||||
NO_COLOR = 1010
|
||||
|
||||
NO_CROSSBOARD = 1020
|
||||
|
||||
|
||||
TOO_FEW_POINTS_OBB = 1200
|
||||
PCA_NO_VECTOR = 1201
|
||||
REFINE_FAIL = 1202
|
||||
ALL_POSE_REFINE_FAILED = 1203
|
||||
CANNOT_ESTIMATE = 1210
|
||||
|
||||
|
||||
COORS_TOO_FEW = 1300
|
||||
POINT_CLOUDS_TOO_FEW = 1301
|
||||
ZERO_TRUE_NUM = 1302
|
||||
E2E_NO_PREDICTION = 1303
|
||||
E2E_NO_VALID_MATRIX = 1304
|
||||
|
||||
@@ -0,0 +1,33 @@
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
__all__ = [
|
||||
"NodeType", "SourceType", "DetectorType", "RefinerType", "EstimatorType"
|
||||
]
|
||||
|
||||
class NodeType(IntEnum):
|
||||
SERVICE = 0
|
||||
PUBLISHER = 1
|
||||
ACTION = 2
|
||||
|
||||
|
||||
class SourceType(IntEnum):
|
||||
DRIVER = 0
|
||||
DIRECT = 1
|
||||
|
||||
|
||||
class DetectorType(IntEnum):
|
||||
OBJECT = 0
|
||||
COLOR = 1
|
||||
CROSSBOARD = 2
|
||||
|
||||
|
||||
class EstimatorType(IntEnum):
|
||||
PCA = 0
|
||||
ICP = 1
|
||||
E2E = 2
|
||||
|
||||
|
||||
class RefinerType(IntEnum):
|
||||
NO = 0
|
||||
FIXED = 1
|
||||
@@ -0,0 +1,5 @@
|
||||
from .pca_estimator import *
|
||||
from .icp_estimator import *
|
||||
from .e2e_estimator import *
|
||||
|
||||
# from .estimator_baseline import *
|
||||
@@ -0,0 +1,7 @@
|
||||
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .estimator_baseline import EstimatorBaseline
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
from ..struct import ImageData, ImageDataContainer, SegmentationData, PoseData
|
||||
|
||||
__all__ = ["EstimatorBaseline"]
|
||||
|
||||
class EstimatorBaseline:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def _estimate(
|
||||
self,
|
||||
image_data: ImageData,
|
||||
detect_data: SegmentationData,
|
||||
get_grab_width: bool = True
|
||||
) -> tuple[PoseData | None, int]:
|
||||
pass
|
||||
|
||||
def get_poses(
|
||||
self,
|
||||
position: str,
|
||||
image_data: ImageDataContainer,
|
||||
detect_data: SegmentationData,
|
||||
get_grab_width: bool = True
|
||||
) -> tuple[PoseData | None, int]:
|
||||
return self._estimate(
|
||||
image_data=image_data[position],
|
||||
detect_data=detect_data,
|
||||
get_grab_width=get_grab_width
|
||||
)
|
||||
@@ -0,0 +1,7 @@
|
||||
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .estimator_baseline import EstimatorBaseline
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
|
||||
from .. enum import Status
|
||||
from ..utils import pointcloud, image
|
||||
from .estimator_baseline import EstimatorBaseline
|
||||
from ..struct import ImageData, SegmentationData, PoseData
|
||||
|
||||
|
||||
__all__ = ["PCAEstimator"]
|
||||
|
||||
def pca(data: np.ndarray, sort=True):
|
||||
"""主成分分析 """
|
||||
center = np.mean(data, axis=0)
|
||||
centered_points = data - center # 去中心化
|
||||
|
||||
try:
|
||||
cov_matrix = np.cov(centered_points.T) # 转置
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
||||
|
||||
except np.linalg.LinAlgError:
|
||||
return None, None
|
||||
|
||||
if sort:
|
||||
sort = eigenvalues.argsort()[::-1] # 降序排列
|
||||
eigenvalues = eigenvalues[sort] # 索引
|
||||
eigenvectors = eigenvectors[:, sort]
|
||||
|
||||
return eigenvalues, eigenvectors
|
||||
|
||||
|
||||
class PCAEstimator(EstimatorBaseline):
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
|
||||
def _estimate(
|
||||
self,
|
||||
image_data: ImageData,
|
||||
detect_data: SegmentationData,
|
||||
get_grab_width: bool = True,
|
||||
) -> tuple[PoseData | None, int]:
|
||||
|
||||
depth_image = image_data.depth_image
|
||||
karr = image_data.karr
|
||||
h, w = depth_image.shape[:2]
|
||||
image_size = [w, h]
|
||||
masks = detect_data.masks
|
||||
|
||||
# check boxes data
|
||||
boxes = detect_data.boxes
|
||||
if boxes is None:
|
||||
box_sign = False
|
||||
else:
|
||||
box_sign = True
|
||||
|
||||
n = 0
|
||||
pose_data = PoseData()
|
||||
for i, mask in enumerate(masks):
|
||||
karr_mask = karr.copy()
|
||||
|
||||
if box_sign:
|
||||
crops, p = image.crop_imgs_xywh([depth_image, mask], boxes[i], same_sign=True)
|
||||
else:
|
||||
crops, p = image.crop_imgs_mask([depth_image], mask, same_sign=True)
|
||||
|
||||
depth_img_mask = np.zeros_like(crops[0])
|
||||
depth_img_mask[crops[1] > 0] = crops[0][crops[1] > 0]
|
||||
karr_mask[2] -= p[0]
|
||||
karr_mask[5] -= p[1]
|
||||
|
||||
pcd, CODE = pointcloud.create_o3d_denoised_pcd(
|
||||
depth_img_mask, image_size, karr_mask, **self.config
|
||||
)
|
||||
if CODE != 0:
|
||||
pose_data.add_data(CODE)
|
||||
continue
|
||||
|
||||
x, y, z = pcd.get_center()
|
||||
if get_grab_width:
|
||||
if np.asarray(pcd.points).shape[0] < 4:
|
||||
pose_data.add_data(Status.TOO_FEW_POINTS_OBB)
|
||||
continue
|
||||
|
||||
obb = pcd.get_oriented_bounding_box()
|
||||
extent = obb.extent
|
||||
order = np.argsort(-extent)
|
||||
grab_width = extent[order]
|
||||
|
||||
v = obb.R
|
||||
v = v[:, order]
|
||||
if v is None:
|
||||
pose_data.add_data(Status.PCA_NO_VECTOR)
|
||||
continue
|
||||
grab_width = grab_width * 1.05
|
||||
|
||||
else:
|
||||
w, v = pca(np.asarray(pcd.points))
|
||||
|
||||
if w is None or v is None:
|
||||
pose_data.add_data(Status.PCA_NO_VECTOR)
|
||||
continue
|
||||
|
||||
grab_width = (0.0, 0.0, 0.0)
|
||||
|
||||
vx, vy, vz = v[:, 0], v[:, 1], v[:, 2]
|
||||
|
||||
if vx[0] < 0:
|
||||
vx = -vx
|
||||
if vy[1] < 0:
|
||||
vy = -vy
|
||||
if not np.allclose(np.cross(vx, vy), vz):
|
||||
vz = -vz
|
||||
|
||||
R = np.column_stack((vx, vy, vz))
|
||||
pose_mat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
|
||||
pose_data.add_data(Status.SUCCESS, pose_mat, tuple(grab_width))
|
||||
n += 1
|
||||
|
||||
if n == 0:
|
||||
return pose_data, Status.CANNOT_ESTIMATE
|
||||
return pose_data, Status.SUCCESS
|
||||
@@ -0,0 +1,4 @@
|
||||
from .driver_source import *
|
||||
from .topic_source import *
|
||||
|
||||
# from .source_baseline import *
|
||||
@@ -0,0 +1,28 @@
|
||||
from rclpy.node import Node
|
||||
|
||||
from interfaces.msg import ImgMsg
|
||||
|
||||
from .source_baseline import SourceBaseline
|
||||
|
||||
|
||||
__all__ = ["DriverSource"]
|
||||
|
||||
class DriverSource(SourceBaseline):
|
||||
def __init__(self, config, node: Node):
|
||||
super().__init__()
|
||||
self.sub = node.create_subscription(
|
||||
ImgMsg,
|
||||
config["subscription_name"],
|
||||
self._subscription_callback,
|
||||
10
|
||||
)
|
||||
|
||||
def _subscription_callback(self, msg):
|
||||
with self.lock:
|
||||
self.images_buffer.save_data(
|
||||
position=msg.position,
|
||||
color=msg.image_color,
|
||||
depth=msg.image_depth,
|
||||
karr=msg.karr,
|
||||
darr=msg.darr
|
||||
)
|
||||
@@ -0,0 +1,120 @@
|
||||
import time
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
from ..enum import Status
|
||||
from ..struct import ImageDataContainer
|
||||
from ..utils import image
|
||||
|
||||
|
||||
__all__ = ['SourceBaseline']
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _BufferData:
|
||||
image_color: NDArray | None
|
||||
image_depth: NDArray | None
|
||||
karr: NDArray | list[float] | None
|
||||
darr: NDArray | list[float] | None
|
||||
|
||||
def is_empty(self):
|
||||
return (self.image_color is None or self.image_depth is None
|
||||
or self.karr is None or self.darr is None)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _ImageBuffer:
|
||||
data_dict: dict[str, _BufferData] = field(default_factory=dict)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
raise AttributeError
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.data_dict[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.data_dict
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data_dict)
|
||||
|
||||
def save_data(
|
||||
self,
|
||||
position: str,
|
||||
*,
|
||||
color: NDArray | None,
|
||||
depth: NDArray | None,
|
||||
karr: list[float] | None,
|
||||
darr: list[float] | None,
|
||||
):
|
||||
self.data_dict[position] = _BufferData(color, depth, karr, darr)
|
||||
|
||||
|
||||
class SourceBaseline:
|
||||
def __init__(self):
|
||||
self.images_buffer = _ImageBuffer()
|
||||
self.cv_bridge = CvBridge()
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def get_images(self, positions: tuple[str, ...]) -> tuple[ImageDataContainer | None, int]:
|
||||
time_start = time.time()
|
||||
with self.lock:
|
||||
image_data = ImageDataContainer()
|
||||
if len(self.images_buffer) == 0:
|
||||
return None, Status.NO_CAMERA_DATA
|
||||
|
||||
buffer_data_list = []
|
||||
for position in positions:
|
||||
if not (position in self.images_buffer):
|
||||
# image_data.add_data(position, Status.NO_POSITION_DATA)
|
||||
buffer_data_list.append(Status.NO_POSITION_DATA)
|
||||
continue
|
||||
|
||||
if self.images_buffer[position].is_empty():
|
||||
# image_data.add_data(position, Status.NO_POSITION_DATA)
|
||||
buffer_data_list.append(Status.NO_POSITION_DATA)
|
||||
continue
|
||||
|
||||
buffer_data_list.append(self.images_buffer[position])
|
||||
|
||||
time_1 = time.time()
|
||||
valid_positions = 0
|
||||
for data in buffer_data_list:
|
||||
if data == Status.NO_POSITION_DATA:
|
||||
image_data.add_data(position, Status.NO_POSITION_DATA)
|
||||
continue
|
||||
|
||||
color_img_cv = self.cv_bridge.imgmsg_to_cv2(
|
||||
self.images_buffer[position].image_color, "bgr8")
|
||||
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(
|
||||
self.images_buffer[position].image_depth, '16UC1')
|
||||
|
||||
camera_size = color_img_cv.shape[:2][::-1]
|
||||
color_img_cv, depth_img_cv, k = image.distortion_correction(
|
||||
color_img_cv,
|
||||
depth_img_cv,
|
||||
self.images_buffer[position].karr,
|
||||
self.images_buffer[position].darr,
|
||||
camera_size
|
||||
)
|
||||
|
||||
image_data.add_data(
|
||||
position=position,
|
||||
status=Status.SUCCESS,
|
||||
color_image=color_img_cv,
|
||||
depth_image=depth_img_cv,
|
||||
karr=list(k),
|
||||
darr=tuple(self.images_buffer[position].darr)
|
||||
)
|
||||
valid_positions += 1
|
||||
|
||||
time_end = time.time()
|
||||
|
||||
print(f"get_data: {(time_1 - time_start) * 1000} ms")
|
||||
print(f"img_cv_process: {(time_end - time_1) * 1000} ms")
|
||||
if valid_positions == 0:
|
||||
return None, Status.NO_ALL_POSITION_DATA
|
||||
else:
|
||||
return image_data, Status.SUCCESS
|
||||
@@ -0,0 +1,57 @@
|
||||
import rclpy
|
||||
from cv_bridge import CvBridge
|
||||
from rclpy.task import Future
|
||||
from rclpy.node import Node
|
||||
from message_filters import Subscriber, ApproximateTimeSynchronizer
|
||||
|
||||
from sensor_msgs.msg import Image, CameraInfo
|
||||
|
||||
from .source_baseline import SourceBaseline
|
||||
|
||||
|
||||
__all__ = ["TopicSource"]
|
||||
|
||||
class TopicSource(SourceBaseline):
|
||||
def __init__(self, config, node: Node):
|
||||
super().__init__()
|
||||
self.position = config["position"]
|
||||
self.future = Future()
|
||||
self.camera_info = []
|
||||
|
||||
self.sub_camera_info = node.create_subscription(
|
||||
CameraInfo,
|
||||
config["camera_info_topic_name"],
|
||||
self._camera_info_callback,
|
||||
10
|
||||
)
|
||||
|
||||
node.get_logger().info("Waiting for camera info...")
|
||||
rclpy.spin_until_future_complete(node, self.future)
|
||||
node.destroy_subscription(self.sub_camera_info)
|
||||
node.get_logger().info("Camera info received.")
|
||||
|
||||
self.sub_color_image = Subscriber(self, Image, config["color_image_topic_name"])
|
||||
self.sub_depth_image = Subscriber(self, Image, config["depth_image_topic_name"])
|
||||
self.sync_subscriber = ApproximateTimeSynchronizer(
|
||||
[self.sub_color_image, self.sub_depth_image],
|
||||
queue_size=10,
|
||||
slop=0.1
|
||||
)
|
||||
self.sync_subscriber.registerCallback(self._sync_sub_callback)
|
||||
|
||||
def _camera_info_callback(self, msg):
|
||||
self.camera_info = [msg.k, msg.d]
|
||||
if (self.camera_info[0] is not None and len(self.camera_info[0]) > 0 and
|
||||
self.camera_info[1] is not None and len(self.camera_info[1]) > 0):
|
||||
if not self.future.done():
|
||||
self.future.set_result(True)
|
||||
|
||||
def _sync_sub_callback(self, color, depth):
|
||||
with self.lock:
|
||||
self.images_buffer.save_data(
|
||||
position=self.position,
|
||||
color=color,
|
||||
depth=depth,
|
||||
karr=self.camera_info[0],
|
||||
darr=self.camera_info[1]
|
||||
)
|
||||
@@ -0,0 +1,6 @@
|
||||
from .config_manager import *
|
||||
from .source_manager import *
|
||||
from .detector_manager import *
|
||||
from .estimator_manager import *
|
||||
from .refiner_manager import *
|
||||
from .resource_manager import *
|
||||
@@ -0,0 +1,142 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
|
||||
from ..utils import io, format
|
||||
from ..map import CONFIG_MAP
|
||||
from ..enum import SourceType, DetectorType, EstimatorType, RefinerType, NodeType
|
||||
|
||||
|
||||
__all__ = ["ConfigManager"]
|
||||
|
||||
class ConfigManager:
|
||||
def __init__(self, logging_map_path, logger=None):
|
||||
self.logger = logger or logging
|
||||
|
||||
with open(logging_map_path, 'r') as f:
|
||||
self.logging_map = json.load(f)["warning"]
|
||||
|
||||
self.left = None
|
||||
self.right = None
|
||||
self.top = None
|
||||
|
||||
self.node_name = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
self.save_image = None
|
||||
self.image_save_dir = None
|
||||
|
||||
# Didn't need storage
|
||||
self.node_mode = None
|
||||
self.source_mode = None
|
||||
self.detector_mode = None
|
||||
self.estimator_mode = None
|
||||
self.refiner_mode = None
|
||||
|
||||
self.node_config = None
|
||||
self.source_config = None
|
||||
self.detector_config = None
|
||||
self.estimator_config = None
|
||||
self.refiner_config = None
|
||||
|
||||
self.calibration = None
|
||||
|
||||
def load_config(self, config_path):
|
||||
with open(config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
self.calibration = config["calibration"]
|
||||
|
||||
self.node_name = config["node_name"]
|
||||
self.output_boxes = config.get("output_boxes", False)
|
||||
self.output_masks = config.get("output_masks", False)
|
||||
|
||||
self.save_image = config.get("save_image", True)
|
||||
if self.save_image:
|
||||
self.image_save_dir = os.path.expanduser(config.get("image_save_dir", "~/images"))
|
||||
if not os.path.exists(self.image_save_dir):
|
||||
os.mkdir(self.image_save_dir)
|
||||
|
||||
node_mode = config["node_mode"]
|
||||
image_source = config["image_source"]
|
||||
detect_mode = config["detect_mode"]
|
||||
estimate_mode = config["estimate_mode"]
|
||||
refine_mode = config["refine_mode"]
|
||||
|
||||
try:
|
||||
self.node_mode = NodeType[node_mode]
|
||||
self.node_config = config.get(CONFIG_MAP["node"].get(self.node_mode))
|
||||
if self.node_config is None:
|
||||
raise KeyError
|
||||
except (KeyError, ValueError):
|
||||
self.node_mode = NodeType.SERVICE
|
||||
self.node_config = config["service_configs"]
|
||||
|
||||
try:
|
||||
self.source_mode = SourceType[image_source]
|
||||
self.source_config = config.get(CONFIG_MAP["source"].get(self.source_mode))
|
||||
if self.source_config is None:
|
||||
raise KeyError
|
||||
except (KeyError, ValueError):
|
||||
self.source_mode = SourceType.DRIVER
|
||||
self.source_config = config["driver_configs"]
|
||||
|
||||
try:
|
||||
self.detector_mode = DetectorType[detect_mode]
|
||||
self.detector_config = config.get(CONFIG_MAP["detector"].get(self.detector_mode))
|
||||
if self.detector_config is None:
|
||||
raise KeyError
|
||||
except (KeyError, ValueError):
|
||||
self.detector_mode = DetectorType.OBJECT
|
||||
self.detector_config = config["object_configs"]
|
||||
|
||||
try:
|
||||
self.estimator_mode = EstimatorType[estimate_mode]
|
||||
self.estimator_config = config.get(CONFIG_MAP["estimator"].get(self.estimator_mode))
|
||||
if self.estimator_config is None:
|
||||
raise KeyError
|
||||
except (KeyError, ValueError):
|
||||
self.estimator_mode = EstimatorType.PCA
|
||||
self.estimator_config = config["pca_configs"]
|
||||
|
||||
try:
|
||||
self.refiner_mode = RefinerType[refine_mode]
|
||||
if self.refiner_mode == RefinerType.NO:
|
||||
self.refiner_config = {}
|
||||
elif (self.refiner_mode == RefinerType.FIXED
|
||||
and self.detector_mode == DetectorType.OBJECT):
|
||||
self.refiner_config = {
|
||||
"depth_scale": self.estimator_config.get("depth_scale", 1000.0),
|
||||
"depth_trunc": self.estimator_config.get("depth_trunc", 3.0),
|
||||
"voxel_size": self.estimator_config.get("voxel_size", 0.002)
|
||||
}
|
||||
else: raise KeyError
|
||||
except KeyError or ValueError:
|
||||
self.refiner_mode = RefinerType.NO
|
||||
self.refiner_config = {}
|
||||
|
||||
# self.logger.info(f"node: {self.node_mode}")
|
||||
# self.logger.info(f"source: {self.source_mode}")
|
||||
# self.logger.info(f"detector: {self.detector_mode}")
|
||||
# self.logger.info(f"estimator: {self.estimator_mode}")
|
||||
# self.logger.info(f"refiner: {self.refiner_mode}")
|
||||
|
||||
def load_calibration(self, shared_dir):
|
||||
eye_in_left_hand_path = os.path.join(shared_dir, self.calibration["left_hand"])
|
||||
eye_in_right_hand_path = os.path.join(shared_dir, self.calibration["right_hand"])
|
||||
eye_to_hand_path = os.path.join(shared_dir, self.calibration["head"])
|
||||
|
||||
self.left, CODE = io.load_calibration_mat(eye_in_left_hand_path)
|
||||
self.logger.info(f"\nleft_hand_mat: \n{format.np_mat2str(self.left)}")
|
||||
if CODE != 0:
|
||||
self.logger.warning(f"left_hand: {self.logging_map[f'{CODE:04d}']}")
|
||||
|
||||
self.right, CODE = io.load_calibration_mat(eye_in_right_hand_path)
|
||||
self.logger.info(f"\nright_hand_mat: \n{format.np_mat2str(self.right)}")
|
||||
if CODE != 0:
|
||||
self.logger.warning(f"right_hand: {self.logging_map[f'{CODE:04d}']}")
|
||||
|
||||
self.top, CODE = io.load_calibration_mat(eye_to_hand_path)
|
||||
self.logger.info(f"\ntop_mat: \n{format.np_mat2str(self.top)}")
|
||||
if CODE != 0:
|
||||
self.logger.warning(f"top: {self.logging_map[f'{CODE:04d}']}")
|
||||
@@ -0,0 +1,21 @@
|
||||
import logging
|
||||
|
||||
from ..map import DETECTOR_MAP
|
||||
from ..struct import ImageDataContainer, SegmentationData
|
||||
|
||||
|
||||
__all__ = ["DetectorManager"]
|
||||
|
||||
class DetectorManager:
|
||||
def __init__(self, mode, config, logger=None):
|
||||
_logger = logger or logging
|
||||
self.detector = DETECTOR_MAP.get(mode)(config, _logger)
|
||||
|
||||
def get_masks(
|
||||
self,
|
||||
position,
|
||||
classes_name,
|
||||
image_data: ImageDataContainer
|
||||
) -> tuple[SegmentationData | None, int]:
|
||||
return self.detector.get_masks(
|
||||
position=position, classes_name=classes_name, image_data=image_data)
|
||||
@@ -0,0 +1,23 @@
|
||||
from ..map import ESTIMATOR_MAP
|
||||
from ..struct import ImageDataContainer, SegmentationData
|
||||
|
||||
|
||||
__all__ = ['EstimatorManager']
|
||||
|
||||
class EstimatorManager:
|
||||
def __init__(self, mode, config):
|
||||
self.estimator = ESTIMATOR_MAP[mode](config)
|
||||
|
||||
def get_poses(
|
||||
self,
|
||||
position: str,
|
||||
image_data:ImageDataContainer,
|
||||
detect_data: SegmentationData,
|
||||
get_grab_width: bool = True,
|
||||
):
|
||||
return self.estimator.get_poses(
|
||||
position=position,
|
||||
image_data=image_data,
|
||||
detect_data=detect_data,
|
||||
get_grab_width=get_grab_width
|
||||
)
|
||||
@@ -0,0 +1,27 @@
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from ..map import REFINER_MAP
|
||||
from ..struct import ImageDataContainer, PoseData
|
||||
|
||||
|
||||
__all__ = ["RefinerManager"]
|
||||
|
||||
class RefinerManager:
|
||||
def __init__(self, mode, config):
|
||||
self.refiner = REFINER_MAP[mode](config)
|
||||
|
||||
def get_refine(
|
||||
self,
|
||||
position: str,
|
||||
image_data_container: ImageDataContainer,
|
||||
pose_data: PoseData,
|
||||
calibration_mat_dict: dict[str, NDArray],
|
||||
**kwargs
|
||||
):
|
||||
return self.refiner.get_refine(
|
||||
position=position,
|
||||
image_data_container=image_data_container,
|
||||
pose_data=pose_data,
|
||||
calibration_mat_dict=calibration_mat_dict,
|
||||
**kwargs
|
||||
)
|
||||
@@ -0,0 +1,22 @@
|
||||
from .config_manager import ConfigManager
|
||||
|
||||
|
||||
__all__ = ["ResourceManager"]
|
||||
|
||||
class ResourceManager:
|
||||
def __init__(self, config_manager: ConfigManager):
|
||||
self.logging_map = config_manager.logging_map
|
||||
self.node_name = config_manager.node_name
|
||||
|
||||
self.calibration_matrix = {
|
||||
"left": config_manager.left,
|
||||
"right": config_manager.right,
|
||||
"top": config_manager.top
|
||||
}
|
||||
|
||||
self.save_image = config_manager.save_image
|
||||
if self.save_image:
|
||||
self.image_save_dir = config_manager.image_save_dir
|
||||
|
||||
self.output_boxes = config_manager.output_boxes
|
||||
self.output_masks = config_manager.output_masks
|
||||
@@ -0,0 +1,17 @@
|
||||
from typing import Tuple
|
||||
|
||||
from rclpy.node import Node
|
||||
|
||||
from ..map import SOURCE_MAP
|
||||
from ..struct import ImageDataContainer
|
||||
|
||||
|
||||
class SourceManager:
|
||||
"""
|
||||
register source
|
||||
"""
|
||||
def __init__(self, mode, config, node: Node):
|
||||
self.source = SOURCE_MAP.get(mode)(config, node)
|
||||
|
||||
def get_images(self, positions) -> Tuple[ImageDataContainer | None, int]:
|
||||
return self.source.get_images(positions=positions)
|
||||
@@ -0,0 +1 @@
|
||||
from .manager_map import *
|
||||
@@ -0,0 +1,58 @@
|
||||
from .. import image_sources, detectors, estimators, refiners
|
||||
from ..enum import NodeType, SourceType, DetectorType, EstimatorType, RefinerType
|
||||
|
||||
|
||||
__all__ = [
|
||||
"SOURCE_MAP", "DETECTOR_MAP",
|
||||
"ESTIMATOR_MAP", "REFINER_MAP",
|
||||
"CONFIG_MAP"
|
||||
]
|
||||
|
||||
|
||||
DETECTOR_MAP = {
|
||||
DetectorType.OBJECT: detectors.ObjectDetector,
|
||||
DetectorType.COLOR: detectors.ColorDetector,
|
||||
DetectorType.CROSSBOARD: detectors.CrossboardDetector
|
||||
}
|
||||
|
||||
|
||||
SOURCE_MAP = {
|
||||
SourceType.DRIVER: image_sources.DriverSource,
|
||||
SourceType.DIRECT: image_sources.TopicSource
|
||||
}
|
||||
|
||||
|
||||
ESTIMATOR_MAP = {
|
||||
EstimatorType.PCA: estimators.PCAEstimator,
|
||||
# EstimatorType.ICP: estimators.ICPEstimator,
|
||||
# EstimatorType.E2E: estimators.E2EEstimator,
|
||||
}
|
||||
|
||||
|
||||
REFINER_MAP = {
|
||||
RefinerType.NO: refiners.NoRefiner,
|
||||
RefinerType.FIXED: refiners.FixedOrientationRefiner
|
||||
}
|
||||
|
||||
CONFIG_MAP = {
|
||||
"node": {
|
||||
NodeType.PUBLISHER: "publisher_configs",
|
||||
NodeType.SERVICE: "service_configs",
|
||||
NodeType.ACTION: "action_configs",
|
||||
},
|
||||
"source":{
|
||||
SourceType.DRIVER: "driver_configs",
|
||||
SourceType.DIRECT: "direct_configs"
|
||||
},
|
||||
"detector":{
|
||||
DetectorType.OBJECT: "object_configs",
|
||||
DetectorType.COLOR: "color_configs",
|
||||
DetectorType.CROSSBOARD: "crossboard_configs"
|
||||
},
|
||||
"estimator":{
|
||||
(EstimatorType.PCA, None): "pca_configs",
|
||||
(EstimatorType.ICP, DetectorType.OBJECT): "icp_configs",
|
||||
(EstimatorType.E2E, DetectorType.OBJECT): "e2e_configs"
|
||||
},
|
||||
"refiner":{}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
from .fixed_orientation_refiner import *
|
||||
from .no_refiner import *
|
||||
@@ -0,0 +1,426 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
|
||||
from ..enum import Status
|
||||
from ..utils import pointcloud, algorithm
|
||||
from ..struct import ImageData, PoseData
|
||||
from .refiner_baseline import RefinerBaseline
|
||||
|
||||
|
||||
__all__ = ['FixedOrientationRefiner']
|
||||
|
||||
def collision_detector(
|
||||
points: np.ndarray,
|
||||
refine: np.ndarray,
|
||||
volume: list[float],
|
||||
iou: float = 0.001,
|
||||
search_mode: bool = False,
|
||||
**kwargs
|
||||
) -> int:
|
||||
"""
|
||||
collision detector
|
||||
-----
|
||||
input:
|
||||
points: np.ndarray (3, n)
|
||||
refine: np.ndarray (3, ), Grab target poes coordinate system
|
||||
volume: list [left, right]
|
||||
iou : float
|
||||
search_mode: bool, Default False
|
||||
**kwargs:
|
||||
"grab_width": float
|
||||
"hand_size": list [width, height, length]
|
||||
"left_size": list [thick, width, length]
|
||||
"right_size": list [thick, width, length]
|
||||
thick of gripper finger, width of gripper finger, length of gripper finger
|
||||
output:
|
||||
collision_code: int
|
||||
"""
|
||||
hand_size = kwargs.get('hand_size', [0.113, 0.063, 0.13])
|
||||
left_size = kwargs.get('left_size', [0.006, 0.037, 0.086])
|
||||
right_size = kwargs.get('right_size', [0.006, 0.037, 0.086])
|
||||
grab_width = kwargs.get('grab_width', 0.10) * 0.95
|
||||
x, y, z = refine
|
||||
|
||||
if not search_mode:
|
||||
hand_top_box = (
|
||||
(points[2] < z) & (points[2] > (z - hand_size[2]))
|
||||
& (points[0] < (x - hand_size[1]*1/4)) & (points[0] > (x - hand_size[1]/2))
|
||||
& (points[1] < (y + hand_size[0]/2)) & (points[1] > (y - hand_size[0]/2))
|
||||
)
|
||||
hand_center_box = (
|
||||
(points[2] < z) & (points[2] > (z - hand_size[2]))
|
||||
& (points[0] < (x + hand_size[1]*1/4)) & (points[0] > (x - hand_size[1]*1/4))
|
||||
& (points[1] < (y + hand_size[0]/2)) & (points[1] > (y - hand_size[0]/2))
|
||||
)
|
||||
hand_bottom_box = (
|
||||
(points[2] < z) & (points[2] > (z - hand_size[2]))
|
||||
& (points[0] < (x + hand_size[1]/2)) & (points[0] > (x + hand_size[1]*1/4))
|
||||
& (points[1] < (y + hand_size[0]/2)) & (points[1] > (y - hand_size[0]/2))
|
||||
)
|
||||
|
||||
if (hand_top_box.any() and hand_bottom_box.any()) or hand_center_box.any():
|
||||
return 1
|
||||
if hand_bottom_box.any():
|
||||
return 2
|
||||
if hand_top_box.any():
|
||||
return 3
|
||||
else:
|
||||
iou *= 0.5
|
||||
|
||||
left_finger_box = (
|
||||
(points[2] < (z + left_size[2])) & (points[2] > z -0.05)
|
||||
& (points[0] < (x + left_size[1]/2)) & (points[0] > (x - left_size[1]/2))
|
||||
& (points[1] < (y + grab_width/2 + left_size[0])) & (points[1] > (y + grab_width/2))
|
||||
)
|
||||
right_finger_box = (
|
||||
(points[2] < (z + right_size[2])) & (points[2] > z - 0.05)
|
||||
& (points[0] < (x + right_size[1]/2)) & (points[0] > (x - right_size[1]/2))
|
||||
& (points[1] < (y - grab_width/2)) & (points[1] > (y-(grab_width/2 + right_size[0])))
|
||||
)
|
||||
|
||||
left_iou = left_finger_box.sum() / (volume[0]+1e-6)
|
||||
right_iou = right_finger_box.sum() / (volume[1]+1e-6)
|
||||
|
||||
if left_iou > iou and right_iou > iou:
|
||||
return 6
|
||||
|
||||
if left_iou > iou :
|
||||
return 4
|
||||
if right_iou > iou :
|
||||
return 5
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def refine_grasp_pose(
|
||||
points: np.ndarray,
|
||||
voxel_size: float,
|
||||
target_position: np.ndarray,
|
||||
expect_orientation: np.ndarray | None = None,
|
||||
search_mode: bool = False,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
refine grasp pose
|
||||
-----
|
||||
input:
|
||||
points: np.ndarray (n, 3), already complete voxel downsample
|
||||
voxel_size: float
|
||||
target_position: np.ndarray (3, ), grab target position
|
||||
expect_orientation: np.ndarray (3, 3), expect grab target orientation
|
||||
search_mode: bool, Default False
|
||||
**kwargs:
|
||||
"grab_width": float
|
||||
"hand_size": list [width, height, length]
|
||||
"left_size": list [thick, width, length]
|
||||
"right_size": list [thick, width, length]
|
||||
thick of gripper finger, width of gripper finger, length of gripper finger
|
||||
output:
|
||||
"""
|
||||
if expect_orientation is None:
|
||||
expect_orientation = np.asarray([[0, -1, 0], [1, 0, 0], [0, 0, 1]])
|
||||
|
||||
refine = np.zeros(3)
|
||||
grab_width = kwargs.get('grab_width', 0.10)
|
||||
hand_size = kwargs.get('hand_size', [0.113, 0.063, 0.13])
|
||||
left_size = kwargs.get('left_size', [0.006, 0.037, 0.086])
|
||||
right_size = kwargs.get('right_size', [0.006, 0.037, 0.086])
|
||||
|
||||
left_volume = left_size[0] * left_size[1] * left_size[2] / (voxel_size**3)
|
||||
right_volume = right_size[0] * right_size[1] * right_size[2] / (voxel_size**3)
|
||||
|
||||
points = points - target_position[None, :] # (n, 3) - (1, 3) = (n, 3)
|
||||
points = expect_orientation.T @ points.T # (3, 3) @ (3, n) = (3, n)
|
||||
|
||||
points = points[:,
|
||||
(points[2] < left_size[2]) & (points[2] > -(0.1 + hand_size[2]))
|
||||
& (points[0] < (0.05 + hand_size[1]/2)) & (points[0] > -(0.05 + hand_size[1]/2))
|
||||
& (points[1] < (0.05 + grab_width/2 + left_size[0]))
|
||||
& (points[1] > -(0.05 + grab_width/2 + right_size[0]))
|
||||
]
|
||||
|
||||
frist_sign = False
|
||||
y_n = y_p = False
|
||||
left_last = right_last = False
|
||||
hand_num = left_num = right_num = 0
|
||||
while hand_num < 20 and left_num < 10 and right_num < 10:
|
||||
collision_code = collision_detector(
|
||||
points, refine, volume=[left_volume, right_volume], **kwargs
|
||||
)
|
||||
|
||||
if collision_code == 0:
|
||||
if y_p and not y_n:
|
||||
refine[1] += 0.004
|
||||
y_p = y_n = False
|
||||
left_last = True
|
||||
right_last = False
|
||||
continue
|
||||
if not y_p and y_n:
|
||||
refine[1] -= 0.004
|
||||
y_p = y_n = False
|
||||
left_last = False
|
||||
right_last = True
|
||||
continue
|
||||
if search_mode:
|
||||
break
|
||||
position = target_position + (expect_orientation @ refine.T).T
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray(position)), expect_orientation, [1, 1, 1]
|
||||
)
|
||||
return rmat, Status.SUCCESS
|
||||
if collision_code == 6:
|
||||
return None, Status.REFINE_FAIL
|
||||
|
||||
if collision_code == 1:
|
||||
refine[2] -= 0.008
|
||||
hand_num += 1
|
||||
print("z + 0.008")
|
||||
continue
|
||||
|
||||
if collision_code == 2:
|
||||
if frist_sign:
|
||||
print("z + 0.004")
|
||||
refine[2] -= 0.004
|
||||
frist_sign = True
|
||||
continue
|
||||
refine[0] -= 0.002
|
||||
hand_num += 1
|
||||
print("x - 0.002")
|
||||
continue
|
||||
|
||||
if collision_code == 3:
|
||||
if frist_sign:
|
||||
print("z + 0.004")
|
||||
refine[2] -= 0.004
|
||||
frist_sign = True
|
||||
continue
|
||||
refine[0] += 0.002
|
||||
hand_num += 1
|
||||
print("x + 0.002")
|
||||
continue
|
||||
|
||||
step = 0.004
|
||||
if collision_code == 4:
|
||||
y_p = True
|
||||
y_n = False
|
||||
if not y_p and y_n:
|
||||
refine[1] -= step
|
||||
step /= 2
|
||||
if step <= 0.001:
|
||||
return None, Status.REFINE_FAIL
|
||||
refine[1] += step
|
||||
left_num += 1
|
||||
print("y + 0.004")
|
||||
continue
|
||||
|
||||
if collision_code == 5:
|
||||
y_p = False
|
||||
y_n = True
|
||||
if y_p and not y_n:
|
||||
refine[1] += step
|
||||
step /= 2
|
||||
if step <= 0.001:
|
||||
return None, Status.REFINE_FAIL
|
||||
refine[1] -= step
|
||||
right_num += 1
|
||||
print("y - 0.004")
|
||||
continue
|
||||
else:
|
||||
return None, Status.REFINE_FAIL
|
||||
|
||||
# max_moves = 20
|
||||
# step = 0.004
|
||||
# x_moves, y_moves, z_moves = 0, 0, 0
|
||||
# last_collision_direction = None # 记录最后一次碰撞发生的方向
|
||||
# first_up_collision = False # 标记是否第一次发生上端碰撞
|
||||
# first_down_collision = False # 标记是否第一次发生下端碰撞
|
||||
# last_left_position = None # 记录左指碰撞前的位置
|
||||
# last_right_position = None # 记录右指碰撞前的位置
|
||||
#
|
||||
# while x_moves < max_moves and y_moves < max_moves and z_moves < max_moves:
|
||||
# # 每次进入循环时先进行碰撞检测
|
||||
# collision = collision_detector(points, refine, volume=[left_volume, right_volume], **kwargs)
|
||||
#
|
||||
# # 如果没有碰撞,且上次碰撞是在左右方向,继续按方向移动
|
||||
# if collision == 0:
|
||||
# if last_collision_direction == "left":
|
||||
# refine[0] += step # 向左指碰撞的方向移动
|
||||
# elif last_collision_direction == "right":
|
||||
# refine[0] -= step # 向右指碰撞的方向移动
|
||||
# if search_mode:
|
||||
# break
|
||||
# position = target_position + (expect_orientation @ refine.T).T
|
||||
# rmat = tfs.affines.compose(
|
||||
# np.squeeze(np.asarray(position)), expect_orientation, [1, 1, 1]
|
||||
# )
|
||||
# return rmat, Status.SUCCESS
|
||||
#
|
||||
# if collision == 1: # 掌心或手掌上下两端同时碰撞
|
||||
# refine[2] -= step # 在z方向调整位置
|
||||
# z_moves += 1
|
||||
# elif collision == 3: # 手掌上端碰撞
|
||||
# if not first_up_collision: # 第一次发生上端碰撞
|
||||
# refine[2] -= step
|
||||
# first_up_collision = True # 记录第一次发生上端碰撞
|
||||
# z_moves += 1
|
||||
# elif x_moves < max_moves: # 后续上端碰撞,调整x轴
|
||||
# refine[0] += step
|
||||
# x_moves += 1
|
||||
# elif collision == 2: # 手掌下端碰撞
|
||||
# if not first_down_collision: # 第一次发生下端碰撞
|
||||
# refine[2] -= step
|
||||
# first_down_collision = True # 记录第一次发生下端碰撞
|
||||
# z_moves += 1
|
||||
# elif x_moves < max_moves: # 后续下端碰撞,调整x轴
|
||||
# refine[0] -= step
|
||||
# x_moves += 1
|
||||
# elif collision == 6: # 双指碰撞
|
||||
# return None
|
||||
#
|
||||
# # 根据碰撞类型处理调整方向
|
||||
# elif collision == 4: # 左指碰撞
|
||||
# if last_right_position is not None:
|
||||
# step /= 2 # 步长减半
|
||||
# if step <= 0.001:
|
||||
# return None, Status.REFINE_FAIL
|
||||
# refine = last_right_position.copy() # 回到右指碰撞前的位置
|
||||
# last_right_position = None
|
||||
# last_collision_direction = "right" # 设置为左指方向
|
||||
# refine[0] -= step
|
||||
# x_moves += 1
|
||||
# continue # 继续进行下一次碰撞检测
|
||||
# last_collision_direction = "left"
|
||||
# last_left_position = refine.copy() # 记录左指碰撞前的位置
|
||||
# refine[0] += step # 向左指方向移动
|
||||
# x_moves += 1
|
||||
#
|
||||
# elif collision == 5: # 右指碰撞
|
||||
# if last_left_position is not None:
|
||||
# step /= 2
|
||||
# if step <= 0.001:
|
||||
# return None, Status.REFINE_FAIL
|
||||
# refine = last_left_position.copy() # 回到左指碰撞前的位置
|
||||
# last_left_position = None
|
||||
# last_collision_direction = "left" # 设置为右指方向
|
||||
# refine[0] += step
|
||||
# x_moves += 1
|
||||
# continue # 继续进行下一次碰撞检测
|
||||
# last_collision_direction = "right"
|
||||
# last_right_position = refine.copy() # 记录右指碰撞前的位置
|
||||
# refine[0] -= step # 向右指方向移动
|
||||
# x_moves += 1
|
||||
# else:
|
||||
# return None, Status.REFINE_FAIL
|
||||
|
||||
|
||||
|
||||
if search_mode:
|
||||
right_num = left_num = 0
|
||||
# already in left side
|
||||
if left_last and not right_last:
|
||||
y_min = refine[1] - 0.004
|
||||
while left_num < 10:
|
||||
left_num += 1
|
||||
refine[1] += 0.004
|
||||
collision_code = collision_detector(
|
||||
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
|
||||
)
|
||||
if collision_code:
|
||||
refine[1] = (refine[1] - 0.004 + y_min) / 2
|
||||
break
|
||||
else:
|
||||
refine[1] = y_min + 0.02
|
||||
print(f"left_num = {left_num}")
|
||||
|
||||
elif not left_last and right_last:
|
||||
# already in right side
|
||||
y_max = refine[1] + 0.004
|
||||
while right_num < 10:
|
||||
right_num += 1
|
||||
refine[1] -= 0.004
|
||||
collision_code = collision_detector(
|
||||
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
|
||||
)
|
||||
if collision_code:
|
||||
refine[1] = (refine[1] + 0.004 + y_max) / 2
|
||||
break
|
||||
else:
|
||||
refine[1] = y_max - 0.02
|
||||
print(f"right_num = {right_num}")
|
||||
|
||||
elif not left_last and not right_last:
|
||||
# in middle
|
||||
y_cur = refine[1]
|
||||
while left_num < 10:
|
||||
left_num += 1
|
||||
refine[1] += 0.004
|
||||
collision_code = collision_detector(
|
||||
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
|
||||
)
|
||||
if collision_code:
|
||||
y_max = refine[1] - 0.004
|
||||
break
|
||||
else:
|
||||
y_max = y_cur + 0.040
|
||||
print(f"left_num = {left_num}")
|
||||
|
||||
refine[1] = y_cur
|
||||
while right_num < 10:
|
||||
right_num += 1
|
||||
refine[1] -= 0.004
|
||||
collision_code = collision_detector(
|
||||
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
|
||||
)
|
||||
if collision_code:
|
||||
refine[1] = (refine[1] + 0.004 + y_max) / 2
|
||||
break
|
||||
else:
|
||||
refine[1] = (y_cur - 0.040 + y_max) / 2
|
||||
print(f"right_num = {right_num}")
|
||||
|
||||
position = target_position + (expect_orientation @ refine.T).T
|
||||
rmat = tfs.affines.compose(
|
||||
np.squeeze(np.asarray(position)),
|
||||
expect_orientation,
|
||||
[1, 1, 1]
|
||||
)
|
||||
return rmat, Status.SUCCESS
|
||||
return None, Status.REFINE_FAIL
|
||||
|
||||
|
||||
class FixedOrientationRefiner(RefinerBaseline):
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
|
||||
def _refine(self, image_data: ImageData, pose_data: PoseData, calibration_mat: np.ndarray,
|
||||
**kwargs) -> tuple[PoseData, int]:
|
||||
|
||||
image_size = image_data.depth_image.shape[:2][::-1]
|
||||
full_points = pointcloud.create_o3d_pcd(
|
||||
image_data.depth_image, image_size, image_data.karr, **self.config
|
||||
)
|
||||
|
||||
n = 0
|
||||
for i, (status, pose_mat, grasp_width) in enumerate(pose_data):
|
||||
if status != 0:
|
||||
continue
|
||||
|
||||
pose_mat, CODE = refine_grasp_pose(
|
||||
full_points, self.config.get("voxel_size"), pose_mat[0:3, 3], search_mode=True
|
||||
)
|
||||
if CODE != 0:
|
||||
pose_data.set_data(i, CODE)
|
||||
continue
|
||||
|
||||
pose_mat = calibration_mat @ pose_mat
|
||||
quat = algorithm.rmat2quat(pose_mat)
|
||||
pose_data.set_data(i, Status.SUCCESS, quat, grasp_width)
|
||||
n += 1
|
||||
|
||||
if n == 0:
|
||||
return pose_data, Status.ALL_POSE_REFINE_FAILED
|
||||
|
||||
return pose_data, Status.SUCCESS
|
||||
@@ -0,0 +1,29 @@
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ..enum import Status
|
||||
from .refiner_baseline import RefinerBaseline
|
||||
from ..struct import ImageDataContainer, PoseData
|
||||
from ..utils import algorithm
|
||||
|
||||
__all__ = ["NoRefiner"]
|
||||
|
||||
SUCCESS = 0
|
||||
|
||||
class NoRefiner(RefinerBaseline):
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
|
||||
def _refine(self, image_data: Optional[ImageDataContainer], pose_data: Optional[PoseData],
|
||||
calibration_mat: Optional[np.ndarray], **kwargs) -> Tuple[Optional[PoseData], int]:
|
||||
for i, (status, pose_mat, grasp_width) in enumerate(pose_data):
|
||||
if status != 0:
|
||||
continue
|
||||
|
||||
pose_mat = calibration_mat @ pose_mat
|
||||
quat = algorithm.rmat2quat(pose_mat)
|
||||
pose_data.set_data(i, Status.SUCCESS, quat, grasp_width)
|
||||
|
||||
return pose_data, Status.SUCCESS
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from ..struct import ImageData, ImageDataContainer, PoseData
|
||||
|
||||
|
||||
__all__ = ['RefinerBaseline']
|
||||
|
||||
class RefinerBaseline:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def _refine(
|
||||
self,
|
||||
image_data:ImageData,
|
||||
pose_data: PoseData,
|
||||
calibration_mat: NDArray,
|
||||
**kwargs
|
||||
) -> tuple[PoseData | None, int]:
|
||||
pass
|
||||
|
||||
def get_refine(
|
||||
self,
|
||||
position: str,
|
||||
image_data_container:ImageDataContainer,
|
||||
pose_data: PoseData,
|
||||
calibration_mat_dict: dict[str, NDArray],
|
||||
**kwargs
|
||||
) -> tuple[PoseData | None, int]:
|
||||
return self._refine(
|
||||
image_data=image_data_container[position],
|
||||
pose_data=pose_data,
|
||||
calibration_mat=calibration_mat_dict[position],
|
||||
**kwargs
|
||||
)
|
||||
@@ -0,0 +1,3 @@
|
||||
from .image_data import *
|
||||
from .detect_data import *
|
||||
from .pose_data import *
|
||||
@@ -0,0 +1,88 @@
|
||||
from dataclasses import dataclass, field
|
||||
from numpy.typing import NDArray
|
||||
|
||||
import numpy as np
|
||||
|
||||
__all__ = ["SegmentationData"]
|
||||
|
||||
SUCCESS = 0
|
||||
|
||||
@dataclass(slots=True)
|
||||
class SegmentationData:
|
||||
status: int
|
||||
results: list | None = None
|
||||
|
||||
masks: list[NDArray] | None = field(init=False, default=None)
|
||||
boxes: list[NDArray] | None = field(init=False, default=None)
|
||||
class_ids: list[int] | None = field(init=False, default=None)
|
||||
confidences: list[float] | None = field(init=False, default=None)
|
||||
labels_map: list[str] | None = field(init=False, default=None)
|
||||
|
||||
def __iter__(self):
|
||||
"""遍历获得id和标签名"""
|
||||
if (self.masks is None or self.boxes is None or self.confidences is None
|
||||
or self.class_ids is None or self.labels_map is None or len(self.class_ids) == 0):
|
||||
return iter(())
|
||||
|
||||
return (
|
||||
(mask, box, conf, cid, self.labels_map[int(cid)])
|
||||
for mask, box, cid, conf in zip(self.masks, self.boxes,
|
||||
self.class_ids, self.confidences)
|
||||
)
|
||||
|
||||
def __getitem__(self, index: int) -> tuple:
|
||||
return (
|
||||
self.masks[index], self.boxes[index],
|
||||
self.confidences[index], self.class_ids[index],
|
||||
self.labels_map[int(self.class_ids[index])]
|
||||
)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
raise AttributeError
|
||||
|
||||
def __len__(self) -> int:
|
||||
return 0 if self.masks is None else len(self.masks)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.status == SUCCESS and self.results is not None:
|
||||
if len(self.results) != 1:
|
||||
raise ValueError("results must only contain exactly one element")
|
||||
self._analysis()
|
||||
|
||||
def _analysis(self):
|
||||
result = self.results[0]
|
||||
masks = result.masks.data.cpu().numpy()
|
||||
|
||||
# Get boxes
|
||||
boxes = result.boxes.xywh.cpu().numpy()
|
||||
class_ids = result.boxes.cls.cpu().numpy().astype(np.int32)
|
||||
confidences = result.boxes.conf.cpu().numpy()
|
||||
self.labels_map = result.names
|
||||
|
||||
# Sort
|
||||
x_centers, y_centers = boxes[:, 0], boxes[:, 1]
|
||||
sorted_index = np.lexsort((-y_centers, x_centers))
|
||||
self.masks = masks[sorted_index]
|
||||
self.boxes = boxes[sorted_index]
|
||||
self.class_ids = class_ids[sorted_index].tolist()
|
||||
self.confidences = confidences[sorted_index].tolist()
|
||||
|
||||
@classmethod
|
||||
def create_mask_only_data(cls, masks: list[NDArray]):
|
||||
obj = cls(status=SUCCESS, results=None)
|
||||
obj.masks = masks
|
||||
if obj.masks is None or len(obj.masks) == 0:
|
||||
raise ValueError
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def create_data(cls, results):
|
||||
obj = cls(status=SUCCESS, results=results)
|
||||
if obj.masks is None or len(obj.masks) == 0:
|
||||
raise ValueError
|
||||
return obj
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class SegmentationDataContainer:
|
||||
pass
|
||||
@@ -0,0 +1,57 @@
|
||||
from dataclasses import dataclass, field
|
||||
from numpy.typing import NDArray
|
||||
|
||||
|
||||
__all__ = ["ImageDataContainer", "ImageData"]
|
||||
|
||||
SUCCESS = 0
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ImageData:
|
||||
status: int
|
||||
color_image: NDArray | None = None
|
||||
depth_image: NDArray | None = None
|
||||
karr: tuple[float, ...] | NDArray | None = None
|
||||
darr: tuple[float, ...] | NDArray | None = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ImageDataContainer:
|
||||
_data_dict: dict[str, ImageData] = field(default_factory=dict)
|
||||
|
||||
def __getitem__(self, position: str) -> ImageData:
|
||||
item = self._data_dict.get(position)
|
||||
if item is None:
|
||||
raise KeyError(f"Position '{position}' not found in ImageDataContainer")
|
||||
return item
|
||||
|
||||
def __setitem__(self, position: str, value):
|
||||
raise AttributeError
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data_dict)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._data_dict.items())
|
||||
|
||||
def add_data(
|
||||
self,
|
||||
position: str,
|
||||
status: int,
|
||||
color_image: NDArray | None = None,
|
||||
depth_image: NDArray | None = None,
|
||||
karr: list[float] | NDArray | None = None,
|
||||
darr: tuple[float, ...] | NDArray | None = None
|
||||
):
|
||||
if status == SUCCESS:
|
||||
self._data_dict[position] = (ImageData(status, color_image, depth_image, karr, darr))
|
||||
else:
|
||||
self._data_dict[position] = (ImageData(status, None, None, None, None))
|
||||
|
||||
def check_data_status(self, logger, logging_map):
|
||||
for position, data in self._data_dict.items():
|
||||
if data.status == 0:
|
||||
continue
|
||||
logger.warning(
|
||||
f"{position}-Image: {logging_map.get(f'{data.status:04d}', f'unknown code: {data.status:04d}')}"
|
||||
)
|
||||
@@ -0,0 +1,68 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Iterator
|
||||
from numpy.typing import NDArray
|
||||
|
||||
|
||||
__all__ = ["PoseData"]
|
||||
|
||||
SUCCESS = 0
|
||||
|
||||
PoseDataItemType = tuple[int, tuple[float, ...] | NDArray | None, tuple[float, ...] | None]
|
||||
|
||||
@dataclass(slots=True)
|
||||
class _PoseDataItem:
|
||||
status: int
|
||||
pose: tuple[float, ...] | NDArray | None
|
||||
grasp_width: tuple[float, ...] | None
|
||||
|
||||
def as_tuple(self) -> PoseDataItemType:
|
||||
return self.status, self.pose, self.grasp_width
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PoseData:
|
||||
# _data_list: [[status, pose, grasp_width]]
|
||||
_data_list: list[_PoseDataItem] = field(default_factory=list)
|
||||
|
||||
def __iter__(self) -> Iterator[PoseDataItemType]:
|
||||
for item in self._data_list:
|
||||
yield item.status, item.pose, item.grasp_width
|
||||
|
||||
def __getitem__(self, index: int) -> PoseDataItemType:
|
||||
return self._data_list[index].as_tuple()
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data_list)
|
||||
|
||||
def add_data(
|
||||
self,
|
||||
status: int,
|
||||
pose: tuple[float, ...] | NDArray | None = None,
|
||||
grasp_width: tuple[float, ...] | None = None
|
||||
):
|
||||
|
||||
if status == SUCCESS:
|
||||
if pose is None or grasp_width is None:
|
||||
raise ValueError("pose and grasp_width cannot be None when status is SUCCESS")
|
||||
self._data_list.append(_PoseDataItem(status, pose, grasp_width))
|
||||
else:
|
||||
self._data_list.append(_PoseDataItem(status, None, None))
|
||||
|
||||
def set_data(
|
||||
self,
|
||||
index: int,
|
||||
status: int,
|
||||
pose: tuple[float, ...] | NDArray | None = None,
|
||||
grasp_width: tuple[float, ...] | None = None
|
||||
):
|
||||
if status == SUCCESS:
|
||||
if pose is None or grasp_width is None:
|
||||
raise ValueError("pose and grasp_width cannot be None when status is SUCCESS")
|
||||
self._data_list[index] = _PoseDataItem(status, pose, grasp_width)
|
||||
else:
|
||||
self._data_list[index] = _PoseDataItem(status, None, None)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PoseDataContainer:
|
||||
pass
|
||||
@@ -0,0 +1,3 @@
|
||||
from . import pointcloud, image, io, format
|
||||
|
||||
__all__ = ['pointcloud', 'image', "io", "format"]
|
||||
@@ -0,0 +1 @@
|
||||
from .transforms import *
|
||||
@@ -0,0 +1,20 @@
|
||||
import numpy as np
|
||||
import transforms3d as tfs
|
||||
|
||||
|
||||
__all__ = ['rmat2quat', 'quat2rmat']
|
||||
|
||||
def rmat2quat(rmat):
|
||||
"""Convert rotation matrix to quaternion."""
|
||||
x, y, z = rmat[0:3, 3:4].flatten()
|
||||
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
||||
quat = [x, y, z, rw, rx, ry, rz]
|
||||
return quat
|
||||
|
||||
|
||||
def quat2rmat(quat):
|
||||
"""Convert quaternion to rotation matrix."""
|
||||
x, y, z, rw, rx, ry, rz = quat
|
||||
r = tfs.quaternions.quat2mat([rw, rx, ry, rz])
|
||||
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), r, [1, 1, 1])
|
||||
return rmat
|
||||
@@ -0,0 +1 @@
|
||||
from .np import *
|
||||
@@ -0,0 +1,17 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
__all__ = ["np_mat2str"]
|
||||
|
||||
|
||||
def np_mat2str(mat, indent=4):
|
||||
mat_str = np.array2string(
|
||||
mat,
|
||||
precision=6,
|
||||
separator=', ',
|
||||
suppress_small=True,
|
||||
max_line_width=1000
|
||||
)
|
||||
pad = " " * indent
|
||||
mat_str = "\n".join(pad + line for line in mat_str.splitlines())
|
||||
return f"{mat_str}"
|
||||
@@ -0,0 +1,2 @@
|
||||
from .crop import *
|
||||
from .refine_map import *
|
||||
107
vision_detect/vision_detect/vision_core/core/utils/image/crop.py
Normal file
107
vision_detect/vision_detect/vision_core/core/utils/image/crop.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import numpy as np
|
||||
import logging
|
||||
|
||||
|
||||
__all__ = [
|
||||
"crop_imgs_xywh",
|
||||
"crop_imgs_xyxy",
|
||||
"crop_imgs_mask",
|
||||
]
|
||||
|
||||
def crop_imgs_xywh(imgs: list, box, same_sign: bool = False):
|
||||
"""
|
||||
Crop imgs
|
||||
-----------------------------
|
||||
input:
|
||||
imgs: list, Each img in imgs has the same Width and High.
|
||||
box: The YOLO model outputs bounding box data in the format [x, y, w, h, confidence, class_id].
|
||||
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
|
||||
output:
|
||||
crop_imgs: list;
|
||||
(x_min, y_min);
|
||||
"""
|
||||
if not imgs:
|
||||
logging.warning("imgs is empty")
|
||||
return [], (0, 0)
|
||||
|
||||
if not same_sign and len(imgs) != 1:
|
||||
for img in imgs:
|
||||
if imgs[0].shape != img.shape:
|
||||
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
|
||||
|
||||
high, width = imgs[0].shape[:2]
|
||||
x_center, y_center, w, h = box[:4]
|
||||
x_min, x_max = max(0, int(round(x_center - w/2))), min(int(round(x_center + w/2)), width-1)
|
||||
y_min, y_max = max(0, int(round(y_center - h/2))), min(int(round(y_center + h/2)), high-1)
|
||||
|
||||
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
|
||||
|
||||
return crop_imgs, (x_min, y_min)
|
||||
|
||||
|
||||
def crop_imgs_xyxy(imgs: list, box, same_sign: bool = False):
|
||||
"""
|
||||
Crop imgs
|
||||
-----------------------------
|
||||
input:
|
||||
imgs: list, Each img in imgs has the same Width and High.
|
||||
box: The YOLO model outputs bounding box data in the format [x1, y1, x2, y2, confidence, class_id].
|
||||
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
|
||||
output:
|
||||
crop_imgs: list
|
||||
(x_min, y_min)
|
||||
"""
|
||||
if not imgs:
|
||||
logging.warning("imgs is empty")
|
||||
return [], (0, 0)
|
||||
|
||||
if not same_sign and len(imgs) != 1:
|
||||
for img in imgs:
|
||||
if imgs[0].shape != img.shape:
|
||||
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
|
||||
|
||||
high, width = imgs[0].shape[:2]
|
||||
x1, y1, x2, y2 = box[:4]
|
||||
x_min, x_max = max(0, int(round(x1))), min(int(round(x2)), width - 1)
|
||||
y_min, y_max = max(0, int(round(y1))), min(int(round(y2)), high - 1)
|
||||
|
||||
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
|
||||
|
||||
return crop_imgs, (x_min, y_min)
|
||||
|
||||
|
||||
def crop_imgs_mask(imgs: list, mask: np.ndarray, same_sign: bool = False):
|
||||
"""
|
||||
Crop imgs
|
||||
-----------------------------
|
||||
input:
|
||||
imgs: list, Each img in imgs has the same Width and High.
|
||||
mask: np.ndarray
|
||||
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
|
||||
output:
|
||||
crop_imgs: list
|
||||
(x_min, y_min)
|
||||
"""
|
||||
if not imgs:
|
||||
logging.warning("imgs is empty")
|
||||
return [], (0, 0)
|
||||
|
||||
if not same_sign and len(imgs) != 1:
|
||||
for img in imgs:
|
||||
if imgs[0].shape != img.shape:
|
||||
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
|
||||
|
||||
high, width = imgs[0].shape[:2]
|
||||
ys, xs = np.where(mask > 0)
|
||||
|
||||
if xs.size == 0 or ys.size == 0:
|
||||
# 没有有效像素
|
||||
return None, None, (None, None)
|
||||
|
||||
x_min, x_max = max(0, int(round(xs.min()))), min(int(round(xs.max())), width - 1)
|
||||
y_min, y_max = max(0, int(round(ys.min()))), min(int(round(ys.max())), high - 1)
|
||||
|
||||
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
|
||||
crop_imgs.append(mask[y_min:y_max + 1, x_min:x_max + 1])
|
||||
|
||||
return crop_imgs, (x_min, y_min)
|
||||
@@ -0,0 +1,40 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
__all__ = ['distortion_correction']
|
||||
|
||||
def distortion_correction(
|
||||
color_image: np.ndarray,
|
||||
depth_image: np.ndarray,
|
||||
k: list,
|
||||
d: list,
|
||||
camera_size: list
|
||||
):
|
||||
"""
|
||||
畸变矫正
|
||||
|
||||
input:
|
||||
color_image: np.ndarray
|
||||
depth_image: np.ndarray
|
||||
k: list, shape (9)
|
||||
d: list
|
||||
camera_size: list, [w, h]
|
||||
output:
|
||||
undistorted_color: np.ndarray
|
||||
undistorted_depth: np.ndarray
|
||||
new_k: list, shape (9)
|
||||
"""
|
||||
h, w = camera_size[::-1]
|
||||
k = np.array(k).reshape(3, 3)
|
||||
d = np.array(d)
|
||||
new_k, _ = cv2.getOptimalNewCameraMatrix(k, d, (w, h), 1, (w, h))
|
||||
map1, map2 = cv2.initUndistortRectifyMap(k, d, None, new_k, (w, h), cv2.CV_32FC1)
|
||||
|
||||
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
|
||||
undistorted_color = undistorted_color.astype(color_image.dtype)
|
||||
|
||||
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
|
||||
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
|
||||
|
||||
return undistorted_color, undistorted_depth, new_k.flatten()
|
||||
@@ -0,0 +1 @@
|
||||
from .load_calibration import *
|
||||
@@ -0,0 +1,30 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
import numpy as np
|
||||
|
||||
__all__ = ["load_calibration_mat"]
|
||||
|
||||
SUCCESS = 0
|
||||
FAIL_LOAD_FILE = 110
|
||||
WRONG_KEY = 111
|
||||
WRONG_SHAPE = 112
|
||||
|
||||
def load_calibration_mat(mat_path):
|
||||
"""load calibration matrix from a JSON file."""
|
||||
code = SUCCESS
|
||||
mat = np.eye(4)
|
||||
if not os.path.exists(mat_path):
|
||||
code = FAIL_LOAD_FILE
|
||||
else:
|
||||
with open(mat_path, "r", encoding="utf-8") as f:
|
||||
try:
|
||||
mat = np.array(json.load(f)["T"])
|
||||
if mat.shape != (4, 4):
|
||||
code = WRONG_SHAPE
|
||||
mat = np.eye(4)
|
||||
else:
|
||||
code = SUCCESS
|
||||
except Exception:
|
||||
code = WRONG_KEY
|
||||
return mat, code
|
||||
@@ -0,0 +1 @@
|
||||
from .o3d import *
|
||||
@@ -0,0 +1,150 @@
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
|
||||
|
||||
__all__ = [
|
||||
"create_o3d_pcd",
|
||||
"create_o3d_denoised_pcd"
|
||||
]
|
||||
|
||||
SUCCESS = 0
|
||||
TOO_MACH_NOISE = 1010
|
||||
POINTS_EMPTY = 1101
|
||||
POINTS_TOO_CLOSELY = 1102
|
||||
|
||||
|
||||
def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: float = 0.005):
|
||||
"""点云去噪"""
|
||||
point_cloud = point_cloud.remove_non_finite_points()
|
||||
down_pcd = point_cloud.voxel_down_sample(voxel_size=voxel_size)
|
||||
point_num = len(down_pcd.points)
|
||||
|
||||
# 半径滤波
|
||||
clean_pcd, _ = down_pcd.remove_radius_outlier(
|
||||
nb_points=max(int(round(10 * voxel_size / 0.005)), 3),
|
||||
radius=voxel_size * 10
|
||||
)
|
||||
|
||||
# 统计滤波
|
||||
clean_pcd, _ = clean_pcd.remove_statistical_outlier(
|
||||
nb_neighbors=max(int(round(10 * voxel_size / 0.005)), 3),
|
||||
std_ratio=2.0
|
||||
)
|
||||
|
||||
# 过滤过近的点
|
||||
points = np.asarray(clean_pcd.points)
|
||||
clean_pcd.points = o3d.utility.Vector3dVector(points[points[:, 2] >= 0.2])
|
||||
|
||||
# # 使用数量最大簇判定噪声强度
|
||||
# _, counts = np.unique(labels[labels >= 0], return_counts=True)
|
||||
# largest_cluster_ratio = counts.max() / len(labels)
|
||||
# if largest_cluster_ratio < 0.5:
|
||||
# return None
|
||||
|
||||
labels = np.array(
|
||||
clean_pcd.cluster_dbscan(
|
||||
eps=voxel_size * 10,
|
||||
min_points=max(int(round(10 * voxel_size / 0.005)), 3)
|
||||
)
|
||||
)
|
||||
if len(labels[labels >= 0]) == 0:
|
||||
return clean_pcd, SUCCESS
|
||||
|
||||
# 使用距离最近簇作为物体
|
||||
points = np.asarray(clean_pcd.points)
|
||||
cluster_label = set(labels)
|
||||
point_cloud_clusters = []
|
||||
for label in cluster_label:
|
||||
if label == -1:
|
||||
continue
|
||||
idx = np.where(labels == label)[0]
|
||||
point_cloud_cluster = clean_pcd.select_by_index(idx)
|
||||
points_cluster_z = points[idx, 2]
|
||||
z_avg = np.mean(points_cluster_z)
|
||||
if z_avg < 0.2:
|
||||
continue
|
||||
point_cloud_clusters.append((point_cloud_cluster, z_avg))
|
||||
|
||||
if len(point_cloud_clusters) == 0:
|
||||
return None, POINTS_TOO_CLOSELY
|
||||
|
||||
point_cloud_clusters.sort(key=lambda x: x[1])
|
||||
clean_pcd = point_cloud_clusters[0][0]
|
||||
|
||||
# 使用最近簇判断噪音强度
|
||||
largest_cluster_ratio = len(clean_pcd.points) / point_num
|
||||
if largest_cluster_ratio < 0.05:
|
||||
return None, TOO_MACH_NOISE
|
||||
|
||||
return clean_pcd, SUCCESS
|
||||
|
||||
|
||||
def create_o3d_denoised_pcd(depth_img_mask, camera_size, k, **kwargs):
|
||||
"""
|
||||
create o3d denoised pcd
|
||||
--------
|
||||
input:
|
||||
depth_img_mask: np.ndarray
|
||||
intrinsics: o3d.camera.PinholeCameraIntrinsic
|
||||
**kwargs:
|
||||
"depth_scale": float
|
||||
"depth_trunc": float
|
||||
"voxel_size": float
|
||||
output:
|
||||
point_cloud: o3d.geometry.PointCloud
|
||||
"""
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(camera_size[0]), int(camera_size[1]), k[0], k[4], k[2], k[5]
|
||||
)
|
||||
|
||||
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
||||
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=kwargs.get("depth_scale", 1000.0),
|
||||
depth_trunc=kwargs.get("depth_trunc", 3.0),
|
||||
)
|
||||
|
||||
point_cloud, CODE = point_cloud_denoising(
|
||||
point_cloud, kwargs.get("voxel_size", 0.002)
|
||||
)
|
||||
if CODE != SUCCESS:
|
||||
return None, CODE
|
||||
|
||||
if len(point_cloud.points) == 0:
|
||||
return None, POINTS_EMPTY
|
||||
|
||||
return point_cloud, SUCCESS
|
||||
|
||||
|
||||
def create_o3d_pcd(depth_img, camera_size, k, **kwargs):
|
||||
"""
|
||||
create o3d pcd
|
||||
--------
|
||||
input:
|
||||
depth_img: np.ndarray
|
||||
camera_size: list
|
||||
k: np.ndarray | list
|
||||
**kwargs:
|
||||
"depth_scale": float
|
||||
"depth_trunc": float
|
||||
"voxel_size": float
|
||||
output:
|
||||
orign_points: o3d.geometry.PointCloud
|
||||
"""
|
||||
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
||||
int(camera_size[0]), int(camera_size[1]), k[0], k[4], k[2], k[5]
|
||||
)
|
||||
depth_o3d = o3d.geometry.Image(depth_img.astype(np.uint16))
|
||||
orign_point_clouds = o3d.geometry.PointCloud.create_from_depth_image(
|
||||
depth=depth_o3d,
|
||||
intrinsic=intrinsics,
|
||||
depth_scale=kwargs.get("depth_scale", 1000.0),
|
||||
depth_trunc=kwargs.get("depth_trunc", 3.0),
|
||||
)
|
||||
orign_point_clouds = orign_point_clouds.voxel_down_sample(
|
||||
kwargs.get("voxel_size", 0.01)
|
||||
)
|
||||
orign_points = np.asarray(orign_point_clouds.points)
|
||||
|
||||
return orign_points
|
||||
@@ -0,0 +1 @@
|
||||
from .task_executer import *
|
||||
@@ -0,0 +1,88 @@
|
||||
import queue
|
||||
import threading
|
||||
|
||||
from ..core.enum import Status
|
||||
|
||||
|
||||
__all__ = ["TaskExecutor"]
|
||||
|
||||
class TaskExecutor:
|
||||
def __init__(self):
|
||||
self._queue = queue.Queue()
|
||||
self._worker_running_sign: bool = False
|
||||
self._is_stopped_sign: bool = False
|
||||
self._submit_refuse_sign: bool = False
|
||||
self._state_lock: threading.Lock = threading.Lock()
|
||||
self._worker = threading.Thread(
|
||||
target=self._worker_loop,
|
||||
daemon=True,
|
||||
name="TaskExecutor"
|
||||
)
|
||||
|
||||
self.start()
|
||||
|
||||
def start(self):
|
||||
self._worker_running_sign = True
|
||||
self._worker.start()
|
||||
|
||||
def submit_and_wait(self, func, args=None, kwargs=None):
|
||||
if not self._worker.is_alive():
|
||||
return None, Status.WORKER_NOT_ALIVE
|
||||
|
||||
if self._submit_refuse_sign:
|
||||
return None, Status.EXECUTOR_ALREADY_STOP
|
||||
|
||||
if args is None:
|
||||
args = ()
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
event = threading.Event()
|
||||
results = []
|
||||
self._queue.put((func, args, kwargs, event, results))
|
||||
if not event.wait(timeout=10):
|
||||
return None, Status.WORKER_TIMEOUT
|
||||
|
||||
if self._submit_refuse_sign:
|
||||
return None, Status.TASK_ABORTED
|
||||
|
||||
return results
|
||||
|
||||
def _worker_loop(self):
|
||||
while self._worker_running_sign:
|
||||
try:
|
||||
func, args, kwargs, event, results = self._queue.get(timeout=1)
|
||||
except queue.Empty:
|
||||
if not self._worker_running_sign and self._queue.empty():
|
||||
break
|
||||
continue
|
||||
|
||||
try:
|
||||
result, CODE = func(*args, **kwargs)
|
||||
results.append(result)
|
||||
results.append(CODE)
|
||||
finally:
|
||||
event.set() # 唤醒原回调线程
|
||||
self._queue.task_done()
|
||||
|
||||
def stop(self):
|
||||
self._submit_refuse_sign = True
|
||||
with self._state_lock:
|
||||
if self._is_stopped_sign:
|
||||
return
|
||||
self._worker_running_sign = False
|
||||
|
||||
while True:
|
||||
try:
|
||||
func, args, kwargs, event, results = self._queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
results.append(None)
|
||||
results.append(Status.TASK_ABORTED)
|
||||
event.set()
|
||||
self._queue.task_done()
|
||||
|
||||
# 等待工作线程正常退出
|
||||
self._worker.join(timeout=2)
|
||||
with self._state_lock:
|
||||
self._is_stopped_sign = True
|
||||
1
vision_detect/vision_detect/vision_core/node/__init__.py
Normal file
1
vision_detect/vision_detect/vision_core/node/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .node_manager import NodeManager
|
||||
@@ -0,0 +1,30 @@
|
||||
from ..core import (ConfigManager, SourceManager, DetectorManager,
|
||||
EstimatorManager, RefinerManager, ResourceManager)
|
||||
|
||||
|
||||
__all__ = ["NodeBaseline"]
|
||||
|
||||
class NodeBaseline:
|
||||
def __init__(self, config_manager: ConfigManager, ros_node):
|
||||
self._source_manager = SourceManager(
|
||||
config_manager.source_mode, config_manager.source_config, ros_node
|
||||
)
|
||||
ros_node.get_logger().info("Image source ready")
|
||||
|
||||
self._detector_manager = DetectorManager(
|
||||
config_manager.detector_mode, config_manager.detector_config
|
||||
)
|
||||
ros_node.get_logger().info("Detector ready")
|
||||
|
||||
self._estimator_manager = EstimatorManager(
|
||||
config_manager.estimator_mode, config_manager.estimator_config
|
||||
)
|
||||
ros_node.get_logger().info("Estimator ready")
|
||||
|
||||
self._refiner_manager = RefinerManager(
|
||||
config_manager.refiner_mode, config_manager.refiner_config
|
||||
)
|
||||
ros_node.get_logger().info("Refiner ready")
|
||||
|
||||
self._resource_manager = ResourceManager(config_manager)
|
||||
ros_node.get_logger().info("ResourceManager ready")
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user