重构检测节点文件,消息接口文件,更改坐标系设置为z为图像深度,图像成像于xy平面,添加姿态中yaw计算,九点采样更改为固定九点采样,检测模型更改为yolov11-seg模型

This commit is contained in:
liangyuxuan
2025-08-12 11:02:33 +08:00
parent 2524b7db9a
commit 6a66efa528
40 changed files with 307 additions and 130 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/softwaresystem.iml" filepath="$PROJECT_DIR$/.idea/softwaresystem.iml" />
</modules>
</component>
</project>

15
.idea/softwaresystem.iml generated Normal file
View File

@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.10" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="GOOGLE" />
<option name="myDocStringFormat" value="Google" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="py.test" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@@ -47,7 +47,9 @@
ros2 launch detect_part camera_detect.launch.py
> camera_detect.launch.py 中可以修改部分参数:set_confidence, output_detect_image, checkpoint_name
> camera_detect.launch.py 中可以修改部分参数:set_confidence, output_boxes, output_masks, checkpoint_name
> 坐标系设置为Z为图像深度XY平面为图像成象平面
2. 接收位置话题节点:
@@ -55,20 +57,19 @@
3. 位置话题接口:
interface/msg/PointWithID.msg, PointWithIDArray.msg
interface/msg/PoseWithID.msg, PoseWithIDArray.msg
4. 话题名称:
/pose/cv_detect_pose # 位置
/image/detect_image # 目标识别图片
5. PointWithID.msg
5. PoseWithID.msg
string class_name
int32 class_id
geometry_msgs/Point[] points
geometry_msgs/Pose[] pose_list
6. PointWithIDArray.msg
PointWithID[] objects
6. PoseWithIDArray.msg
PoseWithID[] objects

View File

