transfer instance&timeout to cerebellum, disable action timeout

This commit is contained in:
NuoDaJia02
2025-11-12 16:41:53 +08:00
parent 7753f2f569
commit 114ba598d0
4 changed files with 103 additions and 29 deletions

View File

@@ -379,16 +379,16 @@ public:
}
auto result_future = e->client->async_get_result(goal_handle);
auto deadline = std::chrono::steady_clock::now() + timeout;
if (result_future.wait_until(deadline) != std::future_status::ready) {
RCLCPP_ERROR(logger, "Timed out waiting for result on '%s', sending cancel", name.c_str());
try {
e->client->async_cancel_goal(goal_handle);
} catch (const std::exception & ex) {
RCLCPP_WARN(logger, "Failed cancelling goal for '%s': %s", name.c_str(), ex.what());
}
return std::nullopt;
}
// auto deadline = std::chrono::steady_clock::now() + timeout;
// if (result_future.wait_until(deadline) != std::future_status::ready) {
// RCLCPP_ERROR(logger, "Timed out waiting for result on '%s', sending cancel", name.c_str());
// try {
// e->client->async_cancel_goal(goal_handle);
// } catch (const std::exception & ex) {
// RCLCPP_WARN(logger, "Failed cancelling goal for '%s': %s", name.c_str(), ex.what());
// }
// return std::nullopt;
// }
typename rclcpp_action::ClientGoalHandle<ActionT>::WrappedResult wrapped_result;
try {
wrapped_result = result_future.get();
@@ -460,16 +460,16 @@ public:
}
auto result_future = e->client->async_get_result(goal_handle);
auto deadline = std::chrono::steady_clock::now() + timeout;
if (result_future.wait_until(deadline) != std::future_status::ready) {
RCLCPP_ERROR(logger, "Timed out waiting for result on '%s', sending cancel", name.c_str());
try {
e->client->async_cancel_goal(goal_handle);
} catch (const std::exception & ex) {
RCLCPP_WARN(logger, "Failed cancelling goal for '%s': %s", name.c_str(), ex.what());
}
return std::nullopt;
}
// auto deadline = std::chrono::steady_clock::now() + timeout;
// if (result_future.wait_until(deadline) != std::future_status::ready) {
// RCLCPP_ERROR(logger, "Timed out waiting for result on '%s', sending cancel", name.c_str());
// try {
// e->client->async_cancel_goal(goal_handle);
// } catch (const std::exception & ex) {
// RCLCPP_WARN(logger, "Failed cancelling goal for '%s': %s", name.c_str(), ex.what());
// }
// return std::nullopt;
// }
typename rclcpp_action::ClientGoalHandle<ActionT>::WrappedResult wrapped_result;
try {
wrapped_result = result_future.get();

View File

@@ -275,6 +275,8 @@ private:
const std::string & skill,
const std::string & topic,
std::chrono::nanoseconds timeout_ns,
const std::string & instance_name,
uint32_t timeout_ms_override,
int index,
int total_skills,
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle,
@@ -296,6 +298,8 @@ private:
const std::string & skill,
const std::string & topic,
std::chrono::nanoseconds timeout,
const std::string & instance_name,
uint32_t timeout_ms_override,
int index,
int total_skills,
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle,

View File

@@ -854,8 +854,8 @@ void CerebellumNode::SetupExecuteBtServer()
goal->epoch, goal->action_name.c_str(), goal->calls.size());
for (size_t i = 0; i < goal->calls.size(); ++i) {
const auto & c = goal->calls[i];
RCLCPP_INFO(this->get_logger(), "call[%zu]: name=%s interface=%s kind=%s topic=%s payload_len=%zu",
i, c.name.c_str(), c.interface_type.c_str(), c.call_kind.c_str(), c.topic.c_str(), c.payload_yaml.size());
RCLCPP_INFO(this->get_logger(), "call[%zu]: name=%s instance=%s interface=%s kind=%s topic=%s timeout_ms=%u payload_len=%zu",
i, c.name.c_str(), c.instance_name.c_str(), c.interface_type.c_str(), c.call_kind.c_str(), c.topic.c_str(), c.timeout_ms, c.payload_yaml.size());
if (!c.payload_yaml.empty()) {
// Log payload content (trim if very long)
const std::string payload = c.payload_yaml.size() > 512 ? c.payload_yaml.substr(0, 512) + "..." : c.payload_yaml;
@@ -1153,17 +1153,35 @@ bool CerebellumNode::ExecuteActionSkill(
RCLCPP_ERROR(this->get_logger(), "[%s] %s, TOPIC: %s", skill.c_str(), detail.c_str(), topic.c_str());
return false;
}
RCLCPP_INFO(this->get_logger(), "SKILL: [%s], DETAIL:[%s], TOPIC:[%s]", skill.c_str(), detail.c_str(), topic.c_str());
const float base_progress =
(total_skills > 0) ? (static_cast<float>(index) / static_cast<float>(total_skills)) : 0.f;
PublishFeedbackStage(goal_handle, "START", skill, base_progress, "");
if (!WaitForActionServer(topic, skill, base_progress, goal_handle, detail)) {
return false;
}
auto timeout = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::string instance_name;
uint32_t timeout_ms_override = 0;
if (goal_handle && goal_handle->get_goal()) {
const auto & calls = goal_handle->get_goal()->calls;
for (const auto & c : calls) {
if (c.name == skill) {
instance_name = c.instance_name;
timeout_ms_override = c.timeout_ms;
break;
}
}
}
auto effective_timeout = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(GetTimeoutForSkill(skill)));
if (timeout_ms_override > 0) {
effective_timeout = std::chrono::milliseconds(timeout_ms_override);
}
return (skill == "Arm") ?
ExecuteBilateralArmAction(skill, topic, timeout, index, total_skills, goal_handle, detail) :
RunActionSkillWithProgress(skill, topic, timeout, index, total_skills, goal_handle, detail);
ExecuteBilateralArmAction(skill, topic, effective_timeout, instance_name, timeout_ms_override, index, total_skills, goal_handle, detail) :
RunActionSkillWithProgress(skill, topic, effective_timeout, instance_name, timeout_ms_override, index, total_skills, goal_handle, detail);
}
/**
@@ -1185,6 +1203,8 @@ bool CerebellumNode::RunActionSkillWithProgress(
const std::string & skill,
const std::string & topic,
std::chrono::nanoseconds timeout_ns,
const std::string & instance_name,
uint32_t timeout_ms_override,
int index,
int total_skills,
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle,
@@ -1194,9 +1214,11 @@ bool CerebellumNode::RunActionSkillWithProgress(
bool ok = false;
auto logger = this->get_logger();
std::string local_detail;
(void)timeout_ms_override; // currently encoded into timeout_ns already
std::thread worker([&]() {
const auto * prev_calls = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = &goal_handle->get_goal()->calls;
RCLCPP_INFO(this->get_logger(), "[%s] instance='%s' timeout=%.3fs", skill.c_str(), instance_name.c_str(), timeout_ns.count()/1e9);
ok = brain::dispatch_skill_action<0, brain::SkillActionTypes>(
skill, topic, action_clients_.get(), logger, timeout_ns, local_detail);
CerebellumNode::tls_current_calls_ = prev_calls;
@@ -1243,19 +1265,25 @@ bool CerebellumNode::ExecuteBilateralArmAction(
const std::string & skill,
const std::string & topic,
std::chrono::nanoseconds timeout,
const std::string & instance_name,
uint32_t timeout_ms_override,
int index,
int total_skills,
const std::shared_ptr<rclcpp_action::ServerGoalHandle<interfaces::action::ExecuteBtAction>> & goal_handle,
std::string & detail)
{
RCLCPP_INFO(this->get_logger(), "[Arm] send two goals: body_id=1 and body_id=0, index=%d, total_skills=%d", index, total_skills);
RCLCPP_INFO(this->get_logger(), "[Arm] instance='%s' timeout=%ld ms", instance_name.c_str(), static_cast<long>(timeout_ms_override));
std::atomic<bool> finished1{false}, finished2{false};
bool ok1 = false, ok2 = false;
std::string d1, d2;
auto left_arm_time_steady = std::chrono::steady_clock::now();
auto right_arm_time_steady = std::chrono::steady_clock::now();
auto worker = [this, &ok_out = ok1, &done_flag = finished1, &d_out = d1,
skill, topic, timeout, &goal_handle](int body_id) {
skill, topic, timeout, &goal_handle, &time_steady = left_arm_time_steady](int body_id) {
tls_arm_body_id = body_id;
const auto * prev_calls = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = &goal_handle->get_goal()->calls;
@@ -1263,11 +1291,13 @@ bool CerebellumNode::ExecuteBilateralArmAction(
skill, topic, action_clients_.get(), this->get_logger(), timeout, d_out);
CerebellumNode::tls_current_calls_ = prev_calls;
done_flag.store(true, std::memory_order_release);
time_steady = std::chrono::steady_clock::now();
RCLCPP_WARN(this->get_logger(), "left_arm_time_steady2");
};
std::thread t1(worker, RIGHT_ARM); // RIGHT_ARM=1
auto worker2 = [this, &ok_out = ok2, &done_flag = finished2, &d_out = d2,
skill, topic, timeout, &goal_handle](int body_id) {
skill, topic, timeout, &goal_handle, &time_steady = right_arm_time_steady](int body_id) {
tls_arm_body_id = body_id;
const auto * prev_calls = CerebellumNode::tls_current_calls_;
CerebellumNode::tls_current_calls_ = &goal_handle->get_goal()->calls;
@@ -1275,14 +1305,26 @@ bool CerebellumNode::ExecuteBilateralArmAction(
skill, topic, action_clients_.get(), this->get_logger(), timeout, d_out);
CerebellumNode::tls_current_calls_ = prev_calls;
done_flag.store(true, std::memory_order_release);
time_steady = std::chrono::steady_clock::now();
RCLCPP_WARN(this->get_logger(), "right_arm_time_steady2");
};
std::thread t2(worker2, LEFT_ARM); // LEFT_ARM=0
const auto start_steady = std::chrono::steady_clock::now();
while (!(finished1.load(std::memory_order_acquire) && finished2.load(std::memory_order_acquire))) {
double elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_steady).count();
PublishFeedbackStage(goal_handle, "RUN", skill, elapsed_ms, "");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
double arm_exe_bias = std::chrono::duration_cast<std::chrono::milliseconds>(left_arm_time_steady - right_arm_time_steady).count();
RCLCPP_WARN(this->get_logger(), "arm_exe_bias: %f", arm_exe_bias);
if (timeout_ms_override > 0 && std::abs(arm_exe_bias) > timeout_ms_override) {
ok1 = false;
ok2 = false;
RCLCPP_ERROR(this->get_logger(), "Fail in Arm arm_exe_bias: %f, timeout_ms_override=%ld", arm_exe_bias, static_cast<long>(timeout_ms_override));
}
if (t1.joinable()) {

View File

@@ -654,7 +654,7 @@ void CerebrumNode::SetupDynamicParameterHandling()
break;
}
const std::string field = name.substr(last_dot + 1);
if (field != "topic" && field != "payload_yaml") {
if (field != "topic" && field != "payload_yaml" && field != "timeout_ms") {
// Reject unknown fields under this namespace to avoid typos
result.successful = false;
result.reason = "unsupported field for cerebrum.bt.*";
@@ -684,6 +684,8 @@ interfaces::msg::SkillCall CerebrumNode::BuildSkillCallForSkill(const std::strin
{
interfaces::msg::SkillCall call;
call.name = skill_name;
call.instance_name = "";
call.timeout_ms = 0;
// Interface type/kind inference from SkillSpec
if (!skill_manager_) {return call;}
auto it = skill_manager_->skills().find(skill_name);
@@ -701,6 +703,7 @@ interfaces::msg::SkillCall CerebrumNode::BuildSkillCallForSkill(const std::strin
const std::string ns = std::string("cerebrum.bt.") + skill_name + ".";
const std::string p_topic = ns + "topic";
const std::string p_payload = ns + "payload_yaml";
const std::string p_timeout = ns + "timeout_ms";
try {
if (this->has_parameter(p_topic)) {
call.topic = this->get_parameter(p_topic).as_string();
@@ -708,6 +711,15 @@ interfaces::msg::SkillCall CerebrumNode::BuildSkillCallForSkill(const std::strin
if (this->has_parameter(p_payload)) {
call.payload_yaml = this->get_parameter(p_payload).as_string();
}
if (this->has_parameter(p_timeout)) {
auto v = this->get_parameter(p_timeout).as_string();
try {
long parsed = std::stol(v);
if (parsed > 0) call.timeout_ms = static_cast<uint32_t>(parsed);
} catch (...) {
RCLCPP_WARN(this->get_logger(), "[%s] invalid timeout_ms value '%s'", skill_name.c_str(), v.c_str());
}
}
} catch (const std::exception & e) {
RCLCPP_WARN(this->get_logger(), "[%s] read params failed: %s", skill_name.c_str(), e.what());
}
@@ -733,6 +745,7 @@ interfaces::msg::SkillCall CerebrumNode::BuildSkillCallForSkill(
const std::string ns = std::string("cerebrum.bt.") + skill_name + "." + *instance_name + ".";
const std::string p_topic = ns + "topic";
const std::string p_payload = ns + "payload_yaml";
const std::string p_timeout = ns + "timeout_ms";
try {
// topic override
if (this->has_parameter(p_topic)) {
@@ -744,6 +757,21 @@ interfaces::msg::SkillCall CerebrumNode::BuildSkillCallForSkill(
auto v = this->get_parameter(p_payload).as_string();
if (!v.empty()) base.payload_yaml = v;
}
base.instance_name = *instance_name;
if (this->has_parameter(p_timeout)) {
auto v = this->get_parameter(p_timeout).as_string();
try {
long parsed = std::stol(v);
if (parsed > 0) base.timeout_ms = static_cast<uint32_t>(parsed);
} catch (...) {
RCLCPP_WARN(this->get_logger(), "[%s/%s] invalid timeout_ms value '%s'", skill_name.c_str(), instance_name->c_str(), v.c_str());
}
}
if (base.instance_name == "s4_arm_grasp_box" || base.instance_name == "s5_arm_pickup_box" ||
base.instance_name == "s16_arm_grasp_box" || base.instance_name == "s17_arm_pickup_box") {
base.timeout_ms = 2000;
}
} catch (const std::exception & e) {
RCLCPP_WARN(this->get_logger(), "[%s/%s] read instance params failed: %s", skill_name.c_str(), instance_name->c_str(), e.what());
}