修改为服务

This commit is contained in:
liangyuxuan
2025-09-19 09:41:17 +08:00
parent f2db4d4570
commit 4ee7f22fd3
4 changed files with 543 additions and 184 deletions

View File

@@ -0,0 +1,392 @@
import os
from collections import defaultdict
import time
import cv2
import open3d as o3d
import numpy as np
import transforms3d as tfs
from cv_bridge import CvBridge
import torch
from detect_part.ultralytics import YOLO
import rclpy
from rclpy.node import Node
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseWithID, PoseWithIDArray
import detect_part
def get_map(K, D, camera_size):
h, w = camera_size[::-1]
K = np.array(K).reshape(3, 3)
D = np.array(D)
new_K, _ = cv2.getOptimalNewCameraMatrix(K, D, (w, h), 1, (w, h))
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, new_K, (w, h), cv2.CV_32FC1)
return map1, map2, new_K.flatten()
def pca(data, sort=True):
center = np.mean(data, axis=0)
centered_points = data - center # 去中心化
try:
cov_matrix = np.cov(centered_points.T) # 转置
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
except np.linalg.LinAlgError:
return None, None
if sort:
sort = eigenvalues.argsort()[::-1] # 降序排列
eigenvalues = eigenvalues[sort] # 索引
eigenvectors = eigenvectors[:, sort]
return eigenvalues, eigenvectors
def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics):
"""计算位态"""
depth_img_mask = np.zeros_like(depth_img)
depth_img_mask[mask > 0] = depth_img[mask > 0]
depth_o3d = o3d.geometry.Image(depth_img_mask.astype(np.uint16))
point_cloud = o3d.geometry.PointCloud.create_from_depth_image(
depth=depth_o3d,
intrinsic=intrinsics,
depth_scale=1000.0,
depth_trunc=8.0,
)
point_cloud = point_cloud.remove_non_finite_points()
down_pcd = point_cloud.voxel_down_sample(voxel_size=0.02)
clean_pcd, _ = down_pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)
if len(clean_pcd.points) == 0:
return 0.0, 0.0, 0.0, None, None, None
center = clean_pcd.get_center()
x, y, z = center
w, v = pca(np.asarray(clean_pcd.points))
if w is not None and v is not None:
vx, vy, vz = v[:,0], v[:,1], v[:,2]
if vx[0] < 0:
vx = -vx
if vy[1] < 0:
vy = -vy
if not np.allclose(np.cross(vx, vy), vz):
vz = -vz
R = np.column_stack((vx, vy, vz))
rmat = tfs.affines.compose(np.squeeze(np.asarray((x, y, z))), R, [1, 1, 1])
roll, pitch, yaw = tfs.euler.mat2euler(rmat)
# point = [
# [x, y, z], [x, y, z] + vx, [x, y, z] + vy, [x, y, z] + vz,
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
# ] # 画点:原点、第一主成分、第二主成分
# lines = [
# [0, 1], [0, 2], [0, 3], [4, 5], [4, 6], [4, 7]
# ] # 画出三点之间两两连线
# colors = [
# [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [0, 1, 0], [0, 0, 1]
# ]
#
# # 构造open3d中的LineSet对象用于主成分显示
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
# line_set.colors = o3d.utility.Vector3dVector(colors)
# o3d.visualization.draw_geometries([point_cloud, line_set])
# # o3d.visualization.draw_geometries([clean_pcd, line_set])
return x, y, z, roll, pitch, yaw
else:
return 0.0, 0.0, 0.0, None, None, None
def draw_box(set_confidence, rgb_img, result):
"""绘制目标检测边界框"""
boxes = result.boxes.xywh.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
for i, box in enumerate(boxes):
if confidences[i] >= set_confidence:
x_center, y_center, width, height = box[:4]
p1 = [int((x_center - width / 2)), int((y_center - height / 2))]
p2 = [int((x_center + width / 2)), int((y_center + height / 2))]
cv2.rectangle(rgb_img, p1, p2, (255, 255, 0), 2)
cv2.putText(rgb_img, f'{labels[class_ids[i]]}: {confidences[i]*100:.2f}', (p1[0], p1[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
def draw_mask(set_confidence, rgb_img, result):
"""绘制实例分割mask"""
masks = result.masks.data.cpu().numpy()
orig_shape = result.masks.orig_shape
confidences = result.boxes.conf.cpu().numpy()
for i, mask in enumerate(masks):
if confidences[i] >= set_confidence:
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
else:
continue
def distortion_correction(color_image, depth_image, map1, map2):
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
undistorted_color = undistorted_color.astype(color_image.dtype)
undistorted_depth = cv2.remap(depth_image, map1, map2, cv2.INTER_NEAREST)
undistorted_depth = undistorted_depth.astype(depth_image.dtype)
return undistorted_color, undistorted_depth
def crop_mask_bbox(depth_img, mask, box):
"""
输入:
depth_img: H x W
mask: H x W (0/1 或 bool)
输出:
depth_crop, mask_crop
"""
width, high = depth_img.shape
x_center, y_center, w, h = box[:4]
x_min, x_max = int(round(x_center - w/2)), int(round(x_center + w/2))
y_min, y_max = int(round(y_center - h/2)), int(round(y_center + h/2))
depth_crop = depth_img[max(0, y_min):min(y_max, high), max(0, x_min):min(x_max, width)]
mask_crop = mask[max(0, y_min):min(y_max, high), max(0, x_min):min(x_max, width)]
return depth_crop, mask_crop, (max(0, x_min), max(0, y_min))
class DetectNode(Node):
def __init__(self, name, mode):
super().__init__(name)
self.checkpoint_path = None
self.checkpoint_name = None
self.function = None
self.output_boxes = None
self.output_masks = None
self.K = self.D = None
self.map1 = self.map2 = None
self.intrinsics = None
self.mode = mode
self.device = None
self.calculate_function = None
self.fx = self.fy = 0.5
self.cv_bridge = CvBridge()
'''init'''
self._init_param()
if mode == 'test':
self.function = self._test_image
else:
self.function = None
self.get_logger().error('Error: Mode Unknown')
if self.device == 'cpu':
self.calculate_function = calculate_pose_cpu
elif self.device == 'gpu':
raise NotImplementedError('Function: calculate_pose_gpu not implemented')
else:
raise ValueError(f"device must be cpu or gpu, now {self.device}")
self._init_publisher()
self._init_subscriber()
def _init_param(self):
"""init parameter"""
pkg_dir = os.path.dirname(detect_part.__file__)
self.declare_parameter('checkpoint_name', 'yolo11s-seg.pt')
self.checkpoint_name = self.get_parameter('checkpoint_name').value
self.checkpoint_path = os.path.join(pkg_dir, 'checkpoint', self.checkpoint_name)
self.declare_parameter('output_boxes', True)
self.output_boxes = self.get_parameter('output_boxes').value
self.declare_parameter('output_masks', False)
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('set_confidence', 0.6)
self.set_confidence = self.get_parameter('set_confidence').value
self.declare_parameter('color_image_topic', '/camera/color/image_raw')
self.color_image_topic = self.get_parameter('color_image_topic').value
self.declare_parameter('depth_image_topic', '/camera/depth/image_raw')
self.depth_image_topic = self.get_parameter('depth_image_topic').value
self.declare_parameter('camera_info_topic', '/camera/color/camera_info')
self.camera_info_topic = self.get_parameter('camera_info_topic').value
self.declare_parameter('device', 'cpu')
self.device = self.get_parameter('device').value
self.declare_parameter('width', 1280)
self.declare_parameter('high', 720)
self.expect_size = [self.get_parameter('width').value, self.get_parameter('high').value]
def _init_publisher(self):
"""init_publisher"""
self.pub_pose_list = self.create_publisher(PoseWithIDArray, '/pose/cv_detect_pose', 10)
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
def _init_subscriber(self):
"""init_subscriber"""
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic,
self._camera_info_callback,
10
)
'''sync get color and depth img'''
self.sub_color_image = Subscriber(self, Image, self.color_image_topic)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_callback)
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""
self.K = msg.k
self.D = msg.d
self.camera_size = [msg.width, msg.height]
if self.K is not None and self.D is not None:
self.map1, self.map2, self.K = get_map(msg.k, msg.d, self.camera_size)
self.scale = [self.expect_size[0] / self.camera_size[0], self.expect_size[1] / self.camera_size[1]]
self.intrinsics = o3d.camera.PinholeCameraIntrinsic(
self.expect_size[0],
self.expect_size[1],
self.K[0] * self.scale[0],
self.K[4] * self.scale[1],
self.K[2] * self.scale[0],
self.K[5] * self.scale[1]
)
if len(self.D) != 0:
self.destroy_subscription(self.sub_camera_info)
else:
self.D = [0, 0, 0, 0, 0, 0, 0, 0]
self.destroy_subscription(self.sub_camera_info)
else:
raise "K and D are not defined"
def _sync_callback(self, color_img_ros, depth_img_ros):
"""同步回调函数"""
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
if self.camera_size != self.expect_size:
color_img_cv = cv2.resize(color_img_cv, self.expect_size)
depth_img_cv = cv2.resize(depth_img_cv, self.expect_size)
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, self.map1, self.map2)
img, pose_dict = self.function(color_img_cv, depth_img_cv)
"""masks为空结束这一帧"""
if img is None:
img = self.cv_bridge.cv2_to_imgmsg(color_img_cv, "bgr8")
if self.output_boxes or self.output_masks:
self.pub_detect_image.publish(img)
if pose_dict is not None:
pose_list_all = PoseWithIDArray()
for (class_id, class_name), pose_list in pose_dict.items():
pose_list_all.objects.append(
PoseWithID(
class_name = class_name,
class_id = class_id,
pose_list = pose_list
)
)
pose_list_all.header.stamp = self.get_clock().now().to_msg()
pose_list_all.header.frame_id = "pose_list"
self.pub_pose_list.publish(pose_list_all)
def _test_image(self, rgb_img, depth_img):
pose_dict = defaultdict(list)
rgb_img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
pattern_size = (11, 8)
# pattern_size = (15, 7)
ret, corners = cv2.findChessboardCorners(rgb_img_gray, pattern_size)
if ret:
# 角点亚像素精确化(提高标定精度)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
corners_subpix = cv2.cornerSubPix(rgb_img_gray, corners, (11, 11), (-1, -1), criteria)
corners_subpix = corners_subpix.reshape(pattern_size[1], pattern_size[0], 2)
mask = np.zeros(rgb_img_gray.shape, dtype=np.uint8)
for i in range(0, pattern_size[1] - 1):
for j in range(0, pattern_size[0] - 1):
pts = np.array([
corners_subpix[i, j],
corners_subpix[i, j + 1],
corners_subpix[i + 1, j + 1],
corners_subpix[i + 1, j]
], dtype=np.int32)
cv2.fillConvexPoly(mask, pts, 1)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
orig_shape = rgb_img_gray.shape
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
x, y, z, roll, pitch, yaw = self.calculate_function(mask, depth_img, self.intrinsics)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(x=roll, y=pitch, z=yaw)
pose_dict[int(99), 'crossboard'].append(pose)
cv2.putText(rgb_img, f'cs: x: {x:.3f}, y: {y:.3f}, z: {z:.3f}', (0, 0 + 30),cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
cv2.putText(rgb_img, f'rpy: r: {roll:.3f}, p: {pitch:.3f}, y: {yaw:.3f}', (0, 0 + 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), None
def main(args=None):
rclpy.init(args=args)
node = DetectNode('detect', 'test')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -1,6 +1,7 @@
import os
# import time
import threading
from collections import defaultdict
import time
import cv2
import open3d as o3d
@@ -13,11 +14,11 @@ from detect_part.ultralytics import YOLO
import rclpy
from rclpy.node import Node
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image, CameraInfo
from sensor_msgs.msg import Image
from geometry_msgs.msg import Pose, Point, Quaternion
from interfaces.msg import PoseWithID, PoseWithIDArray
from interfaces.msg import PoseWithID, CustomImg
from interfaces.srv import PoseArray
import detect_part
@@ -33,12 +34,14 @@ def get_map(K, D, camera_size):
def pca(data, sort=True):
"""主成分分析 """
center = np.mean(data, axis=0)
centered_points = data - center # 去中心化
try:
cov_matrix = np.cov(centered_points.T) # 转置
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
except np.linalg.LinAlgError:
return None, None
@@ -90,23 +93,6 @@ def calculate_pose_cpu(mask, depth_img: np.ndarray, intrinsics):
roll, pitch, yaw = tfs.euler.mat2euler(rmat)
# point = [
# [x, y, z], [x, y, z] + vx, [x, y, z] + vy, [x, y, z] + vz,
# [0, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1]
# ] # 画点:原点、第一主成分、第二主成分
# lines = [
# [0, 1], [0, 2], [0, 3], [4, 5], [4, 6], [4, 7]
# ] # 画出三点之间两两连线
# colors = [
# [1, 0, 0], [0, 1, 0], [0, 0, 1], [1, 0, 0], [0, 1, 0], [0, 0, 1]
# ]
#
# # 构造open3d中的LineSet对象用于主成分显示
# line_set = o3d.geometry.LineSet(points=o3d.utility.Vector3dVector(point), lines=o3d.utility.Vector2iVector(lines))
# line_set.colors = o3d.utility.Vector3dVector(colors)
# o3d.visualization.draw_geometries([point_cloud, line_set])
# # o3d.visualization.draw_geometries([clean_pcd, line_set])
return x, y, z, roll, pitch, yaw
else:
@@ -147,6 +133,7 @@ def draw_mask(set_confidence, rgb_img, result):
def distortion_correction(color_image, depth_image, map1, map2):
"""畸变矫正"""
undistorted_color = cv2.remap(color_image, map1, map2, cv2.INTER_LINEAR)
undistorted_color = undistorted_color.astype(color_image.dtype)
@@ -179,28 +166,28 @@ def crop_mask_bbox(depth_img, mask, box):
class DetectNode(Node):
def __init__(self, name, mode):
super().__init__(name)
self.checkpoint_path = None
self.checkpoint_name = None
self.function = None
self.output_boxes = None
self.output_masks = None
self.K = self.D = None
self.map1 = self.map2 = None
self.intrinsics = None
self.mode = mode
self.device = None
self.checkpoint_path = None
self.checkpoint_name = None
self.output_boxes = None
self.output_masks = None
self.function = None
self.calculate_function = None
self.K = None
self.fx = self.fy = 0.5
self.camera_data = {}
self.cv_bridge = CvBridge()
self.lock = threading.Lock()
'''init'''
self._init_param()
if mode == 'detect':
self._init_model()
elif mode == 'test':
self.function = self._test_image
else:
self.function = None
self.get_logger().error('Error: Mode Unknown')
@@ -214,6 +201,7 @@ class DetectNode(Node):
self._init_publisher()
self._init_subscriber()
self._init_service()
def _init_param(self):
"""init parameter"""
@@ -229,25 +217,12 @@ class DetectNode(Node):
self.declare_parameter('output_masks', False)
self.output_masks = self.get_parameter('output_masks').value
self.declare_parameter('set_confidence', 0.6)
self.declare_parameter('set_confidence', 0.5)
self.set_confidence = self.get_parameter('set_confidence').value
self.declare_parameter('color_image_topic', '/camera/color/image_raw')
self.color_image_topic = self.get_parameter('color_image_topic').value
self.declare_parameter('depth_image_topic', '/camera/depth/image_raw')
self.depth_image_topic = self.get_parameter('depth_image_topic').value
self.declare_parameter('camera_info_topic', '/camera/color/camera_info')
self.camera_info_topic = self.get_parameter('camera_info_topic').value
self.declare_parameter('device', 'cpu')
self.device = self.get_parameter('device').value
self.declare_parameter('width', 1280)
self.declare_parameter('high', 720)
self.expect_size = [self.get_parameter('width').value, self.get_parameter('high').value]
def _init_model(self):
"""init model"""
device_model = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
@@ -265,101 +240,93 @@ class DetectNode(Node):
self.get_logger().error(f'Unknown checkpoint: {self.checkpoint_name}')
def _init_publisher(self):
"""init_publisher"""
self.pub_pose_list = self.create_publisher(PoseWithIDArray, '/pose/cv_detect_pose', 10)
"""init publisher"""
if self.output_boxes or self.output_masks:
self.pub_detect_image = self.create_publisher(Image, '/image/detect_image', 10)
def _init_service(self):
"""init service server"""
self.server = self.create_service(
PoseArray,
"/get_object_pose_list",
self._service_callback
)
def _init_subscriber(self):
"""init_subscriber"""
self.sub_camera_info = self.create_subscription(
CameraInfo,
self.camera_info_topic,
self._camera_info_callback,
"""init subscriber"""
self.sub_img = self.create_subscription(
CustomImg,
"/get_img",
self._sub_callback,
10
)
'''sync get color and depth img'''
self.sub_color_image = Subscriber(self, Image, self.color_image_topic)
self.sub_depth_image = Subscriber(self, Image, self.depth_image_topic)
self.sync_subscriber = ApproximateTimeSynchronizer(
[self.sub_color_image, self.sub_depth_image],
queue_size=10,
slop=0.1
)
self.sync_subscriber.registerCallback(self._sync_callback)
def _camera_info_callback(self, msg: CameraInfo):
"""Get camera info"""
self.K = msg.k
self.D = msg.d
self.camera_size = [msg.width, msg.height]
if self.K is not None and self.D is not None:
self.map1, self.map2, self.K = get_map(msg.k, msg.d, self.camera_size)
self.scale = [self.expect_size[0] / self.camera_size[0], self.expect_size[1] / self.camera_size[1]]
self.intrinsics = o3d.camera.PinholeCameraIntrinsic(
self.expect_size[0],
self.expect_size[1],
self.K[0] * self.scale[0],
self.K[4] * self.scale[1],
self.K[2] * self.scale[0],
self.K[5] * self.scale[1]
)
if len(self.D) != 0:
self.destroy_subscription(self.sub_camera_info)
else:
self.D = [0, 0, 0, 0, 0, 0, 0, 0]
self.destroy_subscription(self.sub_camera_info)
else:
raise "K and D are not defined"
def _sync_callback(self, color_img_ros, depth_img_ros):
def _sub_callback(self, msg):
"""同步回调函数"""
with self.lock:
self.camera_data[msg.position] = [
msg.image_color,
msg.image_depth,
msg.karr,
msg.darr
]
def _service_callback(self, request, response):
with self.lock:
if request.camera_position in self.camera_data:
color_img_ros, depth_img_ros, self.K, D = self.camera_data[request.camera_position]
else:
response.success = False
response.info = f"{request.camera_position} Camera data is empty or name is wrong"
response.objects = []
return response
self.camera_size = [color_img_ros.width, color_img_ros.height]
color_img_cv = self.cv_bridge.imgmsg_to_cv2(color_img_ros, "bgr8")
depth_img_cv = self.cv_bridge.imgmsg_to_cv2(depth_img_ros, '16UC1')
if self.camera_size != self.expect_size:
color_img_cv = cv2.resize(color_img_cv, self.expect_size)
depth_img_cv = cv2.resize(depth_img_cv, self.expect_size)
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, self.map1, self.map2)
map1, map2, self.K = get_map(self.K, D, self.camera_size)
color_img_cv, depth_img_cv = distortion_correction(color_img_cv, depth_img_cv, map1, map2)
img, pose_dict = self.function(color_img_cv, depth_img_cv)
"""masks为空结束这一帧"""
if img is None:
img = self.cv_bridge.cv2_to_imgmsg(color_img_cv, "bgr8")
if self.output_boxes or self.output_masks:
if img is None:
img = color_img_ros
self.pub_detect_image.publish(img)
if pose_dict is not None:
pose_list_all = PoseWithIDArray()
response.info = "Success get pose"
response.success = True
for (class_id, class_name), pose_list in pose_dict.items():
pose_list_all.objects.append(
response.objects.append(
PoseWithID(
class_name = class_name,
class_id = class_id,
pose_list = pose_list
)
)
pose_list_all.header.stamp = self.get_clock().now().to_msg()
pose_list_all.header.frame_id = "pose_list"
self.pub_pose_list.publish(pose_list_all)
return response
else:
response.info = "pose dict is empty"
response.success = False
response.objects = []
return response
def _seg_image(self, rgb_img: np.ndarray, depth_img: np.ndarray):
"""Use segmentation model"""
pose_dict = defaultdict(list)
'''Get Predict Results'''
time1 = time.time()
# time1 = time.time()
results = self.model(rgb_img)
time2 = time.time()
# time2 = time.time()
result = results[0]
'''Get masks'''
@@ -374,7 +341,7 @@ class DetectNode(Node):
class_ids = result.boxes.cls.cpu().numpy()
labels = result.names
time3 = time.time()
# time3 = time.time()
for i, (mask, box) in enumerate(zip(masks, boxes)):
'''Confidence Filter'''
@@ -389,21 +356,21 @@ class DetectNode(Node):
mask_crop = cv2.resize(mask_crop, None, fx=self.fx, fy=self.fy, interpolation=cv2.INTER_NEAREST)
depth_crop = cv2.resize(depth_crop, None, fx=self.fx, fy=self.fy, interpolation=cv2.INTER_NEAREST)
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.expect_size[0] * self.fx),
int(self.expect_size[1] * self.fy),
self.K[0] * self.scale[0] * self.fx,
self.K[4] * self.scale[1] * self.fy,
(self.K[2] - x_min) * self.scale[0] * self.fx,
(self.K[5] - y_min) * self.scale[1] * self.fy
int(self.camera_size[0] * self.fx),
int(self.camera_size[1] * self.fy),
self.K[0] * self.fx,
self.K[4] * self.fy,
(self.K[2] - x_min) * self.fx,
(self.K[5] - y_min) * self.fy
)
else:
intrinsics = o3d.camera.PinholeCameraIntrinsic(
int(self.expect_size[0]),
int(self.expect_size[1]),
self.K[0] * self.scale[0],
self.K[4] * self.scale[1],
(self.K[2] - x_min) * self.scale[0],
(self.K[5] - y_min) * self.scale[1]
int(self.camera_size[0]),
int(self.camera_size[1]),
self.K[0],
self.K[4],
self.K[2] - x_min,
self.K[5] - y_min
)
x, y, z, roll, pitch, yaw = self.calculate_function(mask_crop, depth_crop, intrinsics)
@@ -414,19 +381,19 @@ class DetectNode(Node):
pose.orientation = Quaternion(x=roll, y=pitch, z=yaw)
pose_dict[int(class_ids[i]), labels[class_ids[i]]].append(pose)
time4 = time.time()
# time4 = time.time()
'''mask_img and box_img is or not output'''
if self.output_boxes and not self.output_masks:
draw_box(self.set_confidence, rgb_img, result)
time5 = time.time()
self.get_logger().info(f'start')
self.get_logger().info(f'{(time2 - time1)*1000} ms, model predict')
self.get_logger().info(f'{(time3 - time2)*1000} ms, get mask and some param')
self.get_logger().info(f'{(time4 - time3)*1000} ms, calculate all mask PCA')
self.get_logger().info(f'{(time5 - time4)*1000} ms, draw box')
self.get_logger().info(f'{(time5 - time1)*1000} ms, completing a picture entire process')
self.get_logger().info(f'end')
# time5 = time.time()
# self.get_logger().info(f'start')
# self.get_logger().info(f'{(time2 - time1)*1000} ms, model predict')
# self.get_logger().info(f'{(time3 - time2)*1000} ms, get mask and some param')
# self.get_logger().info(f'{(time4 - time3)*1000} ms, calculate all mask PCA')
# self.get_logger().info(f'{(time5 - time4)*1000} ms, draw box')
# self.get_logger().info(f'{(time5 - time1)*1000} ms, completing a picture entire process')
# self.get_logger().info(f'end')
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
elif self.output_boxes and self.output_masks:
draw_box(self.set_confidence, rgb_img, result)
@@ -438,47 +405,6 @@ class DetectNode(Node):
else:
return None, pose_dict
def _test_image(self, rgb_img, depth_img):
pose_dict = defaultdict(list)
rgb_img_gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
pattern_size = (11, 8)
# pattern_size = (15, 7)
ret, corners = cv2.findChessboardCorners(rgb_img_gray, pattern_size)
if ret:
# 角点亚像素精确化(提高标定精度)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
corners_subpix = cv2.cornerSubPix(rgb_img_gray, corners, (11, 11), (-1, -1), criteria)
corners_subpix = corners_subpix.reshape(pattern_size[1], pattern_size[0], 2)
mask = np.zeros(rgb_img_gray.shape, dtype=np.uint8)
for i in range(0, pattern_size[1] - 1):
for j in range(0, pattern_size[0] - 1):
pts = np.array([
corners_subpix[i, j],
corners_subpix[i, j + 1],
corners_subpix[i + 1, j + 1],
corners_subpix[i + 1, j]
], dtype=np.int32)
cv2.fillConvexPoly(mask, pts, 1)
rgb_img[mask > 0] = rgb_img[mask > 0] * 0.5 + np.array([0, 0, 255]) * 0.5
orig_shape = rgb_img_gray.shape
mask = cv2.resize(mask.astype(np.uint8), orig_shape[::-1], interpolation=cv2.INTER_NEAREST)
x, y, z, roll, pitch, yaw = self.calculate_function(mask, depth_img, self.intrinsics)
if (x, y, z) != (0.0, 0.0, 0.0):
pose = Pose()
pose.position = Point(x=x, y=y, z=z)
pose.orientation = Quaternion(x=roll, y=pitch, z=yaw)
pose_dict[int(99), 'crossboard'].append(pose)
cv2.putText(rgb_img, f'cs: x: {x:.3f}, y: {y:.3f}, z: {z:.3f}', (0, 0 + 30),cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
cv2.putText(rgb_img, f'rpy: r: {roll:.3f}, p: {pitch:.3f}, y: {yaw:.3f}', (0, 0 + 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), pose_dict
else:
return self.cv_bridge.cv2_to_imgmsg(rgb_img, "bgr8"), None
def main(args=None):
rclpy.init(args=args)
@@ -492,17 +418,5 @@ def main(args=None):
rclpy.shutdown()
def crossboard_detect(args=None):
rclpy.init(args=args)
node = DetectNode('detect', 'test')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,52 @@
import rclpy
from rclpy.node import Node
from interfaces.srv import PoseArray
class Client(Node):
def __init__(self, name):
super().__init__(name)
self.client_ = self.create_client(PoseArray, "/get_object_pose_list",)
self.timer_ = self.create_timer(2.0, self._timer_callback)
def _timer_callback(self):
self.send_request("head")
def send_request(self, camera_position):
while not self.client_.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
request = PoseArray.Request()
request.camera_position = camera_position
self.client_.call_async(request).add_done_callback(self._result_callback)
def _result_callback(self, result):
response = result.result()
if response.success:
pose_dict = {}
for obj in response.objects:
pose_dict[obj.class_name] = {
"class_id": obj.class_id,
"pose_list": obj.pose_list,
}
self.get_logger().info(f"{pose_dict}")
else:
self.get_logger().error(f"{response.info}")
def main(args=None):
rclpy.init(args=args)
node = Client('get_camera_service_client')
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@@ -30,7 +30,8 @@ setup(
'detect_node = detect_part.detect:main',
'sub_pose_node = detect_part.sub_pose:main',
'calibration_node = detect_part.hand_eye_calibration:main',
'crossboard_detect_node = detect_part.detect:crossboard_detect',
'crossboard_detect_node = detect_part.crossboard_detect:main',
'service_client_node = detect_part.service_client_node:main',
],
},
)