@@ -1,5 +1,5 @@
import os
import random
import math
from collections import defaultdict
import cv2
@@ -14,61 +14,195 @@ from rclpy.node import Node
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import Point
from interfaces.msg import PointWithID, PointWithIDArray
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseWithID, PoseWithIDArray
import detect_part
def iqr(depths, threshold: float = 1.5):
if len(depths) < 7:
return depths
q1 = np.percentile(depths, 25)
q3 = np.percentile(depths, 75)
iqr_ = q3 - q1
lower_bound = q1 - iqr_ * threshold
upper_bound = q3 + iqr_ * threshold
iqr_depths = depths[(depths >= lower_bound) & (depths <= upper_bound)]
return iqr_depths
def get_nine_patch_samples(depth_img, u, v, width, height):
ws = [int(round(-width/4)), 0, int(round(width/4))]
hs = [int(round(-height/4)), 0, int(round(height/4))]
window_size = 25
half = window_size // 2
patch = []
for w in ws:
for h in hs:
patch.append(
depth_img[int(max(0, v + h - half)):int(min(v + h + half + 1, depth_img.shape[0])):2,
int(max(0, u + w - half)):int(min(u + w + half + 1, depth_img.shape[1])):2].flatten())
return np.concatenate(patch)
def calculate_coordinate(self, depth_img: np.ndarray, camera_info, u, v, width: int, height: int) -> tuple[float, float, float]:
u = int(round(u))
v = int(round(v))
_cx, _cy, _fx, _fy = camera_info
if not (0 <= v < depth_img.shape[0] and 0 <= u < depth_img.shape[1]):
self.get_logger().warning(f'Calculate coordinate error: u={u}, v={v}')
return 0.0, 0.0, 0.0
window_size = 39
half = window_size // 2
patch = depth_img[max(0, v - half):v + half + 1, max(0, u - half):u + half + 1:2].flatten()
valid_depths = patch[patch > 0]
if len(valid_depths) == 0:
patch = get_nine_patch_samples(depth_img, u, v, width, height)
valid_depths = patch[patch > 0]
if len(valid_depths) == 0:
self.get_logger().warning(f'No valid depth in window at ({u}, {v})')
return 0.0, 0.0, 0.0
valid_depths = iqr(valid_depths)
depth = np.median(valid_depths) / 1e3
z = depth
x = -(u - _cx) * z / _fx
y = -(v - _cy) * z / _fy
return x, y, z
def calculate_rpy(mask, rgb_img):
rect = cv2.minAreaRect(max(cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0], key=cv2.contourArea))
box = cv2.boxPoints(rect)
len1 = np.linalg.norm(box[1] - box[0])
len2 = np.linalg.norm(box[1] - box[2])
if len1 >= len2:
vec = box[1] - box[0]
else:
vec = box[1] - box[2]
if vec[0] < 0:
vec = -vec
angle = math.degrees(math.atan2(vec[1], vec[0]))
if angle <= -90.0:
angle += 180
y = angle/180.0 * math.pi
p = 0.0
r = 0.0
return r, p, y
def draw_box(set_confidence, rgb_img, boxes, confidences, class_ids, labels):
for i, box in enumerate(boxes):
if confidences[i] < set_confidence:
continue
else:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
def draw_mask(set_confidence, rgb_img, masks, confidences, orig_shape):
for i, mask in enumerate(masks):
if confidences[i] < set_confidence:
continue
else:
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_LINEAR)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
class DetectNode(Node):
def __init__(self, name):
super().__init__(name)
self.depth_image = None
self.fx = self.fy = self.cx = self.cy = None
self.device = None
self.checkpoint_path = None
self.checkpoint_name = None
self.function = None
self.output_boxes = None
self.output_masks = None
self.camera_info = []
self.camera_size = []
'''Init'''
self._init_param()
self._init_model()
self._init_publisher()
self._init_subscriber()
self.cv_bridge = CvBridge()
self.pub_pose_list = self.create_publisher(PointWithIDArray, '/pose/cv_detect_pose', 10)
if self.output_detect_image:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
def _init_param(self):
pkg_dir = os.path.dirname(detect_part.__file__)
self.declare_parameter('checkpoint_name', 'yolo11s.pt')
checkpoint_name = self.get_parameter('checkpoint_name').value
self.checkpoint_path = os.path.join(pkg_dir, 'checkpoint', checkpoint_name)
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
self.checkpoint_name = self.get_parameter('checkpoint_name').value
self.checkpoint_path = os.path.join(pkg_dir, 'checkpoint', self.checkpoint_name)
self.declare_parameter('output_detect_image', False)
self.output_detect_image = self.get_parameter('output_detect_image').value
self.declare_parameter('output_boxes', True)
self.output_boxes = self.get_parameter('output_boxes').value
self.declare_parameter('set_confidence', 0.6)
self.declare_parameter('output_masks', False)
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('set_confidence', 0.5)
self.set_confidence = self.get_parameter('set_confidence').value
def _init_model(self):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device)
self.model = YOLO(self.checkpoint_path).to(self.device)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
if self.checkpoint_name.endswith('-seg.pt'):
self.function = self._seg_image
elif self.checkpoint_name.endswith('.pt'):
self.function = self._detect_image
else:
self.function = None
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
def _init_publisher(self):
self.pub_pose_list = self.create_publisher(PoseWithIDArray, '/pose/cv_detect_pose', 10)
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
def _init_subscriber(self):
self.sub_camera_info = self.create_subscription(
CameraInfo,
'/camera/color/camera_info',
'/camera # x = depth_img[v, u] / 1e3/color/camera_info',
self._camera_info_callback,
10
)
# sync get color and depth img
'''Sync get color and depth img'''
self.sub_color_image = Subscriber(self, Image, '/camera/color/image_raw')
self.sub_depth_image = Subscriber(self, Image, '/camera/depth/image_raw')
@@ -80,47 +214,51 @@ class DetectNode(Node):
self.sync_subscriber.registerCallback(self._sync_callback)
def _sync_callback(self, color_img_ros, depth_img_ros):
if None in (self.fx, self.fy, self.cx, self.cy):
if not self.camera_info:
self.get_logger().warn('Camera intrinsics not yet received. Waiting...')
return
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
# self.get_logger().info(f'Color image: {color_img_cv.shape}')
# self.get_logger().info(f'Depth image: {depth_img_cv.shape}')
if self.output_detect_image:
detect_img, pose_dict = self._detect_image(color_img_cv, depth_img_cv)
self.pub_detect_image.publish(detect_img)
if self.output_boxes:
img, pose_dict = self.function(color_img_cv, depth_img_cv)
self.pub_detect_image.publish(img)
else:
pose_dict = self._detect_image(color_img_cv)
pose_dict = self.function(color_img_cv, depth_img_cv)
pose_list_all = PointWithIDArray()
for (class_id, class_name), points in pose_dict.items():
pose_list_all = PoseWithIDArray()
for (class_id, class_name), pose_list in pose_dict.items():
pose_list_all.objects.append(
PointWithID(
PoseWithID(
class_name = class_name,
class_id = class_id,
points = points
pose_list = pose_list
)
)
self.pub_pose_list.publish(pose_list_all)
'''Get camera info'''
def _camera_info_callback(self, msg: CameraInfo):
self.fx = msg.k[0]
self.fy = msg.k[4]
self.cx = msg.k[2]
self.cy = msg.k[5]
self.camera_size.append(msg.width)
self.camera_size.append(msg.height)
self.camera_info.append(msg.k[2])
self.camera_info.append(msg.k[5])
self.camera_info.append(msg.k[0])
self.camera_info.append(msg.k[4])
self.destroy_subscription(self.sub_camera_info)
'''Use detection model'''
def _detect_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pose_dict = defaultdict(list)
img_h, img_w = rgb_img.shape[:2]
border_x = img_w * 0.12
border_y = img_h * 0.10
border_x = img_w * 0.10
border_y = img_h * 0.08
results = self.model(rgb_img)
result = results[0]
@@ -140,97 +278,82 @@ class DetectNode(Node):
# self.get_logger().info(f'Skipping object near edge at ({x_center},{y_center})')
continue
x, y, z = self._calculate_coordinate(depth_img, x_center, y_center, width, height)
x, y, z = calculate_coordinate(self, depth_img, self.camera_info, x_center, y_center, width, height)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(x=0, y=0, z=0)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(Point(x=x, y=y, z=z))
if self.output_detect_image:
self._draw_box(rgb_img, boxes, confidences, class_ids, labels)
if self.output_boxes:
draw_box(self.set_confidence, rgb_img, boxes, confidences, class_ids, labels)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return pose_dict
def _draw_box(self, rgb_img, boxes, confidences, class_ids, labels):
for i, box in enumerate(boxes):
'''Use segmentation model'''
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pose_dict = defaultdict(list)
'''Define border'''
border_x, border_y = rgb_img.shape[:2][0] * 0.10, rgb_img.shape[:2][1] * 0.08
'''Get Predict Results'''
results = self.model(rgb_img)
result = results[0]
'''Get masks'''
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
'''Get boxes'''
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, (mask, box) in enumerate(zip(masks, boxes)):
'''Confidence Filter'''
if confidences[i] < self.set_confidence:
continue
else:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
'''Border Filter'''
if not (border_x <= x_center <= rgb_img.shape[:2][1] - border_x
and border_y <= y_center <= rgb_img.shape[:2][0] - border_y):
# self.get_logger().info(f'Skipping object near edge at ({x_center},{y_center})')
continue
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
# cv2.putText(rgb_img, f'cs:{x:.2f} {y:.2f} {z:.2f}', (p1[0], p1[1] - 35),
# cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
mask = cv2.resize(mask, orig_shape[::-1], cv2.INTER_LINEAR)
rgb_masked = rgb_img.copy()
rgb_masked[mask == 0] = 0
def _calculate_coordinate(self, depth_img: np.ndarray, u, v, width: int, height: int) -> tuple[float, float, float]:
u = int(round(u))
v = int(round(v))
roll, pitch, yaw = calculate_rpy(mask, rgb_img)
if not (0 <= v < depth_img.shape[0] and 0 <= u < depth_img.shape[1]):
self.get_logger().warning(f'Calculate coordinate error: u={u}, v={v}')
return 0.0, 0.0, 0.0
window_size = 39
half = window_size // 2
patch = depth_img[max(0, v - half):v + half + 1, max(0, u - half):u + half + 1:2].flatten()
valid_depths = patch[patch > 0]
if len(valid_depths) == 0:
patch = self._get_nine_patch_samples(depth_img, u, v, width, height)
valid_depths = patch[patch > 0]
if len(valid_depths) == 0:
self.get_logger().warning(f'No valid depth in window at ({u}, {v})')
return 0.0, 0.0, 0.0
valid_depths = self.iqr(valid_depths)
# x = depth_img[v, u] / 1e3
depth = np.median(valid_depths) / 1e3
x = depth
y = -(u - self.cx) * x / self.fx
z = -(v - self.cy) * x / self.fy
return x, y, z
def _get_nine_patch_samples(self, depth_img, u, v, width, height):
ws = [int(round(random.uniform(-0.05*width, 0.05*width)-width/4)), 0, int(round(random.uniform(-0.05*width, 0.05*width)+width/4))]
hs = [int(round(random.uniform(-0.05*height, 0.05*height)-height/4)), 0, int(round(random.uniform(-0.05*height, 0.05*height)+height/4))]
window_size = 25
half = window_size // 2
patch = []
for w in ws:
for h in hs:
patch.append(
depth_img[int(max(0, v + h - half)):int(min(v + h + half + 1, depth_img.shape[0])):2,
int(max(0, u + w - half)):int(min(u + w + half + 1, depth_img.shape[1])):2].flatten())
return np.concatenate(patch)
def iqr(self, depths, threshold: float = 1.5):
if len(depths) < 7:
return depths
q1 = np.percentile(depths, 25)
q3 = np.percentile(depths, 75)
iqr = q3 - q1
lower_bound = q1 - iqr * threshold
upper_bound = q3 + iqr * threshold
iqr_depths = depths[(depths >= lower_bound) & (depths <= upper_bound)]
return iqr_depths
x, y, z = calculate_coordinate(self, depth_img, self.camera_info, x_center, y_center, width, height)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(x=roll, y=pitch, z=yaw)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
'''mask_img and box_img is or not output'''
if self.output_boxes and self.output_masks:
draw_box(self.set_confidence, rgb_img, boxes, confidences, class_ids, labels)
draw_mask(self.set_confidence, rgb_img, masks, confidences, orig_shape)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif self.output_boxes and not self.output_masks:
draw_box(self.set_confidence, rgb_img, boxes, confidences, class_ids, labels)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif not self.output_boxes and self.output_masks:
draw_mask(self.set_confidence, rgb_img, masks, confidences, orig_shape)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return pose_dict
def main(args=None):
@@ -247,3 +370,4 @@ def main(args=None):
if __name__ == '__main__':
main()

View File

@@ -1,20 +1,20 @@
import rclpy
from rclpy.node import Node
from interfaces.msg import PointWithIDArray
from interfaces.msg import PoseWithIDArray
class SubPose(Node):
def __init__(self, name):
super().__init__(name)
self.sub = self.create_subscription(PointWithIDArray, '/pose/cv_detect_pose', self.pose_callback, 10)
self.sub = self.create_subscription(PoseWithIDArray, '/pose/cv_detect_pose', self.pose_callback, 10)
def pose_callback(self, msg):
pose_dict = {}
for object in msg.objects:
pose_dict[object.class_id] = {
'name' : object.class_name,
'points' : object.points,
'pose_list' : object.pose_list,
}
self.get_logger().info(f'{pose_dict}')

View File

@@ -1456,7 +1456,7 @@ def torch_safe_load(weight, safe_only=False):
f"with https://github.com/ultralytics/yolov5.\nThis model is NOT forwards compatible with "
f"YOLOv8 at https://github.com/ultralytics/ultralytics."
f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "
f"run a command with an official Ultralytics model, i.e. 'yolo predict model=yolo11n.pt'"
f"run a command with an official Ultralytics model, i.e. 'yolo predict model=yolo11n.det.pt'"
)
) from e
elif e.name == "numpy._core":
@@ -1469,7 +1469,7 @@ def torch_safe_load(weight, safe_only=False):
f"{weight} appears to require '{e.name}', which is not in Ultralytics requirements."
f"\nAutoInstall will run now for '{e.name}' but this feature will be removed in the future."
f"\nRecommend fixes are to train a new model using the latest 'ultralytics' package or to "
f"run a command with an official Ultralytics model, i.e. 'yolo predict model=yolo11n.pt'"
f"run a command with an official Ultralytics model, i.e. 'yolo predict model=yolo11n.det.pt'"
)
check_requirements(e.name) # install missing module
ckpt = torch_load(file, map_location="cpu")

