增加相机控制接口,图像预处理部分功能

This commit is contained in:
liangyuxuan
2026-03-06 10:07:31 +08:00
parent d960da1192
commit 0bcf24e0ca
181 changed files with 1721 additions and 4114 deletions

3
.gitignore vendored
View File

@@ -3,4 +3,5 @@
**/build/
**/install/
**/log/
**/VisionDetect/**/**.so
**/vision_core/**/**.so
**/__pycache__

View File

@@ -4,7 +4,7 @@ from ultralytics import YOLO
checkpoint_path = "vision_detect/checkpoints/medical_sense-seg.pt"
save_path = "vision_detect/map/label/medical_sense.json"
save_path = "vision_detect/manager_map/label/medical_sense.json"
model = YOLO(os.path.expanduser(checkpoint_path))

View File

@@ -0,0 +1,56 @@
cmake_minimum_required(VERSION 3.8)
project(vision_control)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_srvs REQUIRED)
find_package(interfaces REQUIRED)
find_package(orbbec_camera_msgs REQUIRED)
add_executable(camera_control_node src/CameraRawControlNode.cpp)
target_include_directories(camera_control_node PRIVATE /usr/include/nlohmann) # json
target_include_directories(camera_control_node
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/include
)
ament_target_dependencies(
camera_control_node
rclcpp std_srvs interfaces orbbec_camera_msgs
)
install(
DIRECTORY
launch
configs
DESTINATION share/${PROJECT_NAME}
)
install(
TARGETS camera_control_node
DESTINATION lib/${PROJECT_NAME}
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@@ -0,0 +1,8 @@
{
"left_name": "/camera1",
"left_namespace": "/camera1",
"right_name": "/camera2",
"right_namespace": "/camera2",
"head_name": "/camera",
"head_namespace": "/camera"
}

View File

@@ -0,0 +1,50 @@
#pragma once
#include <string>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp/parameter_client.hpp>
#include <std_msgs/msg/string.hpp>
#include "interfaces/srv/set_camera_raw_params.hpp"
#include "interfaces/srv/set_camera_raw_status.hpp"
#include "orbbec_camera_msgs/srv/set_int32.hpp"
#include "std_srvs/srv/set_bool.hpp"
class CameraRawControlNode : public rclcpp::Node {
public:
CameraRawControlNode(std::string name);
private:
bool left_sign, right_sign, head_sign;
std::string left_name, right_name, head_name;
std::string left_namespace, right_namespace, head_namespace;
rclcpp::CallbackGroup::SharedPtr client_cb_group_;
rclcpp::Service<interfaces::srv::SetCameraRawParams>::SharedPtr raw_params_service;
rclcpp::Service<interfaces::srv::SetCameraRawStatus>::SharedPtr raw_status_service;
// rclcpp::Client<orbbec_camera_msgs::srv::SetInt32> ::SharedPtr
std::shared_ptr<rclcpp::SyncParametersClient> camera_left;
std::shared_ptr<rclcpp::SyncParametersClient> camera_right;
std::shared_ptr<rclcpp::SyncParametersClient> camera_head;
rclcpp::Client<orbbec_camera_msgs::srv::SetInt32>::SharedPtr color_exposure, depth_exposure;
rclcpp::Client<orbbec_camera_msgs::srv::SetInt32>::SharedPtr color_gain, depth_gain;
rclcpp::Client<std_srvs::srv::SetBool>::SharedPtr color_raw_control, depth_raw_control;
void raw_params_callback(
const std::shared_ptr<interfaces::srv::SetCameraRawParams::Request> &request,
const std::shared_ptr<interfaces::srv::SetCameraRawParams::Response> &response
);
void raw_status_callback(
const std::shared_ptr<interfaces::srv::SetCameraRawStatus::Request> &request,
const std::shared_ptr<interfaces::srv::SetCameraRawStatus::Response> &response
);
};

View File

@@ -1,21 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>vision_pose_msgs</name>
<name>vision_control</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="lyx@todo.todo">lyx</maintainer>
<license>Apache-2.0</license>
<maintainer email="16126883+liangyuxuan123@user.noreply.gitee.com">lyx</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>geometry_msgs</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<depend>rclpy</depend>
<depend>rclcpp</depend>
<depend>std_srvs</depend>
<depend>orbbec_camera_msgs</depend>
<depend>interfaces</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

View File

@@ -0,0 +1,379 @@
#include <iostream>
#include <fstream>
#include <nlohmann/json.hpp>
#include <rclcpp/parameter.hpp>
#include <rclcpp/executors/multi_threaded_executor.hpp>
#include "ament_index_cpp/get_package_share_directory.hpp"
#include "vision_control/CameraRawControlNode.hpp"
using json = nlohmann::json;
using string = std::string;
using raw_params_type = orbbec_camera_msgs::srv::SetInt32;
const string SHARED_DIR = ament_index_cpp::get_package_share_directory("vision_control");
const string CONFIGS_PATH = "configs/raw_control_configs.json";
void create_params_client(
std::shared_ptr<rclcpp::SyncParametersClient> &camera_client,
rclcpp::Node *node,
const string &params_service_name,
const string position,
bool &sign
) {
int i = 0;
camera_client = std::make_shared<rclcpp::SyncParametersClient>(node, params_service_name);
while (!camera_client->wait_for_service(std::chrono::seconds(1)) && i < 3) {
RCLCPP_INFO(
node->get_logger(),
("Waiting for " + position + " camera parameter service...").c_str()
);
i++;
}
if (i == 3) {
RCLCPP_WARN(
node->get_logger(),
(position + " camera params service is unavailable").c_str()
);
sign = false;
} else {
RCLCPP_INFO(
node->get_logger(),
(position + " camera params service is alreadly available").c_str()
);
sign = true;
}
}
CameraRawControlNode::CameraRawControlNode(std::string name) : Node(name) {
// create callback group
client_cb_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
// open configs file
try {
std::ifstream file(SHARED_DIR + "/" + CONFIGS_PATH);
RCLCPP_INFO(this->get_logger(), (SHARED_DIR + "/" + CONFIGS_PATH).c_str());
if (!file.is_open()) {
throw std::runtime_error("Can't open json file");
} else {
json configs;
file >> configs;
left_name = configs["left_name"];
right_name = configs["right_name"];
head_name = configs["head_name"];
left_namespace = configs["left_namespace"];
right_namespace = configs["right_namespace"];
head_namespace = configs["head_namespace"];
}
} catch (const std::runtime_error &e) {
RCLCPP_ERROR(this->get_logger(), e.what());
left_name = "/camera1";
right_name = "/camera2";
head_name = "/camera";
left_namespace = "/camera1";
right_namespace = "/camera2";
head_namespace = "/camera";
};
// create camera parameter client
create_params_client(camera_left, this, left_namespace + left_name, "left", left_sign);
create_params_client(camera_right, this, right_namespace + right_name, "right", right_sign);
create_params_client(camera_head, this, head_namespace + head_name, "head", head_sign);
if (head_sign) {
camera_head.reset();
color_exposure = this->create_client<raw_params_type>(
head_name + "/set_color_exposure", rmw_qos_profile_services_default, client_cb_group_);
depth_exposure = this->create_client<raw_params_type>(
head_name + "/set_depth_exposure", rmw_qos_profile_services_default, client_cb_group_);
color_gain = this->create_client<raw_params_type>(
head_name + "/set_color_gain", rmw_qos_profile_services_default, client_cb_group_);
depth_gain = this->create_client<raw_params_type>(
head_name + "/set_depth_gain", rmw_qos_profile_services_default, client_cb_group_);
color_raw_control = this->create_client<std_srvs::srv::SetBool>(
head_name + "/toggle_color", rmw_qos_profile_services_default, client_cb_group_);
depth_raw_control = this->create_client<std_srvs::srv::SetBool>(
head_name + "/toggle_depth", rmw_qos_profile_services_default, client_cb_group_);
}
// create params control service
raw_params_service = this->create_service<interfaces::srv::SetCameraRawParams>(
"set_camera_raw_params",
std::bind(
&CameraRawControlNode::raw_params_callback,
this,
std::placeholders::_1,
std::placeholders::_2
)
);
// create status control service
raw_status_service = this->create_service<interfaces::srv::SetCameraRawStatus>(
"set_camera_raw_status",
[this](
const std::shared_ptr<interfaces::srv::SetCameraRawStatus::Request> &request,
const std::shared_ptr<interfaces::srv::SetCameraRawStatus::Response> &response
) {
raw_status_callback(request, response);
}
);
}
void CameraRawControlNode::raw_params_callback(
const std::shared_ptr<interfaces::srv::SetCameraRawParams::Request> &request,
const std::shared_ptr<interfaces::srv::SetCameraRawParams::Response> &response
) {
if (request->camera_position == "left") {
string prefix;
if (!left_sign) {
response->success = false;
response->info = "Left camera params service is unavailable";
return;
} else if (request->raw == "color") {
prefix = "rgb_camera";
} else if (request->raw == "depth") {
prefix = "depth_module";
} else {
response->success = false;
response->info = "raw is wrong: " + request->raw;
return;
}
rclcpp::Parameter exposure(prefix + ".exposure", request->exposure);
rclcpp::Parameter gain(prefix + ".gain", request->gain);
auto results = camera_left->set_parameters({exposure, gain});
response->success = true;
for (auto &result : results) {
if (!result.successful) {
response->success = false;
response->info += result.reason + " | ";
}
}
} else if (request->camera_position == "right") {
string prefix;
if (!right_sign) {
response->success = false;
response->info = "Right camera params service is unavailable";
return;
}
if (request->raw == "color") {
prefix = "rgb_camera";
} else if (request->raw == "depth") {
prefix = "depth_module";
} else {
response->success = false;
response->info = "raw is wrong: " + request->raw;
return;
}
rclcpp::Parameter exposure(prefix + ".exposure", request->exposure);
rclcpp::Parameter gain(prefix + ".gain", request->gain);
auto results = camera_right->set_parameters({exposure, gain});
response->success = true;
for (auto &result : results) {
if (!result.successful) {
response->success = false;
response->info += result.reason + " | ";
}
}
} else if (request->camera_position == "head") {
string prefix;
if (!head_sign) {
response->success = false;
response->info = "Head camera params service is unavailable";
return;
}
if (request->raw == "color") {
// /camera/set_color_exposure
// /camera/set_color_gain
auto exposure_request = std::make_shared<raw_params_type::Request>();
auto gain_request = std::make_shared<raw_params_type::Request>();
exposure_request->data = request->exposure;
gain_request->data = request->gain;
auto future_exposure = color_exposure->async_send_request(exposure_request);
auto future_gain = color_gain->async_send_request(gain_request);
response->success = true;
auto result_exposure = future_exposure.get();
auto result_gain = future_gain.get();
response->success = response->success && result_exposure->success;
if (!response->success) {
response->info += result_exposure->message;
}
response->success = response->success && result_gain->success;
if (!response->success) {
response->info += result_gain->message;
}
return;
} else if (request->raw == "depth") {
// /camera/set_depth_exposure
// /camera/set_depth_gain
auto exposure_request = std::make_shared<raw_params_type::Request>();
auto gain_request = std::make_shared<raw_params_type::Request>();
exposure_request->data = request->exposure;
gain_request->data = request->gain;
auto future_exposure = depth_exposure->async_send_request(exposure_request);
auto future_gain = depth_gain->async_send_request(gain_request);
response->success = true;
auto result_exposure = future_exposure.get();
auto result_gain = future_gain.get();
response->success = response->success && result_exposure->success;
if (!response->success) {
response->info += result_exposure->message;
}
response->success = response->success && result_gain->success;
if (!response->success) {
response->info += result_gain->message;
}
return;
} else {
response->success = false;
response->info = "raw is wrong: " + request->raw;
return;
}
} else {
response->success = false;
response->info = ("camera position is wrong: " + request->camera_position);
}
}
void CameraRawControlNode::raw_status_callback(
const std::shared_ptr<interfaces::srv::SetCameraRawStatus::Request> &request,
std::shared_ptr<interfaces::srv::SetCameraRawStatus::Response> const &response
) {
std::shared_ptr<rclcpp::SyncParametersClient> camera = nullptr;
if (request->camera_position == "left") {
if (left_sign) {
camera = camera_left;
} else {
response->success = false;
response->info = "Left camera params service is unavailable.";
return;
}
} else if (request->camera_position == "right") {
if (right_sign) {
camera = camera_right;
} else {
response->success = false;
response->info = "Right camera params service is unavailable.";
return;
}
} else if (request->camera_position == "head") {
if (head_sign) {
auto request_c = std::make_shared<std_srvs::srv::SetBool::Request>();
auto request_d = std::make_shared<std_srvs::srv::SetBool::Request>();
request_c->data = request->color_raw;
request_d->data = request->depth_raw;
auto future_c = color_raw_control->async_send_request(request_c);
auto future_d = depth_raw_control->async_send_request(request_d);
auto result_c = future_c.get();
auto result_d = future_d.get();
response->success = true;
response->success = response->success && result_c->success;
if (!response->success) {
response->info += result_c->message;
}
response->success = response->success && result_d->success;
if (!response->success) {
response->info += " | ";
response->info += result_d->message;
}
return;
} else {
response->success = false;
response->info = "Head camera params service is unavailable.";
return;
}
} else {
response->success = false;
response->info = ("camera position is wrong: " + request->camera_position);
return;
}
rclcpp::Parameter color_raw("enable_color", request->color_raw);
rclcpp::Parameter depth_raw("enable_depth", request->depth_raw);
rclcpp::Parameter ir_raw("enable_ir", request->ir_raw);
auto results = camera->set_parameters({color_raw, depth_raw, ir_raw});
response->success = false;
for (auto &result : results) {
if (!result.successful) {
response->success = response->success || false;
response->info += result.reason + " | ";
} else {
response->success = response->success || true;
}
}
return;
}
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
auto node = std::make_shared<CameraRawControlNode>("camera_raw_control_node");
auto executor = rclcpp::executors::MultiThreadedExecutor();
try {
executor.add_node(node);
executor.spin();
executor.remove_node(node);
} catch (...) {
}
node.reset();
if (rclcpp::ok()) {
rclcpp::shutdown();
}
return 0;
}

Binary file not shown.

View File

