support parallel action

This commit is contained in:
2025-11-01 11:39:20 +08:00
parent d7dea1b47e
commit 4da85d16dc
6 changed files with 342 additions and 181 deletions

View File

@@ -11,9 +11,11 @@
<Arm_H name="s4_arm_grasp_box" />
<Arm_H name="s5_arm_pickup_box" />
<MoveWheel_H name="s6_wheel_move_back_to_second_position" />
<MoveWaist_H name="s7_waist_turn_around" />
<MoveWaist_H name="s8_waist_bend_down" />
<Arm_H name="s9_arm_put_down_box" />
<!-- <MoveWaist_H name="s7_waist_turn_around" /> -->
<Parallel name="move_waist">
<MoveWaist_H name="s8_waist_bend_down" />
<Arm_H name="s9_arm_put_down_box" />
</Parallel>
<Arm_H name="s10_arm_release" />
<Arm_H name="s10_arm_raise_up" />
<MoveWaist_H name="s11_waist_bend_up_and_turn_around" />

View File

@@ -406,6 +406,87 @@ public:
return wrapped_result;
}
template<typename ActionT>
std::optional<typename rclcpp_action::ClientGoalHandle<ActionT>::WrappedResult> send_and_wait_with_goal(
const std::string & name,
rclcpp::Logger logger,
const typename ActionT::Goal & goal,
std::chrono::nanoseconds timeout = std::chrono::seconds(30))
{
auto it = entries_.find(name);
if (it == entries_.end()) {
RCLCPP_ERROR(logger, "No action client registered for '%s'", name.c_str());
return std::nullopt;
}
auto * e = dynamic_cast<Entry<ActionT> *>(it->second.get());
if (!e) {
RCLCPP_ERROR(logger, "Action client type mismatch for '%s'", name.c_str());
return std::nullopt;
}
if (!e->client) {return std::nullopt;}
if (!e->client->wait_for_action_server(std::chrono::seconds(10))) {
RCLCPP_ERROR(logger, "Action server '%s' not available", name.c_str());
return std::nullopt;
}
auto opts = typename rclcpp_action::Client<ActionT>::SendGoalOptions{};
if (e->on_feedback) {opts.feedback_callback = e->on_feedback;}
if (e->on_result) {opts.result_callback = e->on_result;}
if (e->on_goal_response) {opts.goal_response_callback = e->on_goal_response;}
auto goal_future = e->client->async_send_goal(goal, opts);
auto goal_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
if (goal_future.wait_until(goal_deadline) != std::future_status::ready) {
RCLCPP_ERROR(logger, "Timed out waiting for goal response on '%s'", name.c_str());
return std::nullopt;
}
typename rclcpp_action::ClientGoalHandle<ActionT>::SharedPtr goal_handle;
try {
goal_handle = goal_future.get();
} catch (const std::exception & ex) {
RCLCPP_ERROR(logger, "Failed to get goal handle for '%s': %s", name.c_str(), ex.what());
return std::nullopt;
}
if (!goal_handle) {
RCLCPP_WARN(logger, "Goal rejected for '%s'", name.c_str());
return std::nullopt;
}
e->last_goal_handle = goal_handle;
if (e->on_goal_response) {
try {
e->on_goal_response(goal_handle);
} catch (const std::exception & ex) {
RCLCPP_WARN(logger, "Goal response callback threw for '%s': %s", name.c_str(), ex.what());
}
}
auto result_future = e->client->async_get_result(goal_handle);
auto deadline = std::chrono::steady_clock::now() + timeout;
if (result_future.wait_until(deadline) != std::future_status::ready) {
RCLCPP_ERROR(logger, "Timed out waiting for result on '%s', sending cancel", name.c_str());
try {
e->client->async_cancel_goal(goal_handle);
} catch (const std::exception & ex) {
RCLCPP_WARN(logger, "Failed cancelling goal for '%s': %s", name.c_str(), ex.what());
}
return std::nullopt;
}
typename rclcpp_action::ClientGoalHandle<ActionT>::WrappedResult wrapped_result;
try {
wrapped_result = result_future.get();
} catch (const std::exception & ex) {
RCLCPP_ERROR(logger, "Failed to get result for '%s': %s", name.c_str(), ex.what());
return std::nullopt;
}
if (e->on_result) {
try {
e->on_result(wrapped_result);
} catch (const std::exception & ex) {
RCLCPP_WARN(logger, "Result callback threw for '%s': %s", name.c_str(), ex.what());
}
}
return wrapped_result;
}
private:
rclcpp::Node * node_;
std::unordered_map<std::string, std::unique_ptr<EntryBase>> entries_;