View File

@@ -51,8 +51,9 @@ def load_parameters(context, args):
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('checkpoint_name', default_value='yolo11s.pt'),
DeclareLaunchArgument('output_detect_image', default_value='True'),
DeclareLaunchArgument('checkpoint_name', default_value='yolo11s-seg.pt'),
DeclareLaunchArgument('output_boxes', default_value='True'),
DeclareLaunchArgument('output_masks', default_value='False'),
DeclareLaunchArgument('set_confidence', default_value='0.6'),
]
@@ -311,7 +312,8 @@ def generate_launch_description():
def create_detect_node(context):
checkpoint = LaunchConfiguration('checkpoint_name').perform(context)
output = LaunchConfiguration('output_detect_image').perform(context)
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
conf = LaunchConfiguration('set_confidence').perform(context)
return [
@@ -320,7 +322,8 @@ def generate_launch_description():
executable='detect_node',
parameters=[{
'checkpoint_name': checkpoint,
'output_detect_image': output.lower() == 'true',
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'set_confidence': float(conf)
}]
)

View File

@@ -14,8 +14,8 @@ find_package(geometry_msgs REQUIRED)
# find_package(<dependency> REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/PointWithID.msg"
"msg/PointWithIDArray.msg"
"msg/PoseWithID.msg"
"msg/PoseWithIDArray.msg"
DEPENDENCIES geometry_msgs
)

View File

@@ -1 +0,0 @@
PointWithID[] objects

View File

@@ -1,4 +1,4 @@
string class_name
int32 class_id
geometry_msgs/Point[] points
geometry_msgs/Pose[] pose_list

View File

@@ -0,0 +1 @@
PoseWithID[] objects