@@ -12,51 +12,73 @@
},
"node_mode": "ACTION",
"service_configs": {
"service_node_configs": {
"service_name": "/vision_object_recognition"
},
"publisher_configs": {
"publisher_node_configs": {
"publish_time": 0.1,
"position": "right",
"publisher_name": "/detect/pose"
},
"action_configs": {
"action_node_configs": {
"action_name": "/vision_object_recognition"
},
"image_source": "DRIVER",
"driver_configs": {
"preprocess_configs": {
"distortion": false,
"denoising": false,
"enhancement": false
},
"driver_source_configs": {
"subscription_name": "/img_msg"
},
"direct_configs": {
"direct_source_configs": {
"position": "right",
"color_image_topic_name": "/camera/color/image_raw",
"depth_image_topic_name": "/camera/depth/image_raw",
"camera_info_topic_name": "/camera/color/camera_info"
},
"topic_source_configs": {
"left": [
"/camera1/camera1/color/image_raw",
"/camera1/camera1/aligned_depth_to_color/image_raw",
"/camera1/camera1/color/camera_info"
],
"right": [
"/camera2/camera2/color/image_raw",
"/camera2/camera2/aligned_depth_to_color/image_raw",
"/camera2/camera2/color/camera_info"
],
"head": [
"/camera/color/image_raw",
"/camera/depth/image_raw",
"/camera/color/camera_info"
]
},
"detect_mode": "OBJECT",
"object_configs": {
"checkpoint_path": "checkpoints/medical_sense-seg.pt",
"object_detector_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.pt",
"confidence": 0.70,
"label_map_path": "map/label/medical_sense.json",
"classes": []
},
"color_configs": {
"color_detector_configs": {
"distance": 1500,
"color_range": [[[0, 120, 70], [10, 255, 255]], [[170, 120, 70], [180, 255, 255]]]
},
"crossboard_configs": {
"crossboard_detector_configs": {
"pattern_size": [8, 5]
},
"estimate_mode": "PCA",
"pca_configs": {
"pca_estimator_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.004
},
"icp_configs": {
"icp_estimator_configs": {
"complete_model_path": "pointclouds/bottle_model.pcd",
"depth_scale": 1000.0,
"depth_trunc": 2.0,
@@ -65,13 +87,13 @@
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]
},
"e2e_configs": {
"gsnet_estimator_configs": {
"checkpoint_path": "checkpoints/posenet.pt",
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.010,
"collision_thresh": 0.01
"collision_thresh": 0.00
},
"refine_mode": "FIXED"
"refine_mode": "NO"
}

View File

@@ -12,23 +12,23 @@
},
"node_mode": "SERVICE",
"service_configs": {
"service_node_configs": {
"service_name": "/vision_object_recognition"
},
"publisher_configs": {
"publisher_node_configs": {
"publish_time": 0.1,
"position": "right",
"publisher_name": "/detect/pose"
},
"action_configs": {
"action_node_configs": {
"action_name": "/vision_object_recognition"
},
"image_source": "DRIVER",
"driver_configs": {
"driver_source_configs": {
"subscription_name": "/img_msg"
},
"direct_configs": {
"direct_source_configs": {
"position": "right",
"color_image_topic_name": "/camera/color/image_raw",
"depth_image_topic_name": "/camera/depth/image_raw",
@@ -36,27 +36,27 @@
},
"detect_mode": "OBJECT",
"object_configs": {
"checkpoint_path": "checkpoints/medical_sense-seg.pt",
"object_detector_configs": {
"checkpoint_path": "checkpoints/yolo11s-seg.onnx",
"confidence": 0.70,
"label_map_path": "map/label/medical_sense.json",
"classes": []
},
"color_configs": {
"color_detector_configs": {
"distance": 1500,
"color_range": [[[0, 120, 70], [10, 255, 255]], [[170, 120, 70], [180, 255, 255]]]
},
"crossboard_configs": {
"crossboard_detector_configs": {
"pattern_size": [8, 5]
},
"estimate_mode": "PCA",
"pca_configs": {
"pca_estimator_configs": {
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.004
},
"icp_configs": {
"icp_estimator_configs": {
"complete_model_path": "pointclouds/bottle_model.pcd",
"depth_scale": 1000.0,
"depth_trunc": 2.0,
@@ -65,12 +65,12 @@
"icp_voxel_radius": [0.004, 0.002, 0.001],
"icp_max_iter": [50, 30, 14]
},
"e2e_configs": {
"gsnet_estimator_configs": {
"checkpoint_path": "checkpoints/posenet.pt",
"depth_scale": 1000.0,
"depth_trunc": 3.0,
"voxel_size": 0.010,
"collision_thresh": 0.01
"collision_thresh": 0.00
},
"refine_mode": "FIXED"

View File

@@ -23,6 +23,8 @@
"0302": "Task is aborted",
"0303": "Worker time out",
"0400": "task executor internal error",
"1000": "Detected object count is 0",
"1001": "Depth crop is None",
"1003": "Failed to detect a valid pose",

View File

@@ -27,13 +27,8 @@ data_files = [
('share/' + package_name + '/checkpoints', glob('checkpoints/*.pt')),
('share/' + package_name + '/checkpoints', glob('checkpoints/*.onnx')),
('share/' + package_name + '/checkpoints', glob('checkpoints/*.engine')),
('share/' + package_name + '/pointclouds', glob('pointclouds/*.pcd')),
('lib/python3.10/site-packages/' + package_name + '/VisionDetect/net/pointnet2/pointnet2',
glob('vision_detect/VisionDetect/net/pointnet2/pointnet2/*.so')),
('lib/python3.10/site-packages/' + package_name + '/VisionDetect/net/knn/knn_pytorch',
glob('vision_detect/VisionDetect/net/knn/knn_pytorch/*.so'))
]
('share/' + package_name + '/pointclouds', glob('pointclouds/*.pcd'))
]
data_files.extend(openvino_files)
@@ -41,9 +36,19 @@ setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
package_data={
'vision_detect': [
'vision_core/model/pointnet2/pointnet2/*.so',
'vision_core/model/knn/knn_pytorch/*.so'
]
},
data_files=data_files,
install_requires=['setuptools'],
zip_safe=True,
install_requires=[
'setuptools',
'numpy >= 1.23.0, < 2.0.0',
'opencv-python > 4.0.0, < 4.12.0',
],
zip_safe=False,
include_package_data=True,
maintainer='lyx',
maintainer_email='lyx@todo.todo',
@@ -59,12 +64,14 @@ setup(
'flexiv_detect_service_node = vision_detect.flexivaidk_detect_service:main',
'sub_pose_node = vision_detect.sub_pose:main',
'service_client_node = vision_detect.service_client:main',
'get_camera_pose_node = vision_detect.get_camera_pose:main',
'detect_node = vision_detect.detect_node:main',
'detect_node_test = vision_detect.detect_node_test:main',
'test_action_client = vision_detect.action_client_node:main'
'service_client_node = vision_detect.service_client:main',
'test_action_client = vision_detect.action_client_node:main',
'once_test_action_client = vision_detect.action_client_once:main',
'concurrent_test_action_client = vision_detect.action_client_once_concurrent:main'
],
},
)

View File

@@ -1,3 +0,0 @@
__all__ = []

View File

@@ -1 +0,0 @@
from .gsnet import *

View File

@@ -1,132 +0,0 @@
import os
import sys
import numpy as np
import argparse
from PIL import Image
import torch
import open3d as o3d
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(ROOT_DIR)
from models.graspnet import GraspNet, pred_decode
import collections.abc as container_abcs
import MinkowskiEngine as ME
from utils.collision_detector import ModelFreeCollisionDetector
parser = argparse.ArgumentParser()
parser.add_argument('--checkpoint_path', default='checkpoints/minkuresunet_realsense.tar')
parser.add_argument('--dump_dir', help='Dump dir to save outputs', default='results/')
parser.add_argument('--seed_feat_dim', default=512, type=int, help='Point wise feature dim')
parser.add_argument('--collision_thresh', type=float, default=0.01,
help='Collision Threshold in collision detection [default: 0.01]')
parser.add_argument('--voxel_size_cd', type=float, default=0.01,
help='Voxel Size for collision detection')
parser.add_argument('--infer', action='store_true', default=True)
parser.add_argument('--vis', action='store_true', default=True)
cfgs = parser.parse_args()
# ------------------------------------------------------------------------- GLOBAL CONFIG BEG
# if not os.path.exists(cfgs.dump_dir):
# os.mkdir(cfgs.dump_dir)
def minkowski_collate_fn(list_data):
coordinates_batch, features_batch = ME.utils.sparse_collate([d["coors"] for d in list_data],
[d["feats"] for d in list_data])
coordinates_batch = np.ascontiguousarray(coordinates_batch, dtype=np.int32)
coordinates_batch, features_batch, _, quantize2original = ME.utils.sparse_quantize(
coordinates_batch, features_batch, return_index=True, return_inverse=True)
res = {
"coors": coordinates_batch,
"feats": features_batch,
"quantize2original": quantize2original
}
def collate_fn_(batch):
if type(batch[0]).__module__ == 'numpy':
return torch.stack([torch.from_numpy(b) for b in batch], 0)
elif isinstance(batch[0], container_abcs.Sequence):
return [[torch.from_numpy(sample) for sample in b] for b in batch]
elif isinstance(batch[0], container_abcs.Mapping):
for key in batch[0]:
if key == 'coors' or key == 'feats':
continue
res[key] = collate_fn_([d[key] for d in batch])
return res
res = collate_fn_(list_data)
return res
def data_process(pcd:o3d.geometry.PointCloud, voxel_size:float = 0.005):
index = 0
camera_poses = np.array([np.eye(4)]) # 相机位姿
align_mat = np.eye(4) # Camera_0 相对桌面的位姿
trans = np.dot(align_mat, camera_poses[int(index)])
pcd.transform(trans)
points = np.asarray(pcd.points)
ret_dict = {
'point_clouds': points.astype(np.float32),
'coors': points.astype(np.float32) / voxel_size,
'feats': np.ones_like(points).astype(np.float32),
}
return ret_dict
def get_grasp_dict(preds):
grasp_dict = {
"score": preds[:, 0],
"width": preds[:, 1],
"height": preds[:, 2],
"depth": preds[:, 3],
"rotation": preds[:, 4:13].reshape(-1, 3, 3),
"translation": preds[:, 13:16].reshape(-1, 3),
"object_id": preds[:, 16]
}
return grasp_dict
def inference(pcd:o3d.geometry.PointCloud, voxel_size:float = 0.005):
data_input = data_process(pcd, voxel_size)
batch_data = minkowski_collate_fn([data_input])
net = GraspNet(seed_feat_dim=512, is_training=False)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")
net.to(device)
# Load checkpoint
checkpoint_path = os.path.join(ROOT_DIR, cfgs.checkpoint_path)
net.load_state_dict(checkpoint_path)
net.eval()
for key in batch_data:
batch_data[key] = batch_data[key].to(device)
# Forward pass
with torch.no_grad():
end_points = net(batch_data)
grasp_preds = pred_decode(end_points)
preds = grasp_preds[0].detach().cpu().numpy()
sorted_index = np.argsort(-preds[:, 0])
preds = preds[sorted_index]
preds = preds[:10]
# collision detection
if cfgs.collision_thresh > 0:
cloud = data_input['point_clouds']
mfcdetector = ModelFreeCollisionDetector(cloud, voxel_size=cfgs.voxel_size_cd)
collision_mask = mfcdetector.detect(get_grasp_dict(preds), approach_dist=0.05,
collision_thresh=cfgs.collision_thresh)
preds = preds[~collision_mask]
return preds
if __name__ == '__main__':
data_dict = data_process()
inference(data_dict)

View File

@@ -1,3 +0,0 @@
from .detect_node import DetectNode
__all__ = ['detect_node']

View File

@@ -1,144 +0,0 @@
import os
import json
from ament_index_python.packages import get_package_share_directory
from rclpy.node import Node
import numpy as np
import open3d as o3d
import yaml
share_dir = get_package_share_directory('vision_detect')
__all__ = [
"ConfigBase"
]
class ConfigBase(Node):
SHARE_DIR = get_package_share_directory('vision_detect')
with open(os.path.join(
SHARE_DIR, "configs/error_configs/report_logging_define.json"), "r"
) as f:
WARNING_LOG_MAP = json.load(f)["warning"]
def __init__(self, name):
super().__init__(name)
"""init parameter"""
self.confidence = None
self.depth_image_topic_name = None
self.color_image_topic_name = None
self.camera_info_topic_name = None
self.service_name = None
self.pattern_size = None
self.device = None
self.configs = None
self.calculate_configs = None
self.checkpoint_path = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.source = None
self.server = None
self.model = None
self.k = self.d = None
self.eye_in_left_hand_mat = None
self.eye_in_right_hand_mat = None
self.eye_to_hand_mat = None
self.hand_eye_mat = None
self.get_camera_mode = None
self.detect_mode = None
self.calculate_mode = None
self.camera_size = None
self.position = None
self.e2e_model = None
self.map1 = self.map2 = None
self.calculate_function = None
self.fx = self.fy = 0.5
self.camera_data = {}
self.distance = 1500
self.color_range = [
[[0, 120, 70], [10, 255, 255]],
[[170, 120, 70], [180, 255, 255]]
]
self._get_param()
def _get_param(self):
"""init parameter"""
self.declare_parameter(
'configs_path',
os.path.join(share_dir, "configs/launch/default_service_config.json")
)
configs_path = self.get_parameter('configs_path').value
with open(configs_path, 'r') as f:
self.configs = json.load(f)
self.output_boxes = self.configs['output_boxes'].lower() == "true"
self.output_masks = self.configs['output_masks'].lower() == "true"
self.get_camera_mode = self.configs['get_camera_mode']
self.detect_mode = self.configs['detect_mode']
self.calculate_mode = self.configs['calculate_mode']
# self._read_calibration_mat()
if self.get_camera_mode == "Service":
self.service_name = self.configs["Service_configs"]["service_name"]
elif self.get_camera_mode == "Topic":
topic_configs = self.configs['Topic_configs']
self.color_image_topic_name = topic_configs["color_image_topic_name"]
self.depth_image_topic_name = topic_configs["depth_image_topic_name"]
self.camera_info_topic_name = topic_configs["camera_info_topic_name"]
self.position = topic_configs["position"]
else:
self.service_name = self.configs["Service"]["service_name"]
if self.detect_mode == 'Detect':
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = os.path.join(share_dir, detect_configs["checkpoint_path"])
elif self.detect_mode == 'Color':
self.color_range = self.configs["Color_configs"]["color_range"]
self.distance = self.configs["Color_configs"]["distance"]
self.color_range = [[np.array(lower), np.array(upper)] for lower, upper in
self.color_range]
elif self.detect_mode == 'Crossboard':
self.pattern_size = self.configs["Crossboard_configs"]["pattern_size"]
else:
self.get_logger().warning("Unknown mode, use detect")
detect_configs = self.configs['Detect_configs']
self.confidence = detect_configs["confidence"]
self.classes = detect_configs["classes"]
if not self.classes:
self.classes = None
self.checkpoint_path = detect_configs["checkpoint_path"]
if self.calculate_mode == "PCA":
self.calculate_configs = self.configs['PCA_configs']
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
self.calculate_configs = self.configs['ICA_configs']
source = o3d.io.read_point_cloud(
os.path.join(share_dir, self.calculate_configs['complete_model_path'])
)
self.calculate_configs["source"] = source
elif self.calculate_mode == "E2E" and self.detect_mode == 'Detect':
self.calculate_configs = self.configs['E2E_configs']
else:
self.get_logger().warning("Unknown calculate_mode, use PCA")
self.calculate_configs = self.configs['PCA_configs']
self.get_logger().info("Get parameters done")

View File

@@ -1,424 +0,0 @@
"""Vision Detect Node"""
import os
import threading
import time
import cv2
import numpy as np
import open3d as o3d
import rclpy
from cv_bridge import CvBridge
from sensor_msgs.msg import CameraInfo
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseClassAndID, PoseArrayClassAndID
from ..utils import distortion_correction, crop_imgs_box_xywh, draw_box, draw_mask, rmat2quat, \
crop_imgs_mask, get_map, create_o3d_pcd, save_img, refine_grasp_pose
from .init_node import InitBase
E2E_SIGN = True
try:
from ..utils import calculate_pose_e2e
except (ImportError, ModuleNotFoundError):
E2E_SIGN = False
class DetectNode(InitBase):
"""Detect Node"""
def __init__(self, name):
super().__init__(name)
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""
self.k = msg.k
self.d = msg.d
self.camera_size = [msg.width, msg.height]
if self.k is not None and len(self.k) > 0 and self.d is not None and len(self.d) > 0:
self.map1, self.map2, self.k = get_map(msg.k, msg.d, self.camera_size)
if not self.future.done():
self.future.set_result(True)
self.destroy_subscription(self.sub_camera_info)
else:
self.get_logger().warning("K and d are not defined")
def _service_sub_callback(self, msgs):
"""同步回调函数"""
with self.lock:
# self.get_logger().info("get msgs")
self.camera_data[msgs.position] = [
msgs.image_color,
msgs.image_depth,
msgs.karr,
msgs.darr
]
def _sync_sub_callback(self, color_img_ros, depth_img_ros):
"""同步回调函数"""
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, self.map1, self.map2)
img, pose_list, sign = self.function(color_img_cv, depth_img_cv)
if not sign:
self.get_logger().warning(self.WARNING_LOG_MAP[str(pose_list)])
# masks为空结束这一帧
if img is None:
img = self.cv_bridge.cv2_to_imgmsg(color_img_cv, "bgr8")
if self.output_boxes or self.output_masks:
self.pub_detect_image.publish(img)
if pose_list:
pose_list_all = PoseArrayClassAndID()
for item in pose_list:
pose_list_all.objects.append(
PoseClassAndID(
class_name = item["class_name"],
class_id = item["class_id"],
pose = item["pose"],
grab_width = item["grab_width"]
)
)
pose_list_all.header.stamp = self.get_clock().now().to_msg()
pose_list_all.header.frame_id = "pose_list"
self.pub_pose_list.publish(pose_list_all)
def _service_callback(self, request, response):
print(" \n ")
print("========================== < start > ==========================")
time_start = time.time()
response.header.stamp = self.get_clock().now().to_msg()
response.header.frame_id = "camera_detect"
with self.lock:
if request.camera_position in self.camera_data:
color_img_ros, depth_img_ros, self.k, d = self.camera_data[request.camera_position]
else:
if len(self.camera_data) == 0:
response.success = False
response.info = "Camera data have not objects"
response.objects = []
print("=========================== < end > ===========================")
return response
response.success = False
response.info = f"Name is wrong: {request.camera_position}"
response.objects = []
print("=========================== < end > ===========================")
return response
if request.camera_position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
self.p = "left"
elif request.camera_position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
self.p = "right"
else:
self.hand_eye_mat = self.eye_to_hand_mat
self.p = "head"
self.camera_size = [color_img_ros.width, color_img_ros.height]
time1 = time.time()
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
map1, map2, self.k = get_map(self.k, d, self.camera_size)
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
time2 = time.time()
print(f"cv: {(time2 - time1) * 1000} ms")
img, pose_list, sign = self.function(color_img_cv, depth_img_cv)
if not sign:
self.get_logger().warning(self.WARNING_LOG_MAP[str(pose_list)])
response.info = "pose dict is empty"
response.success = False
response.objects = []
else:
response.info = "Success get pose"
response.success = True
for item in pose_list:
response.objects.append(
PoseClassAndID(
class_name = item["class_name"],
class_id = item["class_id"],
pose = item["pose"],
grab_width = item["grab_width"]
)
)
# publish detect image
if self.output_boxes or self.output_masks:
if img is None:
img = color_img_ros
self.pub_detect_image.publish(img)
time_end = time.time()
print(f"full process: {(time_end - time_start) * 1000} ms")
print("=========================== < end > ===========================")
return response
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_list = []
home = os.path.expanduser("~")
save_dir = os.path.join(home, "images")
save_img(rgb_img.copy(), "orign_image.png", save_dir=save_dir, mark_cur_time=True)
# Get Predict Results
time1 = time.time()
results = self.model(rgb_img, retina_masks=True, conf=self.confidence, classes=self.classes)
time2 = time.time()
result = results[0]
# Get masks
if result.masks is None or len(result.masks) == 0:
return None, 1000, False
masks = result.masks.data.cpu().numpy()
# Get boxes
boxes = result.boxes.xywh.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
x_centers, y_centers = boxes[:, 0], boxes[:, 1]
sorted_index = np.lexsort((-y_centers, x_centers))
masks = masks[sorted_index]
boxes = boxes[sorted_index]
class_ids = class_ids[sorted_index]
time3 = time.time()
self.get_logger().info(f"Detect object num: {len(masks)}")
full_points = create_o3d_pcd(
depth_img, self.camera_size, self.k, **self.calculate_configs
)
time_full_points = time.time()
print(f"create full points: {(time_full_points - time3) * 1000}")
if self.calculate_mode == "E2E" and self.detect_mode == 'Detect' and E2E_SIGN:
self.calculate_configs["orign_point_clouds"] = create_o3d_pcd(
depth_img, self.camera_size, self.k, **self.calculate_configs
)
for i, (mask, box) in enumerate(zip(masks, boxes)):
imgs_crop, (x_min, y_min) = crop_imgs_box_xywh([depth_img, mask], box, True)
depth_crop, mask_crop = imgs_crop
if depth_crop is None:
self.get_logger().warning("Depth crop is None")
continue
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]), int(self.camera_size[1]),
self.k[0], self.k[4],
self.k[2] - x_min, self.k[5] - y_min
)
rmat, grab_width, sign = self.calculate_function(
mask_crop, depth_crop, intrinsics,
calculate_grab_width=True, **self.calculate_configs
)
if not sign:
self.get_logger().warning(self.WARNING_LOG_MAP[str(rmat)])
continue
time_c = time.time()
if self.p == "left" or self.p == "right":
position = rmat[0:3, 3]
rmat, sign = refine_grasp_pose(
full_points, self.calculate_configs.get("voxel_size"), position,
search_mode=True
)
if not sign:
self.get_logger().warning(self.WARNING_LOG_MAP[str(rmat)])
continue
time_e = time.time()
print(f"Refine: {(time_e - time_c) * 1000} ms")
self.get_logger().info(f"grab_width: {grab_width}")
x, y, z, rw, rx, ry, rz = rmat2quat(self.hand_eye_mat @ rmat)
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
self.get_logger().info(f"xyz, wxyz: {x, y, z, rw, rx, ry, rz}")
pose_list.append(
{
"class_id": int(class_ids[i]),
"class_name": labels[class_ids[i]],
"pose": pose,
"grab_width": grab_width
}
)
time4 = time.time()
if not pose_list:
return None, 1003, False
self.get_logger().info(f'{(time2 - time1) * 1000} ms, model predict')
self.get_logger().info(f'{(time4 - time3) * 1000} ms, calculate all mask PCA')
self.get_logger().info(f'{(time4 - time1) * 1000} ms, completing a picture entire process')
# mask_img and box_img is or not output
if not self.output_boxes and not self.output_masks:
return None, pose_list, True
if self.output_boxes:
draw_box(rgb_img, result, save_dir=save_dir, mark_time = True)
if self.output_masks:
draw_mask(rgb_img, result)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list, True
def _seg_color(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_list = []
hsv_img = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2HSV)
depth_filter_mask = np.zeros_like(depth_img, dtype=np.uint8)
depth_filter_mask[(depth_img > 0) & (depth_img < self.distance)] = 1
hsv_img[depth_filter_mask == 0] = 0
mask_part_list = [cv2.inRange(hsv_img, color[0], color[1]) for color in self.color_range]
mask = sum(mask_part_list[1:], mask_part_list[0])
mask = mask // 255
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([255, 0, 0]) * 0.5
imgs_crop, mask_crop, (x_min, y_min) = crop_imgs_mask([depth_img], mask, True)
depth_crop = imgs_crop[0]
if depth_crop is None:
self.get_logger().warning(self.WARNING_LOG_MAP[str(1001)])
return None, 1001, False
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]), int(self.camera_size[1]),
self.k[0], self.k[4],
self.k[2] - x_min, self.k[5] - y_min
)
rmat, _, sign = self.calculate_function(
mask_crop, depth_crop, intrinsics, **self.calculate_configs
)
if not sign:
# self.get_logger().warning("Color Area point cloud have too many noise")
return None, 1100, False
T = self.hand_eye_mat @ rmat
x, y, z, rw, rx, ry, rz = rmat2quat(T)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
pose_list.append(
{
"class_id": int(98),
"class_name": "red",
"pose": pose,
"grab_width": 0.0
}
)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list, True
def _seg_crossboard(self, rgb_img, depth_img):
pose_list = []
rgb_img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(rgb_img_gray,
self.pattern_size, cv2.CALIB_CB_FAST_CHECK)
if ret:
# 角点亚像素精确化(提高标定精度)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
corners_subpix = cv2.cornerSubPix(rgb_img_gray, corners, (11, 11), (-1, -1), criteria)
corners_subpix = corners_subpix.reshape(self.pattern_size[1], self.pattern_size[0], 2)
mask = np.zeros(rgb_img_gray.shape, dtype=np.uint8)
for i in range(0, self.pattern_size[1] - 1):
for j in range(0, self.pattern_size[0] - 1):
pts = np.array([
corners_subpix[i, j],
corners_subpix[i, j + 1],
corners_subpix[i + 1, j + 1],
corners_subpix[i + 1, j]
], dtype=np.int32)
cv2.fillConvexPoly(mask, pts, 1)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
img_crop, mask_crop, (x_min, y_min) = crop_imgs_mask([depth_img], mask)
depth_crop = img_crop[0]
if depth_crop is None:
self.get_logger().warning(self.WARNING_LOG_MAP[str(1001)])
return None, 1001, False
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.camera_size[0]), int(self.camera_size[1]),
self.k[0], self.k[4],
self.k[2] - x_min, self.k[5] - y_min
)
rmat, _, sign = self.calculate_function(
mask_crop, depth_crop, intrinsics, **self.calculate_configs
)
if not sign:
# self.get_logger().warning("Corssboard point cloud have too many noise")
return None, 1100, False
x, y, z, rw, rx, ry, rz = rmat2quat(rmat)
self.get_logger().info(f"{x}, {y}, {z}, {rw}, {rx}, {ry}, {rz}")
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(w=rw, x=rx, y=ry, z=rz)
pose_list.append(
{
"class_id": int(99),
"class_name": 'crossboard',
"pose": pose,
"grab_width": 0.0
}
)
cv2.putText(
rgb_img, f'cs: x: {x:.3f}, y: {y:.3f}, z: {z:.3f}',
(0, 0 + 30),cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2
)
cv2.putText(
rgb_img, f'quat: rw: {rw:.3f}, rx: {rx:.3f}, ry: {ry:.3f}, rz: {rz:.3f}',
(0, 0 + 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2
)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_list, True
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), None, True
if __name__ == '__main__':
rclpy.init(args=None)
node = DetectNode('detect')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -1,213 +0,0 @@
import os
from ament_index_python.packages import get_package_share_directory
import torch
import numpy as np
from ultralytics import YOLO
import rclpy
from rclpy.task import Future
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from interfaces.msg import PoseArrayClassAndID, ImgMsg
from interfaces.srv import VisionObjectRecognition
from ..utils import calculate_pose_pca, calculate_pose_icp, read_calibration_mat
from .config_node import ConfigBase
E2E_SIGN = True
try:
from ..net import GraspNet
from ..utils import calculate_pose_e2e
except (ImportError, ModuleNotFoundError):
E2E_SIGN = False
class InitBase(ConfigBase):
def __init__(self, name):
super().__init__(name)
self.future = Future()
self._read_calibration_mat()
if self.get_camera_mode == "Service":
self._init_service()
elif self.get_camera_mode == "Topic":
if self.position == 'left':
self.hand_eye_mat = self.eye_in_left_hand_mat
elif self.position == 'right':
self.hand_eye_mat = self.eye_in_right_hand_mat
else:
self.hand_eye_mat = self.eye_to_hand_mat
else:
self._init_service()
if self.detect_mode == 'Detect':
self.function = self._seg_image
if not self.classes:
self.classes = None
self._init_model()
elif self.detect_mode == 'Color':
self.function = self._seg_color
elif self.detect_mode == 'Crossboard':
self.function = self._seg_crossboard
else:
self.function = self._seg_image
if not self.classes:
self.classes = None
self._init_model()
if self.calculate_mode == "PCA":
self.calculate_function = calculate_pose_pca
elif self.calculate_mode == "ICP" and self.detect_mode == 'Detect':
self.calculate_function = calculate_pose_icp
elif self.calculate_mode == "E2E" and self.detect_mode == 'Detect':
self.calculate_function = calculate_pose_e2e
self.e2e_model = GraspNet(seed_feat_dim=512, is_training=False)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.e2e_model.to(device)
self.e2e_model.load_state_dict(
torch.load(
os.path.join(self.SHARE_DIR, self.calculate_configs["checkpoint_path"]),
map_location=device
)
)
self.e2e_model.eval()
self.calculate_configs["model"] = self.e2e_model
else:
self.calculate_function = calculate_pose_pca
self._init_publisher()
self._init_subscriber()
self.get_logger().info("Initialize done")
def _init_model(self):
"""init model"""
if self.checkpoint_path.endswith('-seg.pt'):
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
try:
self.model = YOLO(self.checkpoint_path).to(device_model)
except Exception as e:
self.get_logger().error(f'Failed to load YOLO model: {e}')
raise
elif self.checkpoint_path.endswith('.onnx') or self.checkpoint_path.endswith('.engine'):
self.model = YOLO(self.checkpoint_path)
else:
self.function = None
self.get_logger().fatal(f'Unknown checkpoint: {self.checkpoint_path}')
raise ValueError(f'Unknown checkpoint: {self.checkpoint_path}')
self.get_logger().info(f'Loading checkpoint from: {self.checkpoint_path}')
self.get_logger().info("Initialize model done")
def _init_publisher(self):
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
if self.get_camera_mode == "Topic":
self.pub_pose_list = self.create_publisher(PoseArrayClassAndID, '/pose/cv_detect_pose',
10)
self.get_logger().info("Initialize publisher done")
def _init_service(self):
"""init service server"""
self.server = self.create_service(
VisionObjectRecognition,
self.service_name,
self._service_callback
)
self.get_logger().info("Initialize service done")
def _init_subscriber(self):
"""init subscriber"""
if self.get_camera_mode == "Service":
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
elif self.get_camera_mode == "Topic":
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic_name,
self._camera_info_callback,
10
)
self.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(self, self.future)
self.get_logger().info("Camera info received.")
# sync get color and depth img
self.sub_color_image = Subscriber(self, Image, self.color_image_topic_name)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic_name)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
else:
self.get_logger().warning("get_camera_mode is wrong, use service")
self.sub_img = self.create_subscription(
ImgMsg,
"/img_msg",
self._service_sub_callback,
10
)
self.get_logger().info("Initialize subscriber done")
def _read_calibration_mat(self):
eye_in_left_hand_path = os.path.join(self.SHARE_DIR, self.configs["calibration"]["left_hand"])
eye_in_right_hand_path = os.path.join(self.SHARE_DIR, self.configs["calibration"]["right_hand"])
eye_to_hand_path = os.path.join(self.SHARE_DIR, self.configs["calibration"]["head"])
self.eye_in_left_hand_mat, info, sign = read_calibration_mat(eye_in_left_hand_path)
self.get_logger().info(f"left_hand_mat: {self.eye_in_left_hand_mat}")
if not sign:
self.get_logger().warning(f"left_mat: {info}")
self.eye_in_right_hand_mat, info, sign = read_calibration_mat(eye_in_right_hand_path)
self.get_logger().info(f"right_hand_mat: {self.eye_in_right_hand_mat}")
if not sign:
self.get_logger().warning(f"right_mat: {info}")
self.eye_to_hand_mat, info, sign = read_calibration_mat(eye_to_hand_path)
self.get_logger().info(f"head_mat: {self.eye_to_hand_mat}")
if not sign:
self.get_logger().warning(f"head_mat: {info}")
self.get_logger().info("Read calibration mat file done")
def _camera_info_callback(self, msg: CameraInfo):
pass
def _service_sub_callback(self, msgs):
pass
def _sync_sub_callback(self, color_img_ros, depth_img_ros):
pass
def _service_callback(self, request, response):
pass
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pass
def _seg_color(self, rgb_img: np.ndarray, depth_img: np.ndarray):
pass
def _seg_crossboard(self, rgb_img, depth_img):
pass

View File

@@ -1,6 +0,0 @@
from .image_tools import *
from .draw_tools import *
from .calculate_tools import *
from .file_tools import *
from .pointclouds_tools import *
from .grasp_refine import *

View File

@@ -1,411 +0,0 @@
"""计算工具"""
import logging
# import cv2
import numpy as np
import open3d as o3d
import transforms3d as tfs
from .object_icp import *
from .pointclouds_tools import *
E2E_SIGN = True
try:
import torch
import MinkowskiEngine as ME
import collections.abc as container_abcs
from ..net import pred_decode, ModelFreeCollisionDetector
except (ImportError, OSError, RuntimeError):
logging.warning("ImportError or OSError or RuntimeError")
E2E_SIGN = False
__all__ = [
"calculate_pose_pca", "calculate_pose_icp",
"rmat2quat", "quat2rmat", "calculate_pose_e2e"
]
if not E2E_SIGN:
__all__.remove("calculate_pose_e2e")
def pca(data: np.ndarray, sort=True):
"""主成分分析 """
center = np.mean(data, axis=0)
centered_points = data - center # 去中心化
try:
cov_matrix = np.cov(centered_points.T) # 转置
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
except np.linalg.LinAlgError:
return None, None
if sort:
sort = eigenvalues.argsort()[::-1] # 降序排列
eigenvalues = eigenvalues[sort] # 索引
eigenvectors = eigenvectors[:, sort]
return eigenvalues, eigenvectors
def calculate_pose_pca(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""点云主成分分析法计算位态
----------
input:
mask: np.ndarray
depth_img: np.ndarray
intrinsics: o3d.camera.PinholeCameraIntrinsic
calculate_grab_width: bool
**kwargs:
output:
rmat: np.ndarray (4, 4)
grab_width: list
sign: bool
"""
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
point_cloud, sign = create_o3d_denoised_pcd(depth_img_mask, intrinsics, **kwargs)
if not sign:
return point_cloud, [], False
# depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
#
# point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
# depth=depth_o3d,
# intrinsic=intrinsics,
# depth_scale=kwargs.get("depth_scale", 1000.0),
# depth_trunc=kwargs.get("depth_trunc", 3.0),
# )
#
# point_cloud, sign = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.002))
# if not sign:
# return 1100, [], False
#
# if len(point_cloud.points) == 0:
# # logging.warning("point_cloud is empty")
# return 1101, [], False
center = point_cloud.get_center()
x, y, z = center
if calculate_grab_width:
if np.asarray(point_cloud.points).shape[0] < 4:
# logging.warning("点数不足,不能算 OBB")
return 1200, [0.0, 0.0, 0.0], False
obb = point_cloud.get_oriented_bounding_box()
extent = obb.extent
order = np.argsort(-extent)
grab_width = extent[order]
# z = z + grab_width * 0.20
v = obb.R
v = v[:, order]
if v is None:
return 1201, [0.0, 0.0, 0.0], False
grab_width = grab_width * 1.05
else:
w, v = pca(np.asarray(point_cloud.points))
if w is None or v is None:
return 1201, 0.0, False
grab_width = [0.0, 0.0, 0.0]
vx, vy, vz = v[:,0], v[:,1], v[:,2]
if vx[0] < 0:
vx = -vx
if vy[1] < 0:
vy = -vy
if not np.allclose(np.cross(vx, vy), vz):
vz = -vz
R = np.column_stack((vx, vy, vz))
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
# draw(point_cloud_u, rmat)
# draw(point_cloud, rmat)
return rmat, grab_width, True
def calculate_pose_icp(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""点云配准法计算位姿"""
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
point_cloud, sign = create_o3d_denoised_pcd(depth_img_mask, intrinsics, **kwargs)
if not sign:
return point_cloud, [], False
# depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
#
# point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
# depth=depth_o3d,
# intrinsic=intrinsics,
# depth_scale=kwargs.get("depth_scale", 1000.0),
# depth_trunc=kwargs.get("depth_trunc", 3.0)
# )
#
# point_cloud, sign = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.002))
# if not sign:
# return 1100, [], False
#
# if len(point_cloud.points) == 0:
# # logging.warning("clean_pcd is empty")
# return 1101, [0.0, 0.0, 0.0], False
if calculate_grab_width:
pass
rmat = object_icp(
kwargs.get("source"),
point_cloud,
ransac_voxel_size=kwargs.get("ransac_voxel_size", 0.005),
icp_voxel_radius=kwargs.get("icp_voxel_radius", [0.004, 0.002, 0.001]),
icp_max_iter=kwargs.get("icp_max_iter", [50, 30, 14])
)
grab_width = [0.0, 0.0, 0.0]
return rmat, grab_width, True
if E2E_SIGN:
def minkowski_collate_fn(list_data):
coordinates_batch, features_batch = ME.utils.sparse_collate([d["coors"] for d in list_data],
[d["feats"] for d in list_data])
coordinates_batch = np.ascontiguousarray(coordinates_batch, dtype=np.int32)
coordinates_batch, features_batch, _, quantize2original = ME.utils.sparse_quantize(
coordinates_batch, features_batch, return_index=True, return_inverse=True)
res = {
"coors": coordinates_batch,
"feats": features_batch,
"quantize2original": quantize2original,
"point_clouds": torch.stack([torch.from_numpy(b) for b in [d["point_clouds"] for d in list_data]], 0)
}
return res
def calculate_pose_e2e(
mask,
depth_img: np.ndarray,
intrinsics,
calculate_grab_width: bool = False,
**kwargs
):
"""
点云抓取姿态预测模型计算位态
-----
input:
mask: np.ndarray
depth_img: np.ndarray
intrinsics: o3d.camera.PinholeCameraIntrinsic
calculate_grab_width: bool (abandon)
**kwargs:
"depth_scale": float
"depth_trunc": float
"voxel_size": float
"model":
"collision_thresh": float
"""
# logging.error("stage 1")
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
# 点云创建
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=kwargs.get("depth_scale", 1000.0),
depth_trunc=kwargs.get("depth_trunc", 3.0),
)
point_cloud = point_cloud.remove_non_finite_points()
# 点云去噪过程
down_pcd = point_cloud.voxel_down_sample(voxel_size=kwargs.get("voxel_size", 0.002))
clean_pcd, _ = down_pcd.remove_radius_outlier(
nb_points=max(int(round(10 * kwargs.get("voxel_size", 0.002) / 0.005)), 3),
radius=kwargs.get("voxel_size", 0.002) * 10
)
clean_pcd, _ = clean_pcd.remove_statistical_outlier(
nb_neighbors=max(int(round(10 * kwargs.get("voxel_size", 0.002) / 0.005)), 3),
std_ratio=2.0
)
points = np.asarray(clean_pcd.points)
clean_pcd.points = o3d.utility.Vector3dVector(points[points[:, 2] >= 0.2])
labels = np.array(
clean_pcd.cluster_dbscan(
eps=kwargs.get("voxel_size", 0.002) * 10,
min_points=max(int(round(10 * kwargs.get("voxel_size", 0.002) / 0.005)), 3)
)
)
# 点云簇过滤
points = np.asarray(clean_pcd.points)
cluster_label = set(labels)
point_cloud_clusters = []
for label in cluster_label:
if label == -1:
continue
idx = np.where(labels == label)[0]
point_cloud_cluster = clean_pcd.select_by_index(idx)
points_cluster_z = points[idx, 2]
z_avg = np.mean(points_cluster_z)
if z_avg < 0.2:
continue
point_cloud_clusters.append((point_cloud_cluster, z_avg))
point_cloud_clusters.sort(key=lambda x: x[1])
point_cloud = point_cloud_clusters[0][0]
# 判断点云是否为空
if len(point_cloud.points) == 0:
return 1101, [0.0, 0.0, 0.0], False
# # 点云补齐15000个点
# points = np.asarray(point_cloud.points)
# if len(points) >= 15000:
# idxs = np.random.choice(len(points), 15000, replace=False)
# else:
# idxs1 = np.arange(len(points))
# idxs2 = np.random.choice(len(points), 15000 - len(points), replace=True)
# idxs = np.concatenate([idxs1, idxs2], axis=0)
# points = points[idxs]
# 构建推理数据结构
ret_dict = {
'point_clouds': points.astype(np.float32),
'coors': points.astype(np.float32) / kwargs.get("voxel_size", 0.002),
'feats': np.ones_like(points).astype(np.float32),
}
batch_data = minkowski_collate_fn([ret_dict])
# 将数据置于对应的设备上
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
for key in batch_data:
batch_data[key] = batch_data[key].to(device)
logging.warning(f'points num: {len(points)}')
# 点云数量判断,是否返回
if batch_data['coors'].shape[0] < 128: # 例如 128 / 256
return 1300, [0.0], False
if batch_data["point_clouds"].shape[1] < 128: # 例如 128 / 256
return 1301, [0.0], False
# 梯度置0进入推理
with torch.no_grad():
end_points = kwargs.get("model")(batch_data)
if end_points is None:
return 1302, [0.0, 0.0, 0.0], False
grasp_preds = pred_decode(end_points)
# 推理结果后处理
preds = grasp_preds[0].detach().cpu().numpy()
sorted_index = np.argsort(-preds[:, 0])
preds = preds[sorted_index]
preds = preds[:10]
if kwargs.get("collision_thresh", 0.01) > 0:
cloud = kwargs.get("orign_point_clouds")
if cloud is not None and len(preds) > 0:
mfcdetector = ModelFreeCollisionDetector(
cloud,
voxel_size=kwargs.get("voxel_size", 0.002)
)
collision_mask = mfcdetector.detect(
preds, approach_dist=0.05,
collision_thresh=kwargs.get("collision_thresh", 0.01)
)
preds = preds[~collision_mask]
# logging.error("stage 8")
Rs = preds[:, 4:13].reshape(-1, 3, 3)
centers = preds[:, 13:16].reshape(-1, 3)
grab_width = preds[:, 1]
if not len(Rs):
return 1303, [0.0, 0.0, 0.0], False
# logging.error("stage 9")
rmat = []
for r, center in zip(Rs, centers):
vz, vx, vy = r[:, 0], r[:, 1], r[:, 2]
if vx[0] < 0:
vx = -vx
if vz[2] < 0:
vz = -vz
if not np.allclose(np.cross(vx, vy), vz):
vy = -vy
R = np.column_stack((vx, vy, vz))
rmat.append(tfs.affines.compose(np.squeeze(np.asarray(center)), R, [1, 1, 1]))
if len(rmat) == 0:
return 1304, [0.0, 0.0, 0.0], False
return rmat[0], [grab_width[0]], True
def rmat2quat(rmat):
"""Convert rotation matrix to quaternion."""
x, y, z = rmat[0:3, 3:4].flatten()
rw, rx, ry, rz = tfs.quaternions.mat2quat(rmat[0:3, 0:3])
quat = [x, y, z, rw, rx, ry, rz]
return quat
def quat2rmat(quat):
"""Convert quaternion to rotation matrix."""
x, y, z, rw, rx, ry, rz = quat
r = tfs.quaternions.quat2mat([rw, rx, ry, rz])
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), r, [1, 1, 1])
return rmat
# def draw(pcd, mat):
# R = mat[0:3, 0:3]
# point = mat[0:3, 3:4].flatten()
# x, y, z = R[:, 0], R[:, 1], R[:, 2]
#
# points = [
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1],
# point, point + x, point + y, point + z
#
# ] # 画点:原点、第一主成分、第二主成分
# lines = [
# [0, 1], [0, 2], [0, 3],
# [4, 5], [4, 6], [4, 7]
# ] # 画出三点之间两两连线
# colors = [
# [1, 0, 0], [0, 1, 0], [0, 0, 1],
# [1, 0, 0], [0, 1, 0], [0, 0, 1]
# ]
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(points), lines=o3d.utility.Vector2iVector(lines))
# line_set.colors = o3d.utility.Vector3dVector(colors)
#
# o3d.visualization.draw_geometries([pcd, line_set])