View File

@@ -42,6 +42,10 @@ public:
explicit CerebellumNode(const rclcpp::NodeOptions & options = rclcpp::NodeOptions());
private:
struct GoalExecutionContext { std::atomic<bool> cancel{false}; };
std::unordered_map<std::uintptr_t, std::shared_ptr<GoalExecutionContext>> goal_ctxs_;
std::mutex goal_ctxs_mutex_;
static thread_local const std::vector<interfaces::msg::SkillCall>* tls_current_calls_;
// Generic action registry to support multiple actions via registration
std::unique_ptr<ActionServerRegistry> action_registry_;
std::unique_ptr<ActionClientRegistry> action_clients_;
@@ -114,8 +118,9 @@ private:
template<typename T>
bool TryParseCallPayload(const std::string & skill_name, T & out) const
{
if (last_calls_.empty()) {return false;}
for (const auto & c : last_calls_) {
const auto * calls = tls_current_calls_ ? tls_current_calls_ : &last_calls_;
if (!calls || calls->empty()) {return false;}
for (const auto & c : *calls) {
if (c.name == skill_name && !c.payload_yaml.empty()) {
try {
auto node = YAML::Load(c.payload_yaml);
@@ -355,6 +360,9 @@ private:
* @return Topic / action client key.
*/
std::string ResolveTopicForSkill(const std::string & skill) const;
std::string ResolveTopicForSkillFromCalls(
const std::vector<interfaces::msg::SkillCall> & calls,
const std::string & skill) const;
/**
* @brief Finalize goal with summary result (success or abort) and log message.
* @param goal_handle Goal handle.
@@ -379,6 +387,14 @@ private:
/** @brief Register ExecuteBtAction server with goal/execute handlers. */
void SetupExecuteBtServer();
void RunExecuteBtGoalConcurrent(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle);
brain::CerebellumData::ExecResult ExecuteSequenceConcurrent(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle,
const std::vector<std::string> & skills,
const std::shared_ptr<GoalExecutionContext> & ctx);
/** @brief 将检测到的姿态转换到机械臂参考坐标系。 */
bool TransformPoseToArmFrame(
const geometry_msgs::msg::PoseStamped & source,

View File

@@ -237,6 +237,17 @@ private:
std::optional<std::string> current_instance;
};
std::unordered_map<std::string, PerNodeExecInfo> per_node_exec_;
struct PerInstanceExecInfo
{
std::atomic<bool> in_flight {false};
std::atomic<bool> finished {false};
bool success {false};
ExecResultCode result_code {ExecResultCode::UNKNOWN};
rclcpp::Time start_time;
double last_progress {0.0};
};
std::unordered_map<std::string, PerInstanceExecInfo> per_instance_exec_;
// Set of already registered action client names (prevents duplicate registration & duplicate feedback).
std::unordered_set<std::string> registered_actions_;
// Per-BT-node execution timeout (seconds).

View File

@@ -14,6 +14,7 @@
#include <iostream>
#include <limits>
#include <memory>
#include <cstdint>
#include <sstream>
#include <string>
#include <thread>
@@ -54,6 +55,8 @@
namespace brain
{
static thread_local int tls_arm_body_id = -1;
// 线程局部当前ExecuteBtAction goal的calls用于payload解析与topic解析
thread_local const std::vector<interfaces::msg::SkillCall>* CerebellumNode::tls_current_calls_ = nullptr;
/**
* @brief Construct the CerebellumNode.
@@ -861,18 +864,21 @@ void CerebellumNode::SetupExecuteBtServer()
}
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
};
// Accept cancel requests: set runtime preempt flag; the execute loop will notice and stop
// Accept cancel requests per-goal
h.on_cancel = [this](const std::shared_ptr<rclcpp_action::ServerGoalHandle<EBA>> gh) {
auto & d = brain::runtime();
d.preempt.store(true, std::memory_order_release);
d.preempt_reason = "Cancelled";
std::lock_guard<std::mutex> lk(goal_ctxs_mutex_);
auto key = reinterpret_cast<std::uintptr_t>(gh.get());
auto it = goal_ctxs_.find(key);
if (it != goal_ctxs_.end() && it->second) {
it->second->cancel.store(true, std::memory_order_release);
}
RCLCPP_WARN(this->get_logger(), "[ExecuteBtAction] cancellation requested (epoch=%u)", gh->get_goal()->epoch);
return rclcpp_action::CancelResponse::ACCEPT;
};
// Each goal executes in its own detached thread (provided by ActionServerRegistry)
h.execute = [this](const std::shared_ptr<rclcpp_action::ServerGoalHandle<EBA>> goal_handle) {
// Instead of executing inline, install execute_fn & complete_cb for state machine.
active_goal_wptr = goal_handle;
RunExecuteBtGoal(goal_handle);
RunExecuteBtGoalConcurrent(goal_handle);
};
action_registry_->register_action_server<EBA>(brain::kExecuteBtActionName, h);
}
@@ -967,6 +973,18 @@ std::string CerebellumNode::ToSnake(const std::string & s) const
return out;
}
std::string CerebellumNode::ResolveTopicForSkillFromCalls(
const std::vector<interfaces::msg::SkillCall> & calls,
const std::string & skill) const
{
for (const auto & c : calls) {
if (c.name == skill && !c.topic.empty()) {
return c.topic;
}
}
return ResolveTopicForSkill(skill);
}
/**
* @brief Resolve timeout value for a specific skill.
*
@@ -1177,8 +1195,11 @@ bool CerebellumNode::RunActionSkillWithProgress(
auto logger = this->get_logger();
std::string local_detail;
std::thread worker([&]() {
const auto * prev_calls = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = &goal_handle->get_goal()->calls;
ok = brain::dispatch_skill_action<0, brain::SkillActionTypes>(
skill, topic, action_clients_.get(), logger, timeout_ns, local_detail);
CerebellumNode::tls_current_calls_ = prev_calls;
finished.store(true, std::memory_order_release);
});
const auto start_steady = std::chrono::steady_clock::now();
@@ -1234,19 +1255,25 @@ bool CerebellumNode::ExecuteBilateralArmAction(
std::string d1, d2;
auto worker = [this, &ok_out = ok1, &done_flag = finished1, &d_out = d1,
skill, topic, timeout](int body_id) {
skill, topic, timeout, &goal_handle](int body_id) {
tls_arm_body_id = body_id;
const auto * prev_calls = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = &goal_handle->get_goal()->calls;
ok_out = brain::dispatch_skill_action<0, brain::SkillActionTypes>(
skill, topic, action_clients_.get(), this->get_logger(), timeout, d_out);
CerebellumNode::tls_current_calls_ = prev_calls;
done_flag.store(true, std::memory_order_release);
};
std::thread t1(worker, RIGHT_ARM); // RIGHT_ARM=1
auto worker2 = [this, &ok_out = ok2, &done_flag = finished2, &d_out = d2,
skill, topic, timeout](int body_id) {
skill, topic, timeout, &goal_handle](int body_id) {
tls_arm_body_id = body_id;
const auto * prev_calls = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = &goal_handle->get_goal()->calls;
ok_out = brain::dispatch_skill_action<0, brain::SkillActionTypes>(
skill, topic, action_clients_.get(), this->get_logger(), timeout, d_out);
CerebellumNode::tls_current_calls_ = prev_calls;
done_flag.store(true, std::memory_order_release);
};
std::thread t2(worker2, LEFT_ARM); // LEFT_ARM=0
@@ -1395,41 +1422,16 @@ void CerebellumNode::RunExecuteBtGoal(
return; // goal aborted
}
// Cache calls for this goal (Plan A minimal support)
last_calls_.clear();
for (const auto & c : goal_handle->get_goal()->calls) { last_calls_.push_back(c); }
// Log cached calls for verification
if (!last_calls_.empty()) {
std::ostringstream oss;
oss << "Cached last_calls_ (count=" << last_calls_.size() << "): ";
for (size_t i = 0; i < last_calls_.size(); ++i) {
if (i) oss << ", ";
oss << last_calls_[i].name;
}
RCLCPP_INFO(this->get_logger(), "%s", oss.str().c_str());
}
// Install execute_fn & complete_cb into runtime so state machine owns lifecycle.
auto & d = brain::runtime();
d.execute_fn = [this, goal_handle, skills](rclcpp::Logger logger) -> brain::CerebellumData::ExecResult {
(void)logger; // we already have this->get_logger() inside helpers
return ExecuteSequence(goal_handle, skills);
};
d.complete_cb = [this, goal_handle](bool /*success*/, const std::string & message) {
// Build result message from runtime latched fields
auto & d = brain::runtime();
using EBA = interfaces::action::ExecuteBtAction;
auto res = std::make_shared<EBA::Result>();
res->success = d.last_success.load();
res->total_skills = d.last_total_skills;
res->succeeded_skills = d.last_succeeded_skills;
res->message = message.empty() ? std::string("(no message)") : message;
if (res->success) {
goal_handle->succeed(res);
RCLCPP_INFO(this->get_logger(), "%s", res->message.c_str());
} else {
goal_handle->abort(res);
RCLCPP_ERROR(this->get_logger(), "%s", res->message.c_str());
}
};
// Legacy SMACC path replaced by direct per-goal execution; keep for backward compatibility if needed
auto result = ExecuteSequence(goal_handle, skills);
using EBA = interfaces::action::ExecuteBtAction;
auto res = std::make_shared<EBA::Result>();
res->success = result.success;
res->total_skills = result.total_skills;
res->succeeded_skills = result.succeeded_skills;
res->message = result.message;
if (res->success) { goal_handle->succeed(res); }
else { goal_handle->abort(res); }
}
/**
@@ -1645,4 +1647,126 @@ void CerebellumNode::FinalizeExecuteBtResult(
}
}
void CerebellumNode::RunExecuteBtGoalConcurrent(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle)
{
using EBA = interfaces::action::ExecuteBtAction;
// Register per-goal context
auto ctx = std::make_shared<GoalExecutionContext>();
{
std::lock_guard<std::mutex> lk(goal_ctxs_mutex_);
goal_ctxs_[reinterpret_cast<std::uintptr_t>(goal_handle.get())] = ctx;
}
// Set thread-local calls for payload parsing
struct CallsTlsGuard {
const std::vector<interfaces::msg::SkillCall>* prev;
CallsTlsGuard(const std::vector<interfaces::msg::SkillCall>* calls) {
prev = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = calls;
}
~CallsTlsGuard() { CerebellumNode::tls_current_calls_ = prev; }
} tls_guard(&goal_handle->get_goal()->calls);
std::vector<std::string> skills;
if (!ValidateAndParseGoal(goal_handle, skills)) {
// cleanup
std::lock_guard<std::mutex> lk(goal_ctxs_mutex_);
goal_ctxs_.erase(reinterpret_cast<std::uintptr_t>(goal_handle.get()));
return;
}
auto result = ExecuteSequenceConcurrent(goal_handle, skills, ctx);
auto res = std::make_shared<EBA::Result>();
res->success = result.success;
res->total_skills = result.total_skills;
res->succeeded_skills = result.succeeded_skills;
res->message = result.message;
if (res->success) { goal_handle->succeed(res); }
else { goal_handle->abort(res); }
// Remove context after completion
std::lock_guard<std::mutex> lk(goal_ctxs_mutex_);
goal_ctxs_.erase(reinterpret_cast<std::uintptr_t>(goal_handle.get()));
}
brain::CerebellumData::ExecResult CerebellumNode::ExecuteSequenceConcurrent(
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle,
const std::vector<std::string> & skills,
const std::shared_ptr<GoalExecutionContext> & ctx)
{
brain::CerebellumData::ExecResult result;
result.total_skills = static_cast<int32_t>(skills.size());
if (skills.empty()) {
result.success = false;
result.message = "Empty skill list";
return result;
}
// Optional overall sequence timeout parameter (seconds); default 0 = disabled
double overall_timeout_sec = 0.0;
(void)this->get_parameter("overall_sequence_timeout_sec", overall_timeout_sec);
const bool timeout_enabled = overall_timeout_sec > 0.0;
auto seq_start = std::chrono::steady_clock::now();
std::ostringstream summary;
summary << "Seq{" << goal_handle->get_goal()->action_name << "}: ";
bool overall_ok = true;
size_t loop_index = 0;
const auto & calls = goal_handle->get_goal()->calls;
for (size_t seq_index = 0; seq_index < skills.size(); ++seq_index) {
// Per-goal cancel
if (ctx && ctx->cancel.load(std::memory_order_acquire)) {
overall_ok = false;
summary << "(Cancelled)";
break;
}
// Overall timeout
if (timeout_enabled) {
double elapsed = std::chrono::duration<double>(std::chrono::steady_clock::now() - seq_start).count();
if (elapsed > overall_timeout_sec) {
overall_ok = false;
summary << "(Timeout)";
break;
}
}
const auto & skill = skills[seq_index];
const std::string topic = ResolveTopicForSkillFromCalls(calls, skill);
RCLCPP_INFO(this->get_logger(), "[ExecuteBtAction] Dispatch skill=%s topic=%s, total_skills:%d",
skill.c_str(), topic.c_str(), static_cast<int>(skills.size()));
bool ok = false; std::string detail;
auto it = skill_manager_->skills().find(skill);
if (it == skill_manager_->skills().end()) {
detail = "Skill not defined";
PublishFeedbackStage(
goal_handle, "END", skill,
static_cast<float>(seq_index + 1) / static_cast<float>(skills.size()),
"FAIL:undefined");
} else {
ok = ExecuteSingleSkill(
skill, topic, it->second, detail, static_cast<int>(seq_index), static_cast<int>(skills.size()), goal_handle);
PublishFeedbackStage(
goal_handle, "END", skill,
static_cast<float>(seq_index + 1) / static_cast<float>(skills.size()),
ok ? (detail.empty() ? "OK" : std::string("OK:") + detail)
: (detail.empty() ? "FAIL" : std::string("FAIL:") + detail));
}
RecordSkillResult(skill, ok);
if (ok) { ++result.succeeded_skills; }
overall_ok = overall_ok && ok;
summary << skill << '(' << (ok ? "OK" : "FAIL") << ')';
if (!detail.empty()) summary << '[' << detail << ']';
if (loop_index + 1 < skills.size()) summary << ", ";
++loop_index;
if (!ok) {
RCLCPP_ERROR(this->get_logger(), "Skill %s failed%s, detail: %s", skill.c_str(),
abort_on_failure_ ? ", abort sequence" : " (continue)", detail.c_str());
if (abort_on_failure_) break;
}
}
{
std::lock_guard<std::mutex> lk(stats_mutex_);
if (overall_ok) ++total_sequence_success_; else ++total_sequence_failure_;
}
result.success = overall_ok;
result.message = summary.str();
return result;
}
} // namespace brain

View File

@@ -207,66 +207,26 @@ void CerebrumNode::RegisterActionClient()
RCLCPP_INFO(this->get_logger(), " out_call[%zu]: name=%s interface=%s kind=%s topic=%s payload_len=%zu",
i, c.name.c_str(), c.interface_type.c_str(), c.call_kind.c_str(), c.topic.c_str(), c.payload_yaml.size());
}
RCLCPP_INFO(this->get_logger(), "[ExecuteBtAction]Sending goal epoch=%u skills=%s", g.epoch, g.action_name.c_str());
RCLCPP_INFO(this->get_logger(), "[ExecuteBtAction] Sending goal epoch=%u skills=%s", g.epoch, g.action_name.c_str());
return g;
},
// Feedback
[this](
typename ActionClientRegistry::GoalHandle<EBA>::SharedPtr,
const std::shared_ptr<const EBA::Feedback> feedback) {
if (!feedback) {
if (!feedback) { return; }
if (!epoch_filter_.accept(feedback->epoch, feedback->current_skill, bt_pending_run_)) {
return;
}
if (!epoch_filter_.accept(
feedback->epoch, feedback->current_skill, bt_pending_run_))
{
return;
}
{
std::lock_guard<std::mutex> lk(exec_mutex_);
// Match per-skill BT node by skill_name.
const std::string bt_type = feedback->current_skill + std::string("_H");
auto it = per_node_exec_.find(bt_type);
if (it != per_node_exec_.end()) {
auto & info = it->second;
if (!info.in_flight && !info.result) {
// Late feedback for an already closed node; ignore.
} else {
if (feedback->stage == "RUN" && feedback->progress + 1e-4 < info.last_progress) {
return; // non-monotonic
}
if (feedback->stage == "RUN") {
info.last_progress = feedback->progress;
}
if (feedback->stage == "END") {
info.result_code = ParseResultDetail(feedback->detail);
info.result = (info.result_code == ExecResultCode::SUCCESS);
if (info.result_code == ExecResultCode::SUCCESS) {
info.awaiting_result = true;
info.end_feedback_time = this->now();
} else {
info.in_flight = false;
info.awaiting_result = false;
}
}
}
}
}
RCLCPP_INFO(
this->get_logger(),
// For parallel per-instance leaves, we only log feedback; mapping to instance is ambiguous.
RCLCPP_INFO(this->get_logger(),
"[ExecuteBtAction][fb][epoch=%u] stage=%s skill=%s progress=%.2f detail=%s",
feedback->epoch,
feedback->stage.c_str(),
feedback->current_skill.c_str(),
feedback->progress,
feedback->detail.c_str());
feedback->epoch, feedback->stage.c_str(), feedback->current_skill.c_str(),
feedback->progress, feedback->detail.c_str());
},
// Result
[this](const ActionClientRegistry::GoalHandle<EBA>::WrappedResult & res) {
bool ok = (
res.code == rclcpp_action::ResultCode::SUCCEEDED &&
res.result &&
res.result->success);
const bool ok = (res.code == rclcpp_action::ResultCode::SUCCEEDED && res.result && res.result->success);
if (ok) {
RCLCPP_INFO(this->get_logger(), "[ExecuteBtAction][result] OK: %s",
res.result ? res.result->message.c_str() : "<null>");
@@ -274,17 +234,7 @@ void CerebrumNode::RegisterActionClient()
RCLCPP_ERROR(this->get_logger(), "[ExecuteBtAction][result] FAIL code=%d msg=%s",
static_cast<int>(res.code), res.result ? res.result->message.c_str() : "<null>");
}
for (auto & kv : per_node_exec_) {
auto & info = kv.second;
if (info.in_flight || info.awaiting_result) {
info.in_flight = false;
info.awaiting_result = false;
info.result = ok; // authoritative final
info.result_code = ok ? ExecResultCode::SUCCESS : ExecResultCode::FAILURE;
RCLCPP_INFO(this->get_logger(), "[ExecuteBtAction][result] Finalizing node %s result=%s",
kv.first.c_str(), info.result ? "SUCCESS" : "FAILURE");
}
}
// Per-instance worker threads handle updating node status; avoid mutating shared state here.
});
}
@@ -474,94 +424,71 @@ void CerebrumNode::RegisterSkillBtActions()
const std::string bt_type = skill.name + std::string("_H");
BTActionHandlers handlers;
handlers.on_start = [this, bt_type, skill](rclcpp::Node *, BT::TreeNode & node) {
auto & info = per_node_exec_[bt_type];
const std::string instance_name = node.name();
// If this specific BT node instance already finished successfully earlier in this tree run,
// short-circuit to SUCCESS so that retries only re-execute the failed node.
if (info.done_instances.find(instance_name) != info.done_instances.end()) {
RCLCPP_INFO(this->get_logger(), "[%s/%s] already succeeded, skip", bt_type.c_str(), instance_name.c_str());
return BT::NodeStatus::SUCCESS;
}
// If a previous instance for this skill type is still in flight, keep running.
if (info.in_flight) {
const std::string key = bt_type + std::string("|") + instance_name;
auto & inst = per_instance_exec_[key];
if (inst.in_flight.load(std::memory_order_acquire) && !inst.finished.load(std::memory_order_acquire)) {
return BT::NodeStatus::RUNNING;
}
info.in_flight = true;
info.result = false;
info.result_code = ExecResultCode::UNKNOWN;
info.start_time = this->now();
info.skill_name = skill.name;
info.last_progress = 0.0;
info.cancel_state = PerNodeExecInfo::CancelState::NONE;
info.current_instance = instance_name;
if (!DeclareBtActionParamsForSkillInstance(skill.name, instance_name) ||
!action_registry_->wait_for_server(brain::kExecuteBtActionName, 1500ms)) {
// Init per-instance state and launch worker
inst.in_flight = true;
inst.finished = false;
inst.success = false;
inst.result_code = ExecResultCode::UNKNOWN;
inst.start_time = this->now();
if (!DeclareBtActionParamsForSkillInstance(skill.name, instance_name) ||
!action_registry_->wait_for_server(brain::kExecuteBtActionName, 1500ms)) {
RCLCPP_ERROR(this->get_logger(), "ExecuteBtAction server unavailable");
info.in_flight = false;
info.result = false;
info.result_code = ExecResultCode::FAILURE;
single_skill_goal_override_.reset();
info.current_instance.reset();
inst.in_flight = false;
inst.finished = true;
inst.success = false;
inst.result_code = ExecResultCode::FAILURE;
return BT::NodeStatus::FAILURE;
}
// Override goal builder to send only this skill.
single_skill_goal_override_ = skill.name;
single_skill_instance_override_ = instance_name;
action_registry_->send(brain::kExecuteBtActionName, this->get_logger());
RCLCPP_INFO(this->get_logger(), "[%s/%s] BTAction on_start", bt_type.c_str(), instance_name.c_str());
// Clear override after send to avoid affecting other triggers.
single_skill_goal_override_.reset();
single_skill_instance_override_.reset();
interfaces::action::ExecuteBtAction::Goal goal;
goal.epoch = static_cast<uint32_t>(epoch_filter_.epoch());
goal.action_name = skill.name;
goal.calls.clear();
goal.calls.push_back(BuildSkillCallForSkill(skill.name, instance_name));
auto logger = this->get_logger();
std::thread([this, key, goal, logger]() {
using EBA = interfaces::action::ExecuteBtAction;
auto res = action_registry_->send_and_wait_with_goal<EBA>(
brain::kExecuteBtActionName, logger, goal,
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>(node_timeout_sec_)));
auto it = per_instance_exec_.find(key);
if (it == per_instance_exec_.end()) { return; }
auto & inst = it->second;
if (!res.has_value()) {
inst.success = false;
inst.result_code = ExecResultCode::FAILURE;
} else {
const auto & wrapped = res.value();
const bool ok = (wrapped.code == rclcpp_action::ResultCode::SUCCEEDED && wrapped.result && wrapped.result->success);
inst.success = ok;
inst.result_code = ok ? ExecResultCode::SUCCESS : ExecResultCode::FAILURE;
}
inst.in_flight = false;
inst.finished = true;
}).detach();
RCLCPP_INFO(this->get_logger(), "[%s/%s] launched per-instance ExecuteBtAction goal", bt_type.c_str(), instance_name.c_str());
return BT::NodeStatus::RUNNING;
};
handlers.on_running = [this, bt_type](rclcpp::Node *, BT::TreeNode & node) {
auto it = per_node_exec_.find(bt_type);
if (it == per_node_exec_.end()) {return BT::NodeStatus::FAILURE;}
auto & info = it->second;
const std::string instance_name = node.name();
// If this instance is already marked done (from a previous retry cycle), succeed immediately.
if (info.done_instances.find(instance_name) != info.done_instances.end()) {
return BT::NodeStatus::SUCCESS;
}
if (info.in_flight) {
double elapsed = (this->now() - info.start_time).seconds();
if (elapsed > node_timeout_sec_) {
info.in_flight = false;
info.result = false;
info.result_code = ExecResultCode::FAILURE;
RCLCPP_WARN(this->get_logger(), "[%s/%s] BTAction on_running timeout after %.2f sec", bt_type.c_str(), instance_name.c_str(), elapsed);
action_registry_->cancel(brain::kExecuteBtActionName, this->get_logger());
// return BT::NodeStatus::FAILURE;
}
const std::string key = bt_type + std::string("|") + instance_name;
auto it = per_instance_exec_.find(key);
if (it == per_instance_exec_.end()) { return BT::NodeStatus::FAILURE; }
auto & inst = it->second;
if (!inst.finished.load(std::memory_order_acquire)) {
return BT::NodeStatus::RUNNING;
}
if (info.awaiting_result) {
double wait_elapsed = (this->now() - info.end_feedback_time).seconds();
if (wait_elapsed > result_wait_warn_sec_) {
RCLCPP_WARN_THROTTLE(
this->get_logger(), *this->get_clock(), 2000,
"Skill %s awaiting final result %.2fs after END feedback",
bt_type.c_str(), wait_elapsed);
}
return BT::NodeStatus::RUNNING;
}
// Terminal state for this instance
if (info.result) {
// Mark this instance as done so future retries skip it
if (!instance_name.empty()) {
info.done_instances.insert(instance_name);
}
info.current_instance.reset();
// Set a fixed cooldown between actions on SUCCESS
g_bt_resume_time_steady = std::chrono::steady_clock::now() + kInterActionDelayMs;
return BT::NodeStatus::SUCCESS;
}
info.current_instance.reset();
// Set a fixed cooldown between actions on FAILURE as well
g_bt_resume_time_steady = std::chrono::steady_clock::now() + kInterActionDelayMs;
return BT::NodeStatus::FAILURE;
return inst.success ? BT::NodeStatus::SUCCESS : BT::NodeStatus::FAILURE;
};
handlers.on_halted = [this, bt_type](rclcpp::Node *, BT::TreeNode &) {
RCLCPP_INFO(this->get_logger(), "[%s] BTAction on_halted", bt_type.c_str());