Merge branch 'feature' into feature_cpp_test

This commit is contained in:
liangyuxuan
2025-12-10 14:50:48 +08:00
11 changed files with 323 additions and 21 deletions

View File

@@ -0,0 +1,28 @@
{
"T": [
[
0.013635791720580838,
0.9992765703636767,
-0.03550212819481636,
-0.08645650535173793
],
[
-0.9998865276218899,
0.013399556063222661,
-0.006883587549256321,
0.036196137264974
],
[
-0.006402895000908794,
0.03559196285001416,
0.999345893630474,
0.014407883180676354
],
[
0.0,
0.0,
0.0,
1.0
]
]
}

39
tools/cap_video.py Normal file
View File

@@ -0,0 +1,39 @@
import cv2
# import numpy as np
import time
cap = cv2.VideoCapture(6)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) # 宽
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # 高
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'MP4V') # 'XVID', 'MJPG', 'MP4V' 等
out = cv2.VideoWriter('video/output_720p.mp4', fourcc, 20.0, (frame_width, frame_height))
i = 0
time.sleep(1)
while True:
# 逐帧捕获
ret, frame = cap.read()
if not ret:
print("读取摄像头画面失败")
break
if i <= 5:
i = i+1
continue
cv2.imshow('camera', frame)
out.write(frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
out.release()
cv2.destroyAllWindows()

31
tools/crop_from_video.py Normal file
View File

@@ -0,0 +1,31 @@
import cv2
import os
video_path = "video/video_5.mp4"
save_dir = "/home/lyx/Images/hivecore_box_datasets"
os.makedirs(save_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("无法打开视频")
exit()
frame_interval = 15
frame_count = 0
saved_count = 448
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
save_path = os.path.join(save_dir, f"frame_{saved_count:06d}.jpg")
cv2.imwrite(save_path, frame)
saved_count += 1
frame_count += 1
cap.release()
print(f"共保存 {saved_count} 张图像到 {save_dir}")

View File

@@ -0,0 +1,16 @@
import os
file_dir = "/home/lyx/Datasets/hivecore_box_datasets/"
with open(os.path.join(file_dir, "train.txt"), "w") as f:
for file in os.listdir(os.path.join(file_dir, 'images/train/')):
if file.endswith(".jpg"):
labels_dir = os.path.join(os.path.join('./images/train/', file))
f.write(labels_dir + "\n")
with open(os.path.join(file_dir, "val.txt"), "w") as f:
for file in os.listdir(os.path.join(file_dir, 'images/val/')):
if file.endswith(".jpg"):
labels_dir = os.path.join(os.path.join('./images/val/', file))
f.write(labels_dir + "\n")

View File

@@ -0,0 +1,37 @@
import os
import shutil
def check_dir(dirname):
if not os.path.exists(dirname):
os.makedirs(dirname)
file_dir = "/home/lyx/Datasets/hivecore_box_datasets/data"
imgs_dir = "/home/lyx/Datasets/hivecore_box_datasets/images"
labels_dir = "/home/lyx/Datasets/hivecore_box_datasets/labels"
check_dir(imgs_dir)
check_dir(os.path.join(imgs_dir, "train"))
check_dir(os.path.join(imgs_dir, "val"))
check_dir(labels_dir)
check_dir(os.path.join(labels_dir, "train"))
check_dir(os.path.join(labels_dir, "val"))
i = 0
for file in os.listdir(file_dir):
if file.endswith(".jpg"):
dot_index = file.rfind(".")
file_name = file[:dot_index]
label = file_name + '.txt'
if i % 10 == 9:
shutil.copy(os.path.join(file_dir, file), os.path.join(imgs_dir, "val",file))
shutil.copy(os.path.join(file_dir, label), os.path.join(labels_dir, "val", label))
else:
shutil.copy(os.path.join(file_dir, file), os.path.join(imgs_dir, "train", file))
shutil.copy(os.path.join(file_dir, label), os.path.join(labels_dir, "train", label))
i = i + 1

56
tools/json2yolo.py Normal file
View File

@@ -0,0 +1,56 @@
import json
import os
label_to_class_id = {
"box": 0, # 从0开始
# 其他类别...
}
def convert_labelme_json_to_yolo(json_file, output_dir):
try:
with open(json_file, 'r') as f:
labelme_data = json.load(f)
img_width = labelme_data["imageWidth"]
img_height = labelme_data["imageHeight"]
file_name = os.path.splitext(os.path.basename(json_file))[0]
txt_path = os.path.join(output_dir, f"{file_name}.txt")
with open(txt_path, 'w') as txt_file:
for shape in labelme_data['shapes']:
label = shape['label']
points = shape['points']
if not points:
continue
class_id = label_to_class_id.get(label)
if class_id is None:
print(f"Warning: 跳过未定义标签 '{label}'")
continue
# 检查多边形是否闭合
if points[0] != points[-1]:
points.append(points[0])
normalized = [(x / img_width, y / img_height) for x, y in points]
line = f"{class_id} " + " ".join(f"{x:.6f} {y:.6f}" for x, y in normalized)
txt_file.write(line + "\n")
except Exception as e:
print(f"处理文件 {json_file} 时出错: {str(e)}")
if __name__ == "__main__":
json_dir = "/home/lyx/Datasets/hivecore_box_datasets/data" # labelme标注存放的目录
output_dir = "/home/lyx/Datasets/hivecore_box_datasets/data" # 输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for json_file in os.listdir(json_dir):
if json_file.endswith(".json"):
json_path = os.path.join(json_dir, json_file)
convert_labelme_json_to_yolo(json_path, output_dir)

Binary file not shown.

View File

@@ -0,0 +1,30 @@
{
"node_name": "bottle_detect_service",
"output_boxes": "True",
"output_masks": "False",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/medical_sense-seg.pt",
"confidence": 0.50,
"classes": [0, 1]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.010
}
}

View File

@@ -0,0 +1,37 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/bottle_detect_service_pca.json')
with open(config_dir, "r") as f:
configs = json.load(f)
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'configs_path': configs_path,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -151,6 +151,7 @@ class DetectNode(InitBase):
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
self.get_logger().info('start')
pose_list = []
# Get Predict Results
@@ -190,18 +191,18 @@ class DetectNode(InitBase):
self.k[5] - y_min
)
rmat = self.calculate_function(
rmat, grab_width = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
calculate_grab_width=True,
**self.calculate_configs
)
if rmat is None:
self.get_logger().warning("Object point cloud have too many noise")
continue
grab_width = calculate_grav_width(mask, self.k, rmat[2, 3])
rmat[2, 3] = rmat[2, 3] + grab_width * 0.30
self.get_logger().info(f"grab_width: {grab_width}")
rmat = self.hand_eye_mat @ rmat
@@ -216,13 +217,13 @@ class DetectNode(InitBase):
"class_id": int(class_ids[i]),
"class_name": labels[class_ids[i]],
"pose": pose,
"grab_width": grab_width * 1.05
"grab_width": grab_width
}
)
time4 = time.time()
self.get_logger().info('start')
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
# self.get_logger().info(f'{(time3 - time2) * 1000} ms, get mask and some param')
self.get_logger().info(f'{(time4 - time3) * 1000} ms, calculate all mask PCA')
@@ -279,7 +280,7 @@ class DetectNode(InitBase):
self.k[5] - y_min
)
rmat = self.calculate_function(
rmat, _ = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
@@ -347,7 +348,7 @@ class DetectNode(InitBase):
self.k[5] - y_min
)
rmat = self.calculate_function(
rmat, _ = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,

View File

@@ -9,7 +9,7 @@ import transforms3d as tfs
from .object_icp import object_icp
__all__ = [
"calculate_pose_pca", "calculate_pose_icp", "calculate_grav_width",
"calculate_pose_pca", "calculate_pose_icp", "calculate_grab_width",
"rmat2quat", "quat2rmat",
]
@@ -37,6 +37,7 @@ def calculate_pose_pca(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""点云主成分分析法计算位态"""
@@ -54,19 +55,40 @@ def calculate_pose_pca(
point_cloud = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.005))
if point_cloud is None:
return None
return None, 0.0
if len(point_cloud.points) == 0:
logging.warning("clean_pcd is empty")
return np.eye(4)
center = point_cloud.get_center()
x, y, z = center
return np.eye(4), 0.0
w, v = pca(np.asarray(point_cloud.points))
if calculate_grab_width:
obb = point_cloud.get_oriented_bounding_box()
x, y, z = obb.center
extent = obb.extent
order = np.argsort(-extent)
if w is None or v is None:
logging.warning("PCA output w or v is None")
return np.eye(4)
grab_width = extent[order][-2]
z = z + grab_width * 0.20
v = obb.R
v = v[:, order]
if v is None:
logging.warning("PCA output v is None")
return np.eye(4), 0.0
grab_width = grab_width * 1.05
else:
center = point_cloud.get_center()
x, y, z = center
w, v = pca(np.asarray(point_cloud.points))
if w is None or v is None:
logging.warning("PCA output w or v is None")
return np.eye(4), 0.0
grab_width = 0.0
vx, vy, vz = v[:,0], v[:,1], v[:,2]
@@ -83,13 +105,14 @@ def calculate_pose_pca(
# draw(point_cloud_u, rmat)
# draw(point_cloud, rmat)
return rmat
return rmat, grab_width
def calculate_pose_icp(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""点云配准法计算位姿"""
@@ -107,11 +130,14 @@ def calculate_pose_icp(
point_cloud = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.010))
if point_cloud is None:
return None
return None, 0.0
if len(point_cloud.points) == 0:
logging.warning("clean_pcd is empty")
return np.eye(4)
return np.eye(4), 0.0
if calculate_grab_width:
pass
rmat = object_icp(
kwargs.get("source"),
@@ -121,7 +147,8 @@ def calculate_pose_icp(
icp_max_iter=kwargs.get("icp_max_iter", [50, 30, 14])
)
return rmat
grab_width = 0.0
return rmat, grab_width
def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: float = 0.005):
@@ -192,7 +219,7 @@ def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: floa
return clean_pcd
def calculate_grav_width(mask, k, depth):
def calculate_grab_width(mask, k, depth):
"""计算重心宽度"""
mask = mask.astype(np.uint8) * 255
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)