View File

@@ -1,91 +0,0 @@
import os
import logging
from typing import Union
import cv2
import numpy as np
import open3d as o3d
from .image_tools import save_img
__all__ = [
"draw_box", "draw_mask", "draw_pointcloud",
]
def draw_box(
rgb_img: np.ndarray,
segmentation_result,
put_text: bool = True,
save_dir: Union[bool, str] = False,
mark_time: bool = False
):
"""
绘制目标检测边界框
"""
boxes = segmentation_result.boxes.xywh.cpu().numpy()
confidences = segmentation_result.boxes.conf.cpu().numpy()
class_ids = segmentation_result.boxes.cls.cpu().numpy()
labels = segmentation_result.names
x_centers, y_centers = boxes[:, 0], boxes[:, 1]
sorted_index = np.lexsort((-y_centers, x_centers))
boxes = boxes[sorted_index]
class_ids = class_ids[sorted_index]
confidences = confidences[sorted_index]
for i, box in enumerate(boxes):
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
if put_text:
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}',
(p1[0], p1[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 255, 0), 2)
cv2.putText(rgb_img, f"{i}", (p1[0] + 15, p1[1] + 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
if save_dir:
save_img(rgb_img, "detect_color_img.png", save_dir, mark_time)
def draw_mask(rgb_img: np.ndarray, segmentation_result):
"""
绘制实例分割mask
"""
masks = segmentation_result.masks.data.cpu().numpy()
orig_shape = segmentation_result.masks.orig_shape
for i, mask in enumerate(masks):
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
def draw_pointcloud(pcd, axis: bool = True):
"""绘制点云"""
if not pcd:
logging.warning("pcd is empty")
if axis:
axis_point = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
] # 画点:原点、第一主成分、第二主成分
axis = [
[0, 1], [0, 2], [0, 3]
] # 画出三点之间两两连线
axis_colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
# 构造open3d中的LineSet对象用于主成分显示
axis_set = o3d.geometry.LineSet(
points=o3d.utility.Vector3dVector(axis_point),
lines=o3d.utility.Vector2iVector(axis)
)
axis_set.colors = o3d.utility.Vector3dVector(axis_colors)
pcd.append(axis_set)
o3d.visualization.draw_geometries(pcd)

View File

@@ -1,26 +0,0 @@
"""Utility functions for file operations."""
import os
import json
import numpy as np
__all__ = [
"read_calibration_mat",
]
def read_calibration_mat(mat_path):
"""Read calibration matrix from a json file."""
sign = True
info = "Success"
if not os.path.exists(mat_path):
info = f"{mat_path} not found, use E(4, 4)"
sign = False
mat = np.eye(4)
else:
with open(mat_path, "r", encoding="utf-8") as f:
mat = np.array(json.load(f)["T"])
if mat.shape != (4, 4):
info = "The shape is wrong, use E(4, 4)"
sign = False
mat = np.eye(4)
return mat, info, sign

View File

@@ -1,287 +0,0 @@
import numpy as np
import transforms3d as tfs
# import open3d as o3d
__all__ = ["refine_grasp_pose"]
def collision_detector(
points: np.ndarray,
refine: np.ndarray,
volume: list[float],
iou: float = 0.001,
search_mode: bool = False,
**kwargs
) -> int:
"""
collision detector
-----
input:
points: np.ndarray (3, n)
refine: np.ndarray (3, ), Grab target poes coordinate system
volume: list [left, right]
iou : float
search_mode: bool, Default False
**kwargs:
"grab_width": float
"hand_size": list [width, height, length]
"left_size": list [thick, width, length]
"right_size": list [thick, width, length]
thick of gripper finger, width of gripper finger, length of gripper finger
output:
collision_code: int
"""
hand_size = kwargs.get('hand_size', [0.113, 0.063, 0.13])
left_size = kwargs.get('left_size', [0.006, 0.037, 0.086])
right_size = kwargs.get('right_size', [0.006, 0.037, 0.086])
grab_width = kwargs.get('grab_width', 0.10) * 0.95
x, y, z = refine
if not search_mode:
hand_top_box = (
(points[2] < z) & (points[2] > (z - hand_size[2]))
& (points[0] < (x - hand_size[1]*1/4)) & (points[0] > (x - hand_size[1]/2))
& (points[1] < (y + hand_size[0]/2)) & (points[1] > (y - hand_size[0]/2))
)
hand_center_box = (
(points[2] < z) & (points[2] > (z - hand_size[2]))
& (points[0] < (x + hand_size[1]*1/4)) & (points[0] > (x - hand_size[1]*1/4))
& (points[1] < (y + hand_size[0]/2)) & (points[1] > (y - hand_size[0]/2))
)
hand_bottom_box = (
(points[2] < z) & (points[2] > (z - hand_size[2]))
& (points[0] < (x + hand_size[1]/2)) & (points[0] > (x + hand_size[1]*1/4))
& (points[1] < (y + hand_size[0]/2)) & (points[1] > (y - hand_size[0]/2))
)
if (hand_top_box.any() and hand_bottom_box.any()) or hand_center_box.any():
return 1
if hand_bottom_box.any():
return 2
if hand_top_box.any():
return 3
else:
iou *= 0.5
left_finger_box = (
(points[2] < (z + left_size[2])) & (points[2] > z -0.05)
& (points[0] < (x + left_size[1]/2)) & (points[0] > (x - left_size[1]/2))
& (points[1] < (y + grab_width/2 + left_size[0])) & (points[1] > (y + grab_width/2))
)
right_finger_box = (
(points[2] < (z + right_size[2])) & (points[2] > z - 0.05)
& (points[0] < (x + right_size[1]/2)) & (points[0] > (x - right_size[1]/2))
& (points[1] < (y - grab_width/2)) & (points[1] > (y-(grab_width/2 + right_size[0])))
)
left_iou = left_finger_box.sum() / (volume[0]+1e-6)
right_iou = right_finger_box.sum() / (volume[1]+1e-6)
if left_iou > iou and right_iou > iou:
return 6
if left_iou > iou :
return 4
if right_iou > iou :
return 5
return 0
def refine_grasp_pose(
points: np.ndarray,
voxel_size: float,
target_position: np.ndarray,
expect_orientation: np.ndarray | None = None,
search_mode: bool = False,
**kwargs
):
"""
refine grasp pose
-----
input:
points: np.ndarray (n, 3), already complete voxel downsample
voxel_size: float
target_position: np.ndarray (3, ), grab target position
expect_orientation: np.ndarray (3, 3), expect grab target orientation
search_mode: bool, Default False
**kwargs:
"grab_width": float
"hand_size": list [width, height, length]
"left_size": list [thick, width, length]
"right_size": list [thick, width, length]
thick of gripper finger, width of gripper finger, length of gripper finger
output:
"""
if expect_orientation is None:
expect_orientation = np.asarray([[0, -1, 0], [1, 0, 0], [0, 0, 1]])
refine = np.zeros(3)
grab_width = kwargs.get('grab_width', 0.10)
hand_size = kwargs.get('hand_size', [0.113, 0.063, 0.13])
left_size = kwargs.get('left_size', [0.006, 0.037, 0.086])
right_size = kwargs.get('right_size', [0.006, 0.037, 0.086])
left_volume = left_size[0] * left_size[1] * left_size[2] / (voxel_size**3)
right_volume = right_size[0] * right_size[1] * right_size[2] / (voxel_size**3)
points = points - target_position[None, :] # (n, 3) - (1, 3) = (n, 3)
points = expect_orientation.T @ points.T # (3, 3) @ (3, n) = (3, n)
points = points[:,
(points[2] < left_size[2]) & (points[2] > -(0.1 + hand_size[2]))
& (points[0] < (0.05 + hand_size[1]/2)) & (points[0] > -(0.05 + hand_size[1]/2))
& (points[1] < (0.05 + grab_width/2 + left_size[0]))
& (points[1] > -(0.05 + grab_width/2 + right_size[0]))
]
frist_sign = False
y_n = y_p = False
left_last = right_last = False
hand_num = left_num = right_num = 0
while hand_num < 20 and left_num < 10 and right_num < 10:
collision_code = collision_detector(
points, refine, volume=[left_volume, right_volume], **kwargs
)
if collision_code == 0:
if y_p and not y_n:
refine[1] += 0.004
y_p = y_n = False
left_last = True
right_last = False
continue
if not y_p and y_n:
refine[1] -= 0.004
y_p = y_n = False
left_last = False
right_last = True
continue
if search_mode:
break
position = target_position + (expect_orientation @ refine.T).T
rmat = tfs.affines.compose(
np.squeeze(np.asarray(position)), expect_orientation, [1, 1, 1]
)
return rmat, True
if collision_code == 6:
return 1202, False
if collision_code == 1:
refine[2] -= 0.008
hand_num += 1
print("z + 0.008")
continue
if collision_code == 2:
if frist_sign:
print("z + 0.004")
refine[2] -= 0.004
frist_sign = True
continue
refine[0] -= 0.002
hand_num += 1
print("x - 0.002")
continue
if collision_code == 3:
if frist_sign:
print("z + 0.004")
refine[2] -= 0.004
frist_sign = True
continue
refine[0] += 0.002
hand_num += 1
print("x + 0.002")
continue
if collision_code == 4:
y_p = True
y_n = False
refine[1] += 0.004
left_num += 1
print("y + 0.004")
continue
if collision_code == 5:
y_p = False
y_n = True
refine[1] -= 0.004
right_num += 1
print("y - 0.004")
continue
else:
return 1202, False
if search_mode:
right_num = left_num = 0
# already in left side
if left_last and not right_last:
y_min = refine[1] - 0.004
while left_num < 10:
left_num += 1
refine[1] += 0.004
collision_code = collision_detector(
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
)
if collision_code:
refine[1] = (refine[1] - 0.004 + y_min) / 2
break
else:
refine[1] = y_min + 0.02
print(f"left_num = {left_num}")
elif not left_last and right_last:
# already in right side
y_max = refine[1] + 0.004
while right_num < 10:
right_num += 1
refine[1] -= 0.004
collision_code = collision_detector(
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
)
if collision_code:
refine[1] = (refine[1] + 0.004 + y_max) / 2
break
else:
refine[1] = y_max - 0.02
print(f"right_num = {right_num}")
elif not left_last and not right_last:
# in middle
y_cur = refine[1]
while left_num < 10:
left_num += 1
refine[1] += 0.004
collision_code = collision_detector(
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
)
if collision_code:
y_max = refine[1] - 0.004
break
else:
y_max = y_cur + 0.040
print(f"left_num = {left_num}")
refine[1] = y_cur
while right_num < 10:
right_num += 1
refine[1] -= 0.004
collision_code = collision_detector(
points, refine, volume=[left_volume, right_volume], search_mode=True, **kwargs
)
if collision_code:
refine[1] = (refine[1] + 0.004 + y_max) / 2
break
else:
refine[1] = (y_cur - 0.040 + y_max) / 2
print(f"right_num = {right_num}")
position = target_position + (expect_orientation @ refine.T).T
rmat = tfs.affines.compose(
np.squeeze(np.asarray(position)),
expect_orientation,
[1, 1, 1]
)
return rmat, True
return 1202, False

View File

@@ -1,184 +0,0 @@
import os
import time
import cv2
import logging
import numpy as np
__all__ = [
"crop_imgs_box_xywh", "crop_imgs_box_xyxy", "crop_imgs_mask", "get_map",
"distortion_correction", "save_img"
]
def crop_imgs_box_xywh(imgs: list, box, same_sign: bool = False):
"""
Crop imgs
input:
imgs: list, Each img in imgs has the same Width and High.
box: The YOLO model outputs bounding box data in the format [x, y, w, h, confidence, class_id].
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
output:
crop_imgs: list;
(x_min, y_min);
"""
if not imgs:
logging.warning("imgs is empty")
return [], (0, 0)
if not same_sign and len(imgs) != 1:
for img in imgs:
if imgs[0].shape != img.shape:
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
high, width = imgs[0].shape[:2]
x_center, y_center, w, h = box[:4]
x_min, x_max = max(0, int(round(x_center - w/2))), min(int(round(x_center + w/2)), width-1)
y_min, y_max = max(0, int(round(y_center - h/2))), min(int(round(y_center + h/2)), high-1)
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
return crop_imgs, (x_min, y_min)
def crop_imgs_box_xyxy(imgs: list, box, same_sign: bool = False):
"""
Crop imgs
input:
imgs: list, Each img in imgs has the same Width and High.
box: The YOLO model outputs bounding box data in the format [x1, y1, x2, y2, confidence, class_id].
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
output:
crop_imgs: list
(x_min, y_min)
"""
if not imgs:
logging.warning("imgs is empty")
return [], (0, 0)
if not same_sign and len(imgs) != 1:
for img in imgs:
if imgs[0].shape != img.shape:
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
high, width = imgs[0].shape[:2]
x1, y1, x2, y2 = box[:4]
x_min, x_max = max(0, int(round(x1))), min(int(round(x2)), width - 1)
y_min, y_max = max(0, int(round(y1))), min(int(round(y2)), high - 1)
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
return crop_imgs, (x_min, y_min)
def crop_imgs_mask(imgs: list, mask: np.ndarray, same_sign: bool = False):
"""
Crop imgs
input:
imgs: list, Each img in imgs has the same Width and High.
mask: np.ndarray
same_sign: bool, Set True to skip size check if all img in imgs have the same Width and High.
output:
crop_imgs: list
(x_min, y_min)
"""
if not imgs:
logging.warning("imgs is empty")
return [], (0, 0)
if not same_sign and len(imgs) != 1:
for img in imgs:
if imgs[0].shape != img.shape:
raise ValueError(f"Img shape are different: {imgs[0].shape} - {img.shape}")
high, width = imgs[0].shape[:2]
ys, xs = np.where(mask > 0)
if xs.size == 0 or ys.size == 0:
# 没有有效像素
return None, None, (None, None)
x_min, x_max = max(0, int(round(xs.min()))), min(int(round(xs.max())), width - 1)
y_min, y_max = max(0, int(round(ys.min()))), min(int(round(ys.max())), high - 1)
crop_imgs = [img[y_min:y_max + 1, x_min:x_max + 1] for img in imgs]
mask_crop = mask[y_min:y_max + 1, x_min:x_max + 1]
return crop_imgs, mask_crop, (x_min, y_min)
def get_map(K: list, D: list, camera_size: list):
"""
input:
K: list, shape (9)
D: list
camera_size: list, [w, h]
output:
map1: np.ndarray
map2: np.ndarray
new_K: list, shape (9)
"""
h, w = camera_size[::-1]
K = np.array(K).reshape(3, 3)
D = np.array(D)
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
return map1, map2, new_K.flatten()
def distortion_correction(
color_image: np.ndarray,
depth_image: np.ndarray,
map1: np.ndarray,
map2: np.ndarray
):
"""
畸变矫正
input:
color_image: np.ndarray
depth_image: np.ndarray
map1: np.ndarray
map2: np.ndarray
output:
undistorted_color: np.ndarray
undistorted_depth: np.ndarray
"""
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
undistorted_color = undistorted_color.astype(color_image.dtype)
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
return undistorted_color, undistorted_depth
def save_img(img: np.ndarray, save_name, save_dir: str | None = None, mark_cur_time: bool = False):
"""
保存图像
input:
img: np.ndarray
- uint8, BGR (H, W, 3)
- uint16 single-channel
save_name: str
save_path: str | None
mark_cur_time: bool
"""
if isinstance(save_dir, str):
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, save_name)
else:
home_path = os.path.expanduser("~")
save_path = os.path.join(home_path, save_name)
if mark_cur_time:
cur_time = int(time.time() * 1000)
path, ext = os.path.splitext(save_path)
save_path = path + f'_{cur_time}' + ext
cv2.imwrite(save_path, img)

