采集的深度数据更改为.png存储

This commit is contained in:
liangyuxuan
2025-12-28 13:57:37 +08:00
parent 93f3a8d45e
commit 3025b5a23a
3 changed files with 193 additions and 136 deletions

View File

@@ -1,10 +1,8 @@
import os
import json
import zipfile
import threading
import cv2
import numpy as np
import rclpy
from rclpy.node import Node
@@ -15,12 +13,10 @@ from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from std_srvs.srv import SetBool
from vision_tools.utils import zip_tools
share_dir = get_package_share_directory('vision_tools')
from vision_tools.utils import zip_folder
SHARE_DIR = get_package_share_directory('vision_tools')
class CollectDataNode(Node):
def __init__(self):
@@ -32,6 +28,8 @@ class CollectDataNode(Node):
self.left_sign = self.right_sign = self.head_sign = False
self.left_meta = self.right_meta = self.head_meta = False
self.left_index = self.right_index = self.head_index = 0
self.left_depth_raw_file = None
self.right_depth_raw_file = None
self.head_depth_raw_file = None
@@ -63,7 +61,7 @@ class CollectDataNode(Node):
self.left_writer_initialized = False
self.head_writer_initialized = False
with open(os.path.join(share_dir, 'configs/collect_data_node.json'), 'r')as f:
with open(os.path.join(SHARE_DIR, 'configs/collect_data_node.json'), 'r')as f:
configs = json.load(f)
home_path = os.path.expanduser("~")
@@ -147,12 +145,18 @@ class CollectDataNode(Node):
self.save_path, f"{self.index:06d}_right_depth_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_video.mp4"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_raw.raw"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_right_depth_raw.raw"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_raw.raw"))
or os.path.isdir(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_raw"))
or os.path.isdir(os.path.join(
self.save_path, f"{self.index:06d}_right_depth_raw"))
or os.path.isdir(os.path.join(
self.save_path, f"{self.index:06d}_head_depth_raw"))
# or os.path.exists(os.path.join(
# self.save_path, f"{self.index:06d}_left_depth_raw.raw"))
# or os.path.exists(os.path.join(
# self.save_path, f"{self.index:06d}_right_depth_raw.raw"))
# or os.path.exists(os.path.join(
# self.save_path, f"{self.index:06d}_head_depth_raw.raw"))
or os.path.exists(os.path.join(
self.save_path, f"{self.index:06d}_left_depth_raw.zip"))
or os.path.exists(os.path.join(
@@ -181,8 +185,10 @@ class CollectDataNode(Node):
self.index += 1
if self.left_sign:
self.left_depth_raw_file = open(
os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.raw"), 'ab')
self.left_index = 0
os.mkdir(os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw"))
# self.left_depth_raw_file = open(
# os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.raw"), 'ab')
self.left_timestamp_file = open(
os.path.join(self.save_path, f"{self.index:06d}_left_timestamp.txt"), 'a')
self.left_camera_info_file = open(
@@ -194,8 +200,10 @@ class CollectDataNode(Node):
10
)
if self.right_sign:
self.right_depth_raw_file = open(
os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.raw"), 'ab')
self.right_index = 0
os.mkdir(os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw"))
# self.right_depth_raw_file = open(
# os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.raw"), 'ab')
self.right_timestamp_file = open(
os.path.join(self.save_path, f"{self.index:06d}_right_timestamp.txt"), 'a')
self.right_camera_info_file = open(
@@ -207,6 +215,8 @@ class CollectDataNode(Node):
10
)
if self.head_sign:
self.head_index = 0
os.mkdir(os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw"))
self.head_depth_raw_file = open(
os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw.raw"), 'ab')
self.head_timestamp_file = open(
@@ -234,135 +244,91 @@ class CollectDataNode(Node):
if self.left_depth_raw_file is not None:
self.left_depth_raw_file.close()
self.left_depth_raw_file = None
threading.Thread(
target=zip_tools.zip_worker,
args=(
f"{self.index:06d}_left_depth_raw.zip",
f"{self.index:06d}_left_depth_raw.raw",
self.save_path
),
daemon=True
).start()
# with zipfile.ZipFile(
# os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.zip"),
# "w",
# compression=zipfile.ZIP_DEFLATED
# ) as zf:
# zf.write(
# os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.raw"),
# arcname=os.path.basename(f"{self.index:06d}_left_depth_raw.raw")
# )
# if zip_tools.zip_verification(
# f"{self.index:06d}_left_depth_raw.zip",
# f"{self.index:06d}_left_depth_raw.raw",
# self.save_path
# ):
# os.remove(os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.raw"))
# else:
# self.get_logger().warning("zip file verification file, keep raw file")
# os.remove(os.path.join(self.save_path, f"{self.index:06d}_left_depth_raw.zip"))
threading.Thread(
target=zip_folder,
args=(
f"{self.index:06d}_left_depth_raw.zip",
f"{self.index:06d}_left_depth_raw",
self.save_path
),
daemon=True
).start()
self.get_logger().info(
f'left depth raw saved to {self.save_path}, index: {self.index}')
self.get_logger().info(
f'left depth raw saved to {self.save_path}, index: {self.index}')
if self.left_timestamp_file is not None:
self.left_timestamp_file.close()
self.left_timestamp_file = None
self.get_logger().info(
f'left timestamp saved to {self.save_path}, index: {self.index}')
if self.left_camera_info_file is not None:
self.left_camera_info_file.close()
self.left_camera_info_file = None
if self.left_info_sub is not None:
self.destroy_subscription(self.left_info_sub)
self.left_info_sub = None
if self.right_sign:
if self.right_depth_raw_file is not None:
self.right_depth_raw_file.close()
self.right_depth_raw_file = None
threading.Thread(
target=zip_tools.zip_worker,
args=(
f"{self.index:06d}_right_depth_raw.zip",
f"{self.index:06d}_right_depth_raw.raw",
self.save_path
),
daemon=True
).start()
# with zipfile.ZipFile(
# os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.zip"),
# "w",
# compression=zipfile.ZIP_DEFLATED
# ) as zf:
# zf.write(
# os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.raw"),
# arcname=os.path.basename(f"{self.index:06d}_right_depth_raw.raw")
# )
# if zip_tools.zip_verification(
# f"{self.index:06d}_right_depth_raw.zip",
# f"{self.index:06d}_right_depth_raw.raw",
# self.save_path
# ):
# os.remove(os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.raw"))
# else:
# self.get_logger().warning("zip file verification file, keep raw file")
# os.remove(os.path.join(self.save_path, f"{self.index:06d}_right_depth_raw.zip"))
self.get_logger().info(
f'right depth raw saved to {self.save_path}, index: {self.index}')
threading.Thread(
target=zip_folder,
args=(
f"{self.index:06d}_right_depth_raw.zip",
f"{self.index:06d}_right_depth_raw",
self.save_path
),
daemon=True
).start()
self.get_logger().info(
f'right depth raw saved to {self.save_path}, index: {self.index}')
if self.right_timestamp_file is not None:
self.right_timestamp_file.close()
self.right_timestamp_file = None
self.get_logger().info(
f'right timrstamp saved to {self.save_path}, index: {self.index}')
if self.right_camera_info_file is not None:
self.right_camera_info_file.close()
self.right_camera_info_file = None
if self.right_info_sub is not None:
self.destroy_subscription(self.right_info_sub)
self.right_info_sub = None
if self.head_sign:
if self.head_depth_raw_file is not None:
self.head_depth_raw_file.close()
self.head_depth_raw_file = None
threading.Thread(
target=zip_tools.zip_worker,
args=(
f"{self.index:06d}_head_depth_raw.zip",
f"{self.index:06d}_head_depth_raw.raw",
self.save_path
),
daemon=True
).start()
# with zipfile.ZipFile(
# os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw.zip"),
# "w",
# compression=zipfile.ZIP_DEFLATED
# ) as zf:
# zf.write(
# os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw.raw"),
# arcname=os.path.basename(f"{self.index:06d}_head_depth_raw.raw")
# )
# if zip_tools.zip_verification(
# f"{self.index:06d}_head_depth_raw.zip",
# f"{self.index:06d}_head_depth_raw.raw",
# self.save_path
# ):
# os.remove(os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw.raw"))
# else:
# self.get_logger().warning("zip file verification file, keep raw file")
# os.remove(os.path.join(self.save_path, f"{self.index:06d}_head_depth_raw.zip"))
self.get_logger().info(
f'head depth raw saved to {self.save_path}, index: {self.index}')
threading.Thread(
target=zip_folder,
args=(
f"{self.index:06d}_head_depth_raw.zip",
f"{self.index:06d}_head_depth_raw",
self.save_path
),
daemon=True
).start()
self.get_logger().info(
f'head depth raw saved to {self.save_path}, index: {self.index}')
if self.head_timestamp_file is not None:
self.head_timestamp_file.close()
self.head_timestamp_file = None
self.get_logger().info(
f'head timestamp saved to {self.save_path}, index: {self.index}')
if self.head_camera_info_file is not None:
self.head_camera_info_file.close()
self.head_camera_info_file = None
if self.head_info_sub is not None:
self.destroy_subscription(self.head_info_sub)
self.head_info_sub = None
@@ -443,10 +409,20 @@ class CollectDataNode(Node):
self.get_logger().info('VideoWriter initialized')
# RAW
if not depth_frame.flags['C_CONTIGUOUS']:
depth_frame = depth_frame.copy()
if self.left_depth_raw_file is not None:
self.left_depth_raw_file.write(depth_frame.tobytes())
# if not depth_frame.flags['C_CONTIGUOUS']:
# depth_frame = depth_frame.copy()
# if self.left_depth_raw_file is not None:
# self.left_depth_raw_file.write(depth_frame.tobytes())
cv2.imwrite(
os.path.join(
self.save_path,
f"{self.index:06d}_left_depth_raw",
f"{self.left_index:08d}.png"
),
depth_frame
)
self.left_index += 1
if self.left_timestamp_file is not None:
ts = depth_msg.header.stamp.sec * 1_000_000_000 + depth_msg.header.stamp.nanosec
self.left_timestamp_file.write(f"{ts}\n")
@@ -511,10 +487,20 @@ class CollectDataNode(Node):
self.get_logger().info('VideoWriter initialized')
# RAW
if not depth_frame.flags['C_CONTIGUOUS']:
depth_frame = depth_frame.copy()
if self.right_depth_raw_file is not None:
self.right_depth_raw_file.write(depth_frame.tobytes())
# if not depth_frame.flags['C_CONTIGUOUS']:
# depth_frame = depth_frame.copy()
# if self.right_depth_raw_file is not None:
# self.right_depth_raw_file.write(depth_frame.tobytes())
cv2.imwrite(
os.path.join(
self.save_path,
f"{self.index:06d}_right_depth_raw",
f"{self.right_index:08d}.png"
),
depth_frame
)
self.right_index += 1
if self.right_timestamp_file is not None:
ts = depth_msg.header.stamp.sec * 1_000_000_000 + depth_msg.header.stamp.nanosec
self.right_timestamp_file.write(f"{ts}\n")
@@ -579,10 +565,20 @@ class CollectDataNode(Node):
self.get_logger().info('VideoWriter initialized')
# RAW
if not depth_frame.flags['C_CONTIGUOUS']:
depth_frame = depth_frame.copy()
if self.head_depth_raw_file is not None:
self.head_depth_raw_file.write(depth_frame.tobytes())
# if not depth_frame.flags['C_CONTIGUOUS']:
# depth_frame = depth_frame.copy()
# if self.head_depth_raw_file is not None:
# self.head_depth_raw_file.write(depth_frame.tobytes())
cv2.imwrite(
os.path.join(
self.save_path,
f"{self.index:06d}_head_depth_raw",
f"{self.head_index:08d}.png"
),
depth_frame
)
self.head_index += 1
if self.head_timestamp_file is not None:
ts = depth_msg.header.stamp.sec * 1_000_000_000 + depth_msg.header.stamp.nanosec
self.head_timestamp_file.write(f"{ts}\n")

View File

@@ -0,0 +1 @@
from .zip_tools import *

View File

@@ -1,38 +1,98 @@
import os
import threading
import time
import zipfile
import logging
import shutil
__all__ = ["zip_worker", "zip_verification"]
__all__ = ["zip_verification", "zip_folder"]
def zip_worker(zip_name:str, raw_name:str, save_path):
# def zip_worker(zip_name:str, raw_name:str, save_path):
# with zipfile.ZipFile(
# os.path.join(save_path, zip_name),
# "w",
# compression=zipfile.ZIP_DEFLATED
# ) as zf:
# zf.write(
# os.path.join(save_path, raw_name),
# arcname=os.path.basename(raw_name)
# )
#
# if zip_verification(zip_name, raw_name, save_path):
# os.remove(os.path.join(save_path, raw_name))
# else:
# logging.warning("zip file verification file, keep raw file")
# os.remove(os.path.join(save_path, zip_name))
def zip_folder(zip_name:str, folder_name:str, save_path:str):
logging.info("zip folder start")
with zipfile.ZipFile(
os.path.join(save_path, zip_name),
"w",
'w',
compression=zipfile.ZIP_DEFLATED
) as zf:
zf.write(
os.path.join(save_path, raw_name),
arcname=os.path.basename(raw_name)
)
for root, dirs, files in os.walk(os.path.join(save_path, folder_name)):
for file in files:
abs_path = os.path.join(root, file)
rel_path = os.path.relpath(abs_path, os.path.join(save_path, folder_name))
zf.write(abs_path, rel_path)
if zip_verification(zip_name, raw_name, save_path):
os.remove(os.path.join(save_path, raw_name))
if zip_verification(zip_name, folder_name, save_path):
shutil.rmtree(os.path.join(save_path, folder_name))
else:
logging.warning("zip file verification file, keep raw file")
os.remove(os.path.join(save_path, raw_name))
logging.warning("zip file verification file, keep folder")
os.remove(os.path.join(save_path, zip_name))
logging.info("zip folder end")
def zip_verification(zip_name:str, raw_name, save_path):
with zipfile.ZipFile(os.path.join(save_path, zip_name), "r") as zf:
zf.extract(raw_name, path="./_tmp")
def zip_verification(zip_name:str, folder_name:str, save_path:str):
tmp_dir = os.path.join(save_path, "_tmp")
# 清理旧的临时目录
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
os.makedirs(tmp_dir, exist_ok=True)
zip_path = os.path.join(save_path, zip_name)
try:
# 解压所有内容到临时目录
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(tmp_dir)
# 遍历原始文件夹,逐个文件比对大小
for root, dirs, files in os.walk(os.path.join(save_path, folder_name)):
for file in files:
# 原文件完整路径
abs_path = os.path.join(root, file)
# 计算文件相对 folder_name 的路径
rel_path = os.path.relpath(abs_path, os.path.join(save_path, folder_name))
# 解压后的文件路径
extracted_path = os.path.join(tmp_dir, rel_path)
if not os.path.exists(extracted_path):
return False # 文件不存在
if os.path.getsize(abs_path) != os.path.getsize(extracted_path):
return False # 文件大小不一致
if os.path.getsize(os.path.join(save_path, raw_name)) == os.path.getsize(f"./_tmp/{raw_name}"):
os.remove(os.path.join(f"./_tmp/{raw_name}"))
# os.rmdir("./_tmp")
return True
os.remove(os.path.join(f"./_tmp/{raw_name}"))
# os.rmdir("./_tmp")
return False
finally:
# 清理临时目录
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
if __name__ == '__main__':
threading.Thread(
target=zip_folder,
args=(
f"000000_right_depth_raw.zip",
f"000000_right_depth_raw",
"/home/lyx/collect_data"
),
daemon=True
).start()
time.sleep(10)