修改声纹验证失败仍然执行,增加接口解析提示词

This commit is contained in:
lxy
2026-01-21 15:13:31 +08:00
parent dd6ccf77bb
commit ab1fb4f3f8
4 changed files with 262 additions and 36 deletions

View File

@@ -52,13 +52,16 @@ system:
shutup_keywords: "bi zui" # 闭嘴指令关键词(拼音,逗号分隔)
interrupt_command_queue_depth: 10 # 中断命令订阅的队列深度QoS
sv_enabled: true # 是否启用声纹识别
sv_model_path: "~/hivecore_robot_os1/voice_model" # 声纹模型路径
# sv_model_path: "~/ros_learn/speech_campplus_sv_zh-cn_16k-common" # 声纹模型路径
# sv_model_path: "~/hivecore_robot_os1/voice_model" # 声纹模型路径
sv_model_path: "~/ros_learn/speech_campplus_sv_zh-cn_16k-common" # 声纹模型路径
sv_threshold: 0.45 # 声纹识别阈值0.0-1.0,值越小越宽松,值越大越严格)
sv_speaker_db_path: "~/hivecore_robot_os1/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
# sv_speaker_db_path: "~/ros_learn/hivecore_robot_voice/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
# sv_speaker_db_path: "~/hivecore_robot_os1/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
sv_speaker_db_path: "~/ros_learn/hivecore_robot_voice/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
sv_buffer_size: 240000 # 声纹验证录音缓冲区大小样本数48kHz下5秒=240000
camera:
image:
jpeg_quality: 85 # JPEG压缩质量0-10085是质量和大小平衡点
interfaces:
root_path: "~/ros_learn/hivecore_robot_interfaces/src" # 接口文件根目录,支持 ~ 展开和相对路径

View File