View File

@@ -1,140 +0,0 @@
import numpy as np
import open3d as o3d
from typing import Union, List, Tuple
__all__ = [
"object_icp",
]
def _draw_pointcloud(pcd, T):
R = T[0:3, 0:3]
point = T[0:3, 3:4].flatten()
x, y, z = R[:, 0], R[:, 1], R[:, 2]
points = [
[0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1],
point, point + x, point + y, point + z
] # 画点:原点、第一主成分、第二主成分
lines = [
[0, 1], [0, 2], [0, 3],
[4, 5], [4, 6], [4, 7]
] # 画出三点之间两两连线
colors = [
[1, 0, 0], [0, 1, 0], [0, 0, 1],
[1, 0, 0], [0, 1, 0], [0, 0, 1]
]
line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(points), lines=o3d.utility.Vector2iVector(lines))
line_set.colors = o3d.utility.Vector3dVector(colors)
pcd.append(line_set)
o3d.visualization.draw_geometries(pcd)
def _preprocess_point_cloud(pcd, voxel_size):
pcd_down = pcd.voxel_down_sample(voxel_size)
radius_normal = voxel_size * 2
pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))
radius_feature = voxel_size * 5
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd_down,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100)
)
return pcd_down, pcd_fpfh
def _prepare_dataset(source, target, voxel_size):
trans_init = np.identity(4)
source.transform(trans_init)
source_down, source_fpfh = _preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = _preprocess_point_cloud(target, voxel_size)
return source_down, target_down, source_fpfh, target_fpfh
def _execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
distance_threshold = voxel_size * 1.5
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_down,
target_down,
source_fpfh,
target_fpfh,
True,
distance_threshold,
o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
3,
[
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
],
o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))
return result.transformation
def object_icp(
target: o3d.geometry.PointCloud,
source: Union[o3d.geometry.PointCloud, str],
ransac_voxel_size: float = 0.005,
icp_voxel_radius: Union[List[float], Tuple[float, ...], None] = (0.004, 0.002, 0.001),
icp_max_iter: Union[List[int], Tuple[int, ...], None] = (50, 30, 14),
):
if isinstance(source, str):
source = o3d.io.read_point_cloud(source)
elif isinstance(source, o3d.geometry.PointCloud):
pass
else:
raise TypeError(f"Unsupported Type {type(source)}")
voxel_size = 0.005 # means 5mm for this dataset
source_down, target_down, source_fpfh, target_fpfh = _prepare_dataset(source, target, voxel_size)
T = _execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, ransac_voxel_size)
for scale in range(3):
iter = icp_max_iter[scale]
radius = icp_voxel_radius[scale]
# print([iter, radius, scale])
source_down = source.voxel_down_sample(radius)
target_down = target.voxel_down_sample(radius)
source_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
target_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
result_icp = o3d.pipelines.registration.registration_icp(
source_down,
target_down,
radius * 5,
T,
o3d.pipelines.registration.TransformationEstimationPointToPoint(),
o3d.pipelines.registration.ICPConvergenceCriteria(
relative_fitness=1e-6,
relative_rmse=1e-6,
max_iteration=iter
)
)
T = result_icp.transformation
_draw_pointcloud([source.transform(T), target], T)
return T
if __name__ == "__main__":
target = o3d.io.read_point_cloud("pointcloud/pointcloud_0.pcd")
source = o3d.io.read_point_cloud("pointcloud/bottle_model.pcd")
Rmat = object_icp(target, source)

