Merge branch 'feature' into feature_cpp_test
This commit is contained in:
@@ -2,28 +2,31 @@
|
||||
"node_name": "bottle_detect_service",
|
||||
"output_boxes": "True",
|
||||
"output_masks": "False",
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"get_camera_mode": "Service",
|
||||
"Service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
},
|
||||
|
||||
"detect_mode": "Detect",
|
||||
"Detect_configs": {
|
||||
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
|
||||
"confidence": 0.50,
|
||||
"classes": [39]
|
||||
},
|
||||
|
||||
"calculate_mode": "PCA",
|
||||
"ICP_configs": {
|
||||
"complete_model_path": "pointclouds/bottle_model.pcd",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 2.0,
|
||||
"nb_points": 10,
|
||||
"radius": 0.1,
|
||||
"nb_neighbors": 20,
|
||||
"std_ratio": 3.0,
|
||||
"voxel_size": 0.010,
|
||||
"ransac_voxel_size": 0.005,
|
||||
"icp_voxel_radius": [0.004, 0.002, 0.001],
|
||||
"icp_max_iter": [50, 30, 14]
|
||||
|
||||
@@ -2,19 +2,25 @@
|
||||
"node_name": "bottle_detect_service",
|
||||
"output_boxes": "True",
|
||||
"output_masks": "False",
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"get_camera_mode": "Service",
|
||||
"Service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
},
|
||||
|
||||
"detect_mode": "Detect",
|
||||
"Detect_configs": {
|
||||
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
|
||||
"confidence": 0.50,
|
||||
"classes": [39]
|
||||
},
|
||||
|
||||
"calculate_mode": "PCA",
|
||||
"PCA_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
|
||||
@@ -2,9 +2,13 @@
|
||||
"node_name": "crossboard_topic",
|
||||
"output_boxes": "True",
|
||||
"output_masks": "False",
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"get_camera_mode": "Topic",
|
||||
"Topic_configs": {
|
||||
"position": "right",
|
||||
@@ -12,10 +16,12 @@
|
||||
"depth_image_topic_name": "/camera/depth/image_raw",
|
||||
"camera_info_topic_name": "/camera/color/camera_info"
|
||||
},
|
||||
|
||||
"detect_mode": "Crossboard",
|
||||
"Crossboard_configs": {
|
||||
"pattern_size": [8, 5]
|
||||
},
|
||||
|
||||
"calculate_mode": "PCA",
|
||||
"PCA_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
|
||||
@@ -2,9 +2,13 @@
|
||||
"node_name": "default_config_detect_service",
|
||||
"output_boxes": "True",
|
||||
"output_masks": "False",
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"get_camera_mode": "Service",
|
||||
"Service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
@@ -15,6 +19,7 @@
|
||||
"depth_image_topic_name": "/camera/depth/image_raw",
|
||||
"camera_info_topic_name": "/camera/color/camera_info"
|
||||
},
|
||||
|
||||
"detect_mode": "Detect",
|
||||
"Detect_configs": {
|
||||
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
|
||||
@@ -28,6 +33,7 @@
|
||||
"Crossboard_configs": {
|
||||
"pattern_size": [8, 5]
|
||||
},
|
||||
|
||||
"calculate_mode": "PCA",
|
||||
"PCA_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
@@ -38,10 +44,7 @@
|
||||
"complete_model_path": "pointclouds/bottle_model.pcd",
|
||||
"depth_scale": 1000.0,
|
||||
"depth_trunc": 2.0,
|
||||
"nb_points": 10,
|
||||
"radius": 0.1,
|
||||
"nb_neighbors": 20,
|
||||
"std_ratio": 3.0,
|
||||
"voxel_size": 0.010,
|
||||
"ransac_voxel_size": 0.005,
|
||||
"icp_voxel_radius": [0.004, 0.002, 0.001],
|
||||
"icp_max_iter": [50, 30, 14]
|
||||
|
||||
@@ -2,19 +2,25 @@
|
||||
"node_name": "detect_service",
|
||||
"output_boxes": "True",
|
||||
"output_masks": "False",
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json",
|
||||
|
||||
"calibration": {
|
||||
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
|
||||
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
|
||||
"head": "configs/hand_eye_mat/eye_to_hand.json"
|
||||
},
|
||||
|
||||
"get_camera_mode": "Service",
|
||||
"Service_configs": {
|
||||
"service_name": "/vision_object_recognition"
|
||||
},
|
||||
|
||||
"detect_mode": "Detect",
|
||||
"Detect_configs": {
|
||||
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
|
||||
"confidence": 0.50,
|
||||
"classes": []
|
||||
},
|
||||
|
||||
"calculate_mode": "PCA",
|
||||
"PCA_configs": {
|
||||
"depth_scale": 1000.0,
|
||||
|
||||
@@ -15,35 +15,11 @@ with open(config_dir, "r") as f:
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
|
||||
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
|
||||
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
|
||||
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
|
||||
DeclareLaunchArgument('head', default_value=configs['eye_to_handhead_mat_path']),
|
||||
|
||||
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
|
||||
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
|
||||
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
|
||||
|
||||
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
|
||||
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
|
||||
DeclareLaunchArgument("ICP_configs", default_value=json.dumps(configs['ICP_configs'])),
|
||||
DeclareLaunchArgument('configs_path', default_value=config_dir),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
left_hand = LaunchConfiguration('left_hand').perform(context)
|
||||
right_hand = LaunchConfiguration('right_hand').perform(context)
|
||||
head = LaunchConfiguration('head').perform(context)
|
||||
|
||||
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
|
||||
detect_mode = LaunchConfiguration('detect_mode').perform(context)
|
||||
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
|
||||
|
||||
Service_configs = LaunchConfiguration('Service_configs').perform(context)
|
||||
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
|
||||
ICP_configs = LaunchConfiguration('ICP_configs').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
@@ -51,19 +27,7 @@ def generate_launch_description():
|
||||
executable='detect_node',
|
||||
name=configs['node_name'],
|
||||
parameters=[{
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'left_hand': left_hand,
|
||||
'right_hand': right_hand,
|
||||
'head': head,
|
||||
|
||||
'get_camera_mode': get_camera_mode,
|
||||
'detect_mode': detect_mode,
|
||||
'calculate_mode': calculate_mode,
|
||||
|
||||
'Service_configs': Service_configs,
|
||||
'Detect_configs': Detect_configs,
|
||||
'ICP_configs': ICP_configs,
|
||||
'configs_path': configs_path,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
@@ -15,35 +15,11 @@ with open(config_dir, "r") as f:
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
|
||||
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
|
||||
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
|
||||
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
|
||||
DeclareLaunchArgument('head', default_value=configs['head']),
|
||||
|
||||
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
|
||||
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
|
||||
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
|
||||
|
||||
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
|
||||
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
|
||||
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
|
||||
DeclareLaunchArgument('configs_path', default_value=config_dir),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
left_hand = LaunchConfiguration('left_hand').perform(context)
|
||||
right_hand = LaunchConfiguration('right_hand').perform(context)
|
||||
head = LaunchConfiguration('head').perform(context)
|
||||
|
||||
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
|
||||
detect_mode = LaunchConfiguration('detect_mode').perform(context)
|
||||
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
|
||||
|
||||
Service_configs = LaunchConfiguration('Service_configs').perform(context)
|
||||
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
|
||||
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
@@ -51,19 +27,7 @@ def generate_launch_description():
|
||||
executable='detect_node',
|
||||
name=configs['node_name'],
|
||||
parameters=[{
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'left_hand': left_hand,
|
||||
'right_hand': right_hand,
|
||||
'head': head,
|
||||
|
||||
'get_camera_mode': get_camera_mode,
|
||||
'detect_mode': detect_mode,
|
||||
'calculate_mode': calculate_mode,
|
||||
|
||||
'Service_configs': Service_configs,
|
||||
'Detect_configs': Detect_configs,
|
||||
'PCA_configs': PCA_configs,
|
||||
'configs_path': configs_path,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
@@ -15,36 +15,11 @@ with open(config_dir, "r") as f:
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
|
||||
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
|
||||
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
|
||||
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
|
||||
DeclareLaunchArgument('head', default_value=configs['head']),
|
||||
|
||||
|
||||
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
|
||||
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
|
||||
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
|
||||
|
||||
DeclareLaunchArgument("Topic_configs", default_value=json.dumps(configs['Topic_configs'])),
|
||||
DeclareLaunchArgument("Crossboard_configs", default_value=json.dumps(configs['Crossboard_configs'])),
|
||||
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
|
||||
DeclareLaunchArgument('configs_path', default_value=config_dir),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
left_hand = LaunchConfiguration('left_hand').perform(context)
|
||||
right_hand = LaunchConfiguration('right_hand').perform(context)
|
||||
head = LaunchConfiguration('head').perform(context)
|
||||
|
||||
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
|
||||
detect_mode = LaunchConfiguration('detect_mode').perform(context)
|
||||
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
|
||||
|
||||
Topic_configs = LaunchConfiguration('Topic_configs').perform(context)
|
||||
Crossboard_configs = LaunchConfiguration('Crossboard_configs').perform(context)
|
||||
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
@@ -52,19 +27,7 @@ def generate_launch_description():
|
||||
executable='detect_node',
|
||||
name=configs['node_name'],
|
||||
parameters=[{
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'left_hand': left_hand,
|
||||
'right_hand': right_hand,
|
||||
'head': head,
|
||||
|
||||
'get_camera_mode': get_camera_mode,
|
||||
'detect_mode': detect_mode,
|
||||
'calculate_mode': calculate_mode,
|
||||
|
||||
'Topic_configs': Topic_configs,
|
||||
'Crossboard_configs': Crossboard_configs,
|
||||
'PCA_configs': PCA_configs,
|
||||
'configs_path': configs_path,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
@@ -15,35 +15,11 @@ with open(config_dir, "r") as f:
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
|
||||
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
|
||||
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
|
||||
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
|
||||
DeclareLaunchArgument('head', default_value=configs['head']),
|
||||
|
||||
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
|
||||
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
|
||||
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
|
||||
|
||||
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
|
||||
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
|
||||
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
|
||||
DeclareLaunchArgument('configs_path', default_value=config_dir),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
left_hand = LaunchConfiguration('left_hand').perform(context)
|
||||
right_hand = LaunchConfiguration('right_hand').perform(context)
|
||||
head = LaunchConfiguration('head').perform(context)
|
||||
|
||||
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
|
||||
detect_mode = LaunchConfiguration('detect_mode').perform(context)
|
||||
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
|
||||
|
||||
Service_configs = LaunchConfiguration('Service_configs').perform(context)
|
||||
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
|
||||
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
@@ -51,19 +27,7 @@ def generate_launch_description():
|
||||
executable='detect_node',
|
||||
name=configs['node_name'],
|
||||
parameters=[{
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'left_hand': left_hand,
|
||||
'right_hand': right_hand,
|
||||
'head': head,
|
||||
|
||||
'get_camera_mode': get_camera_mode,
|
||||
'detect_mode': detect_mode,
|
||||
'calculate_mode': calculate_mode,
|
||||
|
||||
'Service_configs': Service_configs,
|
||||
'Detect_configs': Detect_configs,
|
||||
'PCA_configs': PCA_configs,
|
||||
'configs_path': configs_path,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
@@ -13,74 +13,14 @@ config_dir = os.path.join(share_dir, 'configs/launch_configs/default_config.json
|
||||
with open(config_dir, "r") as f:
|
||||
configs = json.load(f)
|
||||
|
||||
# def get_param_configs():
|
||||
# param_configs = {}
|
||||
# if configs["get_camera_mode"] == "Service":
|
||||
# param_configs.update(configs["Service_configs"])
|
||||
# elif configs["get_camera_mode"] == "Topic":
|
||||
# param_configs.update(configs["Topic_configs"])
|
||||
# else:
|
||||
# param_configs.update(configs["Service_configs"])
|
||||
#
|
||||
# if configs["detect_mode"] == "Detect":
|
||||
# param_configs.update(configs["Detect_configs"])
|
||||
# elif configs["detect_mode"] == "Color":
|
||||
# param_configs.update(configs["Color_configs"])
|
||||
# elif configs["detect_mode"] == "Crossboard":
|
||||
# param_configs.update(configs["Crossboard_configs"])
|
||||
# else:
|
||||
# param_configs.update(configs["Detect_configs"])
|
||||
#
|
||||
# if configs["calculate_mode"] == "PCA":
|
||||
# param_configs.update(configs["PCA_configs"])
|
||||
# elif configs["calculate_mode"] == "ICP":
|
||||
# param_configs.update(configs["ICP_configs"])
|
||||
# else:
|
||||
# param_configs.update(configs["PCA_configs"])
|
||||
# return param_configs
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
args_detect = [
|
||||
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
|
||||
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
|
||||
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
|
||||
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
|
||||
DeclareLaunchArgument('head', default_value=configs['head']),
|
||||
|
||||
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
|
||||
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
|
||||
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
|
||||
|
||||
# DeclareLaunchArgument("param_configs", default_value=json.dumps(get_param_configs())),
|
||||
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
|
||||
DeclareLaunchArgument("Topic_configs", default_value=json.dumps(configs['Topic_configs'])),
|
||||
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
|
||||
DeclareLaunchArgument("Color_configs", default_value=json.dumps(configs['Color_configs'])),
|
||||
DeclareLaunchArgument("Crossboard_configs", default_value=json.dumps(configs['Crossboard_configs'])),
|
||||
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
|
||||
DeclareLaunchArgument("ICP_configs", default_value=json.dumps(configs['ICP_configs'])),
|
||||
DeclareLaunchArgument('configs_path', default_value=config_dir),
|
||||
]
|
||||
|
||||
def create_detect_node(context):
|
||||
output_boxes = LaunchConfiguration('output_boxes').perform(context)
|
||||
output_masks = LaunchConfiguration('output_masks').perform(context)
|
||||
left_hand = LaunchConfiguration('left_hand').perform(context)
|
||||
right_hand = LaunchConfiguration('right_hand').perform(context)
|
||||
head = LaunchConfiguration('head').perform(context)
|
||||
|
||||
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
|
||||
detect_mode = LaunchConfiguration('detect_mode').perform(context)
|
||||
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
|
||||
|
||||
# param_configs = LaunchConfiguration('param_configs').perform(context)
|
||||
Service_configs = LaunchConfiguration('Service_configs').perform(context)
|
||||
Topic_configs = LaunchConfiguration('Topic_configs').perform(context)
|
||||
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
|
||||
Color_configs = LaunchConfiguration('Color_configs').perform(context)
|
||||
Crossboard_configs = LaunchConfiguration('Crossboard_configs').perform(context)
|
||||
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
|
||||
ICP_configs = LaunchConfiguration('ICP_configs').perform(context)
|
||||
configs_path = LaunchConfiguration('configs_path').perform(context)
|
||||
|
||||
return [
|
||||
Node(
|
||||
@@ -88,24 +28,7 @@ def generate_launch_description():
|
||||
executable='detect_node',
|
||||
name=configs['node_name'],
|
||||
parameters=[{
|
||||
'output_boxes': output_boxes.lower() == 'true',
|
||||
'output_masks': output_masks.lower() == 'true',
|
||||
'left_hand': left_hand,
|
||||
'right_hand': right_hand,
|
||||
'head': head,
|
||||
|
||||
'get_camera_mode': get_camera_mode,
|
||||
'detect_mode': detect_mode,
|
||||
'calculate_mode': calculate_mode,
|
||||
|
||||
# 'param_configs': param_configs,
|
||||
'Service_configs': Service_configs,
|
||||
'Topic_configs': Topic_configs,
|
||||
'Detect_configs': Detect_configs,
|
||||
'Color_configs': Color_configs,
|
||||
'Crossboard_configs': Crossboard_configs,
|
||||
'PCA_configs': PCA_configs,
|
||||
'ICP_configs': ICP_configs,
|
||||
'configs_path': configs_path,
|
||||
}]
|
||||
)
|
||||
]
|
||||
|
||||
154
vision_detect/vision_detect/VisionDetect/node/config_node.py
Normal file
154
vision_detect/vision_detect/VisionDetect/node/config_node.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import os
|
||||
import json
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
from rclpy.node import Node
|
||||
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
|
||||
from ..utils import read_calibration_mat
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
__all__ = [
|
||||
"ConfigBase"
|
||||
]
|
||||
|
||||
class ConfigBase(Node):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
"""init parameter"""
|
||||
self.confidence = None
|
||||
self.depth_image_topic_name = None
|
||||
self.color_image_topic_name = None
|
||||
self.camera_info_topic_name = None
|
||||
self.service_name = None
|
||||
|
||||
self.pattern_size = None
|
||||
self.device = None
|
||||
self.configs = None
|
||||
self.calculate_configs = None
|
||||
self.checkpoint_path = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
|
||||
self.function = None
|
||||
self.source = None
|
||||
|
||||
self.server = None
|
||||
self.model = None
|
||||
|
||||
self.k = self.d = None
|
||||
self.eye_in_left_hand_mat = None
|
||||
self.eye_in_right_hand_mat = None
|
||||
self.eye_to_hand_mat = None
|
||||
self.hand_eye_mat = None
|
||||
|
||||
self.get_camera_mode = None
|
||||
self.detect_mode = None
|
||||
self.calculate_mode = None
|
||||
|
||||
self.camera_size = None
|
||||
self.position = None
|
||||
|
||||
self.map1 = self.map2 = None
|
||||
|
||||
self.calculate_function = None
|
||||
self.fx = self.fy = 0.5
|
||||
self.camera_data = {}
|
||||
self.distance = 1500
|
||||
self.color_range = [
|
||||
[[0, 120, 70], [10, 255, 255]],
|
||||
[[170, 120, 70], [180, 255, 255]]
|
||||
]
|
||||
self._get_param()
|
||||
|
||||
def _get_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter(
|
||||
'configs_path',
|
||||
os.path.join(share_dir, "configs/launch_configs/default_config.json")
|
||||
)
|
||||
configs_path = self.get_parameter('configs_path').value
|
||||
with open(configs_path, 'r') as f:
|
||||
self.configs = json.load(f)
|
||||
|
||||
self.output_boxes = self.configs['output_boxes'].lower() == "true"
|
||||
self.output_masks = self.configs['output_masks'].lower() == "true"
|
||||
|
||||
self.get_camera_mode = self.configs['get_camera_mode']
|
||||
self.detect_mode = self.configs['detect_mode']
|
||||
self.calculate_mode = self.configs['calculate_mode']
|
||||
|
||||
self._read_calibration_mat()
|
||||
|
||||
if self.get_camera_mode == "Service":
|
||||
self.service_name = self.configs["Service_configs"]["service_name"]
|
||||
elif self.get_camera_mode == "Topic":
|
||||
topic_configs = self.configs['Topic_configs']
|
||||
self.color_image_topic_name = topic_configs["color_image_topic_name"]
|
||||
self.depth_image_topic_name = topic_configs["depth_image_topic_name"]
|
||||
self.camera_info_topic_name = topic_configs["camera_info_topic_name"]
|
||||
self.position = topic_configs["position"]
|
||||
else:
|
||||
self.service_name = self.configs["Service"]["service_name"]
|
||||
|
||||
if self.detect_mode == 'Detect':
|
||||
detect_configs = self.configs['Detect_configs']
|
||||
self.confidence = detect_configs["confidence"]
|
||||
self.classes = detect_configs["classes"]
|
||||
if not self.classes:
|
||||
self.classes = None
|
||||
self.checkpoint_path = os.path.join(share_dir, detect_configs["checkpoint_path"])
|
||||
elif self.detect_mode == 'Color':
|
||||
self.color_range = self.configs["Color_configs"]["color_range"]
|
||||
self.distance = self.configs["Color_configs"]["distance"]
|
||||
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in
|
||||
self.color_range]
|
||||
elif self.detect_mode == 'Crossboard':
|
||||
self.pattern_size = self.configs["Crossboard_configs"]["pattern_size"]
|
||||
else:
|
||||
self.get_logger().warning("Unknown mode, use detect")
|
||||
detect_configs = self.configs['Detect_configs']
|
||||
self.confidence = detect_configs["confidence"]
|
||||
self.classes = detect_configs["classes"]
|
||||
if not self.classes:
|
||||
self.classes = None
|
||||
self.checkpoint_path = detect_configs["checkpoint_path"]
|
||||
|
||||
if self.calculate_mode == "PCA":
|
||||
self.calculate_configs = self.configs['PCA_configs']
|
||||
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
|
||||
self.calculate_configs = self.configs['ICA_configs']
|
||||
source = o3d.io.read_point_cloud(
|
||||
os.path.join(share_dir, self.calculate_configs['complete_model_path'])
|
||||
)
|
||||
self.calculate_configs["source"] = source
|
||||
|
||||
else:
|
||||
self.get_logger().warning("Unknown calculate_mode, use PCA")
|
||||
self.calculate_configs = self.configs['PCA_configs']
|
||||
|
||||
self.get_logger().info("Get parameters done")
|
||||
|
||||
def _read_calibration_mat(self):
|
||||
eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
|
||||
eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
|
||||
eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
|
||||
|
||||
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"left_mat: {info}")
|
||||
|
||||
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"right_mat: {info}")
|
||||
|
||||
self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"head_mat: {info}")
|
||||
|
||||
self.get_logger().info("Read calibration mat file done")
|
||||
|
||||
@@ -1,285 +1,29 @@
|
||||
"""Vision Detect Node"""
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import json
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
import time
|
||||
|
||||
import cv2
|
||||
import torch
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
from ultralytics import YOLO
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.task import Future
|
||||
from cv_bridge import CvBridge
|
||||
from message_filters import Subscriber, ApproximateTimeSynchronizer
|
||||
from sensor_msgs.msg import Image, CameraInfo
|
||||
from sensor_msgs.msg import CameraInfo
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion
|
||||
|
||||
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID, ImgMsg
|
||||
from interfaces.srv import VisionObjectRecognition
|
||||
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
|
||||
|
||||
from ..utils import *
|
||||
from .init_node import InitBase
|
||||
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
class DetectNode(Node):
|
||||
class DetectNode(InitBase):
|
||||
"""Detect Node"""
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.device = None
|
||||
self.checkpoint_path = None
|
||||
self.output_boxes = None
|
||||
self.output_masks = None
|
||||
|
||||
self.function = None
|
||||
self.source = None
|
||||
|
||||
self.server = None
|
||||
self.model = None
|
||||
|
||||
self.k = self.d = None
|
||||
self.eye_in_left_hand_mat = None
|
||||
self.eye_in_right_hand_mat = None
|
||||
self.eye_to_hand_mat = None
|
||||
self.hand_eye_mat = None
|
||||
|
||||
self.eye_in_left_hand_path = None
|
||||
self.eye_in_right_hand_path = None
|
||||
self.eye_to_hand_path = None
|
||||
|
||||
self.get_camera_mode = None
|
||||
self.detect_mode = None
|
||||
self.calculate_mode = None
|
||||
|
||||
self.camera_size = None
|
||||
|
||||
self.map1 = self.map2 = None
|
||||
|
||||
self.calculate_function = calculate_pose_pca
|
||||
self.fx = self.fy = 0.5
|
||||
self.camera_data = {}
|
||||
self.future = Future()
|
||||
self.cv_bridge = CvBridge()
|
||||
self.lock = threading.Lock()
|
||||
self.distance = 1500
|
||||
self.color_range = [
|
||||
[[0, 120, 70], [10, 255, 255]],
|
||||
[[170, 120, 70], [180, 255, 255]]
|
||||
]
|
||||
|
||||
'''init'''
|
||||
self._init_param()
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
|
||||
self.get_logger().info("Initialize done")
|
||||
|
||||
def _init_param(self):
|
||||
"""init parameter"""
|
||||
self.declare_parameter('output_boxes', True)
|
||||
self.declare_parameter('output_masks', False)
|
||||
|
||||
self.output_boxes = self.get_parameter('output_boxes').value
|
||||
self.output_masks = self.get_parameter('output_masks').value
|
||||
|
||||
self.declare_parameter('get_camera_mode', 'service')
|
||||
self.declare_parameter('detect_mode', 'detect')
|
||||
self.declare_parameter('calculate_mode', 'pca')
|
||||
|
||||
self.get_camera_mode = self.get_parameter('get_camera_mode').value
|
||||
self.detect_mode = self.get_parameter('detect_mode').value
|
||||
self.calculate_mode = self.get_parameter("calculate_mode").value
|
||||
|
||||
self.declare_parameter('left_hand', 'configs/hand_eye_mat/hand_in_left_hand.json')
|
||||
self.declare_parameter('right_hand', 'configs/hand_eye_mat/hand_in_right_hand.json')
|
||||
self.declare_parameter('head', 'configs/hand_eye_mat/hand_to_hand.json')
|
||||
|
||||
self.declare_parameter('Service_configs', '{}')
|
||||
self.declare_parameter('Topic_configs', '{}')
|
||||
|
||||
self.declare_parameter('Detect_configs', '{}')
|
||||
self.declare_parameter('Color_configs', '{}')
|
||||
self.declare_parameter('Crossboard_configs', '{}')
|
||||
|
||||
self.declare_parameter('PCA_configs', '{}')
|
||||
self.declare_parameter('ICA_configs', '{}')
|
||||
|
||||
self._init_json_file()
|
||||
if self.get_camera_mode == "Service":
|
||||
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
|
||||
self._init_service()
|
||||
elif self.get_camera_mode == "Topic":
|
||||
self.declare_parameter('Topic_configs', '{}')
|
||||
self.color_image_topic_name = json.loads(self.get_parameter('Topic_configs').value)["color_image_topic_name"]
|
||||
self.depth_image_topic_name = json.loads(self.get_parameter('Topic_configs').value)["depth_image_topic_name"]
|
||||
self.camera_info_topic_name = json.loads(self.get_parameter('Topic_configs').value)["camera_info_topic_name"]
|
||||
self.position = json.loads(self.get_parameter('Topic_configs').value)["position"]
|
||||
if self.position == 'left':
|
||||
self.hand_eye_mat = self.eye_in_left_hand_mat
|
||||
elif self.position == 'right':
|
||||
self.hand_eye_mat = self.eye_in_right_hand_mat
|
||||
else:
|
||||
self.hand_eye_mat = self.eye_to_hand_mat
|
||||
else:
|
||||
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
|
||||
|
||||
if self.detect_mode == 'Detect':
|
||||
self.function = self._seg_image
|
||||
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
|
||||
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
|
||||
if not self.classes:
|
||||
self.classes = None
|
||||
self.checkpoint_path = os.path.join(share_dir, json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"])
|
||||
self._init_model()
|
||||
elif self.detect_mode == 'Color':
|
||||
self.function = self._seg_color
|
||||
self.color_range = json.loads(self.get_parameter('Color_configs').value)["color_range"]
|
||||
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in self.color_range]
|
||||
elif self.detect_mode == 'Crossboard':
|
||||
self.function = self._seg_crossboard
|
||||
self.pattern_size = json.loads(self.get_parameter('Crossboard_configs').value)["pattern_size"]
|
||||
else:
|
||||
self.get_logger().warning("Unknown mode, use detect")
|
||||
self.function = self._seg_image
|
||||
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
|
||||
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
|
||||
if not self.classes:
|
||||
self.classes = None
|
||||
self.checkpoint_path = json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"]
|
||||
self._init_model()
|
||||
|
||||
if self.calculate_mode == "PCA":
|
||||
self.configs = json.loads(self.get_parameter('PCA_configs').value)
|
||||
self.calculate_function = calculate_pose_pca
|
||||
elif self.calculate_mode == "ICP":
|
||||
self.configs = json.loads(self.get_parameter('ICA_configs').value)
|
||||
source = o3d.io.read_point_cloud(
|
||||
os.path.join(share_dir, self.configs['complete_model_path'])
|
||||
)
|
||||
self.configs["source"] = source
|
||||
self.calculate_function = calculate_pose_icp
|
||||
|
||||
else:
|
||||
self.get_logger().warning("Unknown calculate_mode, use PCA")
|
||||
self.configs = json.loads(self.get_parameter('PCA_configs').value)
|
||||
self.calculate_function = calculate_pose_pca
|
||||
|
||||
self.get_logger().info("Initialize parameters done")
|
||||
|
||||
def _init_json_file(self):
|
||||
self.eye_in_left_hand_path = os.path.join(
|
||||
share_dir, self.get_parameter('left_hand').value
|
||||
)
|
||||
self.eye_in_right_hand_path = os.path.join(
|
||||
share_dir, self.get_parameter('right_hand').value
|
||||
)
|
||||
self.eye_to_hand_path = os.path.join(
|
||||
share_dir, self.get_parameter('head').value
|
||||
)
|
||||
|
||||
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(self.eye_in_left_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"left_mat: {info}")
|
||||
|
||||
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(self.eye_in_right_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"right_mat: {info}")
|
||||
|
||||
self.eye_to_hand_mat, info, sign = read_calibration_mat(self.eye_to_hand_path)
|
||||
if not sign:
|
||||
self.get_logger().warning(f"head_mat: {info}")
|
||||
|
||||
self.get_logger().info("Initialize read json file done")
|
||||
|
||||
def _init_model(self):
|
||||
"""init model"""
|
||||
if self.checkpoint_path.endswith('-seg.pt'):
|
||||
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
try:
|
||||
self.model = YOLO(self.checkpoint_path).to(device_model)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to load YOLO model: {e}')
|
||||
raise
|
||||
self.function = self._seg_image
|
||||
|
||||
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
|
||||
self.model = YOLO(self.checkpoint_path)
|
||||
self.function = self._seg_image
|
||||
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
|
||||
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
|
||||
|
||||
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
|
||||
|
||||
self.get_logger().info("Initialize model done")
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init publisher"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
if self.get_camera_mode == "Topic":
|
||||
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose', 10)
|
||||
|
||||
self.get_logger().info("Initialize publisher done")
|
||||
|
||||
def _init_service(self):
|
||||
"""init service server"""
|
||||
self.server = self.create_service(
|
||||
VisionObjectRecognition,
|
||||
self.service_name,
|
||||
self._service_callback
|
||||
)
|
||||
self.get_logger().info("Initialize service done")
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init subscriber"""
|
||||
if self.get_camera_mode == "Service":
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._service_sub_callback,
|
||||
10
|
||||
)
|
||||
elif self.get_camera_mode == "Topic":
|
||||
self.sub_camera_info = self.create_subscription(
|
||||
CameraInfo,
|
||||
self.camera_info_topic_name,
|
||||
self._camera_info_callback,
|
||||
10
|
||||
)
|
||||
|
||||
self.get_logger().info("Waiting for camera info...")
|
||||
rclpy.spin_until_future_complete(self, self.future)
|
||||
self.get_logger().info("Camera info received.")
|
||||
|
||||
# sync get color and depth img
|
||||
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
|
||||
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
|
||||
|
||||
self.sync_subscriber = ApproximateTimeSynchronizer(
|
||||
[self.sub_color_image, self.sub_depth_image],
|
||||
queue_size=10,
|
||||
slop=0.1
|
||||
)
|
||||
self.sync_subscriber.registerCallback(self._sync_sub_callback)
|
||||
else:
|
||||
self.get_logger().warning("get_camera_mode is wrong, use service")
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._service_sub_callback,
|
||||
10
|
||||
)
|
||||
|
||||
self.get_logger().info("Initialize subscriber done")
|
||||
|
||||
def _camera_info_callback(self, msg: CameraInfo):
|
||||
"""Get camera info"""
|
||||
@@ -450,7 +194,7 @@ class DetectNode(Node):
|
||||
mask_crop,
|
||||
depth_crop,
|
||||
intrinsics,
|
||||
**self.configs
|
||||
**self.calculate_configs
|
||||
)
|
||||
if rmat is None:
|
||||
self.get_logger().warning("Object point cloud have too many noise")
|
||||
@@ -539,8 +283,11 @@ class DetectNode(Node):
|
||||
mask_crop,
|
||||
depth_crop,
|
||||
intrinsics,
|
||||
**self.configs
|
||||
**self.calculate_configs
|
||||
)
|
||||
if rmat is None:
|
||||
self.get_logger().warning("Color Area point cloud have too many noise")
|
||||
return None, None
|
||||
|
||||
rmat = self.hand_eye_mat @ rmat
|
||||
|
||||
@@ -604,9 +351,13 @@ class DetectNode(Node):
|
||||
mask_crop,
|
||||
depth_crop,
|
||||
intrinsics,
|
||||
**self.configs
|
||||
**self.calculate_configs
|
||||
)
|
||||
|
||||
if rmat is None:
|
||||
self.get_logger().warning("Corssboard point cloud have too many noise")
|
||||
return None, None
|
||||
|
||||
x, y, z, rw, rx, ry, rz = rmat2quat(rmat)
|
||||
|
||||
self.get_logger().info(f"{x}, {y}, {z}, {rw}, {rx}, {ry}, {rz}")
|
||||
|
||||
170
vision_detect/vision_detect/VisionDetect/node/init_node.py
Normal file
170
vision_detect/vision_detect/VisionDetect/node/init_node.py
Normal file
@@ -0,0 +1,170 @@
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
import torch
|
||||
import numpy as np
|
||||
from ultralytics import YOLO
|
||||
|
||||
import rclpy
|
||||
from rclpy.task import Future
|
||||
from message_filters import Subscriber, ApproximateTimeSynchronizer
|
||||
from sensor_msgs.msg import Image, CameraInfo
|
||||
|
||||
from interfaces.msg import PoseArrayClassAndID, ImgMsg
|
||||
from interfaces.srv import VisionObjectRecognition
|
||||
|
||||
from ..utils import calculate_pose_pca, calculate_pose_icp
|
||||
from .config_node import ConfigBase
|
||||
|
||||
share_dir = get_package_share_directory('vision_detect')
|
||||
|
||||
class InitBase(ConfigBase):
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
self.future = Future()
|
||||
|
||||
if self.get_camera_mode == "Service":
|
||||
self._init_service()
|
||||
elif self.get_camera_mode == "Topic":
|
||||
if self.position == 'left':
|
||||
self.hand_eye_mat = self.eye_in_left_hand_mat
|
||||
elif self.position == 'right':
|
||||
self.hand_eye_mat = self.eye_in_right_hand_mat
|
||||
else:
|
||||
self.hand_eye_mat = self.eye_to_hand_mat
|
||||
else:
|
||||
self._init_service()
|
||||
|
||||
if self.detect_mode == 'Detect':
|
||||
self.function = self._seg_image
|
||||
if not self.classes:
|
||||
self.classes = None
|
||||
self._init_model()
|
||||
elif self.detect_mode == 'Color':
|
||||
self.function = self._seg_color
|
||||
elif self.detect_mode == 'Crossboard':
|
||||
self.function = self._seg_crossboard
|
||||
else:
|
||||
self.function = self._seg_image
|
||||
if not self.classes:
|
||||
self.classes = None
|
||||
self._init_model()
|
||||
|
||||
if self.calculate_mode == "PCA":
|
||||
self.calculate_function = calculate_pose_pca
|
||||
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
|
||||
self.calculate_function = calculate_pose_icp
|
||||
else:
|
||||
self.calculate_function = calculate_pose_pca
|
||||
|
||||
self._init_publisher()
|
||||
self._init_subscriber()
|
||||
|
||||
self.get_logger().info("Initialize done")
|
||||
|
||||
def _init_model(self):
|
||||
"""init model"""
|
||||
if self.checkpoint_path.endswith('-seg.pt'):
|
||||
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
try:
|
||||
self.model = YOLO(self.checkpoint_path).to(device_model)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to load YOLO model: {e}')
|
||||
raise
|
||||
|
||||
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
|
||||
self.model = YOLO(self.checkpoint_path)
|
||||
|
||||
else:
|
||||
self.function = None
|
||||
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
|
||||
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
|
||||
|
||||
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
|
||||
|
||||
self.get_logger().info("Initialize model done")
|
||||
|
||||
def _init_publisher(self):
|
||||
"""init publisher"""
|
||||
if self.output_boxes or self.output_masks:
|
||||
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
|
||||
|
||||
if self.get_camera_mode == "Topic":
|
||||
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose',
|
||||
10)
|
||||
|
||||
self.get_logger().info("Initialize publisher done")
|
||||
|
||||
def _init_service(self):
|
||||
"""init service server"""
|
||||
self.server = self.create_service(
|
||||
VisionObjectRecognition,
|
||||
self.service_name,
|
||||
self._service_callback
|
||||
)
|
||||
self.get_logger().info("Initialize service done")
|
||||
|
||||
def _init_subscriber(self):
|
||||
"""init subscriber"""
|
||||
if self.get_camera_mode == "Service":
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._service_sub_callback,
|
||||
10
|
||||
)
|
||||
elif self.get_camera_mode == "Topic":
|
||||
self.sub_camera_info = self.create_subscription(
|
||||
CameraInfo,
|
||||
self.camera_info_topic_name,
|
||||
self._camera_info_callback,
|
||||
10
|
||||
)
|
||||
|
||||
self.get_logger().info("Waiting for camera info...")
|
||||
rclpy.spin_until_future_complete(self, self.future)
|
||||
self.get_logger().info("Camera info received.")
|
||||
|
||||
# sync get color and depth img
|
||||
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
|
||||
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
|
||||
|
||||
self.sync_subscriber = ApproximateTimeSynchronizer(
|
||||
[self.sub_color_image, self.sub_depth_image],
|
||||
queue_size=10,
|
||||
slop=0.1
|
||||
)
|
||||
self.sync_subscriber.registerCallback(self._sync_sub_callback)
|
||||
else:
|
||||
self.get_logger().warning("get_camera_mode is wrong, use service")
|
||||
self.sub_img = self.create_subscription(
|
||||
ImgMsg,
|
||||
"/img_msg",
|
||||
self._service_sub_callback,
|
||||
10
|
||||
)
|
||||
|
||||
self.get_logger().info("Initialize subscriber done")
|
||||
|
||||
def _camera_info_callback(self, msg: CameraInfo):
|
||||
pass
|
||||
|
||||
def _service_sub_callback(self, msgs):
|
||||
pass
|
||||
|
||||
def _sync_sub_callback(self, color_img_ros, depth_img_ros):
|
||||
pass
|
||||
|
||||
def _service_callback(self, request, response):
|
||||
pass
|
||||
|
||||
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
|
||||
pass
|
||||
|
||||
def _seg_color(self, rgb_img: np.ndarray, depth_img: np.ndarray):
|
||||
pass
|
||||
|
||||
def _seg_crossboard(self, rgb_img, depth_img):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user