286 lines
8.1 KiB
Python
286 lines
8.1 KiB
Python
import json
|
|
import time
|
|
|
|
import cv2
|
|
import open3d as o3d
|
|
import numpy as np
|
|
import torch
|
|
import transforms3d as tfs
|
|
|
|
from ultralytics import YOLO
|
|
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
|
|
|
|
# x=0.08685893262849026, y=0.03733333467952511, z=0.5707736129272428
|
|
# x=-0.04385456738166652, y=-0.6302417335103124, z=-0.07781808527160532, w=0.7712434634188767
|
|
|
|
|
|
def main():
|
|
color_image = cv2.imread("test/color_image.png")
|
|
depth_image = cv2.imread("test/depth_image.png", cv2.IMREAD_UNCHANGED)
|
|
with open("test/K.json", 'r') as f:
|
|
k = json.load(f)
|
|
with open("test/D.json", 'r') as f:
|
|
d = json.load(f)
|
|
with open("test/eye_in_right_hand.json", 'r') as f:
|
|
hand_eye_mat = np.array(json.load(f)["T"])
|
|
|
|
camera_size = [color_image.shape[1], color_image.shape[0]]
|
|
|
|
# cv2.imshow("color_image", color_image)
|
|
# cv2.waitKey(0)
|
|
# cv2.destroyAllWindows()
|
|
# cv2.imshow("depth_image", depth_image)
|
|
# cv2.waitKey(0)
|
|
# cv2.destroyAllWindows()
|
|
# print(k)
|
|
# print(d)
|
|
# print(eye_in_right_hand)
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
model = YOLO("test/yolo11s-seg.pt").to(device)
|
|
|
|
results = model(color_image, retina_masks=True, conf=0.5, classes=[39])
|
|
result = results[0]
|
|
|
|
# Get masks
|
|
if result.masks is None or len(result.masks) == 0:
|
|
return None, None
|
|
masks = result.masks.data.cpu().numpy()
|
|
|
|
# Get boxes
|
|
boxes = result.boxes.xywh.cpu().numpy()
|
|
class_ids = result.boxes.cls.cpu().numpy()
|
|
labels = result.names
|
|
print(f"Detect object num: {len(masks)}")
|
|
|
|
|
|
|
|
for i, (mask, box) in enumerate(zip(masks, boxes)):
|
|
imgs_crop, (x_min, y_min) = crop_imgs_box_xywh([depth_image, mask], box, True)
|
|
depth_crop, mask_crop = imgs_crop
|
|
|
|
if depth_crop is None:
|
|
continue
|
|
|
|
intrinsics = o3d.camera.PinholeCameraIntrinsic(
|
|
int(camera_size[0]),
|
|
int(camera_size[1]),
|
|
k[0],
|
|
k[4],
|
|
k[2] - x_min,
|
|
k[5] - y_min
|
|
)
|
|
time1 = time.time()
|
|
rmat = calculate_pose_pca(
|
|
mask_crop,
|
|
depth_crop,
|
|
intrinsics,
|
|
)
|
|
time2 = time.time()
|
|
print(f"PCA: {(time2 - time1) * 1000} ms")
|
|
|
|
grab_width = calculate_grav_width(mask.astype(np.uint8) * 255, k, rmat[2, 3])
|
|
rmat[2, 3] = rmat[2, 3] + grab_width * 0.22
|
|
|
|
rmat = hand_eye_mat @ rmat
|
|
|
|
x, y, z, rw, rx, ry, rz = rmat2quat(rmat)
|
|
|
|
pose_list = []
|
|
if (x, y, z) != (0.0, 0.0, 0.0):
|
|
pose = [x, y, z, rx, ry, rz, rw]
|
|
pose_list.append(
|
|
{
|
|
"class_id": int(class_ids[i]),
|
|
"class_name": labels[class_ids[i]],
|
|
"pose": pose,
|
|
"grab_width": grab_width * 1.05
|
|
}
|
|
)
|
|
|
|
|
|
def crop_imgs_box_xywh(imgs: list, box, same_sign: bool = False):
|
|
"""
|
|
Crop imgs
|
|
|
|
input:
|
|
imgs: list, Each img in imgs has the same Width and High.
|
|
box: The YOLO model outputs bounding box data in the format [x, y, w, h, confidence, class_id].
|
|
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
|
|
output:
|
|
crop_imgs: list;
|
|
(x_min, y_min);
|
|
"""
|
|
if not imgs:
|
|
print("imgs is empty")
|
|
return [], (0, 0)
|
|
|
|
if not same_sign and len(imgs) != 1:
|
|
for img in imgs:
|
|
if imgs[0].shape != img.shape:
|
|
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
|
|
|
|
high, width = imgs[0].shape[:2]
|
|
x_center, y_center, w, h = box[:4]
|
|
x_min, x_max = max(0, int(round(x_center - w/2))), min(int(round(x_center + w/2)), width-1)
|
|
y_min, y_max = max(0, int(round(y_center - h/2))), min(int(round(y_center + h/2)), high-1)
|
|
|
|
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
|
|
|
|
return crop_imgs, (x_min, y_min)
|
|
|
|
|
|
def pca(data: np.ndarray, sort=True):
|
|
"""主成分分析 """
|
|
center = np.mean(data, axis=0)
|
|
centered_points = data - center # 去中心化
|
|
|
|
try:
|
|
cov_matrix = np.cov(centered_points.T) # 转置
|
|
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
|
|
|
|
except np.linalg.LinAlgError:
|
|
return None, None
|
|
|
|
if sort:
|
|
sort = eigenvalues.argsort()[::-1] # 降序排列
|
|
eigenvalues = eigenvalues[sort] # 索引
|
|
eigenvectors = eigenvectors[:, sort]
|
|
|
|
return eigenvalues, eigenvectors
|
|
|
|
|
|
def calculate_pose_pca(
|
|
mask,
|
|
depth_img: np.ndarray,
|
|
intrinsics,
|
|
source=None,
|
|
configs=None,
|
|
**kwargs
|
|
):
|
|
"""计算位态"""
|
|
if configs is None:
|
|
configs = {
|
|
"depth_scale": 1000.0,
|
|
"depth_trunc": 3.0,
|
|
"voxel_size": 0.010,
|
|
"nb_points": 16,
|
|
"radius": 0.03,
|
|
"nb_neighbors": 10,
|
|
"std_ratio": 3.0
|
|
}
|
|
depth_img_mask = np.zeros_like(depth_img)
|
|
depth_img_mask[mask > 0] = depth_img[mask > 0]
|
|
|
|
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
|
|
|
|
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
|
|
depth=depth_o3d,
|
|
intrinsic=intrinsics,
|
|
depth_scale=configs["depth_scale"],
|
|
depth_trunc=configs["depth_trunc"],
|
|
)
|
|
|
|
point_cloud = point_cloud.remove_non_finite_points()
|
|
|
|
down_pcd = point_cloud.voxel_down_sample(voxel_size=configs["voxel_size"])
|
|
clean_pcd, _ = down_pcd.remove_statistical_outlier(
|
|
nb_neighbors=configs["nb_neighbors"],
|
|
std_ratio=configs["std_ratio"]
|
|
)
|
|
clean_pcd, _ = clean_pcd.remove_radius_outlier(
|
|
nb_points=configs["nb_points"],
|
|
radius=configs["radius"]
|
|
)
|
|
labels = np.array(clean_pcd.cluster_dbscan(eps=0.03, min_points=16))
|
|
largest_cluster_idx = np.argmax(np.bincount(labels[labels >= 0]))
|
|
clean_pcd = clean_pcd.select_by_index(np.where(labels == largest_cluster_idx)[0])
|
|
|
|
|
|
if len(clean_pcd.points) == 0:
|
|
print("clean_pcd is empty")
|
|
return 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
|
|
center = clean_pcd.get_center()
|
|
x, y, z = center
|
|
|
|
w, v = pca(np.asarray(clean_pcd.points))
|
|
|
|
if w is None or v is None:
|
|
print("PCA output w or v is None")
|
|
return np.eye(4)
|
|
|
|
vx, vy, vz = v[:, 0], v[:, 1], v[:, 2]
|
|
|
|
if vx[0] < 0:
|
|
vx = -vx
|
|
if vy[1] < 0:
|
|
vy = -vy
|
|
if not np.allclose(np.cross(vx, vy), vz):
|
|
vz = -vz
|
|
|
|
R = np.column_stack((vx, vy, vz))
|
|
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z+0.06*0.25))), R, [1, 1, 1])
|
|
|
|
draw(point_cloud, rmat)
|
|
# draw(down_pcd, rmat)
|
|
draw(clean_pcd, rmat)
|
|
|
|
return rmat
|
|
|
|
|
|
def rmat2quat(rmat):
|
|
"""Convert rotation matrix to quaternion."""
|
|
x, y, z = rmat[0:3, 3:4].flatten()
|
|
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
|
|
quat = [x, y, z, rw, rx, ry, rz]
|
|
return quat
|
|
|
|
|
|
def calculate_grav_width(mask, k, depth):
|
|
"""计算重心宽度"""
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
if not contours:
|
|
return 0.0
|
|
|
|
c = max(contours, key=cv2.contourArea)
|
|
box = cv2.boxPoints(cv2.minAreaRect(c))
|
|
if np.linalg.norm(box[1] - box[0]) < np.linalg.norm(box[1] - box[2]):
|
|
point_diff = box[1] - box[0]
|
|
else:
|
|
point_diff = box[1] - box[2]
|
|
|
|
grab_width = depth * np.sqrt(
|
|
point_diff[0] ** 2 / k[0] ** 2 + point_diff[1] ** 2 / k[4] ** 2
|
|
)
|
|
return grab_width
|
|
|
|
def draw(pcd, mat):
|
|
R = mat[0:3, 0:3]
|
|
point = mat[0:3, 3:4].flatten()
|
|
x, y, z = R[:, 0], R[:, 1], R[:, 2]
|
|
|
|
points = [
|
|
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1],
|
|
point, point + x, point + y, point + z
|
|
|
|
] # 画点:原点、第一主成分、第二主成分
|
|
lines = [
|
|
[0, 1], [0, 2], [0, 3],
|
|
[4, 5], [4, 6], [4, 7]
|
|
] # 画出三点之间两两连线
|
|
colors = [
|
|
[1, 0, 0], [0, 1, 0], [0, 0, 1],
|
|
[1, 0, 0], [0, 1, 0], [0, 0, 1]
|
|
]
|
|
line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(points), lines=o3d.utility.Vector2iVector(lines))
|
|
line_set.colors = o3d.utility.Vector3dVector(colors)
|
|
|
|
o3d.visualization.draw_geometries([pcd, line_set])
|
|
|
|
if __name__ == '__main__':
|
|
main()
|