add test scripts
This commit is contained in:
@@ -122,4 +122,12 @@ install(DIRECTORY launch
|
||||
DESTINATION share/${PROJECT_NAME}/
|
||||
)
|
||||
|
||||
install(PROGRAMS
|
||||
scripts/dual_arm_action_client.py
|
||||
scripts/mock_dual_arm_hardware.py
|
||||
scripts/run_dual_arm_mock_tests.py
|
||||
scripts/analyze_trajectory_metrics.py
|
||||
DESTINATION lib/${PROJECT_NAME}
|
||||
)
|
||||
|
||||
ament_package()
|
||||
|
||||
Binary file not shown.
36
src/robot_control/launch/dual_arm_mock_test.launch.py
Normal file
36
src/robot_control/launch/dual_arm_mock_test.launch.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Launch robot_control + mock dual-arm hardware for no-hardware tests."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import ExecuteProcess
|
||||
from launch.actions import IncludeLaunchDescription
|
||||
from launch.launch_description_sources import PythonLaunchDescriptionSource
|
||||
from ament_index_python.packages import get_package_prefix, get_package_share_directory
|
||||
import os
|
||||
|
||||
|
||||
def generate_launch_description() -> LaunchDescription:
|
||||
"""Generate launch description for DualArm no-hardware tests."""
|
||||
robot_control_share = get_package_share_directory("robot_control")
|
||||
robot_control_launch = IncludeLaunchDescription(
|
||||
PythonLaunchDescriptionSource(
|
||||
os.path.join(robot_control_share, "launch", "robot_control.launch.py")
|
||||
)
|
||||
)
|
||||
|
||||
mock_script = os.path.join(
|
||||
get_package_prefix("robot_control"),
|
||||
"lib",
|
||||
"robot_control",
|
||||
"mock_dual_arm_hardware.py",
|
||||
)
|
||||
mock_hw = ExecuteProcess(
|
||||
cmd=["python3", mock_script],
|
||||
output="screen",
|
||||
)
|
||||
|
||||
return LaunchDescription(
|
||||
[
|
||||
robot_control_launch,
|
||||
mock_hw,
|
||||
]
|
||||
)
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
151
src/robot_control/scripts/analyze_trajectory_metrics.py
Normal file
151
src/robot_control/scripts/analyze_trajectory_metrics.py
Normal file
@@ -0,0 +1,151 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2026 HiveCore. All rights reserved.
|
||||
#
|
||||
# @file analyze_trajectory_metrics.py
|
||||
# @brief 轨迹平稳性/平滑性分析工具(基于 DualArm 导出的 CSV)
|
||||
|
||||
"""Analyze trajectory smoothness metrics from robot_control CSV outputs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import glob
|
||||
import math
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Sequence, Tuple
|
||||
|
||||
|
||||
@dataclass
|
||||
class JointMetrics:
|
||||
"""Per-joint smoothness metrics."""
|
||||
|
||||
max_abs_vel: float
|
||||
max_abs_acc: float
|
||||
max_abs_jerk: float
|
||||
rms_jerk: float
|
||||
|
||||
|
||||
def finite_diff(values: Sequence[float], times: Sequence[float]) -> List[float]:
|
||||
"""Compute first-order finite difference with non-uniform dt."""
|
||||
if len(values) < 2:
|
||||
return []
|
||||
out: List[float] = []
|
||||
for i in range(len(values) - 1):
|
||||
dt = max(times[i + 1] - times[i], 1e-9)
|
||||
out.append((values[i + 1] - values[i]) / dt)
|
||||
return out
|
||||
|
||||
|
||||
def rms(values: Sequence[float]) -> float:
|
||||
"""Root-mean-square value."""
|
||||
if not values:
|
||||
return 0.0
|
||||
return math.sqrt(sum(v * v for v in values) / float(len(values)))
|
||||
|
||||
|
||||
def load_csv(path: str) -> Tuple[List[float], Dict[str, List[float]]]:
|
||||
"""Load trajectory csv: time + joint_position columns."""
|
||||
times: List[float] = []
|
||||
columns: Dict[str, List[float]] = {}
|
||||
|
||||
with open(path, "r", encoding="utf-8") as file:
|
||||
reader = csv.DictReader(file)
|
||||
if reader.fieldnames is None:
|
||||
raise ValueError(f"Invalid CSV header: {path}")
|
||||
if "time" not in reader.fieldnames:
|
||||
raise ValueError(f"CSV missing 'time' column: {path}")
|
||||
|
||||
joint_cols = [name for name in reader.fieldnames if name != "time"]
|
||||
for joint in joint_cols:
|
||||
columns[joint] = []
|
||||
|
||||
for row in reader:
|
||||
times.append(float(row["time"]))
|
||||
for joint in joint_cols:
|
||||
columns[joint].append(float(row[joint]))
|
||||
|
||||
return times, columns
|
||||
|
||||
|
||||
def analyze_joint(times: Sequence[float], positions: Sequence[float]) -> JointMetrics:
|
||||
"""Analyze one joint trajectory."""
|
||||
vel = finite_diff(positions, times)
|
||||
if len(vel) < 2:
|
||||
return JointMetrics(0.0, 0.0, 0.0, 0.0)
|
||||
t_vel = list(times[1:])
|
||||
|
||||
acc = finite_diff(vel, t_vel)
|
||||
if len(acc) < 2:
|
||||
return JointMetrics(max(abs(v) for v in vel), 0.0, 0.0, 0.0)
|
||||
t_acc = t_vel[1:]
|
||||
|
||||
jerk = finite_diff(acc, t_acc)
|
||||
max_vel = max(abs(v) for v in vel) if vel else 0.0
|
||||
max_acc = max(abs(v) for v in acc) if acc else 0.0
|
||||
max_jerk = max(abs(v) for v in jerk) if jerk else 0.0
|
||||
return JointMetrics(max_vel, max_acc, max_jerk, rms(jerk))
|
||||
|
||||
|
||||
def analyze_file(path: str) -> Dict[str, JointMetrics]:
|
||||
"""Analyze all joints from one file."""
|
||||
times, columns = load_csv(path)
|
||||
if len(times) < 4:
|
||||
raise ValueError(f"Not enough points in {path}")
|
||||
return {joint: analyze_joint(times, values) for joint, values in columns.items()}
|
||||
|
||||
|
||||
def pick_latest_csvs(directory: str, limit: int) -> List[str]:
|
||||
"""Pick latest trajectory_*.csv files by modified time."""
|
||||
pattern = os.path.join(directory, "trajectory_*.csv")
|
||||
candidates = glob.glob(pattern)
|
||||
candidates.sort(key=os.path.getmtime, reverse=True)
|
||||
return candidates[:limit]
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""CLI entry point."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Analyze smoothness metrics from trajectory csv files."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--csv",
|
||||
nargs="*",
|
||||
default=[],
|
||||
help="CSV files to analyze. If empty, use latest files in --dir.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dir",
|
||||
default=".",
|
||||
help="Directory for auto-discovery (default: current directory).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--latest",
|
||||
type=int,
|
||||
default=4,
|
||||
help="Number of latest csv files when --csv not provided.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
files = list(args.csv)
|
||||
if not files:
|
||||
files = pick_latest_csvs(args.dir, args.latest)
|
||||
if not files:
|
||||
raise SystemExit("No trajectory csv files found.")
|
||||
|
||||
for path in files:
|
||||
metrics = analyze_file(path)
|
||||
print(f"\n=== {path} ===")
|
||||
for joint_name, item in metrics.items():
|
||||
print(
|
||||
f"{joint_name:32s} "
|
||||
f"max|v|={item.max_abs_vel:8.4f} "
|
||||
f"max|a|={item.max_abs_acc:8.4f} "
|
||||
f"max|j|={item.max_abs_jerk:9.4f} "
|
||||
f"rms(j)={item.rms_jerk:9.4f}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
334
src/robot_control/scripts/mock_dual_arm_hardware.py
Normal file
334
src/robot_control/scripts/mock_dual_arm_hardware.py
Normal file
@@ -0,0 +1,334 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2026 HiveCore. All rights reserved.
|
||||
#
|
||||
# @file mock_dual_arm_hardware.py
|
||||
# @brief Mock 双臂硬件节点:提供左右 FollowJointTrajectory 并发布 /joint_states
|
||||
|
||||
"""Mock dual-arm hardware node for no-hardware integration tests.
|
||||
|
||||
This node provides:
|
||||
1) Two FollowJointTrajectory action servers:
|
||||
- /left_arm_controller/follow_joint_trajectory
|
||||
- /right_arm_controller/follow_joint_trajectory
|
||||
2) A continuous /joint_states publisher to satisfy robot_control init logic.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import rclpy
|
||||
from builtin_interfaces.msg import Duration
|
||||
from control_msgs.action import FollowJointTrajectory
|
||||
from rclpy.action import ActionServer, CancelResponse, GoalResponse
|
||||
from rclpy.action.server import ServerGoalHandle
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import JointState
|
||||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||||
|
||||
|
||||
def duration_to_sec(duration: Duration) -> float:
|
||||
"""Convert ROS Duration to seconds."""
|
||||
return float(duration.sec) + float(duration.nanosec) * 1e-9
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArmExecutionState:
|
||||
"""Runtime state for one arm trajectory playback."""
|
||||
|
||||
active: bool
|
||||
start_time_sec: float
|
||||
joint_names: List[str]
|
||||
points: List[JointTrajectoryPoint]
|
||||
current_positions: List[float]
|
||||
current_velocities: List[float]
|
||||
finished: bool
|
||||
|
||||
|
||||
class MockDualArmHardware(Node):
|
||||
"""Mock dual-arm hardware for robot_control integration tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("mock_dual_arm_hardware")
|
||||
|
||||
default_joint_names = [
|
||||
"left_arm_joint_1",
|
||||
"left_arm_joint_2",
|
||||
"left_arm_joint_3",
|
||||
"left_arm_joint_4",
|
||||
"left_arm_joint_5",
|
||||
"left_arm_joint_6",
|
||||
"right_arm_joint_1",
|
||||
"right_arm_joint_2",
|
||||
"right_arm_joint_3",
|
||||
"right_arm_joint_4",
|
||||
"right_arm_joint_5",
|
||||
"right_arm_joint_6",
|
||||
]
|
||||
|
||||
self.declare_parameter("joint_names", default_joint_names)
|
||||
self.declare_parameter("publish_rate_hz", 100.0)
|
||||
self.declare_parameter("goal_start_delay_sec", 0.02)
|
||||
|
||||
self._joint_names: List[str] = (
|
||||
self.get_parameter("joint_names").get_parameter_value().string_array_value
|
||||
)
|
||||
self._publish_rate_hz: float = (
|
||||
self.get_parameter("publish_rate_hz").get_parameter_value().double_value
|
||||
)
|
||||
self._goal_start_delay_sec: float = (
|
||||
self.get_parameter("goal_start_delay_sec").get_parameter_value().double_value
|
||||
)
|
||||
self._publish_rate_hz = max(self._publish_rate_hz, 20.0)
|
||||
self._goal_start_delay_sec = max(self._goal_start_delay_sec, 0.0)
|
||||
|
||||
self._left_joint_names = self._joint_names[:6]
|
||||
self._right_joint_names = self._joint_names[6:12]
|
||||
|
||||
self._lock = threading.Lock()
|
||||
self._arm_states: Dict[str, ArmExecutionState] = {
|
||||
"left": ArmExecutionState(
|
||||
active=False,
|
||||
start_time_sec=0.0,
|
||||
joint_names=self._left_joint_names.copy(),
|
||||
points=[],
|
||||
current_positions=[0.0] * len(self._left_joint_names),
|
||||
current_velocities=[0.0] * len(self._left_joint_names),
|
||||
finished=False,
|
||||
),
|
||||
"right": ArmExecutionState(
|
||||
active=False,
|
||||
start_time_sec=0.0,
|
||||
joint_names=self._right_joint_names.copy(),
|
||||
points=[],
|
||||
current_positions=[0.0] * len(self._right_joint_names),
|
||||
current_velocities=[0.0] * len(self._right_joint_names),
|
||||
finished=False,
|
||||
),
|
||||
}
|
||||
|
||||
self._joint_pub = self.create_publisher(JointState, "/joint_states", 10)
|
||||
self._pub_timer = self.create_timer(
|
||||
1.0 / self._publish_rate_hz, self._on_publish_joint_states
|
||||
)
|
||||
|
||||
self._left_server = ActionServer(
|
||||
self,
|
||||
FollowJointTrajectory,
|
||||
"/left_arm_controller/follow_joint_trajectory",
|
||||
goal_callback=self._make_goal_callback("left"),
|
||||
cancel_callback=self._make_cancel_callback("left"),
|
||||
execute_callback=self._make_execute_callback("left"),
|
||||
)
|
||||
self._right_server = ActionServer(
|
||||
self,
|
||||
FollowJointTrajectory,
|
||||
"/right_arm_controller/follow_joint_trajectory",
|
||||
goal_callback=self._make_goal_callback("right"),
|
||||
cancel_callback=self._make_cancel_callback("right"),
|
||||
execute_callback=self._make_execute_callback("right"),
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Mock dual-arm hardware started. publish_rate={self._publish_rate_hz:.1f} Hz"
|
||||
)
|
||||
|
||||
def _make_goal_callback(self, arm: str):
|
||||
"""Build per-arm goal callback."""
|
||||
|
||||
def _goal_callback(goal_request: FollowJointTrajectory.Goal) -> GoalResponse:
|
||||
if not goal_request.trajectory.points:
|
||||
self.get_logger().error(f"[{arm}] Reject goal: empty trajectory")
|
||||
return GoalResponse.REJECT
|
||||
if not self._validate_joint_names(arm, goal_request.trajectory.joint_names):
|
||||
self.get_logger().error(
|
||||
f"[{arm}] Reject goal: joint names mismatch {goal_request.trajectory.joint_names}"
|
||||
)
|
||||
return GoalResponse.REJECT
|
||||
return GoalResponse.ACCEPT
|
||||
|
||||
return _goal_callback
|
||||
|
||||
def _make_cancel_callback(self, arm: str):
|
||||
"""Build per-arm cancel callback."""
|
||||
|
||||
def _cancel_callback(_: ServerGoalHandle) -> CancelResponse:
|
||||
self.get_logger().info(f"[{arm}] Cancel request accepted")
|
||||
with self._lock:
|
||||
state = self._arm_states[arm]
|
||||
state.active = False
|
||||
state.finished = True
|
||||
state.current_velocities = [0.0] * len(state.current_velocities)
|
||||
return CancelResponse.ACCEPT
|
||||
|
||||
return _cancel_callback
|
||||
|
||||
def _make_execute_callback(self, arm: str):
|
||||
"""Build per-arm execute callback."""
|
||||
|
||||
def _execute_callback(
|
||||
goal_handle: ServerGoalHandle,
|
||||
) -> FollowJointTrajectory.Result:
|
||||
goal = goal_handle.request
|
||||
trajectory = goal.trajectory
|
||||
with self._lock:
|
||||
state = self._arm_states[arm]
|
||||
state.active = True
|
||||
state.finished = False
|
||||
state.joint_names = list(trajectory.joint_names)
|
||||
state.points = list(trajectory.points)
|
||||
state.start_time_sec = time.time() + self._goal_start_delay_sec
|
||||
|
||||
first_point = trajectory.points[0]
|
||||
if first_point.positions:
|
||||
state.current_positions = list(first_point.positions)
|
||||
if first_point.velocities:
|
||||
state.current_velocities = list(first_point.velocities)
|
||||
else:
|
||||
state.current_velocities = [0.0] * len(state.current_positions)
|
||||
|
||||
while rclpy.ok():
|
||||
if goal_handle.is_cancel_requested:
|
||||
with self._lock:
|
||||
state = self._arm_states[arm]
|
||||
state.active = False
|
||||
state.finished = True
|
||||
state.current_velocities = [0.0] * len(state.current_velocities)
|
||||
goal_handle.canceled()
|
||||
result = FollowJointTrajectory.Result()
|
||||
result.error_code = FollowJointTrajectory.Result.SUCCESSFUL
|
||||
result.error_string = "Goal canceled"
|
||||
return result
|
||||
|
||||
with self._lock:
|
||||
finished = self._arm_states[arm].finished
|
||||
if finished:
|
||||
break
|
||||
time.sleep(0.005)
|
||||
|
||||
goal_handle.succeed()
|
||||
result = FollowJointTrajectory.Result()
|
||||
result.error_code = FollowJointTrajectory.Result.SUCCESSFUL
|
||||
result.error_string = "Mock trajectory completed"
|
||||
return result
|
||||
|
||||
return _execute_callback
|
||||
|
||||
def _validate_joint_names(self, arm: str, names: List[str]) -> bool:
|
||||
"""Validate incoming trajectory joint names with expected arm names."""
|
||||
expected = self._left_joint_names if arm == "left" else self._right_joint_names
|
||||
if len(names) != len(expected):
|
||||
return False
|
||||
return set(names) == set(expected)
|
||||
|
||||
def _sample_trajectory(
|
||||
self, points: List[JointTrajectoryPoint], elapsed_sec: float
|
||||
) -> Tuple[List[float], List[float], bool]:
|
||||
"""Sample trajectory by linear interpolation at elapsed time."""
|
||||
if not points:
|
||||
return [], [], True
|
||||
|
||||
if len(points) == 1:
|
||||
pos = list(points[0].positions)
|
||||
vel = (
|
||||
list(points[0].velocities)
|
||||
if points[0].velocities
|
||||
else [0.0] * len(points[0].positions)
|
||||
)
|
||||
return pos, vel, True
|
||||
|
||||
times = [duration_to_sec(point.time_from_start) for point in points]
|
||||
total = times[-1]
|
||||
if elapsed_sec <= times[0]:
|
||||
p = points[0]
|
||||
pos = list(p.positions)
|
||||
vel = list(p.velocities) if p.velocities else [0.0] * len(pos)
|
||||
return pos, vel, False
|
||||
if elapsed_sec >= total:
|
||||
p = points[-1]
|
||||
pos = list(p.positions)
|
||||
vel = [0.0] * len(pos)
|
||||
return pos, vel, True
|
||||
|
||||
index = 0
|
||||
for i in range(len(times) - 1):
|
||||
if times[i] <= elapsed_sec <= times[i + 1]:
|
||||
index = i
|
||||
break
|
||||
|
||||
p0 = points[index]
|
||||
p1 = points[index + 1]
|
||||
t0 = times[index]
|
||||
t1 = times[index + 1]
|
||||
dt = max(t1 - t0, 1e-6)
|
||||
alpha = (elapsed_sec - t0) / dt
|
||||
|
||||
pos: List[float] = []
|
||||
vel: List[float] = []
|
||||
for j in range(len(p0.positions)):
|
||||
q0 = p0.positions[j]
|
||||
q1 = p1.positions[j]
|
||||
pos.append(q0 + alpha * (q1 - q0))
|
||||
|
||||
if p0.velocities and p1.velocities:
|
||||
v0 = p0.velocities[j]
|
||||
v1 = p1.velocities[j]
|
||||
vel.append(v0 + alpha * (v1 - v0))
|
||||
else:
|
||||
vel.append((q1 - q0) / dt)
|
||||
|
||||
return pos, vel, False
|
||||
|
||||
def _on_publish_joint_states(self) -> None:
|
||||
"""Publish /joint_states and progress active trajectories."""
|
||||
now_sec = time.time()
|
||||
with self._lock:
|
||||
for arm in ("left", "right"):
|
||||
state = self._arm_states[arm]
|
||||
if not state.active:
|
||||
continue
|
||||
elapsed = max(0.0, now_sec - state.start_time_sec)
|
||||
pos, vel, done = self._sample_trajectory(state.points, elapsed)
|
||||
if pos:
|
||||
state.current_positions = pos
|
||||
if vel:
|
||||
state.current_velocities = vel
|
||||
if done:
|
||||
state.active = False
|
||||
state.finished = True
|
||||
state.current_velocities = [0.0] * len(state.current_velocities)
|
||||
|
||||
msg = JointState()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.name = self._joint_names.copy()
|
||||
msg.position = (
|
||||
self._arm_states["left"].current_positions
|
||||
+ self._arm_states["right"].current_positions
|
||||
)
|
||||
msg.velocity = (
|
||||
self._arm_states["left"].current_velocities
|
||||
+ self._arm_states["right"].current_velocities
|
||||
)
|
||||
msg.effort = [0.0] * len(msg.position)
|
||||
|
||||
self._joint_pub.publish(msg)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Entry point."""
|
||||
rclpy.init()
|
||||
node = MockDualArmHardware()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
232
src/robot_control/scripts/run_dual_arm_mock_tests.py
Normal file
232
src/robot_control/scripts/run_dual_arm_mock_tests.py
Normal file
@@ -0,0 +1,232 @@
|
||||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2026 HiveCore. All rights reserved.
|
||||
#
|
||||
# @file run_dual_arm_mock_tests.py
|
||||
# @brief 无硬件场景下的 DualArm MOVEJ/MOVEL 回归测试脚本
|
||||
|
||||
"""Run dual-arm action tests against mock hardware.
|
||||
|
||||
Cases:
|
||||
1) Left arm MOVEJ
|
||||
2) Right arm MOVEJ
|
||||
3) Dual-arm MOVEJ
|
||||
4) Dual-arm MOVEL (pose offset from current status pose)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import rclpy
|
||||
from geometry_msgs.msg import Pose
|
||||
from interfaces.action import DualArm
|
||||
from interfaces.msg import ArmMotionParams, RobotArmStatus
|
||||
from interfaces.srv import InverseKinematics
|
||||
from rclpy.action import ActionClient
|
||||
from rclpy.node import Node
|
||||
from rclpy.task import Future
|
||||
|
||||
|
||||
class DualArmMockTestRunner(Node):
|
||||
"""Integration test runner for DualArm action."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("dual_arm_mock_test_runner")
|
||||
self._action_client = ActionClient(self, DualArm, "DualArm")
|
||||
self._ik_client = self.create_client(
|
||||
InverseKinematics, "/robot_control/inverse_kinematics"
|
||||
)
|
||||
self._last_status_msg: Optional[RobotArmStatus] = None
|
||||
self._status_sub = self.create_subscription(
|
||||
RobotArmStatus,
|
||||
"/robot_control/arm_status",
|
||||
self._on_status,
|
||||
10,
|
||||
)
|
||||
|
||||
def _on_status(self, msg: RobotArmStatus) -> None:
|
||||
self._last_status_msg = msg
|
||||
|
||||
def wait_ready(self, timeout_sec: float = 20.0) -> bool:
|
||||
"""Wait for action server and arm status topic."""
|
||||
self.get_logger().info("Waiting for DualArm action server...")
|
||||
if not self._action_client.wait_for_server(timeout_sec=timeout_sec):
|
||||
self.get_logger().error("DualArm action server timeout")
|
||||
return False
|
||||
self.get_logger().info("DualArm action server is ready")
|
||||
|
||||
self.get_logger().info("Waiting for /robot_control/inverse_kinematics service...")
|
||||
if not self._ik_client.wait_for_service(timeout_sec=timeout_sec):
|
||||
self.get_logger().error("InverseKinematics service timeout")
|
||||
return False
|
||||
self.get_logger().info("InverseKinematics service is ready")
|
||||
return True
|
||||
|
||||
def send_goal_and_wait(
|
||||
self,
|
||||
arm_motion_params: List[ArmMotionParams],
|
||||
velocity_scaling: int = 40,
|
||||
timeout_sec: float = 40.0,
|
||||
) -> Tuple[bool, str]:
|
||||
"""Send DualArm goal and wait for result."""
|
||||
goal = DualArm.Goal()
|
||||
goal.arm_motion_params = arm_motion_params
|
||||
goal.velocity_scaling = velocity_scaling
|
||||
|
||||
future_send = self._action_client.send_goal_async(
|
||||
goal, feedback_callback=self._on_feedback
|
||||
)
|
||||
rclpy.spin_until_future_complete(self, future_send, timeout_sec=5.0)
|
||||
if not future_send.done():
|
||||
return False, "send goal timeout"
|
||||
|
||||
goal_handle = future_send.result()
|
||||
if goal_handle is None or not goal_handle.accepted:
|
||||
return False, "goal rejected"
|
||||
|
||||
future_res: Future = goal_handle.get_result_async()
|
||||
rclpy.spin_until_future_complete(self, future_res, timeout_sec=timeout_sec)
|
||||
if not future_res.done():
|
||||
return False, "result timeout"
|
||||
|
||||
wrapped = future_res.result()
|
||||
result = wrapped.result
|
||||
return bool(result.success), str(result.message)
|
||||
|
||||
def _on_feedback(self, feedback_msg: DualArm.Impl.FeedbackMessage) -> None:
|
||||
feedback = feedback_msg.feedback
|
||||
self.get_logger().info(
|
||||
f"feedback status={int(feedback.status)} progress={float(feedback.progress):.3f}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _make_movej(arm_id: int, joints: List[float]) -> ArmMotionParams:
|
||||
param = ArmMotionParams()
|
||||
param.arm_id = arm_id
|
||||
param.motion_type = ArmMotionParams.MOVEJ
|
||||
param.target_joints = joints
|
||||
param.blend_radius = 0
|
||||
return param
|
||||
|
||||
@staticmethod
|
||||
def _make_movel(arm_id: int, pose: Pose) -> ArmMotionParams:
|
||||
param = ArmMotionParams()
|
||||
param.arm_id = arm_id
|
||||
param.motion_type = ArmMotionParams.MOVEL
|
||||
param.target_pose = pose
|
||||
param.blend_radius = 0
|
||||
return param
|
||||
|
||||
@staticmethod
|
||||
def _offset_pose(src: Pose, dx: float, dy: float, dz: float) -> Pose:
|
||||
pose = Pose()
|
||||
pose.position.x = src.position.x + dx
|
||||
pose.position.y = src.position.y + dy
|
||||
pose.position.z = src.position.z + dz
|
||||
pose.orientation = src.orientation
|
||||
return pose
|
||||
|
||||
def _check_ik(self, arm_id: int, pose: Pose, timeout_sec: float = 3.0) -> bool:
|
||||
request = InverseKinematics.Request()
|
||||
request.arm_id = int(arm_id)
|
||||
request.pose = pose
|
||||
future = self._ik_client.call_async(request)
|
||||
rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)
|
||||
if not future.done():
|
||||
return False
|
||||
response = future.result()
|
||||
if response is None:
|
||||
return False
|
||||
return int(response.result) == 0 and len(response.joint_angles) > 0
|
||||
|
||||
def _find_reachable_movel_pose(self, arm_id: int) -> Optional[Pose]:
|
||||
"""Find one reachable pose candidate for MOVEL test."""
|
||||
y_value = 0.25 if arm_id == ArmMotionParams.ARM_LEFT else -0.25
|
||||
x_candidates = [0.40, 0.35, 0.30, 0.25]
|
||||
z_candidates = [0.35, 0.30, 0.25]
|
||||
|
||||
for x in x_candidates:
|
||||
for z in z_candidates:
|
||||
pose = Pose()
|
||||
pose.position.x = x
|
||||
pose.position.y = y_value
|
||||
pose.position.z = z
|
||||
pose.orientation.x = 0.0
|
||||
pose.orientation.y = 0.0
|
||||
pose.orientation.z = 0.0
|
||||
pose.orientation.w = 1.0
|
||||
if self._check_ik(arm_id, pose):
|
||||
return pose
|
||||
return None
|
||||
|
||||
def run(self) -> int:
|
||||
"""Run all test cases and return process exit code."""
|
||||
if not self.wait_ready():
|
||||
return 2
|
||||
|
||||
summary: Dict[str, Tuple[bool, str]] = {}
|
||||
|
||||
left_target = [0.40, -0.30, 0.45, -0.25, 0.20, 0.0]
|
||||
right_target = [-0.40, 0.30, -0.45, 0.25, -0.20, 0.0]
|
||||
|
||||
summary["single_left_movej"] = self.send_goal_and_wait(
|
||||
[self._make_movej(ArmMotionParams.ARM_LEFT, left_target)]
|
||||
)
|
||||
time.sleep(0.3)
|
||||
|
||||
summary["single_right_movej"] = self.send_goal_and_wait(
|
||||
[self._make_movej(ArmMotionParams.ARM_RIGHT, right_target)]
|
||||
)
|
||||
time.sleep(0.3)
|
||||
|
||||
summary["dual_movej"] = self.send_goal_and_wait(
|
||||
[
|
||||
self._make_movej(
|
||||
ArmMotionParams.ARM_LEFT, [0.25, 0.15, -0.25, 0.35, -0.10, 0.05]
|
||||
),
|
||||
self._make_movej(
|
||||
ArmMotionParams.ARM_RIGHT,
|
||||
[-0.25, -0.15, 0.25, -0.35, 0.10, -0.05],
|
||||
),
|
||||
]
|
||||
)
|
||||
time.sleep(0.3)
|
||||
|
||||
left_pose = self._find_reachable_movel_pose(ArmMotionParams.ARM_LEFT)
|
||||
right_pose = self._find_reachable_movel_pose(ArmMotionParams.ARM_RIGHT)
|
||||
if left_pose is None or right_pose is None:
|
||||
summary["dual_movel"] = (False, "failed to find reachable poses by IK probe")
|
||||
else:
|
||||
summary["dual_movel"] = self.send_goal_and_wait(
|
||||
[
|
||||
self._make_movel(ArmMotionParams.ARM_LEFT, left_pose),
|
||||
self._make_movel(ArmMotionParams.ARM_RIGHT, right_pose),
|
||||
],
|
||||
velocity_scaling=25,
|
||||
timeout_sec=60.0,
|
||||
)
|
||||
|
||||
self.get_logger().info("========== Test Summary ==========")
|
||||
all_ok = True
|
||||
for name, (ok, message) in summary.items():
|
||||
self.get_logger().info(f"{name}: ok={ok} message={message}")
|
||||
all_ok = all_ok and ok
|
||||
|
||||
return 0 if all_ok else 1
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Entry point."""
|
||||
rclpy.init()
|
||||
node = DualArmMockTestRunner()
|
||||
try:
|
||||
code = node.run()
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
raise SystemExit(code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user