@@ -6,6 +6,7 @@ import json
from ament_index_python.packages import get_package_share_directory
from pypinyin import pinyin, Style
from robot_speaker.core.skill_interface_parser import SkillInterfaceParser
@dataclass
@@ -41,21 +42,46 @@ class IntentRouter:
]
self._cached_skill_names: list[str] | None = None
self._cached_kb_data: list[dict] | None = None
interfaces_root = self._get_interfaces_root()
self.interface_parser = SkillInterfaceParser(interfaces_root)
def _get_interfaces_root(self) -> str:
"""从配置文件读取接口文件根目录"""
try:
robot_speaker_share = get_package_share_directory("robot_speaker")
config_path = os.path.join(robot_speaker_share, "config", "voice.yaml")
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
interfaces_config = config.get("interfaces", {})
root_path = interfaces_config.get("root_path", "")
if not root_path:
raise ValueError("interfaces.root_path 未在配置文件中配置")
if root_path.startswith("~"):
root_path = os.path.expanduser(root_path)
if not os.path.isabs(root_path):
config_dir = os.path.dirname(robot_speaker_share)
root_path = os.path.join(config_dir, root_path)
abs_path = os.path.abspath(root_path)
if not os.path.exists(abs_path):
raise ValueError(f"接口文件根目录不存在: {abs_path}")
return abs_path
except Exception as e:
raise ValueError(f"读取接口文件根目录失败: {e}")
def _load_brain_skill_names(self) -> list[str]:
"""加载技能名称(使用接口解析器,避免重复读取)"""
if self._cached_skill_names is not None:
return self._cached_skill_names
skill_names: list[str] = []
try:
brain_share = get_package_share_directory("brain")
skill_path = os.path.join(brain_share, "config", "robot_skills.yaml")
with open(skill_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or []
for entry in data:
if isinstance(entry, dict) and entry.get("name"):
skill_names.append(str(entry["name"]))
except Exception:
skill_names = []
skill_names = self.interface_parser.get_skill_names()
self._cached_skill_names = skill_names
return skill_names
@@ -100,8 +126,6 @@ class IntentRouter:
return "top"
def build_skill_prompt(self, execution_status: Optional[str] = None) -> str:
#-----历史管理修改-----
# 支持传入执行状态,不依赖历史对话
skills = self._load_brain_skill_names()
skills_text = ", ".join(skills) if skills else ""
skill_guard = (
@@ -110,40 +134,47 @@ class IntentRouter:
else "【技能限制】技能列表不可用,请不要输出任何技能名称。"
)
#-----历史管理修改-----
# 根据执行状态构建提示词,不依赖历史对话
execution_hint = ""
if execution_status:
execution_hint = f"【上一轮执行状态】{execution_status}\n请参考上述执行状态,根据成功/失败信息调整本次技能序列。\n"
else:
execution_hint = "【注意】这是首次执行或没有上一轮执行状态,请根据当前图片和用户请求规划技能序列。\n"
skill_params_doc = self.interface_parser.generate_params_documentation()
return (
"你是机器人任务规划器。\n"
"本任务必须拍照。请根据用户请求选择使用哪个相机拍照,并结合当前环境信息生成简洁、可执行的技能序列。\n"
+ execution_hint
+ "\n"
"【规划要求】\n"
"1. body_id规划根据目标物在图片中的方位选择合适的手臂。观察图片中目标物的位置\n"
" - 如果目标物在图片左侧或机器人左侧使用body_id=0左臂\n"
" - 如果目标物在图片右侧或机器人右侧使用body_id=1右臂\n"
" - 如果目标物在图片中央或需要头部操作使用body_id=2头部\n"
" - 如果技能不需要特定身体部位如移动、拍照使用body_id=null\n"
"2. execution规划判断技能之间的执行关系\n"
" - serial串行技能必须按顺序执行前一个完成后再执行下一个。例如先移动再抓取\n"
" - parallel并行技能可以同时执行。例如左右臂同时操作\n"
"3. parameters规划根据目标物距离和任务需求规划具体参数值\n"
" - 对于MoveBase技能根据图片中目标物与机器人的距离估算移动距离。观察图片比例和物体大小合理估算。例如{\"distance\": 1.5, \"direction\": \"forward\"}\n"
" - 对于Arm技能根据目标物位置规划手臂姿态参数。例如{\"pose\": [0.1, 0.2, 0.3], \"speed\": 0.5}\n"
" - 对于其他技能根据任务需求填写相应参数如果不需要参数则使用null\n"
"1. execution规划判断技能之间的执行关系\n"
" - serial串行技能必须按顺序执行前一个完成后再执行下一个\n"
" - parallel并行技能可以同时执行\n"
"2. parameters规划根据目标物距离和任务需求规划具体参数值\n"
" - parameters字典必须包含该技能接口文件目标字段的所有字段\n"
" - 对于包含body_id字段的技能如Armbody_id值根据目标物在图片中的方位选择\n"
" * 目标物在图片左侧或机器人左侧使用body_id=0左臂\n"
" * 目标物在图片右侧或机器人右侧使用body_id=1右臂\n"
" * 目标物在图片中央或需要头部操作使用body_id=2头部\n"
"\n"
"【输出格式要求】\n"
"必须输出JSON格式包含sequence数组。每个技能对象包含3个一级字段\n"
"1. skill: 技能名称(字符串)\n"
"2. execution: 执行方式serial串行或 parallel并行\n"
"3. parameters: 参数字典包含该技能接口文件目标字段的所有字段并填入合理的预测值。如果技能无参数使用null。\n"
"\n"
"注意一级字段skill, execution, parameters是固定结构直接使用即可不需要预测。\n"
"\n"
"【技能参数说明】\n"
+ skill_params_doc +
"\n"
"【输出格式要求】必须输出JSON格式包含sequence数组。每个技能包含skill(技能名)、execution(serial/parallel)、body_id(0/1/2/null)、parameters(具体参数字典或null)。\n"
"示例格式:\n"
"{\n"
' "sequence": [\n'
' {"skill": "MoveBase", "execution": "serial", "body_id": null, "parameters": {"distance": 1.5, "direction": "forward"}},\n'
' {"skill": "Arm", "execution": "serial", "body_id": 0, "parameters": {"pose": [0.3, 0.2, 0.1], "speed": 0.5}},\n'
' {"skill": "GripperCmd0", "execution": "parallel", "body_id": 0, "parameters": {"action": "grasp"}}\n'
' {"skill": "MoveWheel", "execution": "serial", "parameters": {"move_distance": 1.5, "move_angle": 0.0}},\n'
' {"skill": "Arm", "execution": "serial", "parameters": {"body_id": 0, "data_type": 1, "data_length": 6, "command_id": 0, "frame_time_stamp": 0, "data_array": [0.1, 0.2, 0.3, 0.0, 0.0, 0.0]}},\n'
' {"skill": "GripperCmd0", "execution": "parallel", "parameters": {"loc": 128, "speed": 100, "torque": 80, "mode": 1}}\n'
" ]\n"
"}\n"
+ skill_guard

View File

@@ -366,7 +366,11 @@ class RobotSpeakerNode(Node):
self._start_sv_recording("继续录音用于声纹验证")
elif state == ConversationState.AUTHORIZED:
self._start_sv_recording("开始录音用于声纹验证")
if self.sv_enabled and self.sv_client:
self._start_sv_recording("开始录音用于声纹验证")
self._change_state(ConversationState.CHECK_VOICE, "检测到新人声,重新验证声纹")
else:
pass
def _on_audio_chunk_for_sv(self, audio_chunk: bytes):
"""录音线程音频chunk回调 - 仅在需要时录音到声纹缓冲区"""
@@ -545,6 +549,9 @@ class RobotSpeakerNode(Node):
if self._check_interrupt_and_cancel_turn():
continue
# 重新获取状态确保使用最新状态状态可能在CHECK_VOICE分支内已改变
current_state = self._get_state()
if current_state == ConversationState.CHECK_VOICE:
processed_text = text
else:

View File

@@ -0,0 +1,185 @@
"""技能接口文件解析器"""
import os
import yaml
import json
from typing import Optional
from ament_index_python.packages import get_package_share_directory
class SkillInterfaceParser:
def __init__(self, interfaces_root: str):
"""初始化解析器"""
self.interfaces_root = interfaces_root
self._cached_skill_config: list[dict] | None = None
self._cached_skill_interfaces: dict[str, dict] | None = None
def get_skill_names(self) -> list[str]:
"""获取所有技能名称(统一读取 robot_skills.yaml避免重复"""
skill_config = self._load_skill_config()
return [entry["name"] for entry in skill_config if isinstance(entry, dict) and entry.get("name")]
def _load_skill_config(self) -> list[dict]:
"""加载 robot_skills.yaml带缓存避免重复读取"""
if self._cached_skill_config is not None:
return self._cached_skill_config
try:
brain_share = get_package_share_directory("brain")
skill_path = os.path.join(brain_share, "config", "robot_skills.yaml")
with open(skill_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or []
self._cached_skill_config = data if isinstance(data, list) else []
return self._cached_skill_config
except Exception:
self._cached_skill_config = []
return []
def parse_skill_interfaces(self) -> dict[str, dict]:
"""解析所有技能接口文件的目标字段(带缓存)"""
if self._cached_skill_interfaces is not None:
return self._cached_skill_interfaces
result = {}
skill_config = self._load_skill_config()
for skill_entry in skill_config:
skill_name = skill_entry.get("name")
if not skill_name:
continue
interfaces = skill_entry.get("interfaces", [])
for iface in interfaces:
if isinstance(iface, dict):
iface_name = iface.get("name", "")
else:
iface_name = str(iface)
if ".action" in iface_name:
iface_type = "action"
file_path = os.path.join(self.interfaces_root, "action", iface_name)
elif ".srv" in iface_name:
iface_type = "srv"
file_path = os.path.join(self.interfaces_root, "srv", iface_name)
else:
continue
if os.path.exists(file_path):
goal_fields = self._parse_goal_fields(file_path)
result[skill_name] = {
"type": iface_type,
"goal_fields": goal_fields
}
break
self._cached_skill_interfaces = result
return result
def _parse_goal_fields(self, file_path: str) -> list[dict]:
"""解析接口文件的目标字段(第一个---之前的所有字段)"""
goal_fields = []
try:
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if line.startswith("---"):
break
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
field_type = parts[0]
field_name = parts[1]
comment = ""
if "#" in line:
comment = line.split("#", 1)[1].strip()
goal_fields.append({
"name": field_name,
"type": field_type,
"comment": comment
})
except Exception:
return []
return goal_fields
def generate_params_documentation(self) -> str:
"""生成技能参数说明文档"""
skill_interfaces = self.parse_skill_interfaces()
doc_lines = []
for skill_name, skill_info in skill_interfaces.items():
doc_lines.append(f"{skill_name}技能的parameters字段")
goal_fields = skill_info.get("goal_fields", [])
if not goal_fields:
doc_lines.append(" - 无参数,使用 null")
else:
doc_lines.append(" parameters字典必须包含以下字段")
for field in goal_fields:
field_name = field["name"]
field_type = field["type"]
comment = field.get("comment", "")
if field_name == "body_id":
doc_lines.append(
f" - {field_name} ({field_type}): 身体部位ID0=左臂1=右臂2=头部。"
f"根据目标物在图片中的方位选择左侧用0右侧用1中央用2。"
)
else:
type_desc = self._get_type_description(field_type)
doc_lines.append(f" - {field_name} ({field_type}): {type_desc} {comment}")
example_params = {}
for field in goal_fields:
field_name = field["name"]
field_type = field["type"]
example_params[field_name] = self._get_example_value(field_name, field_type)
doc_lines.append(f" 示例:{json.dumps(example_params, ensure_ascii=False)}")
doc_lines.append("")
return "\n".join(doc_lines)
def _get_type_description(self, field_type: str) -> str:
"""根据字段类型返回描述"""
type_map = {
"int8": "整数,范围-128到127",
"int16": "整数,范围-32768到32767",
"int32": "整数",
"int64": "整数",
"uint8": "无符号整数范围0到255",
"float32": "浮点数",
"float64": "浮点数",
"string": "字符串",
}
base_type = field_type.replace("[]", "").replace("_", "")
return type_map.get(base_type, field_type)
def _get_example_value(self, field_name: str, field_type: str) -> any:
"""根据字段名和类型生成示例值"""
if field_name == "body_id":
return 0
elif field_name == "data_array" and "float64[]" in field_type:
return [0.1, 0.2, 0.3, 0.0, 0.0, 0.0]
elif "int" in field_type:
return 0
elif "float" in field_type:
return 0.0
elif "string" in field_type:
return ""
elif "[]" in field_type:
if "int" in field_type:
return [0, 0, 0]
elif "float" in field_type:
return [0.0, 0.0, 0.0]
return []
else:
return None