View File

@@ -1,152 +0,0 @@
# import logging
#
# import cv2
import numpy as np
import open3d as o3d
__all__ = ["create_o3d_pcd", "create_o3d_denoised_pcd"]
def point_cloud_denoising(point_cloud: o3d.geometry.PointCloud, voxel_size: float = 0.005):
"""点云去噪"""
point_cloud = point_cloud.remove_non_finite_points()
down_pcd = point_cloud.voxel_down_sample(voxel_size=voxel_size)
point_num = len(down_pcd.points)
# 半径滤波
clean_pcd, _ = down_pcd.remove_radius_outlier(
nb_points=max(int(round(10 * voxel_size / 0.005)), 3),
radius=voxel_size * 10
)
# 统计滤波
clean_pcd, _ = clean_pcd.remove_statistical_outlier(
nb_neighbors=max(int(round(10 * voxel_size / 0.005)), 3),
std_ratio=2.0
)
# 过滤过近的点
points = np.asarray(clean_pcd.points)
clean_pcd.points = o3d.utility.Vector3dVector(points[points[:, 2] >= 0.2])
# # 使用数量最大簇判定噪声强度
# _, counts = np.unique(labels[labels >= 0], return_counts=True)
# largest_cluster_ratio = counts.max() / len(labels)
# if largest_cluster_ratio < 0.5:
# return None
labels = np.array(
clean_pcd.cluster_dbscan(
eps=voxel_size * 10,
min_points=max(int(round(10 * voxel_size / 0.005)), 3)
)
)
if len(labels[labels >= 0]) == 0:
return clean_pcd, True
# 使用距离最近簇作为物体
points = np.asarray(clean_pcd.points)
cluster_label = set(labels)
point_cloud_clusters = []
for label in cluster_label:
if label == -1:
continue
idx = np.where(labels == label)[0]
point_cloud_cluster = clean_pcd.select_by_index(idx)
points_cluster_z = points[idx, 2]
z_avg = np.mean(points_cluster_z)
if z_avg < 0.2:
continue
point_cloud_clusters.append((point_cloud_cluster, z_avg))
if len(point_cloud_clusters) == 0:
return clean_pcd
point_cloud_clusters.sort(key=lambda x: x[1])
clean_pcd = point_cloud_clusters[0][0]
# 使用最近簇判断噪音强度
largest_cluster_ratio = len(clean_pcd.points) / point_num
if largest_cluster_ratio < 0.05:
return 1100, False
return clean_pcd, True
def create_o3d_pcd(depth_img, camera_size, k, **kwargs):
"""
create o3d pcd
--------
input:
depth_img: np.ndarray
camera_size: list
k: np.ndarray | list
**kwargs:
"depth_scale": float
"depth_trunc": float
"voxel_size": float
output:
orign_points: o3d.geometry.PointCloud
"""
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(camera_size[0]),
int(camera_size[1]),
k[0], k[4], k[2], k[5]
)
depth_o3d = o3d.geometry.Image(depth_img.astype(np.uint16))
orign_point_clouds = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=kwargs.get("depth_scale", 1000.0),
depth_trunc=kwargs.get("depth_trunc", 3.0),
)
orign_point_clouds = orign_point_clouds.voxel_down_sample(
kwargs.get("voxel_size", 0.01)
)
# # 半径滤波
# orign_point_clouds, _ = orign_point_clouds.remove_radius_outlier(
# nb_points=int(round(10 * kwargs.get("voxel_size", 0.01) / 0.005)),
# radius=kwargs.get("voxel_size", 0.01) * 10
# )
#
# # 统计滤波
# orign_point_clouds, _ = orign_point_clouds.remove_statistical_outlier(
# nb_neighbors=int(round(20 * kwargs.get("voxel_size", 0.01) / 0.005)),
# std_ratio=2.0
# )
orign_points = np.asarray(orign_point_clouds.points)
return orign_points
def create_o3d_denoised_pcd(depth_img_mask, intrinsics, **kwargs):
"""
create o3d denoised pcd
--------
input:
depth_img_mask: np.ndarray
intrinsics: o3d.camera.PinholeCameraIntrinsic
**kwargs:
"depth_scale": float
"depth_trunc": float
"voxel_size": float
output:
point_cloud: o3d.geometry.PointCloud
"""
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=kwargs.get("depth_scale", 1000.0),
depth_trunc=kwargs.get("depth_trunc", 3.0),
)
point_cloud, sign = point_cloud_denoising(point_cloud, kwargs.get("voxel_size", 0.002))
if not sign:
return 1100, False
if len(point_cloud.points) == 0:
return 1101, False
return point_cloud, True

View File

@@ -13,7 +13,7 @@ class VisionObjectRecognitionClient(Node):
)
# 示例目标参数
self.camera_position = "left"
self.camera_position = ["left"]
self.classes = ["medical_box"]
# 创建 1 秒定时器

View File

@@ -0,0 +1,76 @@
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from interfaces.action import VisionObjectRecognition
class VisionObjectRecognitionClient(Node):
def __init__(self):
super().__init__('vision_object_recognition_client')
self._action_client = ActionClient(
self,
VisionObjectRecognition,
'/vision_object_recognition'
)
# 示例目标参数
self.camera_position = ["left"]
self.classes = ["medicine_box"]
# self.classes = []
# 创建 1 秒定时器
self.timer = self.create_timer(5.0, self.timer_callback)
def timer_callback(self):
if not self._action_client.server_is_ready():
self.get_logger().info("Waiting for action server...")
return
self.timer.cancel()
# 发送目标
goal_msg = VisionObjectRecognition.Goal()
goal_msg.camera_position = self.camera_position
goal_msg.classes = self.classes
self.get_logger().info(f"Sending goal: camera_position={self.camera_position}, classes={self.classes}")
future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f"Feedback: status={feedback.status}, info={feedback.info}")
def goal_response_callback(self, future_response):
goal_handle = future_response.result()
if not goal_handle.accepted:
self.get_logger().warn("Goal rejected by server")
return
self.get_logger().info("Goal accepted by server")
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)
def result_callback(self, future_result):
result = future_result.result().result
self.get_logger().info(f"Result received: success={result.success}, info={result.info}")
for obj in result.objects:
self.get_logger().info(
f"Object: class={obj.class_name}, id={obj.class_id}, pose={obj.pose}, grab_width={obj.grab_width}"
)
def main(args=None):
rclpy.init(args=args)
client = VisionObjectRecognitionClient()
rclpy.spin(client)
client.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,82 @@
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from interfaces.action import VisionObjectRecognition
class VisionObjectRecognitionClient(Node):
def __init__(self):
super().__init__('vision_object_recognition_client')
self._action_client = ActionClient(
self,
VisionObjectRecognition,
'/vision_object_recognition'
)
# 示例目标参数
self.camera_position = ["left"]
self.classes = ["medicine_box"]
# self.classes = []
# 创建 1 秒定时器
self.timer = self.create_timer(5.0, self.timer_callback)
def timer_callback(self):
if not self._action_client.server_is_ready():
self.get_logger().info("Waiting for action server...")
return
self.timer.cancel()
# 发送目标
for i in range(2):
goal_msg = VisionObjectRecognition.Goal()
goal_msg.camera_position = self.camera_position
goal_msg.classes = self.classes
self.get_logger().info(
f"Sending goal {i}: "
f"camera_position={self.camera_position}, "
f"classes={self.classes}"
)
future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f"Feedback: status={feedback.status}, info={feedback.info}")
def goal_response_callback(self, future_response):
goal_handle = future_response.result()
if not goal_handle.accepted:
self.get_logger().warn("Goal rejected by server")
return
self.get_logger().info("Goal accepted by server")
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)
def result_callback(self, future_result):
result = future_result.result().result
self.get_logger().info(f"Result received: success={result.success}, info={result.info}")
for obj in result.objects:
self.get_logger().info(
f"Object: class={obj.class_name}, id={obj.class_id}, pose={obj.pose}, grab_width={obj.grab_width}"
)
def main(args=None):
rclpy.init(args=args)
client = VisionObjectRecognitionClient()
rclpy.spin(client)
client.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -1,15 +1,18 @@
import rclpy
from rclpy.executors import MultiThreadedExecutor
from vision_detect.vision_core import NodeManager
from vision_detect.VisionDetect.node import DetectNode
def main(args=None):
rclpy.init(args=args)
node = DetectNode('detect')
node = NodeManager('detect')
try:
rclpy.spin(node)
executor = MultiThreadedExecutor()
rclpy.spin(node, executor=executor)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()
rclpy.shutdown()

