24 Commits

Author SHA1 Message Date
NuoDaJia02
f60a86b0fd update @huiyu 2026-01-05 09:40:30 +08:00
liangyuxuan
727b191a2a 删除物体自身深度估计 2025-12-17 10:50:23 +08:00
liangyuxuan
f2f93deb4a 1 2025-12-17 09:46:09 +08:00
liangyuxuan
46ac61a62a 删除物体自身深度估计 2025-12-16 17:24:08 +08:00
liangyuxuan
3b8112deab 1 2025-12-12 15:53:46 +08:00
liangyuxuan
fbe4d80807 1 2025-12-12 15:14:53 +08:00
liangyuxuan
c1b93468fa 1 2025-12-12 09:58:57 +08:00
liangyuxuan
151726eedb add tools 2025-12-11 17:40:31 +08:00
liangyuxuan
2df01a41b1 update checkpoint 2025-12-11 16:08:44 +08:00
liangyuxuan
47e14725bc fix 2025-12-11 10:34:53 +08:00
liangyuxuan
1232fd5ce9 fix 2025-12-11 10:34:33 +08:00
liangyuxuan
bce708ed05 抓取宽度设置为物体y轴方向 2025-12-10 14:42:12 +08:00
liangyuxuan
1702ed2fcb add medical sense file 2025-12-10 14:08:38 +08:00
liangyuxuan
3e78f31a84 优化参数 2025-12-10 09:51:51 +08:00
liangyuxuan
b593cc628d 优化参数 2025-12-10 09:51:38 +08:00
liangyuxuan
f6a9b6c8fb 修改抓取宽度计算方法 2025-12-09 17:51:43 +08:00
liangyuxuan
e27acec63e add tools 2025-12-08 17:23:26 +08:00
liangyuxuan
efcd598565 结构变更 2025-12-05 14:28:10 +08:00
liangyuxuan
927f3c728c 简化配置方式,修正部分错误 2025-12-04 16:19:40 +08:00
liangyuxuan
f433d4428f 1 2025-12-04 11:14:03 +08:00
liangyuxuan
c0a1b4e6e0 滤波参数优化,添加对不同尺寸点云的自适应处理 2025-12-04 11:10:56 +08:00
liangyuxuan
ef093e2813 .py导出为.onnx 2025-12-03 18:15:38 +08:00
liangyuxuan
fe71dab13a .py导出为.onnx 2025-12-03 18:06:55 +08:00
liangyuxuan
3ed1182472 点云滤波参数优化 2025-12-03 17:33:41 +08:00
48 changed files with 1944 additions and 633 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@
**/build/
**/install/
**/log/
imgs*/*

View File

@@ -0,0 +1,28 @@
{
"T": [
[
0.013635791720580838,
0.9992765703636767,
-0.03550212819481636,
-0.08645650535173793
],
[
-0.9998865276218899,
0.013399556063222661,
-0.006883587549256321,
0.036196137264974
],
[
-0.006402895000908794,
0.03559196285001416,
0.999345893630474,
0.014407883180676354
],
[
0.0,
0.0,
0.0,
1.0
]
]
}

View File

@@ -0,0 +1,28 @@
{
"T": [
[
0.01868075138315295,
0.8881359544145228,
0.45920099738999437,
-0.19284748723617093
],
[
0.002854261818715398,
-0.4593266423892484,
0.8882628489252997,
0.057154511710361816
],
[
0.9998214254141742,
-0.01528273756969839,
-0.011115539354645598,
-0.04187423675125192
],
[
0.0,
0.0,
0.0,
1.0
]
]
}

21
tools/add_n_txt.py Normal file
View File

@@ -0,0 +1,21 @@
import os
# ======== 你可以修改的参数 ========
start_id = 1 # 起始编号
end_id = 200 # 结束编号
save_dir = "labels" # 输出目录
prefix = "neg_" # 文件名前缀
zero_padding = 5 # 位数(例如 00001
# ==================================
os.makedirs(save_dir, exist_ok=True)
for i in range(start_id, end_id + 1):
name = f"{prefix}{str(i).zfill(zero_padding)}.txt"
path = os.path.join(save_dir, name)
# 创建空文件
with open(path, "w") as f:
pass
print("created:", path)

39
tools/cap_video.py Normal file
View File

@@ -0,0 +1,39 @@
import cv2
# import numpy as np
import time
cap = cv2.VideoCapture(6)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) # 宽
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # 高
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'MP4V') # 'XVID', 'MJPG', 'MP4V' 等
out = cv2.VideoWriter('video/output_720p.mp4', fourcc, 20.0, (frame_width, frame_height))
i = 0
time.sleep(1)
while True:
# 逐帧捕获
ret, frame = cap.read()
if not ret:
print("读取摄像头画面失败")
break
if i <= 5:
i = i+1
continue
cv2.imshow('camera', frame)
out.write(frame)
if cv2.waitKey(5) & 0xFF == ord('q'):
break
cap.release()
out.release()
cv2.destroyAllWindows()

31
tools/crop_from_video.py Normal file
View File

@@ -0,0 +1,31 @@
import cv2
import os
video_path = "video/video_5.mp4"
save_dir = "/home/lyx/Images/hivecore_box_datasets"
os.makedirs(save_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("无法打开视频")
exit()
frame_interval = 15
frame_count = 0
saved_count = 448
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
save_path = os.path.join(save_dir, f"frame_{saved_count:06d}.jpg")
cv2.imwrite(save_path, frame)
saved_count += 1
frame_count += 1
cap.release()
print(f"共保存 {saved_count} 张图像到 {save_dir}")

View File

@@ -0,0 +1,16 @@
import os
file_dir = "/home/lyx/Datasets/hivecore_box_datasets/"
with open(os.path.join(file_dir, "train.txt"), "w") as f:
for file in os.listdir(os.path.join(file_dir, 'images/train/')):
if file.endswith(".jpg"):
labels_dir = os.path.join(os.path.join('./images/train/', file))
f.write(labels_dir + "\n")
with open(os.path.join(file_dir, "val.txt"), "w") as f:
for file in os.listdir(os.path.join(file_dir, 'images/val/')):
if file.endswith(".jpg"):
labels_dir = os.path.join(os.path.join('./images/val/', file))
f.write(labels_dir + "\n")

View File

@@ -0,0 +1,37 @@
import os
import shutil
def check_dir(dirname):
if not os.path.exists(dirname):
os.makedirs(dirname)
file_dir = "/home/lyx/Datasets/hivecore_box_datasets/data"
imgs_dir = "/home/lyx/Datasets/hivecore_box_datasets/images"
labels_dir = "/home/lyx/Datasets/hivecore_box_datasets/labels"
check_dir(imgs_dir)
check_dir(os.path.join(imgs_dir, "train"))
check_dir(os.path.join(imgs_dir, "val"))
check_dir(labels_dir)
check_dir(os.path.join(labels_dir, "train"))
check_dir(os.path.join(labels_dir, "val"))
i = 0
for file in os.listdir(file_dir):
if file.endswith(".jpg"):
dot_index = file.rfind(".")
file_name = file[:dot_index]
label = file_name + '.txt'
if i % 10 == 9:
shutil.copy(os.path.join(file_dir, file), os.path.join(imgs_dir, "val",file))
shutil.copy(os.path.join(file_dir, label), os.path.join(labels_dir, "val", label))
else:
shutil.copy(os.path.join(file_dir, file), os.path.join(imgs_dir, "train", file))
shutil.copy(os.path.join(file_dir, label), os.path.join(labels_dir, "train", label))
i = i + 1

56
tools/json2yolo.py Normal file
View File

@@ -0,0 +1,56 @@
import json
import os
label_to_class_id = {
"box": 0, # 从0开始
# 其他类别...
}
def convert_labelme_json_to_yolo(json_file, output_dir):
try:
with open(json_file, 'r') as f:
labelme_data = json.load(f)
img_width = labelme_data["imageWidth"]
img_height = labelme_data["imageHeight"]
file_name = os.path.splitext(os.path.basename(json_file))[0]
txt_path = os.path.join(output_dir, f"{file_name}.txt")
with open(txt_path, 'w') as txt_file:
for shape in labelme_data['shapes']:
label = shape['label']
points = shape['points']
if not points:
continue
class_id = label_to_class_id.get(label)
if class_id is None:
print(f"Warning: 跳过未定义标签 '{label}'")
continue
# 检查多边形是否闭合
if points[0] != points[-1]:
points.append(points[0])
normalized = [(x / img_width, y / img_height) for x, y in points]
line = f"{class_id} " + " ".join(f"{x:.6f} {y:.6f}" for x, y in normalized)
txt_file.write(line + "\n")
except Exception as e:
print(f"处理文件 {json_file} 时出错: {str(e)}")
if __name__ == "__main__":
json_dir = "/home/lyx/Datasets/hivecore_box_datasets/data" # labelme标注存放的目录
output_dir = "/home/lyx/Datasets/hivecore_box_datasets/data" # 输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for json_file in os.listdir(json_dir):
if json_file.endswith(".json"):
json_path = os.path.join(json_dir, json_file)
convert_labelme_json_to_yolo(json_path, output_dir)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,22 +1,22 @@
{
"T": [
[
1.0,
0.0,
0.0,
0.0
0.013635791720580838,
0.9992765703636767,
-0.03550212819481636,
-0.08645650535173793
],
[
0.0,
1.0,
0.0,
0.0
-0.9998865276218899,
0.013399556063222661,
-0.006883587549256321,
0.036196137264974
],
[
0.0,
0.0,
1.0,
0.0
-0.006402895000908794,
0.03559196285001416,
0.999345893630474,
0.014407883180676354
],
[
0.0,

View File

@@ -1,22 +1,22 @@
{
"T": [
[
1.0,
0.0,
0.0,
0.0
0.01868075138315295,
0.8881359544145228,
0.45920099738999437,
-0.19284748723617093
],
[
0.0,
1.0,
0.0,
0.0
0.002854261818715398,
-0.4593266423892484,
0.8882628489252997,
0.057154511710361816
],
[
0.0,
0.0,
1.0,
0.0
0.9998214254141742,
-0.01528273756969839,
-0.011115539354645598,
-0.04187423675125192
],
[
0.0,
@@ -25,4 +25,4 @@
1.0
]
]
}
}

View File

@@ -2,28 +2,31 @@
"node_name": "bottle_detect_service",
"output_boxes": "True",
"output_masks": "False",
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.50,
"classes": [39]
},
"calculate_mode": "PCA",
"ICP_configs": {
"complete_model_path": "pointclouds/bottle_model.pcd",
"depth_scale": 1000.0,
"depth_trunc": 2.0,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0,
"voxel_size": 0.010,
"ransac_voxel_size": 0.005,
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]

View File

@@ -2,27 +2,29 @@
"node_name": "bottle_detect_service",
"output_boxes": "True",
"output_masks": "False",
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.50,
"classes": [39]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
"voxel_size": 0.010
}
}

View File

@@ -2,9 +2,13 @@
"node_name": "crossboard_topic",
"output_boxes": "True",
"output_masks": "False",
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Topic",
"Topic_configs": {
"position": "right",
@@ -12,10 +16,12 @@
"depth_image_topic_name": "/camera/depth/image_raw",
"camera_info_topic_name": "/camera/color/camera_info"
},
"detect_mode": "Crossboard",
"Crossboard_configs": {
"pattern_size": [8, 5]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,

View File

@@ -2,9 +2,13 @@
"node_name": "default_config_detect_service",
"output_boxes": "True",
"output_masks": "False",
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
@@ -15,6 +19,7 @@
"depth_image_topic_name": "/camera/depth/image_raw",
"camera_info_topic_name": "/camera/color/camera_info"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
@@ -28,24 +33,18 @@
"Crossboard_configs": {
"pattern_size": [8, 5]
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
"voxel_size": 0.010
},
"ICP_configs": {
"complete_model_path": "pointclouds/bottle_model.pcd",
"depth_scale": 1000.0,
"depth_trunc": 2.0,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0,
"voxel_size": 0.010,
"ransac_voxel_size": 0.005,
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]

View File

@@ -2,27 +2,29 @@
"node_name": "detect_service",
"output_boxes": "True",
"output_masks": "False",
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.50,
"classes": []
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.020,
"nb_points": 10,
"radius": 0.1,
"nb_neighbors": 20,
"std_ratio": 3.0
"voxel_size": 0.010
}
}

View File

@@ -0,0 +1,30 @@
{
"node_name": "bottle_detect_service",
"output_boxes": "True",
"output_masks": "False",
"calibration": {
"left_hand": "configs/hand_eye_mat/eye_in_left_hand.json",
"right_hand": "configs/hand_eye_mat/eye_in_right_hand.json",
"head": "configs/hand_eye_mat/eye_to_hand.json"
},
"get_camera_mode": "Service",
"Service_configs": {
"service_name": "/vision_object_recognition"
},
"detect_mode": "Detect",
"Detect_configs": {
"checkpoint_path": "checkpoints/medical_sense-seg.pt",
"confidence": 0.70,
"classes": []
},
"calculate_mode": "PCA",
"PCA_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.005
}
}

View File

@@ -15,35 +15,11 @@ with open(config_dir, "r") as f:
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
DeclareLaunchArgument('head', default_value=configs['eye_to_handhead_mat_path']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("ICP_configs", default_value=json.dumps(configs['ICP_configs'])),
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
left_hand = LaunchConfiguration('left_hand').perform(context)
right_hand = LaunchConfiguration('right_hand').perform(context)
head = LaunchConfiguration('head').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
ICP_configs = LaunchConfiguration('ICP_configs').perform(context)
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
@@ -51,19 +27,7 @@ def generate_launch_description():
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'left_hand': left_hand,
'right_hand': right_hand,
'head': head,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Service_configs': Service_configs,
'Detect_configs': Detect_configs,
'ICP_configs': ICP_configs,
'configs_path': configs_path,
}]
)
]

View File

@@ -15,35 +15,11 @@ with open(config_dir, "r") as f:
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
DeclareLaunchArgument('head', default_value=configs['head']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
left_hand = LaunchConfiguration('left_hand').perform(context)
right_hand = LaunchConfiguration('right_hand').perform(context)
head = LaunchConfiguration('head').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
@@ -51,19 +27,7 @@ def generate_launch_description():
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'left_hand': left_hand,
'right_hand': right_hand,
'head': head,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Service_configs': Service_configs,
'Detect_configs': Detect_configs,
'PCA_configs': PCA_configs,
'configs_path': configs_path,
}]
)
]

View File

@@ -15,36 +15,11 @@ with open(config_dir, "r") as f:
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
DeclareLaunchArgument('head', default_value=configs['head']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Topic_configs", default_value=json.dumps(configs['Topic_configs'])),
DeclareLaunchArgument("Crossboard_configs", default_value=json.dumps(configs['Crossboard_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
left_hand = LaunchConfiguration('left_hand').perform(context)
right_hand = LaunchConfiguration('right_hand').perform(context)
head = LaunchConfiguration('head').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Topic_configs = LaunchConfiguration('Topic_configs').perform(context)
Crossboard_configs = LaunchConfiguration('Crossboard_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
@@ -52,19 +27,7 @@ def generate_launch_description():
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'left_hand': left_hand,
'right_hand': right_hand,
'head': head,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Topic_configs': Topic_configs,
'Crossboard_configs': Crossboard_configs,
'PCA_configs': PCA_configs,
'configs_path': configs_path,
}]
)
]

View File

@@ -15,35 +15,11 @@ with open(config_dir, "r") as f:
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
DeclareLaunchArgument('head', default_value=configs['head']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
left_hand = LaunchConfiguration('left_hand').perform(context)
right_hand = LaunchConfiguration('right_hand').perform(context)
head = LaunchConfiguration('head').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
@@ -51,19 +27,7 @@ def generate_launch_description():
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'left_hand': left_hand,
'right_hand': right_hand,
'head': head,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
'Service_configs': Service_configs,
'Detect_configs': Detect_configs,
'PCA_configs': PCA_configs,
'configs_path': configs_path,
}]
)
]

View File

@@ -13,74 +13,14 @@ config_dir = os.path.join(share_dir, 'configs/launch_configs/default_config.json
with open(config_dir, "r") as f:
configs = json.load(f)
# def get_param_configs():
# param_configs = {}
# if configs["get_camera_mode"] == "Service":
# param_configs.update(configs["Service_configs"])
# elif configs["get_camera_mode"] == "Topic":
# param_configs.update(configs["Topic_configs"])
# else:
# param_configs.update(configs["Service_configs"])
#
# if configs["detect_mode"] == "Detect":
# param_configs.update(configs["Detect_configs"])
# elif configs["detect_mode"] == "Color":
# param_configs.update(configs["Color_configs"])
# elif configs["detect_mode"] == "Crossboard":
# param_configs.update(configs["Crossboard_configs"])
# else:
# param_configs.update(configs["Detect_configs"])
#
# if configs["calculate_mode"] == "PCA":
# param_configs.update(configs["PCA_configs"])
# elif configs["calculate_mode"] == "ICP":
# param_configs.update(configs["ICP_configs"])
# else:
# param_configs.update(configs["PCA_configs"])
# return param_configs
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('output_boxes', default_value=configs['output_boxes']),
DeclareLaunchArgument('output_masks', default_value=configs['output_masks']),
DeclareLaunchArgument('left_hand', default_value=configs['left_hand']),
DeclareLaunchArgument('right_hand', default_value=configs['right_hand']),
DeclareLaunchArgument('head', default_value=configs['head']),
DeclareLaunchArgument('get_camera_mode', default_value=configs['get_camera_mode']),
DeclareLaunchArgument('detect_mode', default_value=configs['detect_mode']),
DeclareLaunchArgument('calculate_mode', default_value=configs['calculate_mode']),
# DeclareLaunchArgument("param_configs", default_value=json.dumps(get_param_configs())),
DeclareLaunchArgument("Service_configs", default_value=json.dumps(configs['Service_configs'])),
DeclareLaunchArgument("Topic_configs", default_value=json.dumps(configs['Topic_configs'])),
DeclareLaunchArgument("Detect_configs", default_value=json.dumps(configs['Detect_configs'])),
DeclareLaunchArgument("Color_configs", default_value=json.dumps(configs['Color_configs'])),
DeclareLaunchArgument("Crossboard_configs", default_value=json.dumps(configs['Crossboard_configs'])),
DeclareLaunchArgument("PCA_configs", default_value=json.dumps(configs['PCA_configs'])),
DeclareLaunchArgument("ICP_configs", default_value=json.dumps(configs['ICP_configs'])),
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
output_boxes = LaunchConfiguration('output_boxes').perform(context)
output_masks = LaunchConfiguration('output_masks').perform(context)
left_hand = LaunchConfiguration('left_hand').perform(context)
right_hand = LaunchConfiguration('right_hand').perform(context)
head = LaunchConfiguration('head').perform(context)
get_camera_mode = LaunchConfiguration('get_camera_mode').perform(context)
detect_mode = LaunchConfiguration('detect_mode').perform(context)
calculate_mode = LaunchConfiguration('calculate_mode').perform(context)
# param_configs = LaunchConfiguration('param_configs').perform(context)
Service_configs = LaunchConfiguration('Service_configs').perform(context)
Topic_configs = LaunchConfiguration('Topic_configs').perform(context)
Detect_configs = LaunchConfiguration('Detect_configs').perform(context)
Color_configs = LaunchConfiguration('Color_configs').perform(context)
Crossboard_configs = LaunchConfiguration('Crossboard_configs').perform(context)
PCA_configs = LaunchConfiguration('PCA_configs').perform(context)
ICP_configs = LaunchConfiguration('ICP_configs').perform(context)
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
@@ -88,24 +28,7 @@ def generate_launch_description():
executable='detect_node',
name=configs['node_name'],
parameters=[{
'output_boxes': output_boxes.lower() == 'true',
'output_masks': output_masks.lower() == 'true',
'left_hand': left_hand,
'right_hand': right_hand,
'head': head,
'get_camera_mode': get_camera_mode,
'detect_mode': detect_mode,
'calculate_mode': calculate_mode,
# 'param_configs': param_configs,
'Service_configs': Service_configs,
'Topic_configs': Topic_configs,
'Detect_configs': Detect_configs,
'Color_configs': Color_configs,
'Crossboard_configs': Crossboard_configs,
'PCA_configs': PCA_configs,
'ICP_configs': ICP_configs,
'configs_path': configs_path,
}]
)
]

View File

@@ -0,0 +1,37 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
import os
import json
from ament_index_python.packages import get_package_share_directory
share_dir = get_package_share_directory('vision_detect')
config_dir = os.path.join(share_dir, 'configs/launch_configs/medical_sense.json')
with open(config_dir, "r") as f:
configs = json.load(f)
def generate_launch_description():
args_detect = [
DeclareLaunchArgument('configs_path', default_value=config_dir),
]
def create_detect_node(context):
configs_path = LaunchConfiguration('configs_path').perform(context)
return [
Node(
package='vision_detect',
executable='detect_node',
name=configs['node_name'],
parameters=[{
'configs_path': configs_path,
}]
)
]
return LaunchDescription(args_detect + [
OpaqueFunction(function=create_detect_node),
])

View File

@@ -31,22 +31,22 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'detect_service_node = vision_detect.detect_service:main',
'detect_topic_node = vision_detect.detect_topic:main',
# 'detect_service_node = vision_detect.detect_service:main',
# 'detect_topic_node = vision_detect.detect_topic:main',
'box_detect_service_node = vision_detect.detect_box_service:main',
'red_detect_topic_node = vision_detect.detect_red_topic:main',
'red_detect_service_node = vision_detect.detect_red_service:main',
# 'box_detect_service_node = vision_detect.detect_box_service:main',
# 'red_detect_topic_node = vision_detect.detect_red_topic:main',
# 'red_detect_service_node = vision_detect.detect_red_service:main',
'flexiv_detect_topic_node = vision_detect.flexivaidk_detect_topic:main',
'flexiv_detect_service_node = vision_detect.flexivaidk_detect_service:main',
'sub_pose_node = vision_detect.sub_pose:main',
'calibration_node = vision_detect.hand_eye_calibration:main',
'crossboard_detect_node = vision_detect.crossboard_detect:main',
# 'calibration_node = vision_detect.hand_eye_calibration:main',
# 'crossboard_detect_node = vision_detect.crossboard_detect:main',
'service_client_node = vision_detect.service_client:main',
'get_camera_pose_node = vision_detect.get_camera_pose:main',
'calculate_node = vision_detect.calculate:main',
# 'calculate_node = vision_detect.calculate:main',
'detect_node = vision_detect.detect_node:main',
],

View File

@@ -0,0 +1,154 @@
import os
import json
from ament_index_python.packages import get_package_share_directory
from rclpy.node import Node
import numpy as np
import open3d as o3d
from ..utils import read_calibration_mat
share_dir = get_package_share_directory('vision_detect')
__all__ = [
"ConfigBase"
]
class ConfigBase(Node):
def __init__(self, name):
super().__init__(name)
"""init parameter"""
self.confidence = None
self.depth_image_topic_name = None
self.color_image_topic_name = None
self.camera_info_topic_name = None
self.service_name = None
self.pattern_size = None
self.device = None
self.configs = None
self.calculate_configs = None
self.checkpoint_path = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.source = None
self.server = None
self.model = None
self.k = self.d = None
self.eye_in_left_hand_mat = None
self.eye_in_right_hand_mat = None
self.eye_to_hand_mat = None
self.hand_eye_mat = None
self.get_camera_mode = None
self.detect_mode = None
self.calculate_mode = None
self.camera_size = None
self.position = None
self.map1 = self.map2 = None
self.calculate_function = None
self.fx = self.fy = 0.5
self.camera_data = {}
self.distance = 1500
self.color_range = [
[[0, 120, 70], [10, 255, 255]],
[[170, 120, 70], [180, 255, 255]]
]
self._get_param()
def _get_param(self):
"""init parameter"""
self.declare_parameter(
'configs_path',
os.path.join(share_dir, "configs/launch_configs/default_config.json")
)
configs_path = self.get_parameter('configs_path').value
with open(configs_path, 'r') as f:
self.configs = json.load(f)
self.output_boxes = self.configs['output_boxes'].lower() == "true"
self.output_masks = self.configs['output_masks'].lower() == "true"
self.get_camera_mode = self.configs['get_camera_mode']
self.detect_mode = self.configs['detect_mode']
self.calculate_mode = self.configs['calculate_mode']
self._read_calibration_mat()
if self.get_camera_mode == "Service":
self.service_name = self.configs["Service_configs"]["service_name"]
elif self.get_camera_mode == "Topic":
topic_configs = self.configs['Topic_configs']
self.color_image_topic_name = topic_configs["color_image_topic_name"]
self.depth_image_topic_name = topic_configs["depth_image_topic_name"]
self.camera_info_topic_name = topic_configs["camera_info_topic_name"]
self.position = topic_configs["position"]
else:
self.service_name = self.configs["Service"]["service_name"]
if self.detect_mode == 'Detect':
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = os.path.join(share_dir, detect_configs["checkpoint_path"])
elif self.detect_mode == 'Color':
self.color_range = self.configs["Color_configs"]["color_range"]
self.distance = self.configs["Color_configs"]["distance"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in
self.color_range]
elif self.detect_mode == 'Crossboard':
self.pattern_size = self.configs["Crossboard_configs"]["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = detect_configs["checkpoint_path"]
if self.calculate_mode == "PCA":
self.calculate_configs = self.configs['PCA_configs']
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
self.calculate_configs = self.configs['ICA_configs']
source = o3d.io.read_point_cloud(
os.path.join(share_dir, self.calculate_configs['complete_model_path'])
)
self.calculate_configs["source"] = source
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.calculate_configs = self.configs['PCA_configs']
self.get_logger().info("Get parameters done")
def _read_calibration_mat(self):
eye_in_left_hand_path = os.path.join(share_dir, self.configs["calibration"]["left_hand"])
eye_in_right_hand_path = os.path.join(share_dir, self.configs["calibration"]["right_hand"])
eye_to_hand_path = os.path.join(share_dir, self.configs["calibration"]["head"])
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
if not sign:
self.get_logger().warning(f"left_mat: {info}")
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
if not sign:
self.get_logger().warning(f"right_mat: {info}")
self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
if not sign:
self.get_logger().warning(f"head_mat: {info}")
self.get_logger().info("Read calibration mat file done")

View File

@@ -1,285 +1,29 @@
"""Vision Detect Node"""
import os
import time
import threading
import json
from ament_index_python.packages import get_package_share_directory
import time
import cv2
import torch
import numpy as np
import open3d as o3d
from ultralytics import YOLO
import rclpy
from rclpy.node import Node
from rclpy.task import Future
from cv_bridge import CvBridge
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from sensor_msgs.msg import CameraInfo
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID, ImgMsg
from interfaces.srv import VisionObjectRecognition
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
from ..utils import *
from .init_node import InitBase
share_dir = get_package_share_directory('vision_detect')
class DetectNode(Node):
class DetectNode(InitBase):
"""Detect Node"""
def __init__(self, name):
super().__init__(name)
self.device = None
self.checkpoint_path = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.source = None
self.server = None
self.model = None
self.k = self.d = None
self.eye_in_left_hand_mat = None
self.eye_in_right_hand_mat = None
self.eye_to_hand_mat = None
self.hand_eye_mat = None
self.eye_in_left_hand_path = None
self.eye_in_right_hand_path = None
self.eye_to_hand_path = None
self.get_camera_mode = None
self.detect_mode = None
self.calculate_mode = None
self.camera_size = None
self.map1 = self.map2 = None
self.calculate_function = calculate_pose_pca
self.fx = self.fy = 0.5
self.camera_data = {}
self.future = Future()
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
self.distance = 1500
self.color_range = [
[[0, 120, 70], [10, 255, 255]],
[[170, 120, 70], [180, 255, 255]]
]
'''init'''
self._init_param()
self._init_publisher()
self._init_subscriber()
self.get_logger().info("Initialize done")
def _init_param(self):
"""init parameter"""
self.declare_parameter('output_boxes', True)
self.declare_parameter('output_masks', False)
self.output_boxes = self.get_parameter('output_boxes').value
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('get_camera_mode', 'service')
self.declare_parameter('detect_mode', 'detect')
self.declare_parameter('calculate_mode', 'pca')
self.get_camera_mode = self.get_parameter('get_camera_mode').value
self.detect_mode = self.get_parameter('detect_mode').value
self.calculate_mode = self.get_parameter("calculate_mode").value
self.declare_parameter('left_hand', 'configs/hand_eye_mat/hand_in_left_hand.json')
self.declare_parameter('right_hand', 'configs/hand_eye_mat/hand_in_right_hand.json')
self.declare_parameter('head', 'configs/hand_eye_mat/hand_to_hand.json')
self.declare_parameter('Service_configs', '{}')
self.declare_parameter('Topic_configs', '{}')
self.declare_parameter('Detect_configs', '{}')
self.declare_parameter('Color_configs', '{}')
self.declare_parameter('Crossboard_configs', '{}')
self.declare_parameter('PCA_configs', '{}')
self.declare_parameter('ICA_configs', '{}')
self._init_json_file()
if self.get_camera_mode == "Service":
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
self._init_service()
elif self.get_camera_mode == "Topic":
self.declare_parameter('Topic_configs', '{}')
self.color_image_topic_name = json.loads(self.get_parameter('Topic_configs').value)["color_image_topic_name"]
self.depth_image_topic_name = json.loads(self.get_parameter('Topic_configs').value)["depth_image_topic_name"]
self.camera_info_topic_name = json.loads(self.get_parameter('Topic_configs').value)["camera_info_topic_name"]
self.position = json.loads(self.get_parameter('Topic_configs').value)["position"]
if self.position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif self.position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self.service_name = json.loads(self.get_parameter('Service_configs').value)["service_name"]
if self.detect_mode == 'Detect':
self.function = self._seg_image
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = os.path.join(share_dir, json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"])
self._init_model()
elif self.detect_mode == 'Color':
self.function = self._seg_color
self.color_range = json.loads(self.get_parameter('Color_configs').value)["color_range"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in self.color_range]
elif self.detect_mode == 'Crossboard':
self.function = self._seg_crossboard
self.pattern_size = json.loads(self.get_parameter('Crossboard_configs').value)["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
self.function = self._seg_image
self.confidence = json.loads(self.get_parameter('Detect_configs').value)["confidence"]
self.classes = json.loads(self.get_parameter('Detect_configs').value)["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = json.loads(self.get_parameter('Detect_configs').value)["checkpoint_path"]
self._init_model()
if self.calculate_mode == "PCA":
self.configs = json.loads(self.get_parameter('PCA_configs').value)
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP":
self.configs = json.loads(self.get_parameter('ICA_configs').value)
source = o3d.io.read_point_cloud(
os.path.join(share_dir, self.configs['complete_model_path'])
)
self.configs["source"] = source
self.calculate_function = calculate_pose_icp
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.configs = json.loads(self.get_parameter('PCA_configs').value)
self.calculate_function = calculate_pose_pca
self.get_logger().info("Initialize parameters done")
def _init_json_file(self):
self.eye_in_left_hand_path = os.path.join(
share_dir, self.get_parameter('left_hand').value
)
self.eye_in_right_hand_path = os.path.join(
share_dir, self.get_parameter('right_hand').value
)
self.eye_to_hand_path = os.path.join(
share_dir, self.get_parameter('head').value
)
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(self.eye_in_left_hand_path)
if not sign:
self.get_logger().warning(f"left_mat: {info}")
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(self.eye_in_right_hand_path)
if not sign:
self.get_logger().warning(f"right_mat: {info}")
self.eye_to_hand_mat, info, sign = read_calibration_mat(self.eye_to_hand_path)
if not sign:
self.get_logger().warning(f"head_mat: {info}")
self.get_logger().info("Initialize read json file done")
def _init_model(self):
"""init model"""
if self.checkpoint_path.endswith('-seg.pt'):
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
self.function = self._seg_image
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
self.model = YOLO(self.checkpoint_path)
self.function = self._seg_image
else:
self.function = None
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
self.get_logger().info("Initialize model done")
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
if self.get_camera_mode == "Topic":
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose', 10)
self.get_logger().info("Initialize publisher done")
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
self.service_name,
self._service_callback
)
self.get_logger().info("Initialize service done")
def _init_subscriber(self):
"""init subscriber"""
if self.get_camera_mode == "Service":
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
elif self.get_camera_mode == "Topic":
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic_name,
self._camera_info_callback,
10
)
self.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(self, self.future)
self.get_logger().info("Camera info received.")
# sync get color and depth img
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
else:
self.get_logger().warning("get_camera_mode is wrong, use service")
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
self.get_logger().info("Initialize subscriber done")
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""
@@ -407,6 +151,7 @@ class DetectNode(Node):
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
self.get_logger().info('start')
pose_list = []
# Get Predict Results
@@ -415,6 +160,10 @@ class DetectNode(Node):
time2 = time.time()
result = results[0]
home = os.path.expanduser("/home/demo/hivecore_robot_os1/hivecore_robot_vision/imgs")
save_path = os.path.join(home, f"origin_image_{int(time1*1000)}.png")
cv2.imwrite(save_path, rgb_img)
# Get masks
if result.masks is None or len(result.masks) == 0:
self.get_logger().info(f"Detect object num: 0")
@@ -425,6 +174,12 @@ class DetectNode(Node):
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
x_centers, y_centers = boxes[:, 0], boxes[:, 1]
sorted_index = np.lexsort((-y_centers, x_centers))
masks = masks[sorted_index]
boxes = boxes[sorted_index]
class_ids = class_ids[sorted_index]
time3 = time.time()
@@ -446,18 +201,21 @@ class DetectNode(Node):
self.k[5] - y_min
)
rmat = self.calculate_function(
rmat, grab_width = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
**self.configs
calculate_grab_width=False,
**self.calculate_configs
)
if rmat is None:
self.get_logger().warning("Object point cloud have too many noise")
continue
grab_width = calculate_grav_width(mask, self.k, rmat[2, 3])
rmat[2, 3] = rmat[2, 3] + grab_width * 0.38
if np.allclose(rmat, np.eye(4)):
continue
self.get_logger().info(f"grab_width: {grab_width}")
rmat = self.hand_eye_mat @ rmat
@@ -472,13 +230,13 @@ class DetectNode(Node):
"class_id": int(class_ids[i]),
"class_name": labels[class_ids[i]],
"pose": pose,
"grab_width": grab_width * 1.05
"grab_width": grab_width
}
)
time4 = time.time()
self.get_logger().info('start')
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
# self.get_logger().info(f'{(time3 - time2) * 1000} ms, get mask and some param')
self.get_logger().info(f'{(time4 - time3) * 1000} ms, calculate all mask PCA')
@@ -487,9 +245,10 @@ class DetectNode(Node):
# mask_img and box_img is or not output
if self.output_boxes and not self.output_masks:
home = os.path.expanduser("~")
save_path = os.path.join(home, "detect_image.png")
home = os.path.expanduser("/home/demo/hivecore_robot_os1/hivecore_robot_vision/imgs")
save_path = os.path.join(home, f"detect_image_{int(time1*1000)}.png")
draw_box(rgb_img, result, save_path=save_path)
# draw_box(rgb_img, result, save_path=False)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list
elif self.output_boxes and self.output_masks:
draw_box(rgb_img, result)
@@ -534,12 +293,15 @@ class DetectNode(Node):
self.k[5] - y_min
)
rmat = self.calculate_function(
rmat, _ = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
**self.configs
**self.calculate_configs
)
if rmat is None:
self.get_logger().warning("Color Area point cloud have too many noise")
return None, None
rmat = self.hand_eye_mat @ rmat
@@ -554,7 +316,7 @@ class DetectNode(Node):
"class_id": int(98),
"class_name": "red",
"pose": pose,
"grab_width": 0.0
"grab_width": [0.0]
}
)
@@ -599,13 +361,17 @@ class DetectNode(Node):
self.k[5] - y_min
)
rmat = self.calculate_function(
rmat, _ = self.calculate_function(
mask_crop,
depth_crop,
intrinsics,
**self.configs
**self.calculate_configs
)
if rmat is None:
self.get_logger().warning("Corssboard point cloud have too many noise")
return None, None
x, y, z, rw, rx, ry, rz = rmat2quat(rmat)
self.get_logger().info(f"{x}, {y}, {z}, {rw}, {rx}, {ry}, {rz}")
@@ -619,7 +385,7 @@ class DetectNode(Node):
"class_id": int(99),
"class_name": 'crossboard',
"pose": pose,
"grab_width": 0.0
"grab_width": [0.0]
}
)

View File

@@ -0,0 +1,170 @@
from ament_index_python.packages import get_package_share_directory
import torch
import numpy as np
from ultralytics import YOLO
import rclpy
from rclpy.task import Future
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from interfaces.msg import PoseArrayClassAndID, ImgMsg
from interfaces.srv import VisionObjectRecognition
from ..utils import calculate_pose_pca, calculate_pose_icp
from .config_node import ConfigBase
share_dir = get_package_share_directory('vision_detect')
class InitBase(ConfigBase):
def __init__(self, name):
super().__init__(name)
self.future = Future()
if self.get_camera_mode == "Service":
self._init_service()
elif self.get_camera_mode == "Topic":
if self.position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif self.position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self._init_service()
if self.detect_mode == 'Detect':
self.function = self._seg_image
if not self.classes:
self.classes = None
self._init_model()
elif self.detect_mode == 'Color':
self.function = self._seg_color
elif self.detect_mode == 'Crossboard':
self.function = self._seg_crossboard
else:
self.function = self._seg_image
if not self.classes:
self.classes = None
self._init_model()
if self.calculate_mode == "PCA":
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
self.calculate_function = calculate_pose_icp
else:
self.calculate_function = calculate_pose_pca
self._init_publisher()
self._init_subscriber()
self.get_logger().info("Initialize done")
def _init_model(self):
"""init model"""
if self.checkpoint_path.endswith('-seg.pt'):
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
self.model = YOLO(self.checkpoint_path)
else:
self.function = None
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
self.get_logger().info("Initialize model done")
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
if self.get_camera_mode == "Topic":
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose',
10)
self.get_logger().info("Initialize publisher done")
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
self.service_name,
self._service_callback
)
self.get_logger().info("Initialize service done")
def _init_subscriber(self):
"""init subscriber"""
if self.get_camera_mode == "Service":
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
elif self.get_camera_mode == "Topic":
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic_name,
self._camera_info_callback,
10
)
self.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(self, self.future)
self.get_logger().info("Camera info received.")
# sync get color and depth img
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
else:
self.get_logger().warning("get_camera_mode is wrong, use service")
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
self.get_logger().info("Initialize subscriber done")
def _camera_info_callback(self, msg: CameraInfo):
pass
def _service_sub_callback(self, msgs):
pass
def _sync_sub_callback(self, color_img_ros, depth_img_ros):
pass
def _service_callback(self, request, response):
pass
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pass
def _seg_color(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pass
def _seg_crossboard(self, rgb_img, depth_img):
pass

View File

@@ -9,7 +9,7 @@ import transforms3d as tfs
from .object_icp import object_icp
__all__ = [
"calculate_pose_pca", "calculate_pose_icp", "calculate_grav_width",
"calculate_pose_pca", "calculate_pose_icp",
"rmat2quat", "quat2rmat",
]
@@ -37,6 +37,7 @@ def calculate_pose_pca(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""点云主成分分析法计算位态"""
@@ -52,21 +53,45 @@ def calculate_pose_pca(
depth_trunc=kwargs.get("depth_trunc", 3.0),
)
point_cloud = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.010))
point_cloud = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.005))
if point_cloud is None:
return None
return None, [0.0, 0.0, 0.0]
if len(point_cloud.points) == 0:
logging.warning("clean_pcd is empty")
return np.eye(4)
center = point_cloud.get_center()
x, y, z = center
return np.eye(4), [0.0, 0.0, 0.0]
w, v = pca(np.asarray(point_cloud.points))
if calculate_grab_width:
if np.asarray(point_cloud.points).shape[0] < 4:
logging.warning("点数不足,不能算 OBB")
return np.eye(4), [0.0, 0.0, 0.0]
obb = point_cloud.get_oriented_bounding_box()
x, y, z = obb.center
extent = obb.extent
order = np.argsort(-extent)
if w is None or v is None:
logging.warning("PCA output w or v is None")
return np.eye(4)
grab_width = extent[order]
# z = z + grab_width * 0.20
v = obb.R
v = v[:, order]
if v is None:
logging.warning("PCA output v is None")
return np.eye(4), [0.0, 0.0, 0.0]
grab_width = grab_width * 1.05
else:
center = point_cloud.get_center()
x, y, z = center
w, v = pca(np.asarray(point_cloud.points))
if w is None or v is None:
logging.warning("PCA output w or v is None")
return np.eye(4), 0.0
grab_width = [0.0, 0.0, 0.0]
vx, vy, vz = v[:,0], v[:,1], v[:,2]
@@ -80,16 +105,17 @@ def calculate_pose_pca(
R = np.column_stack((vx, vy, vz))
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
# draw(point_cloud_u, rmat)
# draw(point_cloud, rmat)
# draw(point_cloud_1, rmat)
return rmat
return rmat, grab_width
def calculate_pose_icp(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""点云配准法计算位姿"""
@@ -107,11 +133,14 @@ def calculate_pose_icp(
point_cloud = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.010))
if point_cloud is None:
return None
return None, [0.0, 0.0, 0.0]
if len(point_cloud.points) == 0:
logging.warning("clean_pcd is empty")
return np.eye(4)
return np.eye(4), [0.0, 0.0, 0.0]
if calculate_grab_width:
pass
rmat = object_icp(
kwargs.get("source"),
@@ -121,23 +150,34 @@ def calculate_pose_icp(
icp_max_iter=kwargs.get("icp_max_iter", [50, 30, 14])
)
return rmat
grab_width = [0.0, 0.0, 0.0]
return rmat, grab_width
def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: float = 0.010):
def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: float = 0.005):
"""点云去噪"""
point_cloud = point_cloud.remove_non_finite_points()
# while True:
# down_pcd = point_cloud.voxel_down_sample(voxel_size=voxel_size)
# point_num = len(down_pcd.points)
# if point_num <= 1000:
# break
# voxel_size *= 4
down_pcd = point_cloud.voxel_down_sample(voxel_size=voxel_size)
point_num = len(down_pcd.points)
# logging.fatal("point_cloud_denoising: point_num={}".format(len(point_cloud.points)))
# logging.fatal("point_cloud_denoising: point_num={}".format(point_num))
# 半径滤波
clean_pcd, _ = down_pcd.remove_radius_outlier(
nb_points=10,
radius=voxel_size * 5
nb_points=int(round(10 * voxel_size / 0.005)),
radius=voxel_size * 10
)
# 统计滤波
clean_pcd, _ = clean_pcd.remove_statistical_outlier(
nb_neighbors=10,
nb_neighbors=int(round(10 * voxel_size / 0.005)),
std_ratio=2.0
)
@@ -151,7 +191,7 @@ def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: floa
# if largest_cluster_ratio < 0.5:
# return None
labels = np.array(clean_pcd.cluster_dbscan(eps=voxel_size * 5, min_points=10))
labels = np.array(clean_pcd.cluster_dbscan(eps=voxel_size * 10, min_points=int(round(10 * voxel_size / 0.005))))
if len(labels[labels >= 0]) == 0:
return clean_pcd
@@ -177,32 +217,32 @@ def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: floa
clean_pcd = point_cloud_clusters[0][0]
# 使用最近簇判断噪音强度
largest_cluster_ratio = len(clean_pcd.points) / len(points)
if largest_cluster_ratio < 0.2:
largest_cluster_ratio = len(clean_pcd.points) / point_num
if largest_cluster_ratio < 0.08:
return None
return clean_pcd
def calculate_grav_width(mask, k, depth):
"""计算重心宽度"""
mask = mask.astype(np.uint8) * 255
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return 0.0
c = max(contours, key=cv2.contourArea)
box = cv2.boxPoints(cv2.minAreaRect(c))
if np.linalg.norm(box[1] - box[0]) < np.linalg.norm(box[1] - box[2]):
point_diff = box[1] - box[0]
else:
point_diff = box[1] - box[2]
grab_width = depth * np.sqrt(
point_diff[0]**2 / k[0]**2 + point_diff[1]**2 / k[4]**2
)
return grab_width
# def calculate_grab_width(mask, k, depth):
# """计算重心宽度"""
# mask = mask.astype(np.uint8) * 255
# contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
#
# if not contours:
# return 0.0
#
# c = max(contours, key=cv2.contourArea)
# box = cv2.boxPoints(cv2.minAreaRect(c))
# if np.linalg.norm(box[1] - box[0]) < np.linalg.norm(box[1] - box[2]):
# point_diff = box[1] - box[0]
# else:
# point_diff = box[1] - box[2]
#
# grab_width = depth * np.sqrt(
# point_diff[0]**2 / k[0]**2 + point_diff[1]**2 / k[4]**2
# )
# return grab_width
def rmat2quat(rmat):

View File

@@ -25,6 +25,11 @@ def draw_box(
confidences = segmentation_result.boxes.conf.cpu().numpy()
class_ids = segmentation_result.boxes.cls.cpu().numpy()
labels = segmentation_result.names
x_centers, y_centers = boxes[:, 0], boxes[:, 1]
sorted_index = np.lexsort((-y_centers, x_centers))
boxes = boxes[sorted_index]
class_ids = class_ids[sorted_index]
for i, box in enumerate(boxes):
x_center, y_center, width, height = box[:4]
@@ -38,7 +43,7 @@ def draw_box(
(p1[0], p1[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 255, 0), 2)
cv2.putText(rgb_img, f"{i}", (p1[0] + 5, p1[1] + 15),
cv2.putText(rgb_img, f"{i}", (p1[0] + 15, p1[1] + 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
if save_path:

View File

@@ -44,17 +44,17 @@ class GetCameraPose(Node):
pose_dict = {}
for object in camera_pose.objects:
pose_dict[object.class_name] = object.pose_list
pose_dict[object.class_name] = object.pose
if self.class_name in pose_dict:
_camera = [
pose_dict[self.class_name][-1].position.x,
pose_dict[self.class_name][-1].position.y,
pose_dict[self.class_name][-1].position.z,
pose_dict[self.class_name][-1].orientation.w,
pose_dict[self.class_name][-1].orientation.x,
pose_dict[self.class_name][-1].orientation.y,
pose_dict[self.class_name][-1].orientation.z,
pose_dict[self.class_name].position.x,
pose_dict[self.class_name].position.y,
pose_dict[self.class_name].position.z,
pose_dict[self.class_name].orientation.w,
pose_dict[self.class_name].orientation.x,
pose_dict[self.class_name].orientation.y,
pose_dict[self.class_name].orientation.z,
]
self.get_logger().info(f"add camera: {_camera}")

View File

@@ -0,0 +1,18 @@
{
"save_path": "collect_data",
"left": {
"color": "",
"depth": "",
"info": ""
},
"right": {
"color": "/camera/camera/color/image_raw",
"depth": "/camera/camera/aligned_depth_to_color/image_raw",
"info": "/camera/camera/color/camera_info"
},
"head": {
"color": "/camera/color/image_raw",
"depth": "/camera/depth/image_raw",
"info": "/camera/color/camera_info"
}
}

20
vision_tools/package.xml Normal file
View File

@@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>vision_tools</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="16126883+liangyuxuan123@user.noreply.gitee.com">lyx</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

4
vision_tools/setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/vision_tools
[install]
install_scripts=$base/lib/vision_tools

30
vision_tools/setup.py Normal file
View File

@@ -0,0 +1,30 @@
from glob import glob
from setuptools import find_packages, setup
package_name = 'vision_tools'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/configs', glob('configs/*.json')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='lyx',
maintainer_email='lyx@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'collect_data_node = vision_tools.collect_data_node:main',
'collect_data_client = vision_tools.collect_data_client:main',
'get_camera_pose_node = vision_tools.get_camera_pose:main',
],
},
)

View File

@@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

View File

@@ -0,0 +1,46 @@
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from std_srvs.srv import SetBool
def main(args=None):
def timer_callback():
start_sign = node.get_parameter("collect_sign").value
end_sign = node.get_parameter("end_sign").value
if start_sign:
request = SetBool.Request()
request.data = True
client.call_async(request).add_done_callback(result_callback)
node.set_parameters([Parameter('collect_sign', Parameter.Type.BOOL, False)])
if end_sign:
request = SetBool.Request()
request.data = False
client.call_async(request).add_done_callback(result_callback)
node.set_parameters([Parameter('end_sign', Parameter.Type.BOOL, False)])
def result_callback(result):
response = result.result()
if response.success:
node.get_logger().info(response.message)
else:
node.get_logger().error(response.message)
rclpy.init(args=args)
node = Node("collect_data_client")
client = node.create_client(SetBool, "/collect_data")
while not client.wait_for_service(timeout_sec=1.0):
node.get_logger().info('Service not available, waiting again...')
node.get_logger().info("Service available")
node.declare_parameter("collect_sign", False)
node.declare_parameter("end_sign", False)
node.create_timer(0.5, timer_callback)
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()

View File

@@ -0,0 +1,664 @@
import os
import json
import threading
import cv2
import rclpy
from rclpy.node import Node
from cv_bridge import CvBridge
from ament_index_python import get_package_share_directory
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from std_srvs.srv import SetBool
from vision_tools.utils import zip_folder
SHARE_DIR = get_package_share_directory('vision_tools')
class CollectDataNode(Node):
def __init__(self):
super().__init__("collect_data_node")
self.collect_sign = False
self.save_sign = False
self.cv_bridge = CvBridge()
self.left_sign = self.right_sign = self.head_sign = False
self.left_meta = self.right_meta = self.head_meta = False
self.left_index = self.right_index = self.head_index = 0
self.left_depth_raw_file = None
self.right_depth_raw_file = None
self.head_depth_raw_file = None
self.left_timestamp_file = None
self.right_timestamp_file = None
self.head_timestamp_file = None
self.left_camera_info_file = None
self.right_camera_info_file = None
self.head_camera_info_file = None
self.left_info_sub = None
self.right_info_sub = None
self.head_info_sub = None
self.index = 0
self.fps = 30.0
self.left_color_writer = None
self.right_color_writer = None
self.head_color_writer = None
self.left_depth_writer = None
self.right_depth_writer = None
self.head_depth_writer = None
self.right_writer_initialized = False
self.left_writer_initialized = False
self.head_writer_initialized = False
with open(os.path.join(SHARE_DIR, 'configs/collect_data_node.json'), 'r')as f:
configs = json.load(f)
home_path = os.path.expanduser("~")
self.save_path = os.path.join(home_path, configs['save_path'])
if self.save_path:
os.makedirs(self.save_path, exist_ok=True)
# left
if self.topic_exists(configs["left"]["color"]) and self.topic_exists(
configs["left"]["depth"]):
self.left_sign = True
# Color and Depth
self.sub_left_color = Subscriber(self, Image, configs["left"]["color"])
self.sub_left_depth = Subscriber(self, Image, configs["left"]["depth"])
self.left_sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_left_color, self.sub_left_depth],
queue_size=10,
slop=0.1
)
self.left_sync_subscriber.registerCallback(self.left_callback)
# Camera info
self.left_info_name = configs["left"]["info"]
self.get_logger().info("left camera online")
# right
if self.topic_exists(configs["right"]["color"]) and self.topic_exists(
configs["right"]["depth"]):
self.right_sign = True
# Color and Depth
self.sub_right_color = Subscriber(self, Image, configs["right"]["color"])
self.sub_right_depth = Subscriber(self, Image, configs["right"]["depth"])
self.right_sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_right_color, self.sub_right_depth],
queue_size=10,
slop=0.1
)
self.right_sync_subscriber.registerCallback(self.right_callback)
# Camera info
self.right_info_name = configs["right"]["info"]
self.get_logger().info("right camera online")
# head
if self.topic_exists(configs["head"]["color"]) and self.topic_exists(
configs["head"]["depth"]):
self.head_sign = True
# Color and Depth
self.sub_head_color = Subscriber(self, Image, configs["head"]["color"])
self.sub_head_depth = Subscriber(self, Image, configs["head"]["depth"])
self.head_sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_head_color, self.sub_head_depth],
queue_size=10,
slop=0.1
)
self.head_sync_subscriber.registerCallback(self.head_callback)
# Camera info
self.head_info_name = configs["head"]["info"]
self.get_logger().info("head camera online")
self.service = self.create_service(SetBool, "/collect_data", self.service_callback)
self.get_logger().info("Success start")
def service_callback(self, request, response):
# Start collect data
if request.data:
self.left_meta = self.right_meta = self.head_meta = False
if self.save_path:
os.makedirs(self.save_path, exist_ok=True)
while (
os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_color_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_color_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_color_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_depth_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_video.mp4"))
or os.path.isdir(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_raw"))
or os.path.isdir(os.path.join(
self.save_path, f"{self.index:06d}_right_depth_raw"))
or os.path.isdir(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_raw"))
# or os.path.exists(os.path.join(
# self.save_path, f"{self.index:06d}_left_depth_raw.raw"))
# or os.path.exists(os.path.join(
# self.save_path, f"{self.index:06d}_right_depth_raw.raw"))
# or os.path.exists(os.path.join(
# self.save_path, f"{self.index:06d}_head_depth_raw.raw"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_raw.zip"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_depth_raw.zip"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_raw.zip"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_timestamp.txt"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_timestamp.txt"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_timestamp.txt"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_meta.txt"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_depth_meta.txt"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_meta.txt"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_camera_info.json"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_camera_info.json"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_camera_info.json"))
):
self.index += 1
if self.left_sign:
self.left_index = 0
os.mkdir(os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw"))
# self.left_depth_raw_file = open(
# os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.raw"), 'ab')
self.left_timestamp_file = open(
os.path.join(self.save_path, f"{self.index:06d}_left_timestamp.txt"), 'a')
self.left_camera_info_file = open(
os.path.join(self.save_path, f"{self.index:06d}_left_camera_info.json"), 'w')
self.left_info_sub = self.create_subscription(
CameraInfo,
self.left_info_name,
self.left_info_callback,
10
)
if self.right_sign:
self.right_index = 0
os.mkdir(os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw"))
# self.right_depth_raw_file = open(
# os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.raw"), 'ab')
self.right_timestamp_file = open(
os.path.join(self.save_path, f"{self.index:06d}_right_timestamp.txt"), 'a')
self.right_camera_info_file = open(
os.path.join(self.save_path, f"{self.index:06d}_right_camera_info.json"), 'w')
self.right_info_sub = self.create_subscription(
CameraInfo,
self.right_info_name,
self.right_info_callback,
10
)
if self.head_sign:
self.head_index = 0
os.mkdir(os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw"))
# self.head_depth_raw_file = open(
# os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw.raw"), 'ab')
self.head_timestamp_file = open(
os.path.join(self.save_path, f"{self.index:06d}_head_timestamp.txt"), 'a')
self.head_camera_info_file = open(
os.path.join(self.save_path, f"{self.index:06d}_head_camera_info.json"), 'w')
self.head_info_sub = self.create_subscription(
CameraInfo,
self.head_info_name,
self.head_info_callback,
10
)
self.left_writer_initialized = False
self.right_writer_initialized = False
self.head_writer_initialized = False
self.collect_sign = True
response.success = True
response.message = "start collecting data"
self.get_logger().info("start collecting data")
return response
if self.left_sign:
if self.left_depth_raw_file is not None:
self.left_depth_raw_file.close()
self.left_depth_raw_file = None
threading.Thread(
target=zip_folder,
args=(
f"{self.index:06d}_left_depth_raw.zip",
f"{self.index:06d}_left_depth_raw",
self.save_path
),
daemon=True
).start()
self.get_logger().info(
f'left depth raw saved to {self.save_path}, index: {self.index}')
if self.left_timestamp_file is not None:
self.left_timestamp_file.close()
self.left_timestamp_file = None
self.get_logger().info(
f'left timestamp saved to {self.save_path}, index: {self.index}')
if self.left_camera_info_file is not None:
self.left_camera_info_file.close()
self.left_camera_info_file = None
if self.left_info_sub is not None:
self.destroy_subscription(self.left_info_sub)
self.left_info_sub = None
if self.right_sign:
if self.right_depth_raw_file is not None:
self.right_depth_raw_file.close()
self.right_depth_raw_file = None
threading.Thread(
target=zip_folder,
args=(
f"{self.index:06d}_right_depth_raw.zip",
f"{self.index:06d}_right_depth_raw",
self.save_path
),
daemon=True
).start()
self.get_logger().info(
f'right depth raw saved to {self.save_path}, index: {self.index}')
if self.right_timestamp_file is not None:
self.right_timestamp_file.close()
self.right_timestamp_file = None
self.get_logger().info(
f'right timrstamp saved to {self.save_path}, index: {self.index}')
if self.right_camera_info_file is not None:
self.right_camera_info_file.close()
self.right_camera_info_file = None
if self.right_info_sub is not None:
self.destroy_subscription(self.right_info_sub)
self.right_info_sub = None
if self.head_sign:
if self.head_depth_raw_file is not None:
self.head_depth_raw_file.close()
self.head_depth_raw_file = None
threading.Thread(
target=zip_folder,
args=(
f"{self.index:06d}_head_depth_raw.zip",
f"{self.index:06d}_head_depth_raw",
self.save_path
),
daemon=True
).start()
self.get_logger().info(
f'head depth raw saved to {self.save_path}, index: {self.index}')
if self.head_timestamp_file is not None:
self.head_timestamp_file.close()
self.head_timestamp_file = None
self.get_logger().info(
f'head timestamp saved to {self.save_path}, index: {self.index}')
if self.head_camera_info_file is not None:
self.head_camera_info_file.close()
self.head_camera_info_file = None
if self.head_info_sub is not None:
self.destroy_subscription(self.head_info_sub)
self.head_info_sub = None
# End collect data
if self.left_color_writer is not None:
self.left_color_writer.release()
self.left_color_writer = None
self.get_logger().info(
f'left color video saved to {self.save_path}, index: {self.index}')
if self.left_depth_writer is not None:
self.left_depth_writer.release()
self.left_depth_writer = None
self.get_logger().info(
f'left depth video saved to {self.save_path}, index: {self.index}')
if self.right_color_writer is not None:
self.right_color_writer.release()
self.right_color_writer = None
self.get_logger().info(
f'right color video saved to {self.save_path}, index: {self.index}')
if self.right_depth_writer is not None:
self.right_depth_writer.release()
self.right_depth_writer = None
self.get_logger().info(
f'right depth video saved to {self.save_path}, index: {self.index}')
if self.head_color_writer is not None:
self.head_color_writer.release()
self.head_color_writer = None
self.get_logger().info(
f'head color video saved to {self.save_path}, index: {self.index}')
if self.head_depth_writer is not None:
self.head_depth_writer.release()
self.head_depth_writer = None
self.get_logger().info(
f'head depth video saved to {self.save_path}, index: {self.index}')
self.collect_sign = False
self.save_sign = True
response.success = True
response.message = "stop collecting data"
self.get_logger().info("stop collecting data")
return response
def left_callback(self, color_msg, depth_msg):
if self.collect_sign:
try:
color_frame = self.cv_bridge.imgmsg_to_cv2(color_msg, desired_encoding='bgr8')
depth_frame = self.cv_bridge.imgmsg_to_cv2(depth_msg, desired_encoding='16UC1')
except Exception as e:
self.get_logger().error(f'cv_bridge error: {e}')
return
if not self.left_writer_initialized:
ch, cw = color_frame.shape[:2]
dh, dw = depth_frame.shape[:2]
color_fourcc = cv2.VideoWriter_fourcc(*'mp4v')
depth_fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.left_color_writer = cv2.VideoWriter(
os.path.join(self.save_path, f"{self.index:06d}_left_color_video.mp4"),
color_fourcc,
self.fps,
(cw, ch)
)
self.left_depth_writer = cv2.VideoWriter(
os.path.join(self.save_path, f"{self.index:06d}_left_depth_video.mp4"),
depth_fourcc,
self.fps,
(dw, dh)
)
if not self.left_color_writer.isOpened() or not self.left_depth_writer.isOpened():
self.get_logger().error('Failed to open VideoWriter')
return
self.left_writer_initialized = True
self.get_logger().info('VideoWriter initialized')
# RAW
# if not depth_frame.flags['C_CONTIGUOUS']:
# depth_frame = depth_frame.copy()
# if self.left_depth_raw_file is not None:
# self.left_depth_raw_file.write(depth_frame.tobytes())
cv2.imwrite(
os.path.join(
self.save_path,
f"{self.index:06d}_left_depth_raw",
f"{self.left_index:08d}.png"
),
depth_frame
)
self.left_index += 1
if self.left_timestamp_file is not None:
ts = depth_msg.header.stamp.sec * 1_000_000_000 + depth_msg.header.stamp.nanosec
self.left_timestamp_file.write(f"{ts}\n")
if not self.left_meta:
meta = {
"width": int(depth_frame.shape[1]),
"height": int(depth_frame.shape[0]),
"dtype": "uint16",
"endianness": "little",
"unit": "mm",
"frame_order": "row-major",
"fps": self.fps
}
with open(os.path.join(self.save_path, f"{self.index:06d}_left_depth_meta.json"), "w") as f:
json.dump(meta, f, indent=2)
self.left_meta = True
# MP4V
self.left_color_writer.write(color_frame)
depth_vis = cv2.convertScaleAbs(
depth_frame,
alpha=255.0 / 10000.0 # 10m根据相机改
)
depth_vis = cv2.cvtColor(depth_vis, cv2.COLOR_GRAY2BGR)
self.left_depth_writer.write(depth_vis)
def right_callback(self, color_msg, depth_msg):
if self.collect_sign:
try:
color_frame = self.cv_bridge.imgmsg_to_cv2(color_msg, desired_encoding='bgr8')
depth_frame = self.cv_bridge.imgmsg_to_cv2(depth_msg, desired_encoding='16UC1')
except Exception as e:
self.get_logger().error(f'cv_bridge error: {e}')
return
if not self.right_writer_initialized:
ch, cw = color_frame.shape[:2]
dh, dw = depth_frame.shape[:2]
color_fourcc = cv2.VideoWriter_fourcc(*'mp4v')
depth_fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.right_color_writer = cv2.VideoWriter(
os.path.join(self.save_path, f"{self.index:06d}_right_color_video.mp4"),
color_fourcc,
self.fps,
(cw, ch)
)
self.right_depth_writer = cv2.VideoWriter(
os.path.join(self.save_path, f"{self.index:06d}_right_depth_video.mp4"),
depth_fourcc,
self.fps,
(dw, dh)
)
if not self.right_color_writer.isOpened() or not self.right_depth_writer.isOpened():
self.get_logger().error('Failed to open VideoWriter')
return
self.right_writer_initialized = True
self.get_logger().info('VideoWriter initialized')
# RAW
# if not depth_frame.flags['C_CONTIGUOUS']:
# depth_frame = depth_frame.copy()
# if self.right_depth_raw_file is not None:
# self.right_depth_raw_file.write(depth_frame.tobytes())
cv2.imwrite(
os.path.join(
self.save_path,
f"{self.index:06d}_right_depth_raw",
f"{self.right_index:08d}.png"
),
depth_frame
)
self.right_index += 1
if self.right_timestamp_file is not None:
ts = depth_msg.header.stamp.sec * 1_000_000_000 + depth_msg.header.stamp.nanosec
self.right_timestamp_file.write(f"{ts}\n")
if not self.right_meta:
meta = {
"width": int(depth_frame.shape[1]),
"height": int(depth_frame.shape[0]),
"dtype": "uint16",
"endianness": "little",
"unit": "mm",
"frame_order": "row-major",
"fps": self.fps
}
with open(os.path.join(self.save_path, f"{self.index:06d}_right_depth_meta.json"), "w") as f:
json.dump(meta, f, indent=2)
self.right_meta = True
# MP4V
self.right_color_writer.write(color_frame)
depth_vis = cv2.convertScaleAbs(
depth_frame,
alpha=255.0 / 10000.0 # 10m根据相机改
)
depth_vis = cv2.cvtColor(depth_vis, cv2.COLOR_GRAY2BGR)
self.right_depth_writer.write(depth_vis)
def head_callback(self, color_msg, depth_msg):
if self.collect_sign:
try:
color_frame = self.cv_bridge.imgmsg_to_cv2(color_msg, desired_encoding='bgr8')
depth_frame = self.cv_bridge.imgmsg_to_cv2(depth_msg, desired_encoding='16UC1')
except Exception as e:
self.get_logger().error(f'cv_bridge error: {e}')
return
if not self.head_writer_initialized:
ch, cw = color_frame.shape[:2]
dh, dw = depth_frame.shape[:2]
color_fourcc = cv2.VideoWriter_fourcc(*'mp4v')
depth_fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.head_color_writer = cv2.VideoWriter(
os.path.join(self.save_path, f"{self.index:06d}_head_color_video.mp4"),
color_fourcc,
self.fps,
(cw, ch)
)
self.head_depth_writer = cv2.VideoWriter(
os.path.join(self.save_path, f"{self.index:06d}_head_depth_video.mp4"),
depth_fourcc,
self.fps,
(dw, dh)
)
if not self.head_color_writer.isOpened() or not self.head_depth_writer.isOpened():
self.get_logger().error('Failed to open VideoWriter')
return
self.head_writer_initialized = True
self.get_logger().info('VideoWriter initialized')
# RAW
# if not depth_frame.flags['C_CONTIGUOUS']:
# depth_frame = depth_frame.copy()
# if self.head_depth_raw_file is not None:
# self.head_depth_raw_file.write(depth_frame.tobytes())
cv2.imwrite(
os.path.join(
self.save_path,
f"{self.index:06d}_head_depth_raw",
f"{self.head_index:08d}.png"
),
depth_frame
)
self.head_index += 1
if self.head_timestamp_file is not None:
ts = depth_msg.header.stamp.sec * 1_000_000_000 + depth_msg.header.stamp.nanosec
self.head_timestamp_file.write(f"{ts}\n")
if not self.head_meta:
meta = {
"width": int(depth_frame.shape[1]),
"height": int(depth_frame.shape[0]),
"dtype": "uint16",
"endianness": "little",
"unit": "mm",
"frame_order": "row-major",
"fps": self.fps
}
with open(os.path.join(self.save_path, f"{self.index:06d}_head_depth_meta.json"), "w") as f:
json.dump(meta, f, indent=2)
self.head_meta = True
# MP4V
self.head_color_writer.write(color_frame)
depth_vis = cv2.convertScaleAbs(
depth_frame,
alpha=255.0 / 10000.0 # 10m根据相机改
)
depth_vis = cv2.cvtColor(depth_vis, cv2.COLOR_GRAY2BGR)
self.head_depth_writer.write(depth_vis)
def left_info_callback(self, msgs):
data = {
"K": msgs.k.tolist(),
"D": msgs.d.tolist(),
}
json.dump(data, self.left_camera_info_file, ensure_ascii=False, indent=4)
self.left_camera_info_file.close()
self.left_camera_info_file = None
self.get_logger().info(f'left camera info saved to {self.save_path}, index: {self.index}')
self.destroy_subscription(self.left_info_sub)
self.left_info_sub = None
def right_info_callback(self, msgs):
data = {
"K": msgs.k.tolist(),
"D": msgs.d.tolist(),
}
json.dump(data, self.right_camera_info_file, ensure_ascii=False, indent=4)
self.right_camera_info_file.close()
self.right_camera_info_file = None
self.get_logger().info(f'right camera info saved to {self.save_path}, index: {self.index}')
self.destroy_subscription(self.right_info_sub)
self.right_info_sub = None
def head_info_callback(self, msgs):
data = {
"K": msgs.k.tolist(),
"D": msgs.d.tolist(),
}
json.dump(data, self.head_camera_info_file, ensure_ascii=False, indent=4)
self.head_camera_info_file.close()
self.head_camera_info_file = None
self.get_logger().info(f'head camera info saved to {self.save_path}, index: {self.index}')
self.destroy_subscription(self.head_info_sub)
self.head_info_sub = None
def topic_exists(self, topic_name: str) -> bool:
topics = self.get_topic_names_and_types()
return any(name == topic_name for name, _ in topics)
# def destroy_node(self):
# self.get_logger().info('VideoWriter released')
# super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = CollectDataNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python
# coding: utf-8
import json
# import os
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from interfaces.msg import PoseArrayClassAndID
class GetCameraPose(Node):
def __init__(self, name):
super(GetCameraPose, self).__init__(name)
self.sync_subscriber = None
self.sub_camera_pose = None
self.sub_hand_pose = None
self.camera = []
self.calibration_matrix = None
self.save = False
self.collect = False
self.done = False
self.done_collecting = False
# self.filename = "camera_pose_data.json"
self.declare_parameter('class_name', 'crossboard')
self.declare_parameter('start_collect_once', False)
self.declare_parameter('save', False)
# if not os.path.exists(self.filename):
# with open(self.filename, "w", encoding="utf-8") as f:
# json.dump([], f, ensure_ascii=False, indent=2)
self.sub_camera_pose = self.create_subscription(PoseArrayClassAndID, '/pose/cv_detect_pose', self.get_pose_callback, 10)
self.timer_ = self.create_timer(1, self.timer_callback)
def get_pose_callback(self, camera_pose):
if self.collect and not self.done_collecting:
self.done_collecting = True
_camera = None
pose_dict = {}
for object in camera_pose.objects:
pose_dict[object.class_name] = object.pose_list
if self.class_name in pose_dict:
_camera = [
pose_dict[self.class_name][-1].position.x,
pose_dict[self.class_name][-1].position.y,
pose_dict[self.class_name][-1].position.z,
pose_dict[self.class_name][-1].orientation.w,
pose_dict[self.class_name][-1].orientation.x,
pose_dict[self.class_name][-1].orientation.y,
pose_dict[self.class_name][-1].orientation.z,
]
self.get_logger().info(f"add camera: {_camera}")
self.done_collecting = False
self.collect = False
self.camera.extend(_camera)
self.set_parameters([Parameter('start_collect_once', Parameter.Type.BOOL, False)])
else:
self.done_collecting = False
self.collect = False
self.set_parameters([Parameter('start_collect_once', Parameter.Type.BOOL, False)])
self.get_logger().info(f"Have not camera data")
return
def timer_callback(self):
self.collect = self.get_parameter('start_collect_once').value
self.save = self.get_parameter('save').value
self.class_name = self.get_parameter('class_name').value
if self.save:
print(self.camera)
self.get_logger().info(f'Save as camera_pose_data.json')
with open(f"camera_pose_data.json", "w") as f:
json.dump(self.camera, f, indent=4)
self.done = True
def main(args=None):
rclpy.init(args=args)
node = GetCameraPose('get_camera_pose')
try:
while rclpy.ok() and not node.done:
rclpy.spin_once(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1 @@
from .zip_tools import *

View File

@@ -0,0 +1,102 @@
import os
import threading
import time
import zipfile
import logging
import shutil
__all__ = ["zip_verification", "zip_folder"]
# def zip_worker(zip_name:str, raw_name:str, save_path):
# with zipfile.ZipFile(
# os.path.join(save_path, zip_name),
# "w",
# compression=zipfile.ZIP_DEFLATED
# ) as zf:
# zf.write(
# os.path.join(save_path, raw_name),
# arcname=os.path.basename(raw_name)
# )
#
# if zip_verification(zip_name, raw_name, save_path):
# os.remove(os.path.join(save_path, raw_name))
# else:
# logging.warning("zip file verification file, keep raw file")
# os.remove(os.path.join(save_path, zip_name))
def zip_folder(zip_name:str, folder_name:str, save_path:str):
logging.info("zip folder start")
with zipfile.ZipFile(
os.path.join(save_path, zip_name),
'w',
compression=zipfile.ZIP_DEFLATED
) as zf:
for root, dirs, files in os.walk(os.path.join(save_path, folder_name)):
for file in files:
abs_path = os.path.join(root, file)
rel_path = os.path.relpath(abs_path, os.path.join(save_path, folder_name))
zf.write(abs_path, rel_path)
if zip_verification(zip_name, folder_name, save_path):
shutil.rmtree(os.path.join(save_path, folder_name))
else:
logging.warning("zip file verification file, keep folder")
os.remove(os.path.join(save_path, zip_name))
logging.info("zip folder end")
def zip_verification(zip_name:str, folder_name:str, save_path:str):
i = 0
while os.path.exists(os.path.join(save_path, f"_tmp_{i}")):
i += 1
tmp_dir = os.path.join(save_path, f"_tmp_{i}")
# 清理旧的临时目录
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
os.makedirs(tmp_dir, exist_ok=True)
zip_path = os.path.join(save_path, zip_name)
try:
# 解压所有内容到临时目录
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(tmp_dir)
# 遍历原始文件夹,逐个文件比对大小
for root, dirs, files in os.walk(os.path.join(save_path, folder_name)):
for file in files:
# 原文件完整路径
abs_path = os.path.join(root, file)
# 计算文件相对 folder_name 的路径
rel_path = os.path.relpath(abs_path, os.path.join(save_path, folder_name))
# 解压后的文件路径
extracted_path = os.path.join(tmp_dir, rel_path)
if not os.path.exists(extracted_path):
return False # 文件不存在
if os.path.getsize(abs_path) != os.path.getsize(extracted_path):
return False # 文件大小不一致
return True
finally:
# 清理临时目录
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
if __name__ == '__main__':
threading.Thread(
target=zip_folder,
args=(
f"000000_right_depth_raw.zip",
f"000000_right_depth_raw",
"/home/lyx/collect_data"
),
daemon=True
).start()
time.sleep(10)