结构变更

This commit is contained in:
liangyuxuan
2025-12-05 14:28:10 +08:00
parent 927f3c728c
commit efcd598565
3 changed files with 329 additions and 248 deletions

View File

@@ -0,0 +1,154 @@
import os
import json
from ament_index_python.packages import get_package_share_directory
from rclpy.node import Node
import numpy as np
import open3d as o3d
from ..utils import read_calibration_mat
share_dir = get_package_share_directory('vision_detect')
__all__ = [
"ConfigBase"
]
class ConfigBase(Node):
def __init__(self, name):
super().__init__(name)
"""init parameter"""
self.confidence = None
self.depth_image_topic_name = None
self.color_image_topic_name = None
self.camera_info_topic_name = None
self.service_name = None
self.pattern_size = None
self.device = None
self.configs = None
self.calculate_configs = None
self.checkpoint_path = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.source = None
self.server = None
self.model = None
self.k = self.d = None
self.eye_in_left_hand_mat = None
self.eye_in_right_hand_mat = None
self.eye_to_hand_mat = None
self.hand_eye_mat = None
self.get_camera_mode = None
self.detect_mode = None
self.calculate_mode = None
self.camera_size = None
self.position = None
self.map1 = self.map2 = None
self.calculate_function = None
self.fx = self.fy = 0.5
self.camera_data = {}
self.distance = 1500
self.color_range = [
[[0, 120, 70], [10, 255, 255]],
[[170, 120, 70], [180, 255, 255]]
]
self._get_param()
def _get_param(self):
"""init parameter"""
self.declare_parameter(
'configs_path',
os.path.join(share_dir, "configs/launch_configs/default_config.json")
)
configs_path = self.get_parameter('configs_path').value
with open(configs_path, 'r') as f:
self.configs = json.load(f)
self.output_boxes = self.configs['output_boxes'].lower() == "true"
self.output_masks = self.configs['output_masks'].lower() == "true"
self.get_camera_mode = self.configs['get_camera_mode']
self.detect_mode = self.configs['detect_mode']
self.calculate_mode = self.configs['calculate_mode']
self._read_calibration_mat()
if self.get_camera_mode == "Service":
self.service_name = self.configs["Service_configs"]["service_name"]
elif self.get_camera_mode == "Topic":
topic_configs = self.configs['Topic_configs']
self.color_image_topic_name = topic_configs["color_image_topic_name"]
self.depth_image_topic_name = topic_configs["depth_image_topic_name"]
self.camera_info_topic_name = topic_configs["camera_info_topic_name"]
self.position = topic_configs["position"]
else:
self.service_name = self.configs["Service"]["service_name"]
if self.detect_mode == 'Detect':
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = os.path.join(share_dir, detect_configs["checkpoint_path"])
elif self.detect_mode == 'Color':
self.color_range = self.configs["Color_configs"]["color_range"]
self.distance = self.configs["Color_configs"]["distance"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in
self.color_range]
elif self.detect_mode == 'Crossboard':
self.pattern_size = self.configs["Crossboard_configs"]["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = detect_configs["checkpoint_path"]
if self.calculate_mode == "PCA":
self.calculate_configs = self.configs['PCA_configs']
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
self.calculate_configs = self.configs['ICA_configs']
source = o3d.io.read_point_cloud(
os.path.join(share_dir, self.calculate_configs['complete_model_path'])
)
self.calculate_configs["source"] = source
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.calculate_configs = self.configs['PCA_configs']
self.get_logger().info("Get parameters done")
def _read_calibration_mat(self):
eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
if not sign:
self.get_logger().warning(f"left_mat: {info}")
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
if not sign:
self.get_logger().warning(f"right_mat: {info}")
self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
if not sign:
self.get_logger().warning(f"head_mat: {info}")
self.get_logger().info("Read calibration mat file done")

View File

@@ -1,272 +1,29 @@
"""Vision Detect Node"""
import os
import time
import threading
import json
from ament_index_python.packages import get_package_share_directory
import time
import cv2
import torch
import numpy as np
import open3d as o3d
from ultralytics import YOLO
import rclpy
from rclpy.node import Node
from rclpy.task import Future
from cv_bridge import CvBridge
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from sensor_msgs.msg import CameraInfo
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID, ImgMsg
from interfaces.srv import VisionObjectRecognition
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
from ..utils import *
from .init_node import InitBase
share_dir = get_package_share_directory('vision_detect')
class DetectNode(Node):
class DetectNode(InitBase):
"""Detect Node"""
def __init__(self, name):
super().__init__(name)
self.device = None
self.configs = None
self.calculate_configs = None
self.checkpoint_path = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.source = None
self.server = None
self.model = None
self.k = self.d = None
self.eye_in_left_hand_mat = None
self.eye_in_right_hand_mat = None
self.eye_to_hand_mat = None
self.hand_eye_mat = None
self.eye_in_left_hand_path = None
self.eye_in_right_hand_path = None
self.eye_to_hand_path = None
self.get_camera_mode = None
self.detect_mode = None
self.calculate_mode = None
self.camera_size = None
self.map1 = self.map2 = None
self.calculate_function = calculate_pose_pca
self.fx = self.fy = 0.5
self.camera_data = {}
self.future = Future()
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
self.distance = 1500
self.color_range = [
[[0, 120, 70], [10, 255, 255]],
[[170, 120, 70], [180, 255, 255]]
]
'''init'''
self._init_param()
self._init_publisher()
self._init_subscriber()
self.get_logger().info("Initialize done")
def _init_param(self):
"""init parameter"""
self.declare_parameter(
'configs_path',
os.path.join(share_dir, "configs/launch_configs/default_config.json")
)
configs_path = self.get_parameter('configs_path').value
with open(configs_path, 'r') as f:
self.configs = json.load(f)
self.output_boxes = self.configs['output_boxes'].lower() == "true"
self.output_masks = self.configs['output_masks'].lower() == "true"
self.get_camera_mode = self.configs['get_camera_mode']
self.detect_mode = self.configs['detect_mode']
self.calculate_mode = self.configs['calculate_mode']
self._init_json_file()
if self.get_camera_mode == "Service":
self.service_name = self.configs["Service_configs"]["service_name"]
self._init_service()
elif self.get_camera_mode == "Topic":
topic_configs = self.configs['Topic_configs']
self.color_image_topic_name = topic_configs["color_image_topic_name"]
self.depth_image_topic_name = topic_configs["depth_image_topic_name"]
self.camera_info_topic_name = topic_configs["camera_info_topic_name"]
self.position = topic_configs["position"]
if self.position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif self.position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self.service_name = self.configs["Service"]["service_name"]
if self.detect_mode == 'Detect':
self.function = self._seg_image
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = os.path.join(share_dir, detect_configs["checkpoint_path"])
self._init_model()
elif self.detect_mode == 'Color':
self.function = self._seg_color
self.color_range = self.configs["Color_configs"]["color_range"]
self.distance = self.configs["Color_configs"]["distance"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in self.color_range]
elif self.detect_mode == 'Crossboard':
self.function = self._seg_crossboard
self.pattern_size = self.configs["Crossboard_configs"]["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
self.function = self._seg_image
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = detect_configs["checkpoint_path"]
self._init_model()
if self.calculate_mode == "PCA":
self.calculate_configs = self.configs['PCA_configs']
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP":
self.calculate_configs = self.configs['ICA_configs']
source = o3d.io.read_point_cloud(
os.path.join(share_dir, self.calculate_configs['complete_model_path'])
)
self.calculate_configs["source"] = source
self.calculate_function = calculate_pose_icp
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.calculate_configs = self.configs['PCA_configs']
self.calculate_function = calculate_pose_pca
self.get_logger().info("Initialize parameters done")
def _init_json_file(self):
self.eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
self.eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
self.eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(self.eye_in_left_hand_path)
if not sign:
self.get_logger().warning(f"left_mat: {info}")
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(self.eye_in_right_hand_path)
if not sign:
self.get_logger().warning(f"right_mat: {info}")
self.eye_to_hand_mat, info, sign = read_calibration_mat(self.eye_to_hand_path)
if not sign:
self.get_logger().warning(f"head_mat: {info}")
self.get_logger().info("Initialize read json file done")
def _init_model(self):
"""init model"""
if self.checkpoint_path.endswith('-seg.pt'):
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
self.function = self._seg_image
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
self.model = YOLO(self.checkpoint_path)
self.function = self._seg_image
else:
self.function = None
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
self.get_logger().info("Initialize model done")
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
if self.get_camera_mode == "Topic":
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose', 10)
self.get_logger().info("Initialize publisher done")
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
self.service_name,
self._service_callback
)
self.get_logger().info("Initialize service done")
def _init_subscriber(self):
"""init subscriber"""
if self.get_camera_mode == "Service":
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
elif self.get_camera_mode == "Topic":
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic_name,
self._camera_info_callback,
10
)
self.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(self, self.future)
self.get_logger().info("Camera info received.")
# sync get color and depth img
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
else:
self.get_logger().warning("get_camera_mode is wrong, use service")
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
self.get_logger().info("Initialize subscriber done")
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""