View File

@@ -1,18 +0,0 @@
import rclpy
from rclpy.executors import MultiThreadedExecutor
from vision_detect.vision_core import NodeManager
def main(args=None):
rclpy.init(args=args)
node = NodeManager('detect')
try:
executor = MultiThreadedExecutor()
rclpy.spin(node, executor=executor)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()

View File

@@ -1 +0,0 @@
from . import io

View File

@@ -1 +0,0 @@
from . import image

View File

@@ -1,2 +0,0 @@
from .save import *
from .draw import *

View File

@@ -1,3 +1,2 @@
from .managers import *
from .struct import *
from .enum import NodeType
from .runtime import *
from .enum import *

View File

@@ -0,0 +1,4 @@
from . import detectors
from . import estimators
from . import image_providers
from . import refiners

View File

@@ -1,8 +1,8 @@
import cv2
import numpy as np
from ..enum import Status
from ..struct import ImageData, SegmentationData
from ...enum import Status
from ...data_struct import ImageData, DetectData
from .detector_baseline import DetectorBaseline
@@ -14,7 +14,7 @@ class ColorDetector(DetectorBaseline):
self.distance = config["distance"]
self.color_range = config["color_range"]
def _detect(self, classes_name, image_data: ImageData) -> tuple[SegmentationData | None, int]:
def _detect(self, classes_name, image_data: ImageData) -> tuple[DetectData | None, int]:
if image_data.status != Status.SUCCESS:
return None, image_data.status
@@ -37,4 +37,4 @@ class ColorDetector(DetectorBaseline):
color_image[mask > 0] = color_image[mask > 0] * 0.5 + np.array([255, 0, 0]) * 0.5
return SegmentationData.create_mask_only_data(masks=[mask]), Status.SUCCESS
return DetectData.create_mask_only_data(masks=[mask]), Status.SUCCESS

View File

@@ -1,8 +1,8 @@
import numpy as np
import cv2
from ..enum import Status
from ..struct import ImageData, SegmentationData
from ...enum import Status
from ...data_struct import ImageData, DetectData
from .detector_baseline import DetectorBaseline
@@ -11,10 +11,11 @@ __all__ = ["CrossboardDetector"]
class CrossboardDetector(DetectorBaseline):
def __init__(self, config, _logger):
super().__init__()
self.pattern_size = config["pattern_size"]
self.pattern_size = config["pattern_size"] # (columns, rows)
def _detect(self, classes_name, image_data: ImageData) -> tuple[SegmentationData | None, int]:
def _detect(self, classes_name, image_data: ImageData) -> tuple[DetectData | None, int]:
color_image = image_data.color_image
depth_image = image_data.depth_image
rgb_img_gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(
@@ -41,5 +42,5 @@ class CrossboardDetector(DetectorBaseline):
cv2.fillConvexPoly(mask, pts, 1)
color_image[mask > 0] = color_image[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
return SegmentationData.create_mask_only_data(masks=[mask]), Status.SUCCESS
corners = corners.reshape(-1, 2)
return DetectData.create_mask_only_data(masks=[mask]), Status.SUCCESS

View File

@@ -1,4 +1,4 @@
from ..struct import SegmentationData, ImageDataContainer
from ...data_struct import DetectData, ImageDataContainer
__all__ = ["DetectorBaseline"]
@@ -7,8 +7,8 @@ class DetectorBaseline:
def __init__(self):
pass
def _detect(self, classes_name, image_data) -> tuple[SegmentationData | None, int]:
def _detect(self, classes_name, image_data) -> tuple[DetectData | None, int]:
pass
def get_masks(self, position, classes_name, image_data: ImageDataContainer) -> tuple[SegmentationData | None, int]:
def get_masks(self, position, classes_name, image_data: ImageDataContainer) -> tuple[DetectData | None, int]:
return self._detect(classes_name, image_data[position])

View File

@@ -6,9 +6,9 @@ from ultralytics import YOLO
from ament_index_python.packages import get_package_share_directory
from ..enum import Status
from ...enum import Status
from .detector_baseline import DetectorBaseline
from ..struct import ImageData, SegmentationData
from ...data_struct import ImageData, DetectData
__all__ = ["ObjectDetector"]
@@ -36,7 +36,7 @@ class ObjectDetector(DetectorBaseline):
self,
classes_name: list[str],
image_data: ImageData
) -> tuple[SegmentationData | None, int]:
) -> tuple[DetectData | None, int]:
if image_data.status != Status.SUCCESS:
return None, image_data.status
@@ -62,4 +62,4 @@ class ObjectDetector(DetectorBaseline):
if results[0].masks is None or len(results[0].masks) == 0:
return None, Status.NO_DETECT
return SegmentationData.create_data(results=results), Status.SUCCESS
return DetectData.create_data(results=results), Status.SUCCESS

View File

@@ -1,5 +1,5 @@
from .pca_estimator import *
from .icp_estimator import *
from .e2e_estimator import *
from .gsnet_estimator import *
# from .estimator_baseline import *

View File

@@ -1,15 +1,15 @@
from ..struct import ImageData, ImageDataContainer, SegmentationData, PoseData
from ...data_struct import ImageData, ImageDataContainer, DetectData, PoseData
__all__ = ["EstimatorBaseline"]
class EstimatorBaseline:
def __init__(self):
pass
def __init__(self, config):
self._config = config
def _estimate(
self,
image_data: ImageData,
detect_data: SegmentationData,
detect_data: DetectData,
get_grab_width: bool = True
) -> tuple[PoseData | None, int]:
pass
@@ -18,7 +18,7 @@ class EstimatorBaseline:
self,
position: str,
image_data: ImageDataContainer,
detect_data: SegmentationData,
detect_data: DetectData,
get_grab_width: bool = True
) -> tuple[PoseData | None, int]:
return self._estimate(

View File

@@ -0,0 +1,158 @@
import torch
import numpy as np
import transforms3d as tfs
import MinkowskiEngine as ME
from ...enum import Status
from ...utils import pointcloud, image
from ...data_struct import ImageData, DetectData, PoseData
from .estimator_baseline import EstimatorBaseline
from ..model.gsnet import pred_decode, ModelFreeCollisionDetector
__all__ = ["GSNetEstimator"]
def minkowski_collate_fn(list_data):
coordinates_batch, features_batch = ME.utils.sparse_collate([d["coors"] for d in list_data],
[d["feats"] for d in list_data])
coordinates_batch = np.ascontiguousarray(coordinates_batch, dtype=np.int32)
coordinates_batch, features_batch, _, quantize2original = ME.utils.sparse_quantize(
coordinates_batch, features_batch, return_index=True, return_inverse=True)
res = {
"coors": coordinates_batch,
"feats": features_batch,
"quantize2original": quantize2original,
"point_clouds": torch.stack(
[torch.from_numpy(b) for b in [d["point_clouds"] for d in list_data]], 0)
}
return res
class GSNetEstimator(EstimatorBaseline):
def __init__(self, config):
super().__init__(config)
self.collision_sign = True if self._config.get("collision_thresh", 0.00) > 0 else False
def _estimate(
self,
image_data: ImageData,
detect_data: DetectData,
get_grab_width: bool = True
) -> tuple[PoseData | None, int]:
depth_image = image_data.depth_image
karr = image_data.karr
image_size = depth_image.shape[:2][::-1]
masks = detect_data.masks
# check boxes data
boxes = detect_data.boxes
box_sign = False if boxes is None else True
if self.collision_sign:
full_pcd = pointcloud.create_o3d_pcd(
image_data.depth_image, image_size, image_data.karr, **self._config
)
full_points = np.asarray(full_pcd.points)
n = 0
pose_data = PoseData()
for i, mask in enumerate(masks):
karr_mask = karr.copy()
if box_sign:
crops, p = image.crop_imgs_xywh([depth_image, mask], boxes[i], same_sign=True)
else:
crops, p = image.crop_imgs_mask([depth_image], mask, same_sign=True)
depth_img_mask = np.zeros_like(crops[0])
depth_img_mask[crops[1] > 0] = crops[0][crops[1] > 0]
karr_mask[2] -= p[0]
karr_mask[5] -= p[1]
pcd, CODE = pointcloud.create_o3d_denoised_pcd(
depth_img_mask, image_size, karr_mask, **self._config
)
if CODE != 0:
pose_data.add_data(CODE)
continue
# 构建推理数据结构
points = np.asarray(pcd.points)
ret_dict = {
'point_clouds': points.astype(np.float32),
'coors': points.astype(np.float32) / self._config.get("voxel_size", 0.002),
'feats': np.ones_like(points).astype(np.float32),
}
batch_data = minkowski_collate_fn([ret_dict])
# 将数据置于对应的设备上
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
for key in batch_data:
batch_data[key] = batch_data[key].to(device)
# 点云数量判断,是否返回
if batch_data['coors'].shape[0] < 128: # 例如 128 / 256
pose_data.add_data(Status.COORS_TOO_FEW)
continue
if batch_data["point_clouds"].shape[1] < 128: # 例如 128 / 256
pose_data.add_data(Status.POINT_CLOUDS_TOO_FEW)
continue
# 梯度置0进入推理
with torch.no_grad():
end_points = self._config.get("model")(batch_data)
if end_points is None:
pose_data.add_data(Status.ZERO_TRUE_NUM)
continue
grasp_preds = pred_decode(end_points)
# 推理结果后处理
preds = grasp_preds[0].detach().cpu().numpy()
sorted_index = np.argsort(-preds[:, 0])
preds = preds[sorted_index]
preds = preds[:10]
if self.collision_sign:
mfcdetector = ModelFreeCollisionDetector(
full_points,
voxel_size=self._config.get("voxel_size", 0.002)
)
collision_mask = mfcdetector.detect(
preds, approach_dist=0.05,
collision_thresh=self._config.get("collision_thresh", 0.01)
)
preds = preds[~collision_mask]
Rs = preds[:, 4:13].reshape(-1, 3, 3)
centers = preds[:, 13:16].reshape(-1, 3)
grab_width = preds[:, 1]
if not len(Rs):
pose_data.add_data(Status.E2E_NO_PREDICTION)
continue
pose_rmat = []
for r, center in zip(Rs, centers):
vz, vx, vy = r[:, 0], r[:, 1], r[:, 2]
if vx[0] < 0:
vx = -vx
if vz[2] < 0:
vz = -vz
if not np.allclose(np.cross(vx, vy), vz):
vy = -vy
R = np.column_stack((vx, vy, vz))
pose_rmat.append(tfs.affines.compose(np.squeeze(np.asarray(center)), R, [1, 1, 1]))
if len(pose_rmat) == 0:
pose_data.add_data(Status.E2E_NO_VALID_MATRIX)
continue
pose_data.add_data(Status.SUCCESS, pose_rmat[0], [grab_width[0]])
n += 1
if n == 0:
return None, Status.CANNOT_ESTIMATE
return pose_data, Status.SUCCESS

View File

@@ -0,0 +1,215 @@
from pathlib import Path
import numpy as np
import open3d as o3d
from ament_index_python.packages import get_package_share_directory
from ...enum import Status
from ...utils import pointcloud, image
from .estimator_baseline import EstimatorBaseline
from ...data_struct import ImageData, DetectData, PoseData
__all__ = ["ICPEstimator"]
SHARE_DIR = Path(get_package_share_directory('vision_detect'))
# def _draw_pointcloud(pcd, T):
# R = T[0:3, 0:3]
# point = T[0:3, 3:4].flatten()
# x, y, z = R[:, 0], R[:, 1], R[:, 2]
#
# points = [
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1],
# point, point + x, point + y, point + z
#
# ] # 画点:原点、第一主成分、第二主成分
# lines = [
# [0, 1], [0, 2], [0, 3],
# [4, 5], [4, 6], [4, 7]
# ] # 画出三点之间两两连线
# colors = [
# [1, 0, 0], [0, 1, 0], [0, 0, 1],
# [1, 0, 0], [0, 1, 0], [0, 0, 1]
# ]
#
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(points),
# lines=o3d.utility.Vector2iVector(lines))
# line_set.colors = o3d.utility.Vector3dVector(colors)
#
# pcd.append(line_set)
# o3d.visualization.draw_geometries(pcd)
def _preprocess_point_cloud(
pcd: o3d.geometry.PointCloud,
voxel_size: float
):
pcd_down = pcd.voxel_down_sample(voxel_size)
radius_normal = voxel_size * 2
pcd_down.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30)
)
radius_feature = voxel_size * 5
pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd_down,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100)
)
return pcd_down, pcd_fpfh
def _prepare_dataset(source, target, voxel_size):
trans_init = np.identity(4)
source.transform(trans_init)
source_down, source_fpfh = _preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = _preprocess_point_cloud(target, voxel_size)
return source_down, target_down, source_fpfh, target_fpfh
def _execute_global_registration(
source_down,
target_down,
source_fpfh,
target_fpfh,
voxel_size
):
distance_threshold = voxel_size * 1.5
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_down,
target_down,
source_fpfh,
target_fpfh,
True,
distance_threshold,
o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
3,
[
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
],
o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))
return result.transformation
def _object_icp(
target: o3d.geometry.PointCloud,
source: o3d.geometry.PointCloud,
ransac_voxel_size: float = 0.005,
icp_voxel_radius: list[float] | tuple[float, ...] | None = (0.004, 0.002, 0.001),
icp_max_iter: list[int] | tuple[int, ...] | None = (50, 30, 14),
):
# # check is PointCloud
# if isinstance(source, str):
# source = o3d.io.read_point_cloud(source)
# elif isinstance(source, o3d.geometry.PointCloud):
# pass
# else:
# raise TypeError(f"Unsupported Type {type(source)}")
voxel_size = 0.005 # means 5mm for this dataset
source_down, target_down, source_fpfh, target_fpfh = _prepare_dataset(source, target,
voxel_size)
T = _execute_global_registration(source_down, target_down, source_fpfh, target_fpfh,
ransac_voxel_size)
for scale in range(3):
_iter = icp_max_iter[scale]
radius = icp_voxel_radius[scale]
# print([_iter, radius, scale])
source_down = source.voxel_down_sample(radius)
target_down = target.voxel_down_sample(radius)
source_down.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
target_down.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
result_icp = o3d.pipelines.registration.registration_icp(
source_down,
target_down,
radius * 5,
T,
o3d.pipelines.registration.TransformationEstimationPointToPoint(),
o3d.pipelines.registration.ICPConvergenceCriteria(
relative_fitness=1e-6,
relative_rmse=1e-6,
max_iteration=_iter
)
)
T = result_icp.transformation
# # draw pcd and source
# _draw_pointcloud([source.transform(T), target], T)
return T
class ICPEstimator(EstimatorBaseline):
def __init__(self, config):
super().__init__(config)
self._pcd_source_full = o3d.io.read_point_cloud(SHARE_DIR / config["complete_model_path"])
def _estimate(
self,
image_data: ImageData,
detect_data: DetectData,
get_grab_width: bool = True
) -> tuple[PoseData | None, int]:
depth_image = image_data.depth_image
karr = image_data.karr
image_size = depth_image.shape[:2][::-1]
masks = detect_data.masks
boxes = detect_data.boxes
box_sign = False if boxes is None else True
n = 0
pose_data = PoseData()
for i, mask in enumerate(masks):
karr_mask = karr.copy()
if box_sign:
crops, p = image.crop_imgs_xywh([depth_image, mask], boxes[i], same_sign=True)
else:
crops, p = image.crop_imgs_mask([depth_image], mask, same_sign=True)
depth_img_mask = np.zeros_like(crops[0])
depth_img_mask[crops[1] > 0] = crops[0][crops[1] > 0]
karr_mask[2] -= p[0]
karr_mask[5] -= p[1]
pcd, CODE = pointcloud.create_o3d_denoised_pcd(
depth_img_mask, image_size, karr_mask, **self._config
)
if CODE != 0:
pose_data.add_data(CODE)
continue
pose_rmat = _object_icp(
self._pcd_source_full.get("source"),
pcd,
ransac_voxel_size=self._config.get("ransac_voxel_size", 0.005),
icp_voxel_radius=self._config.get("icp_voxel_radius", [0.004, 0.002, 0.001]),
icp_max_iter=self._config.get("icp_max_iter", [50, 30, 14])
)
grab_width = [0.0, 0.0, 0.0]
pose_data.add_data(Status.SUCCESS, pose_rmat, tuple(grab_width))
n += 1
if n == 0:
return pose_data, Status.CANNOT_ESTIMATE
return pose_data, Status.SUCCESS

