删除多余的checkpoint,添加手眼标定节点文件,更改坐标计算方式

This commit is contained in:
liangyuxuan
2025-08-15 15:59:00 +08:00
parent f4b83db868
commit e7ce19ab8a
6 changed files with 326 additions and 118 deletions

View File

@@ -21,6 +21,7 @@ import detect_part
def iqr(depths, threshold: float = 1.5):
"""iqr去除异常点"""
if len(depths) < 7:
return depths
q1 = np.percentile(depths, 25)
@@ -32,64 +33,63 @@ def iqr(depths, threshold: float = 1.5):
return iqr_depths
def get_nine_patch_samples(depth_img, u, v, width, height):
ws = [int(round(-width/4)), 0, int(round(width/4))]
hs = [int(round(-height/4)), 0, int(round(height/4))]
window_size = 25
half = window_size // 2
patch = []
for w in ws:
for h in hs:
patch.append(
depth_img[int(max(0, v + h - half)):int(min(v + h + half + 1, depth_img.shape[0])):2,
int(max(0, u + w - half)):int(min(u + w + half + 1, depth_img.shape[1])):2].flatten())
return np.concatenate(patch)
def calculate_coordinate(self, depth_img: np.ndarray, camera_info, u, v, width: int, height: int) -> tuple[float, float, float]:
def calculate_coordinate(mask, depth_img: np.ndarray, camera_info, u, v):
"""计算坐标"""
u = int(round(u))
v = int(round(v))
_cx, _cy, _fx, _fy = camera_info
if not (0 <= v < depth_img.shape[0] and 0 <= u < depth_img.shape[1]):
self.get_logger().warning(f'Calculate coordinate error: u={u}, v={v}')
print(f'Calculate coordinate error: u={u}, v={v}')
return 0.0, 0.0, 0.0
window_size = 39
window_size = 25
half = window_size // 2
"""获取mask质心patch"""
patch = depth_img[max(0, v - half):v + half + 1, max(0, u - half):u + half + 1:2].flatten()
valid_depths = patch[patch > 0]
if len(valid_depths) == 0:
patch = get_nine_patch_samples(depth_img, u, v, width, height)
valid_depths = patch[patch > 0]
if len(valid_depths) == 0:
self.get_logger().warning(f'No valid depth in window at ({u}, {v})')
return 0.0, 0.0, 0.0
print(f'No valid depth in window at ({u}, {v})')
return 0.0, 0.0, 0.0
"""过滤异常值取中位数计算xy"""
valid_depths = iqr(valid_depths)
depth = np.median(valid_depths) / 1e3
z = depth
x = -(u - _cx) * z / _fx
y = -(v - _cy) * z / _fy
x = -(u - _cx) * depth / _fx
y = -(v - _cy) * depth / _fy
"""取mask中深度的中位数作为z"""
depth_masked = depth_img[(mask > 0) & (depth_img > 0)]
median_depth = np.median(depth_masked)
z = median_depth / 1e3
return x, y, z
def calculate_rpy(mask, rgb_img):
def calculate_pose(mask, orig_shape, depth_img: np.ndarray, camera_info):
"""计算位态"""
rect = cv2.minAreaRect(max(cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0], key=cv2.contourArea))
box = cv2.boxPoints(rect)
len1 = np.linalg.norm(box[1] - box[0])
len2 = np.linalg.norm(box[1] - box[2])
M = cv2.moments(mask.astype(np.uint8))
if M["m00"] != 0:
center_x = int(M["m10"] / M["m00"])
center_y = int(M["m01"] / M["m00"])
else:
raise ValueError("Mask Is Empty")
# center = (box[0]+box[1]+box[2]+box[3]) / 4
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_LINEAR)
x, y, z = calculate_coordinate(mask, depth_img, camera_info, center_x, center_y)
if len1 >= len2:
vec = box[1] - box[0]
else:
@@ -102,17 +102,26 @@ def calculate_rpy(mask, rgb_img):
if angle <= -90.0:
angle += 180
y = angle/180.0 * math.pi
p = 0.0
r = 0.0
return r, p, y
yaw = angle/180.0 * math.pi
pitch = 0.0
roll = 0.0
return x, y, z, roll, pitch, yaw
def draw_box(set_confidence, rgb_img, boxes, confidences, class_ids, labels):
def draw_box(set_confidence, rgb_img, result, post_list):
"""绘制目标检测边界框"""
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, box in enumerate(boxes):
j = 0
if confidences[i] < set_confidence:
continue
else:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
@@ -121,9 +130,19 @@ def draw_box(set_confidence, rgb_img, boxes, confidences, class_ids, labels):
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
cv2.putText(rgb_img, f'cs: {post_list[j][0]:.3f} {post_list[j][1]:.3f} {post_list[j][2]:.3f}', (p1[0], p1[1] - 35)
,cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
cv2.putText(rgb_img, f'rpy: {post_list[j][3]:.3f} {post_list[j][4]:.3f} {post_list[j][5]:.3f}', (p1[0], p1[1] - 60)
,cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
j += 1
def draw_mask(set_confidence, rgb_img, masks, confidences, orig_shape):
def draw_mask(set_confidence, rgb_img, result):
"""绘制实例分割mask"""
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
confidences = result.boxes.conf.cpu().numpy()
for i, mask in enumerate(masks):
if confidences[i] < set_confidence:
continue
@@ -145,7 +164,7 @@ class DetectNode(Node):
self.camera_info = []
self.camera_size = []
'''Init'''
'''init'''
self._init_param()
self._init_model()
self._init_publisher()
@@ -153,25 +172,25 @@ class DetectNode(Node):
self.cv_bridge = CvBridge()
def _init_param(self):
"""init parameter"""
pkg_dir = os.path.dirname(detect_part.__file__)
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
self.checkpoint_name = self.get_parameter('checkpoint_name').value
self.checkpoint_path = os.path.join(pkg_dir, 'checkpoint', self.checkpoint_name)
self.declare_parameter('output_boxes', True)
self.output_boxes = self.get_parameter('output_boxes').value
self.declare_parameter('output_masks', False)
self.declare_parameter('output_masks', True)
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('set_confidence', 0.5)
self.set_confidence = self.get_parameter('set_confidence').value
def _init_model(self):
"""init model"""
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(self.device)
@@ -182,27 +201,27 @@ class DetectNode(Node):
if self.checkpoint_name.endswith('-seg.pt'):
self.function = self._seg_image
elif self.checkpoint_name.endswith('.pt'):
self.function = self._detect_image
else:
self.function = None
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
def _init_publisher(self):
"""init_publisher"""
self.pub_pose_list = self.create_publisher(PoseWithIDArray, '/pose/cv_detect_pose', 10)
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
def _init_subscriber(self):
"""init_subscriber"""
self.sub_camera_info = self.create_subscription(
CameraInfo,
'/camera # x = depth_img[v, u] / 1e3/color/camera_info',
'/camera/color/camera_info',
self._camera_info_callback,
10
)
'''Sync get color and depth img'''
'''sync get color and depth img'''
self.sub_color_image = Subscriber(self, Image, '/camera/color/image_raw')
self.sub_depth_image = Subscriber(self, Image, '/camera/depth/image_raw')
@@ -213,7 +232,20 @@ class DetectNode(Node):
)
self.sync_subscriber.registerCallback(self._sync_callback)
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""
self.camera_size.append(msg.width)
self.camera_size.append(msg.height)
self.camera_info.append(msg.k[2])
self.camera_info.append(msg.k[5])
self.camera_info.append(msg.k[0])
self.camera_info.append(msg.k[4])
self.destroy_subscription(self.sub_camera_info)
def _sync_callback(self, color_img_ros, depth_img_ros):
"""同步回调函数"""
if not self.camera_info:
self.get_logger().warn('Camera intrinsics not yet received. Waiting...')
return
@@ -221,11 +253,13 @@ class DetectNode(Node):
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
if self.output_boxes:
img, pose_dict = self.function(color_img_cv, depth_img_cv)
img, pose_dict = self.function(color_img_cv, depth_img_cv)
if img is not None:
self.pub_detect_image.publish(img)
else:
pose_dict = self.function(color_img_cv, depth_img_cv)
"""masks为空结束这一帧"""
if img is None and pose_dict is None:
return
pose_list_all = PoseWithIDArray()
for (class_id, class_name), pose_list in pose_dict.items():
@@ -239,64 +273,10 @@ class DetectNode(Node):
self.pub_pose_list.publish(pose_list_all)
'''Get camera info'''
def _camera_info_callback(self, msg: CameraInfo):
self.camera_size.append(msg.width)
self.camera_size.append(msg.height)
self.camera_info.append(msg.k[2])
self.camera_info.append(msg.k[5])
self.camera_info.append(msg.k[0])
self.camera_info.append(msg.k[4])
self.destroy_subscription(self.sub_camera_info)
'''Use detection model'''
def _detect_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pose_dict = defaultdict(list)
img_h, img_w = rgb_img.shape[:2]
border_x = img_w * 0.10
border_y = img_h * 0.08
results = self.model(rgb_img)
result = results[0]
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, box in enumerate(boxes):
if confidences[i] < self.set_confidence:
continue
else:
x_center, y_center, width, height = box[:4]
if not (border_x <= x_center <= img_w - border_x and border_y <= y_center <= img_h - border_y):
# self.get_logger().info(f'Skipping object near edge at ({x_center},{y_center})')
continue
x, y, z = calculate_coordinate(self, depth_img, self.camera_info, x_center, y_center, width, height)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(x=0, y=0, z=0)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(Point(x=x, y=y, z=z))
if self.output_boxes:
draw_box(self.set_confidence, rgb_img, boxes, confidences, class_ids, labels)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return pose_dict
'''Use segmentation model'''
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_dict = defaultdict(list)
'''Define border'''
border_x, border_y = rgb_img.shape[:2][0] * 0.10, rgb_img.shape[:2][1] * 0.08
@@ -305,6 +285,8 @@ class DetectNode(Node):
result = results[0]
'''Get masks'''
if result.masks is None or len(result.masks) == 0:
return None, None
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
@@ -314,46 +296,42 @@ class DetectNode(Node):
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
post_list = []
for i, (mask, box) in enumerate(zip(masks, boxes)):
'''Confidence Filter'''
if confidences[i] < self.set_confidence:
continue
else:
x_center, y_center, width, height = box[:4]
x_center, y_center= box[:2]
'''Border Filter'''
if not (border_x <= x_center <= rgb_img.shape[:2][1] - border_x
and border_y <= y_center <= rgb_img.shape[:2][0] - border_y):
# self.get_logger().info(f'Skipping object near edge at ({x_center},{y_center})')
continue
mask = cv2.resize(mask, orig_shape[::-1], cv2.INTER_LINEAR)
rgb_masked = rgb_img.copy()
rgb_masked[mask == 0] = 0
x, y, z, roll, pitch, yaw = calculate_pose(mask, orig_shape, depth_img, self.camera_info)
roll, pitch, yaw = calculate_rpy(mask, rgb_img)
x, y, z = calculate_coordinate(self, depth_img, self.camera_info, x_center, y_center, width, height)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(x=roll, y=pitch, z=yaw)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
post_list.append([x, y, z, roll, pitch, yaw])
'''mask_img and box_img is or not output'''
if self.output_boxes and self.output_masks:
draw_box(self.set_confidence, rgb_img, boxes, confidences, class_ids, labels)
draw_mask(self.set_confidence, rgb_img, masks, confidences, orig_shape)
draw_box(self.set_confidence, rgb_img, result, post_list)
draw_mask(self.set_confidence, rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif self.output_boxes and not self.output_masks:
draw_box(self.set_confidence, rgb_img, boxes, confidences, class_ids, labels)
draw_box(self.set_confidence, rgb_img, result, post_list)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif not self.output_boxes and self.output_masks:
draw_mask(self.set_confidence, rgb_img, masks, confidences, orig_shape)
draw_mask(self.set_confidence, rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return pose_dict
return None, pose_dict
def main(args=None):
@@ -370,4 +348,3 @@ def main(args=None):
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,230 @@
#!/usr/bin/env python
# coding: utf-8
import transforms3d as tfs
import numpy as np
import math
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Pose
from message_filters import ApproximateTimeSynchronizer, Subscriber
def get_matrix_eular_radu(x, y, z, rx, ry, rz):
""""""
'''构造旋转矩阵'''
rmat = tfs.euler.euler2mat(
rx, ry, rz,
)
"""构造齐次变换矩阵"""
rmat = tfs.affines.compose(
np.squeeze(np.asarray((x, y, z))),
rmat,
[1, 1, 1]
)
return rmat
def skew(v):
return np.array([[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]])
def rot2quat_minimal(m):
"""旋转矩阵 --> 最小四元数"""
quat = tfs.quaternions.mat2quat(m[0:3, 0:3])
return quat[1:]
def quat_minimal2rot(q):
"""最小四元数 --> 旋转矩阵"""
p = np.dot(q.T, q)
w = np.sqrt(np.subtract(1, p[0][0]))
return tfs.quaternions.quat2mat([w, q[0], q[1], q[2]])
def calculate(Hgs, Hcs):
"""计算标定矩阵"""
# H代表矩阵、h代表标量
Hgijs = []
Hcijs = []
A = []
B = []
size = 0
for i in range(len(Hgs)):
for j in range(i + 1, len(Hgs)):
size += 1
Hgij = np.dot(np.linalg.inv(Hgs[j]), Hgs[i])
Hgijs.append(Hgij)
Pgij = np.dot(2, rot2quat_minimal(Hgij))
Hcij = np.dot(Hcs[j], np.linalg.inv(Hcs[i]))
Hcijs.append(Hcij)
Pcij = np.dot(2, rot2quat_minimal(Hcij))
A.append(skew(np.add(Pgij, Pcij)))
B.append(np.subtract(Pcij, Pgij))
MA = np.asarray(A).reshape(size * 3, 3)
MB = np.asarray(B).reshape(size * 3, 1)
Pcg_ = np.dot(np.linalg.pinv(MA), MB)
# pcg_norm = np.dot(np.conjugate(Pcg_).T, Pcg_)
pcg = np.sqrt(np.add(1, np.dot(Pcg_.T, Pcg_)))
pcg = np.dot(np.dot(2, Pcg_), np.linalg.inv(pcg))
Rcg = quat_minimal2rot(np.divide(pcg, 2)).reshape(3, 3)
A = []
B = []
id = 0
for i in range(len(Hgs)):
for j in range(i + 1, len(Hgs)):
Hgij = Hgijs[id]
Hcij = Hcijs[id]
A.append(np.subtract(Hgij[0:3, 0:3], np.eye(3, 3)))
B.append(np.subtract(np.dot(Rcg, Hcij[0:3, 3:4]), Hgij[0:3, 3:4]))
id += 1
MA = np.asarray(A).reshape(size * 3, 3)
MB = np.asarray(B).reshape(size * 3, 1)
Tcg = np.dot(np.linalg.pinv(MA), MB).reshape(3, )
return tfs.affines.compose(Tcg, np.squeeze(Rcg), [1, 1, 1])
class Calibration(Node):
def __init__(self, name):
super(Calibration, self).__init__(name)
self.Hgs, self.Hcs = [], []
self.hand, self.camera = [], []
self.calibration_matrix = None
self.declare_parameter('start_collect_once', False)
self.collect = self.get_parameter('start_collect_once').value
self.declare_parameter('mode', 'eye_in_hand')
self.mode = self.get_parameter('mode').value.lower()
if self.mode not in ['eye_in_hand', 'eye_to_hand']:
raise ValueError("mode must be 'eye_in_hand' or 'eye_to_hand'")
self.done = False
self._init_sub()
def _init_sub(self):
self.sub_hand_pose = Subscriber(self, Pose, '/calibration/camera/pose')
self.sub_camera_pose = Subscriber(self, Pose, '/calibration/hand/pose')
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_hand_pose, self.sub_camera_pose],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self.get_pose_callback)
def get_pose_callback(self, hand_pose, camera_pose):
if self.collect:
_hand = [
hand_pose.position.x,
hand_pose.position.y,
hand_pose.position.z,
hand_pose.orientation.x,
hand_pose.orientation.y,
hand_pose.orientation.z,
]
_camera = [
camera_pose.position.x,
camera_pose.position.y,
camera_pose.position.z,
camera_pose.orientation.x,
camera_pose.orientation.y,
camera_pose.orientation.z,
]
self.hand.extend(_hand)
self.camera.extend(_camera)
self.collect = False
if len(self.hand) >= 25 * 6 and len(self.camera) >= 25 * 6:
self.sync_subscriber.registerCallback(None)
self.sub_hand_pose = None
self.sub_camera_pose = None
self.sync_subscriber = None
print("已停止订阅")
self.calculate_calibration()
self.get_logger().info(self.calibration_matrix)
self.done = True
def calculate_data(self):
for i in range(0, len(self.hand), 6):
self.Hgs.append(
np.linalg.inv(
get_matrix_eular_radu(
self.hand[i], self.hand[i + 1], self.hand[i + 2],
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5]
)
)
if self.mode == 'eye_to_hand' else
get_matrix_eular_radu(
self.hand[i], self.hand[i + 1], self.hand[i + 2],
self.hand[i + 3], self.hand[i + 4], self.hand[i + 5]
)
)
self.Hcs.append(
get_matrix_eular_radu(
self.camera[i], self.camera[i + 1], self.camera[i + 2],
self.camera[i + 3], self.camera[i + 4], self.camera[i + 5]
)
)
def calculate_calibration(self):
self.calculate_data()
self.calibration_matrix = calculate(self.Hgs, self.Hcs)
def get_data_test(self, hand, camera):
self.hand, self.camera = hand, camera
self.calculate_calibration()
print(self.calibration_matrix)
self.done = True
def main(args=None):
rclpy.init(args=args)
node = Calibration('Calibration')
try:
while rclpy.ok() and not node.done:
rclpy.spin_once(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
hand = [1.1988093940033604, -0.42405585264804424, 0.18828251788562061, math.radians(151.3390418721659), math.radians(-18.612399542280507),
math.radians(153.05074895025035),
1.1684831621733476, -0.183273375514656, 0.12744868246620855, math.radians(-161.57083804238462), math.radians(9.07159838346732),
math.radians(89.1641128844487),
1.1508343174145468, -0.22694301453461405, 0.26625166858469146, math.radians(177.8815855486261), math.radians(0.8991159570568988),
math.radians(77.67286224959672)]
camera = [-0.16249272227287292, -0.047310635447502136, 0.4077761471271515, math.radians(-56.98037030812389), math.radians(-6.16739631361851),
math.radians(-115.84333735802369),
0.03955405578017235, -0.013497642241418362, 0.33975949883461, math.radians(-100.87129330834215), math.radians(-17.192685528625265),
math.radians(-173.07354634882094),
-0.08517949283123016, 0.00957852229475975, 0.46546608209609985, math.radians(-90.85270962096058), math.radians(0.9315977976503153),
math.radians(175.2059707654342)]
rclpy.init(args=None)
node = Calibration('Calibration')
node.get_data_test(hand, camera)
try:
while rclpy.ok() and not node.done:
rclpy.spin_once(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -29,6 +29,7 @@ setup(
'console_scripts': [
'detect_node = detect_part.detect:main',
'sub_pose_node = detect_part.sub_pose:main',
'calibration_node = detect_part.hand_eye_calibration:main',
],
},
)