View File

@@ -0,0 +1,170 @@
from ament_index_python.packages import get_package_share_directory
import torch
import numpy as np
from ultralytics import YOLO
import rclpy
from rclpy.task import Future
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from interfaces.msg import PoseArrayClassAndID, ImgMsg
from interfaces.srv import VisionObjectRecognition
from ..utils import calculate_pose_pca, calculate_pose_icp
from .config_node import ConfigBase
share_dir = get_package_share_directory('vision_detect')
class InitBase(ConfigBase):
def __init__(self, name):
super().__init__(name)
self.future = Future()
if self.get_camera_mode == "Service":
self._init_service()
elif self.get_camera_mode == "Topic":
if self.position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif self.position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self._init_service()
if self.detect_mode == 'Detect':
self.function = self._seg_image
if not self.classes:
self.classes = None
self._init_model()
elif self.detect_mode == 'Color':
self.function = self._seg_color
elif self.detect_mode == 'Crossboard':
self.function = self._seg_crossboard
else:
self.function = self._seg_image
if not self.classes:
self.classes = None
self._init_model()
if self.calculate_mode == "PCA":
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
self.calculate_function = calculate_pose_icp
else:
self.calculate_function = calculate_pose_pca
self._init_publisher()
self._init_subscriber()
self.get_logger().info("Initialize done")
def _init_model(self):
"""init model"""
if self.checkpoint_path.endswith('-seg.pt'):
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
self.model = YOLO(self.checkpoint_path)
else:
self.function = None
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
self.get_logger().info("Initialize model done")
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
if self.get_camera_mode == "Topic":
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose',
10)
self.get_logger().info("Initialize publisher done")
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
self.service_name,
self._service_callback
)
self.get_logger().info("Initialize service done")
def _init_subscriber(self):
"""init subscriber"""
if self.get_camera_mode == "Service":
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
elif self.get_camera_mode == "Topic":
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic_name,
self._camera_info_callback,
10
)
self.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(self, self.future)
self.get_logger().info("Camera info received.")
# sync get color and depth img
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
else:
self.get_logger().warning("get_camera_mode is wrong, use service")
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
self.get_logger().info("Initialize subscriber done")
def _camera_info_callback(self, msg: CameraInfo):
pass
def _service_sub_callback(self, msgs):
pass
def _sync_sub_callback(self, color_img_ros, depth_img_ros):
pass
def _service_callback(self, request, response):
pass
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pass
def _seg_color(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pass
def _seg_crossboard(self, rgb_img, depth_img):
pass