View File

@@ -1,10 +1,10 @@
import numpy as np
import transforms3d as tfs
from .. enum import Status
from ..utils import pointcloud, image
from ...enum import Status
from ...utils import pointcloud, image
from .estimator_baseline import EstimatorBaseline
from ..struct import ImageData, SegmentationData, PoseData
from ...data_struct import ImageData, DetectData, PoseData
__all__ = ["PCAEstimator"]
@@ -31,28 +31,23 @@ def pca(data: np.ndarray, sort=True):
class PCAEstimator(EstimatorBaseline):
def __init__(self, config):
super().__init__()
self.config = config
super().__init__(config)
def _estimate(
self,
image_data: ImageData,
detect_data: SegmentationData,
detect_data: DetectData,
get_grab_width: bool = True,
) -> tuple[PoseData | None, int]:
depth_image = image_data.depth_image
karr = image_data.karr
h, w = depth_image.shape[:2]
image_size = [w, h]
image_size = depth_image.shape[:2][::-1]
masks = detect_data.masks
# check boxes data
boxes = detect_data.boxes
if boxes is None:
box_sign = False
else:
box_sign = True
box_sign = False if boxes is None else True
n = 0
pose_data = PoseData()
@@ -70,7 +65,7 @@ class PCAEstimator(EstimatorBaseline):
karr_mask[5] -= p[1]
pcd, CODE = pointcloud.create_o3d_denoised_pcd(
depth_img_mask, image_size, karr_mask, **self.config
depth_img_mask, image_size, karr_mask, **self._config
)
if CODE != 0:
pose_data.add_data(CODE)
@@ -118,5 +113,5 @@ class PCAEstimator(EstimatorBaseline):
n += 1
if n == 0:
return pose_data, Status.CANNOT_ESTIMATE
return None, Status.CANNOT_ESTIMATE
return pose_data, Status.SUCCESS

View File

@@ -0,0 +1,5 @@
from .driver_source import *
from .single_topic_source import *
from .muli_topic_source import *
# from .source_baseline import *

View File

@@ -9,8 +9,8 @@ __all__ = ["DriverSource"]
class DriverSource(SourceBaseline):
def __init__(self, config, node: Node):
super().__init__()
self.sub = node.create_subscription(
super().__init__(config)
self._sub = node.create_subscription(
ImgMsg,
config["subscription_name"],
self._subscription_callback,
@@ -18,8 +18,8 @@ class DriverSource(SourceBaseline):
)
def _subscription_callback(self, msg):
with self.lock:
self.images_buffer.save_data(
with self._lock:
self._images_buffer.save_data(
position=msg.position,
color=msg.image_color,
depth=msg.image_depth,

View File

@@ -0,0 +1,80 @@
import threading
from rclpy.node import Node
from message_filters import Subscriber, ApproximateTimeSynchronizer
from sensor_msgs.msg import Image, CameraInfo
from .source_baseline import SourceBaseline
__all__ = ["MuliTopicSource"]
class TopicSubscriber:
def __init__(
self,
position,
topic_configs,
node: Node,
lock: threading.Lock,
image_buffer
):
self.lock = lock
self.position = position
self.image_buffer = image_buffer
self.event = threading.Event()
self.camera_info = []
self.sub_camera_info = node.create_subscription(
CameraInfo,
topic_configs[2],
self._camera_info_callback,
10
)
node.get_logger().info("Waiting for camera info...")
self.event.wait()
node.destroy_subscription(self.sub_camera_info)
node.get_logger().info("Camera info received.")
self.sub_color = Subscriber(node, Image, topic_configs[0])
self.sub_depth = Subscriber(node, Image, topic_configs[1])
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color, self.sub_depth],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_sub_callback)
def _sync_sub_callback(self, color, depth):
with self.lock:
self.image_buffer.save_data(
position=self.position,
color=color,
depth=depth,
karr=self.camera_info[0],
darr=self.camera_info[1]
)
def _camera_info_callback(self, msg):
if len(self.camera_info) == 2:
return
if msg.k is not None and len(msg.k) > 0 and msg.k is not None and len(msg.k) > 0:
self.camera_info = [msg.k, msg.d]
self.event.set()
class MuliTopicSource(SourceBaseline):
def __init__(self, config, node: Node):
super().__init__(config)
self.subscribers = [
TopicSubscriber(
key,
value,
node,
self._lock,
self._images_buffer
) for key, value in config
]

View File

@@ -1,6 +1,5 @@
import rclpy
from cv_bridge import CvBridge
from rclpy.task import Future
import threading
from rclpy.node import Node
from message_filters import Subscriber, ApproximateTimeSynchronizer
@@ -9,13 +8,13 @@ from sensor_msgs.msg import Image, CameraInfo
from .source_baseline import SourceBaseline
__all__ = ["TopicSource"]
__all__ = ["SingleTopicSource"]
class TopicSource(SourceBaseline):
class SingleTopicSource(SourceBaseline):
def __init__(self, config, node: Node):
super().__init__()
super().__init__(config)
self.position = config["position"]
self.future = Future()
self.event = threading.Event()
self.camera_info = []
self.sub_camera_info = node.create_subscription(
@@ -26,12 +25,12 @@ class TopicSource(SourceBaseline):
)
node.get_logger().info("Waiting for camera info...")
rclpy.spin_until_future_complete(node, self.future)
self.event.wait()
node.destroy_subscription(self.sub_camera_info)
node.get_logger().info("Camera info received.")
self.sub_color_image = Subscriber(self, Image, config["color_image_topic_name"])
self.sub_depth_image = Subscriber(self, Image, config["depth_image_topic_name"])
self.sub_color_image = Subscriber(node, Image, config["color_image_topic_name"])
self.sub_depth_image = Subscriber(node, Image, config["depth_image_topic_name"])
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
@@ -40,15 +39,15 @@ class TopicSource(SourceBaseline):
self.sync_subscriber.registerCallback(self._sync_sub_callback)
def _camera_info_callback(self, msg):
self.camera_info = [msg.k, msg.d]
if (self.camera_info[0] is not None and len(self.camera_info[0]) > 0 and
self.camera_info[1] is not None and len(self.camera_info[1]) > 0):
if not self.future.done():
self.future.set_result(True)
if len(self.camera_info) == 2:
return
if msg.k is not None and len(msg.k) > 0 and msg.k is not None and len(msg.k) > 0:
self.camera_info = [msg.k, msg.d]
self.event.set()
def _sync_sub_callback(self, color, depth):
with self.lock:
self.images_buffer.save_data(
with self._lock:
self._images_buffer.save_data(
position=self.position,
color=color,
depth=depth,

View File

@@ -1,13 +1,15 @@
import time
import threading
from dataclasses import dataclass, field
import cv2
from numpy.typing import NDArray
from cv_bridge import CvBridge
from ..enum import Status
from ..struct import ImageDataContainer
from ..utils import image
from ...enum import Status
from ...data_struct import ImageDataContainer
from ...utils import image
__all__ = ['SourceBaseline']
@@ -53,31 +55,36 @@ class _ImageBuffer:
class SourceBaseline:
def __init__(self):
self.images_buffer = _ImageBuffer()
def __init__(self, config):
self._images_buffer = _ImageBuffer()
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
self._lock = threading.Lock()
self._clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
self.distortion_switch = config.get("distortion", False)
self.denoising_switch = config.get("denoising", False)
self.enhancement_switch = config.get("enhancement", False)
def get_images(self, positions: tuple[str, ...]) -> tuple[ImageDataContainer | None, int]:
time_start = time.time()
with self.lock:
with self._lock:
image_data = ImageDataContainer()
if len(self.images_buffer) == 0:
if len(self._images_buffer) == 0:
return None, Status.NO_CAMERA_DATA
buffer_data_list = []
for position in positions:
if not (position in self.images_buffer):
if not (position in self._images_buffer):
# image_data.add_data(position, Status.NO_POSITION_DATA)
buffer_data_list.append(Status.NO_POSITION_DATA)
continue
if self.images_buffer[position].is_empty():
if self._images_buffer[position].is_empty():
# image_data.add_data(position, Status.NO_POSITION_DATA)
buffer_data_list.append(Status.NO_POSITION_DATA)
continue
buffer_data_list.append(self.images_buffer[position])
buffer_data_list.append(self._images_buffer[position])
time_1 = time.time()
valid_positions = 0
@@ -86,19 +93,37 @@ class SourceBaseline:
image_data.add_data(position, Status.NO_POSITION_DATA)
continue
color_img_cv = self.cv_bridge.imgmsg_to_cv2(
self.images_buffer[position].image_color, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(
self.images_buffer[position].image_depth, '16UC1')
color_img_cv = self.cv_bridge.imgmsg_to_cv2(data.image_color, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(data.image_depth, '16UC1')
camera_size = color_img_cv.shape[:2][::-1]
color_img_cv, depth_img_cv, k = image.distortion_correction(
color_img_cv,
depth_img_cv,
self.images_buffer[position].karr,
self.images_buffer[position].darr,
camera_size
)
# 畸变矫正
if self.distortion_switch:
camera_size = color_img_cv.shape[:2][::-1]
color_img_cv, depth_img_cv, k = image.distortion_correction(
color_img_cv,
depth_img_cv,
data.karr,
data.darr,
camera_size
)
else:
k = data.karr
# 彩色图像双边滤波
if self.denoising_switch:
color_img_cv = cv2.bilateralFilter(color_img_cv, 9, 75, 75)
# 彩色图像增强
if self.enhancement_switch:
lab = cv2.cvtColor(color_img_cv, cv2.COLOR_BGR2LAB)
ch = cv2.split(lab)
avg_luminance = cv2.mean(ch[0])[0]
ch[0] = self._clahe.apply(ch[0])
current_avg_luminance = cv2.mean(ch[0])[0]
alpha = avg_luminance / current_avg_luminance
ch[0] = cv2.convertScaleAbs(ch[0], alpha=alpha, beta=0)
lab = cv2.merge(ch)
color_img_cv = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
image_data.add_data(
position=position,
@@ -106,7 +131,7 @@ class SourceBaseline:
color_image=color_img_cv,
depth_image=depth_img_cv,
karr=list(k),
darr=tuple(self.images_buffer[position].darr)
darr=tuple(self._images_buffer[position].darr)
)
valid_positions += 1

View File

@@ -0,0 +1,3 @@
from ...model import gsnet
__all__ = ['gsnet']

View File

@@ -1,11 +1,9 @@
import time
import numpy as np
import transforms3d as tfs
from ..enum import Status
from ..utils import pointcloud, algorithm
from ..struct import ImageData, PoseData
from ...enum import Status
from ...utils import pointcloud, transforms
from ...data_struct import ImageData, PoseData
from .refiner_baseline import RefinerBaseline
@@ -399,9 +397,10 @@ class FixedOrientationRefiner(RefinerBaseline):
**kwargs) -> tuple[PoseData, int]:
image_size = image_data.depth_image.shape[:2][::-1]
full_points = pointcloud.create_o3d_pcd(
full_pcd = pointcloud.create_o3d_pcd(
image_data.depth_image, image_size, image_data.karr, **self.config
)
full_points = np.asarray(full_pcd.points)
n = 0
for i, (status, pose_mat, grasp_width) in enumerate(pose_data):
@@ -416,7 +415,7 @@ class FixedOrientationRefiner(RefinerBaseline):
continue
pose_mat = calibration_mat @ pose_mat
quat = algorithm.rmat2quat(pose_mat)
quat = transforms.rmat2quat(pose_mat)
pose_data.set_data(i, Status.SUCCESS, quat, grasp_width)
n += 1

View File

@@ -2,10 +2,10 @@ from typing import Optional, Tuple
import numpy as np
from ..enum import Status
from ...enum import Status
from .refiner_baseline import RefinerBaseline
from ..struct import ImageDataContainer, PoseData
from ..utils import algorithm
from ...data_struct import ImageDataContainer, PoseData
from ...utils import transforms
__all__ = ["NoRefiner"]
@@ -22,7 +22,7 @@ class NoRefiner(RefinerBaseline):
continue
pose_mat = calibration_mat @ pose_mat
quat = algorithm.rmat2quat(pose_mat)
quat = transforms.rmat2quat(pose_mat)
pose_data.set_data(i, Status.SUCCESS, quat, grasp_width)
return pose_data, Status.SUCCESS

View File

@@ -1,6 +1,6 @@
from numpy.typing import NDArray
from ..struct import ImageData, ImageDataContainer, PoseData
from ...data_struct import ImageData, ImageDataContainer, PoseData
__all__ = ['RefinerBaseline']

View File

@@ -3,21 +3,26 @@ from numpy.typing import NDArray
import numpy as np
__all__ = ["SegmentationData"]
__all__ = ["DetectData"]
SUCCESS = 0
@dataclass(slots=True)
class SegmentationData:
class DetectData:
status: int
results: list | None = None
# General
masks: list[NDArray] | None = field(init=False, default=None)
boxes: list[NDArray] | None = field(init=False, default=None)
class_ids: list[int] | None = field(init=False, default=None)
confidences: list[float] | None = field(init=False, default=None)
labels_map: list[str] | None = field(init=False, default=None)
# Crossboard special
board_points: NDArray | None = field(init=False, default=None) # board model points
camera_points: NDArray | None = field(init=False, default=None) # camera get board points
def __iter__(self):
"""遍历获得id和标签名"""
if (self.masks is None or self.boxes is None or self.confidences is None
@@ -82,7 +87,16 @@ class SegmentationData:
raise ValueError
return obj
@classmethod
def create_crossboard_data(cls, detect_data, board_data):
# obj = cls(status=SUCCESS, results=None)
# obj.masks = masks
# if obj.masks is None or len(obj.masks) == 0:
# raise ValueError
# return obj
pass
@dataclass(slots=True)
class SegmentationDataContainer:
class DetectDataContainer:
pass

View File

@@ -1,19 +1,24 @@
from dataclasses import dataclass, field
from numpy.typing import NDArray
from ..enum import Status
from ..utils import LOGGING_MAP
__all__ = ["ImageDataContainer", "ImageData"]
SUCCESS = 0
@dataclass(slots=True)
class ImageData:
status: int
color_image: NDArray | None = None
depth_image: NDArray | None = None
karr: tuple[float, ...] | NDArray | None = None
karr: list[float] | NDArray | None = None
darr: tuple[float, ...] | NDArray | None = None
def status(self) -> str:
return f"{self.status:04d}"
@dataclass(slots=True)
class ImageDataContainer:
@@ -43,15 +48,15 @@ class ImageDataContainer:
karr: list[float] | NDArray | None = None,
darr: tuple[float, ...] | NDArray | None = None
):
if status == SUCCESS:
if status == Status.SUCCESS:
self._data_dict[position] = (ImageData(status, color_image, depth_image, karr, darr))
else:
self._data_dict[position] = (ImageData(status, None, None, None, None))
def check_data_status(self, logger, logging_map):
def check_data_status(self, logger):
for position, data in self._data_dict.items():
if data.status == 0:
if data.status == Status.SUCCESS:
continue
logger.warning(
f"{position}-Image: {logging_map.get(f'{data.status:04d}', f'unknown code: {data.status:04d}')}"
f"{position} Image: {LOGGING_MAP.get(data.status(), f'unknown code: {data.status()}')}"
)

View File

@@ -1,2 +1,2 @@
from .mode_type import *
from .logging_code import *
from .component_mode_enum import *
from .logging_code_enum import *

View File

@@ -14,6 +14,7 @@ class NodeType(IntEnum):
class SourceType(IntEnum):
DRIVER = 0
DIRECT = 1
MUILTOPIC = 2
class DetectorType(IntEnum):
@@ -25,7 +26,7 @@ class DetectorType(IntEnum):
class EstimatorType(IntEnum):
PCA = 0
ICP = 1
E2E = 2
GSNET = 2
class RefinerType(IntEnum):

View File

@@ -27,6 +27,8 @@ class Status(IntEnum):
TASK_ABORTED = 302
WORKER_TIMEOUT = 303
TASK_EXECUTOR_INTERNAL_ERROR = 400
NO_DETECT = 1000
NO_DEPTH_CROP = 1001
FAIL_DETECT_VALID_POSE = 1003
@@ -36,6 +38,11 @@ class Status(IntEnum):
NO_CROSSBOARD = 1020
TOO_MACH_NOISE = 1100
POINTS_EMPTY = 1101
POINTS_TOO_CLOSELY = 1102
TOO_FEW_POINTS_OBB = 1200
PCA_NO_VECTOR = 1201
REFINE_FAIL = 1202

View File

@@ -1,7 +0,0 @@
from typing import Optional, Tuple
from .estimator_baseline import EstimatorBaseline

View File

@@ -1,7 +0,0 @@
from typing import Optional, Tuple
from .estimator_baseline import EstimatorBaseline

View File

@@ -1,4 +0,0 @@
from .driver_source import *
from .topic_source import *
# from .source_baseline import *

View File

@@ -1,17 +0,0 @@
from typing import Tuple
from rclpy.node import Node
from ..map import SOURCE_MAP
from ..struct import ImageDataContainer
class SourceManager:
"""
register source
"""
def __init__(self, mode, config, node: Node):
self.source = SOURCE_MAP.get(mode)(config, node)
def get_images(self, positions) -> Tuple[ImageDataContainer | None, int]:
return self.source.get_images(positions=positions)

View File

@@ -1 +0,0 @@
from .manager_map import *

View File

@@ -1,58 +0,0 @@
from .. import image_sources, detectors, estimators, refiners
from ..enum import NodeType, SourceType, DetectorType, EstimatorType, RefinerType
__all__ = [
"SOURCE_MAP", "DETECTOR_MAP",
"ESTIMATOR_MAP", "REFINER_MAP",
"CONFIG_MAP"
]
DETECTOR_MAP = {
DetectorType.OBJECT: detectors.ObjectDetector,
DetectorType.COLOR: detectors.ColorDetector,
DetectorType.CROSSBOARD: detectors.CrossboardDetector
}
SOURCE_MAP = {
SourceType.DRIVER: image_sources.DriverSource,
SourceType.DIRECT: image_sources.TopicSource
}
ESTIMATOR_MAP = {
EstimatorType.PCA: estimators.PCAEstimator,
# EstimatorType.ICP: estimators.ICPEstimator,
# EstimatorType.E2E: estimators.E2EEstimator,
}
REFINER_MAP = {
RefinerType.NO: refiners.NoRefiner,
RefinerType.FIXED: refiners.FixedOrientationRefiner
}
CONFIG_MAP = {
"node": {
NodeType.PUBLISHER: "publisher_configs",
NodeType.SERVICE: "service_configs",
NodeType.ACTION: "action_configs",
},
"source":{
SourceType.DRIVER: "driver_configs",
SourceType.DIRECT: "direct_configs"
},
"detector":{
DetectorType.OBJECT: "object_configs",
DetectorType.COLOR: "color_configs",
DetectorType.CROSSBOARD: "crossboard_configs"
},
"estimator":{
(EstimatorType.PCA, None): "pca_configs",
(EstimatorType.ICP, DetectorType.OBJECT): "icp_configs",
(EstimatorType.E2E, DetectorType.OBJECT): "e2e_configs"
},
"refiner":{}
}

View File

@@ -0,0 +1 @@
from .managers import *

View File

@@ -0,0 +1,2 @@
from .configs_name_map import *
from .components_type_map import *

View File

@@ -0,0 +1,33 @@
from ...components import image_providers, detectors, estimators, refiners
from ...enum import SourceType, DetectorType, EstimatorType, RefinerType
__all__ = [
"SOURCE_MAP", "DETECTOR_MAP",
"ESTIMATOR_MAP", "REFINER_MAP"
]
SOURCE_MAP = {
SourceType.DRIVER: image_providers.DriverSource,
SourceType.DIRECT: image_providers.SingleTopicSource
}
DETECTOR_MAP = {
DetectorType.OBJECT: detectors.ObjectDetector,
DetectorType.COLOR: detectors.ColorDetector,
DetectorType.CROSSBOARD: detectors.CrossboardDetector
}
ESTIMATOR_MAP = {
EstimatorType.PCA: estimators.PCAEstimator,
EstimatorType.ICP: estimators.ICPEstimator,
# EstimatorType.GSNET: estimators.E2EEstimator,
}
REFINER_MAP = {
RefinerType.NO: refiners.NoRefiner,
RefinerType.FIXED: refiners.FixedOrientationRefiner
}

View File

@@ -0,0 +1,32 @@
from ...enum import NodeType, SourceType, DetectorType, EstimatorType
__all__ = [
"CONFIG_MAP"
]
CONFIG_MAP = {
"mode":{},
"node": {
NodeType.PUBLISHER: "publisher_node_configs",
NodeType.SERVICE: "service_node_configs",
NodeType.ACTION: "action_node_configs",
},
"source":{
SourceType.DRIVER: "driver_source_configs",
SourceType.DIRECT: "direct_source_configs"
},
"detector":{
DetectorType.OBJECT: "object_detector_configs",
DetectorType.COLOR: "color_detector_configs",
DetectorType.CROSSBOARD: "crossboard_detector_configs"
},
"estimator":{
(EstimatorType.PCA, None): "pca_estimator_configs",
(EstimatorType.ICP, DetectorType.OBJECT): "icp_estimator_configs",
(EstimatorType.GSNET, DetectorType.OBJECT): "gsnet_estimator_configs"
},
"refiner":{}
}

View File

@@ -1,6 +1,6 @@
from .config_manager import *
from .source_manager import *
from .detector_manager import *
from .estimator_manager import *
from .refiner_manager import *
from .resource_manager import *
from .resource_manager import *
from .source_manager import *

View File

@@ -2,9 +2,11 @@ import os
import json
import logging
from ..utils import io, format
from ..map import CONFIG_MAP
from ..enum import SourceType, DetectorType, EstimatorType, RefinerType, NodeType
from ...utils import io
from ...utils import format
from ..manager_map import CONFIG_MAP
from ...utils import LOGGING_MAP
from ...enum import SourceType, DetectorType, EstimatorType, RefinerType, NodeType
__all__ = ["ConfigManager"]
@@ -67,59 +69,67 @@ class ConfigManager:
self.node_mode = NodeType[node_mode]
self.node_config = config.get(CONFIG_MAP["node"].get(self.node_mode))
if self.node_config is None:
raise KeyError
raise KeyError(f"node_mode: {node_mode}")
except (KeyError, ValueError):
self.node_mode = NodeType.SERVICE
self.node_config = config["service_configs"]
self.node_config = config.get(CONFIG_MAP["node"].get(self.node_mode))
try:
self.source_mode = SourceType[image_source]
self.source_config = config.get(CONFIG_MAP["source"].get(self.source_mode))
if self.source_config is None:
raise KeyError
raise KeyError(f"image_source: {image_source}")
except (KeyError, ValueError):
self.source_mode = SourceType.DRIVER
self.source_config = config["driver_configs"]
self.source_config = config.get(CONFIG_MAP["source"].get(self.source_mode))
finally:
preprocess_configs = config.get("preprocess_configs")
self.source_config.update(preprocess_configs)
try:
self.detector_mode = DetectorType[detect_mode]
self.detector_config = config.get(CONFIG_MAP["detector"].get(self.detector_mode))
if self.detector_config is None:
raise KeyError
raise KeyError(f"detect_mode: {detect_mode}")
except (KeyError, ValueError):
self.detector_mode = DetectorType.OBJECT
self.detector_config = config["object_configs"]
self.detector_config = config.get(CONFIG_MAP["detector"].get(self.detector_mode))
try:
self.estimator_mode = EstimatorType[estimate_mode]
self.estimator_config = config.get(CONFIG_MAP["estimator"].get(self.estimator_mode))
self.estimator_config = config.get(CONFIG_MAP["estimator"].get(
(self.estimator_mode, self.detector_mode)
))
if self.estimator_config is None:
raise KeyError
raise KeyError(f"estimate_mode: {estimate_mode}")
except (KeyError, ValueError):
self.estimator_mode = EstimatorType.PCA
self.estimator_config = config["pca_configs"]
self.estimator_config = config.get(CONFIG_MAP["estimator"].get(
(self.estimator_mode, None)
))
try:
self.refiner_mode = RefinerType[refine_mode]
if self.refiner_mode == RefinerType.NO:
self.refiner_config = {}
elif (self.refiner_mode == RefinerType.FIXED
and self.detector_mode == DetectorType.OBJECT):
and self.detector_mode == DetectorType.OBJECT
and self.estimator_mode == EstimatorType.PCA):
self.refiner_config = {
"depth_scale": self.estimator_config.get("depth_scale", 1000.0),
"depth_trunc": self.estimator_config.get("depth_trunc", 3.0),
"voxel_size": self.estimator_config.get("voxel_size", 0.002)
}
else: raise KeyError
else: raise KeyError(f"refine_mode: {refine_mode}")
except KeyError or ValueError:
self.refiner_mode = RefinerType.NO
self.refiner_config = {}
# self.logger.info(f"node: {self.node_mode}")
# self.logger.info(f"source: {self.source_mode}")
# self.logger.info(f"detector: {self.detector_mode}")
# self.logger.info(f"estimator: {self.estimator_mode}")
# self.logger.info(f"refiner: {self.refiner_mode}")
self.logger.info(f"node: {self.node_mode}")
self.logger.info(f"source: {self.source_mode}")
self.logger.info(f"detector: {self.detector_mode}")
self.logger.info(f"estimator: {self.estimator_mode}")
self.logger.info(f"refiner: {self.refiner_mode}")
def load_calibration(self, shared_dir):
eye_in_left_hand_path = os.path.join(shared_dir, self.calibration["left_hand"])
@@ -129,14 +139,14 @@ class ConfigManager:
self.left, CODE = io.load_calibration_mat(eye_in_left_hand_path)
self.logger.info(f"\nleft_hand_mat: \n{format.np_mat2str(self.left)}")
if CODE != 0:
self.logger.warning(f"left_hand: {self.logging_map[f'{CODE:04d}']}")
self.logger.warning(f"left_hand: {LOGGING_MAP[f'{CODE:04d}']}")
self.right, CODE = io.load_calibration_mat(eye_in_right_hand_path)
self.logger.info(f"\nright_hand_mat: \n{format.np_mat2str(self.right)}")
if CODE != 0:
self.logger.warning(f"right_hand: {self.logging_map[f'{CODE:04d}']}")
self.logger.warning(f"right_hand: {LOGGING_MAP[f'{CODE:04d}']}")
self.top, CODE = io.load_calibration_mat(eye_to_hand_path)
self.logger.info(f"\ntop_mat: \n{format.np_mat2str(self.top)}")
if CODE != 0:
self.logger.warning(f"top: {self.logging_map[f'{CODE:04d}']}")
self.logger.warning(f"top: {LOGGING_MAP[f'{CODE:04d}']}")

View File

@@ -1,7 +1,8 @@
import logging
from ..map import DETECTOR_MAP
from ..struct import ImageDataContainer, SegmentationData
from ...components import detectors
from ..manager_map import DETECTOR_MAP
from ...data_struct import ImageDataContainer, DetectData
__all__ = ["DetectorManager"]
@@ -9,13 +10,16 @@ __all__ = ["DetectorManager"]
class DetectorManager:
def __init__(self, mode, config, logger=None):
_logger = logger or logging
self.detector = DETECTOR_MAP.get(mode)(config, _logger)
try:
self.detector = DETECTOR_MAP.get(mode)(config, _logger)
except Exception as e:
self.detector = detectors.ObjectDetector(config, _logger)
def get_masks(
self,
position,
classes_name,
image_data: ImageDataContainer
) -> tuple[SegmentationData | None, int]:
) -> tuple[DetectData | None, int]:
return self.detector.get_masks(
position=position, classes_name=classes_name, image_data=image_data)

View File

@@ -1,18 +1,22 @@
from ..map import ESTIMATOR_MAP
from ..struct import ImageDataContainer, SegmentationData
from ...components import estimators
from ..manager_map import ESTIMATOR_MAP
from ...data_struct import ImageDataContainer, DetectData
__all__ = ['EstimatorManager']
class EstimatorManager:
def __init__(self, mode, config):
self.estimator = ESTIMATOR_MAP[mode](config)
try:
self.estimator = ESTIMATOR_MAP[mode](config)
except Exception as e:
self.estimator = estimators.PCAEstimator(config)
def get_poses(
self,
position: str,
image_data:ImageDataContainer,
detect_data: SegmentationData,
detect_data: DetectData,
get_grab_width: bool = True,
):
return self.estimator.get_poses(

View File

@@ -1,14 +1,18 @@
from numpy.typing import NDArray
from ..map import REFINER_MAP
from ..struct import ImageDataContainer, PoseData
from ...components import refiners
from ..manager_map import REFINER_MAP
from ...data_struct import ImageDataContainer, PoseData
__all__ = ["RefinerManager"]
class RefinerManager:
def __init__(self, mode, config):
self.refiner = REFINER_MAP[mode](config)
try:
self.refiner = REFINER_MAP[mode](config)
except Exception as e:
self.refiner = refiners.NoRefiner(config)
def get_refine(
self,

View File

@@ -1,4 +1,5 @@
from .config_manager import ConfigManager
from ...utils import LOGGING_MAP
__all__ = ["ResourceManager"]
@@ -7,6 +8,8 @@ class ResourceManager:
def __init__(self, config_manager: ConfigManager):
self.logging_map = config_manager.logging_map
self.node_name = config_manager.node_name
self.output_boxes = config_manager.output_boxes
self.output_masks = config_manager.output_masks
self.calibration_matrix = {
"left": config_manager.left,
@@ -17,6 +20,3 @@ class ResourceManager:
self.save_image = config_manager.save_image
if self.save_image:
self.image_save_dir = config_manager.image_save_dir
self.output_boxes = config_manager.output_boxes
self.output_masks = config_manager.output_masks

View File

@@ -0,0 +1,63 @@
from typing import Tuple
from rclpy.node import Node
from interfaces.srv import SaveCameraImages
from ... import Status
from ...components import image_providers
from ..manager_map import SOURCE_MAP
from ...data_struct import ImageDataContainer
from ...utils import LOGGING_MAP, io
class SourceManager:
"""
register source
"""
def __init__(self, mode, config, node: Node):
self.node = node
try:
self.source = SOURCE_MAP.get(mode)(config, node)
except Exception as e:
self.source = image_providers.DriverSource(config, node)
self._save_image_service = node.create_service(
SaveCameraImages, "/save_camera_images", self.save_image_service_callback
)
def save_image_service_callback(self, request, response):
image_types = request.image_types
positions = request.camera_positions
save_dir = request.save_dir
save_type : str = request.save_type
if not save_type.startswith("."):
save_type = "." + save_type
if save_type not in [".png", ".jpg", ".jpeg"]:
response.success = False
response.info = "Type error"
return response
image_container = self.get_images(positions)
for position, data in image_container:
if data.status != Status.SUCCESS:
self.node.get_logger().warn(
f"Failed save {position}: {LOGGING_MAP.get(data.status())}")
continue
if "color" in image_types:
io.save_img(
data.color_image, "color_image"+save_type, save_dir, mark_cur_time=True)
if "depth" in image_types:
io.save_img(
data.color_image, "depth_image"+save_type, save_dir, mark_cur_time=True)
response.success = True
return response
def get_images(self, positions) -> Tuple[ImageDataContainer | None, int]:
return self.source.get_images(positions=positions)

View File

@@ -0,0 +1,2 @@
from ..utils import *
from ..logging_map import *

View File

@@ -1,3 +0,0 @@
from . import pointcloud, image, io, format
__all__ = ['pointcloud', 'image', "io", "format"]

View File

@@ -1 +0,0 @@
from .task_executer import *

View File

@@ -0,0 +1 @@
from .logging_map import *

View File

@@ -0,0 +1,45 @@
import os
import json
from typing import Any
from ament_index_python.packages import get_package_share_directory
__all__ = ["LOGGING_MAP"]
def get_logging_map():
json_file_path = os.path.join(
get_package_share_directory('vision_detect'),
'map/logging/report_logging_define.json'
)
with open(json_file_path, "r", encoding="utf-8") as json_file:
logging_map = json.load(json_file)
return logging_map
# LOGGING_MAP = get_logging_map()
class LoggingMap:
_LOGGING_MAP = get_logging_map()
def __getitem__(self, item: str | int) -> str:
if isinstance(item, int):
if f"{item:04d}" in self._LOGGING_MAP:
return self._LOGGING_MAP[f"{item:04d}"]
else:
raise KeyError(f"logging map has this key: {item}")
elif isinstance(item, str):
if len(item) == 4 and item in self._LOGGING_MAP:
return self._LOGGING_MAP[item]
else:
raise KeyError(f"logging map has this key: {item}")
else:
raise TypeError(f"logging map input this wrong type: {type(item)}")
def __setitem__(self, key: str, value: Any) -> None:
raise TypeError("logging map does not support item assignment")
LOGGING_MAP = LoggingMap()

Some files were not shown because too many files have changed in this diff Show More