修复rclpy.spin() 单线程执行器导致异步回调死锁,增加ASR WebSocket 自动重连机制

This commit is contained in:
lxy
2026-01-29 17:24:49 +08:00
parent c65395c50f
commit aaa17c10f2
16 changed files with 948 additions and 807 deletions

View File

@@ -31,6 +31,12 @@ pip3 install -r requirements.txt --break-system-packages
cd ~/ros_learn/hivecore_robot_voice
colcon build
source install/setup.bash
```
```bash
# 终端1: 启动ASR节点
ros2 run robot_speaker asr_audio_node
# 终端2: 注册声纹
ros2 run robot_speaker register_speaker_node
```
@@ -66,6 +72,13 @@ ros2 service call /tts/synthesize robot_speaker/srv/TTSSynthesize \
"{command: 'interrupt', text: '', voice: ''}"
```
5. 完整运行
```bash
# 终端1启动 brain 节点
# 终端2启动 voice 节点
# 终端3启动 bridge 节点
# 终端4订阅相机
```
## 用到的命令
1. 音频设备

File diff suppressed because it is too large Load Diff

View File

@@ -56,20 +56,18 @@ system:
shutup_keywords: "bi zui" # 闭嘴指令关键词(拼音,逗号分隔)
interrupt_command_queue_depth: 10 # 中断命令订阅的队列深度QoS
sv_enabled: true # 是否启用声纹识别
sv_model_path: "~/hivecore_robot_os1/voice_model" # 声纹模型路径
# sv_model_path: "~/ros_learn/speech_campplus_sv_zh-cn_16k-common" # 声纹模型路径
sv_threshold: 0.45 # 声纹识别阈值0.0-1.0,值越小越宽松,值越大越严格)
sv_speaker_db_path: "~/hivecore_robot_os1/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
# sv_speaker_db_path: "~/ros_learn/hivecore_robot_voice/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
sv_buffer_size: 240000 # 声纹验证录音缓冲区大小样本数48kHz下5秒=240000
# sv_model_path: "~/hivecore_robot_os1/voice_model" # 声纹模型路径
sv_model_path: "~/ros_learn/speech_campplus_sv_zh-cn_16k-common" # 声纹模型路径
sv_threshold: 0.65 # 声纹识别阈值0.0-1.0,值越小越宽松,值越大越严格)
# sv_speaker_db_path: "~/hivecore_robot_os1/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
sv_speaker_db_path: "~/ros_learn/hivecore_robot_voice/config/speakers.json" # 声纹数据库保存路径JSON格式相对于ROS2包share目录
sv_buffer_size: 96000 # 声纹验证录音缓冲区大小样本数48kHz下2秒=96000
continue_without_image: true # 多模态意图skill_sequence/chat_camera未获取到图片时是否继续推理
skill_auto_retry: true
skill_max_retries: 5
camera:
image:
jpeg_quality: 85 # JPEG压缩质量0-10085是质量和大小平衡点
interfaces:
root_path: "~/hivecore_robot_os1/hivecore_robot_interfaces/src" # 接口文件根目录,支持 ~ 展开和相对路径
# root_path: "~/ros_learn/hivecore_robot_interfaces/src" # 接口文件根目录,支持 ~ 展开和相对路径
# root_path: "~/hivecore_robot_os1/hivecore_robot_interfaces/src" # 接口文件根目录,支持 ~ 展开和相对路径
root_path: "~/ros_learn/hivecore_robot_interfaces/src" # 接口文件根目录,支持 ~ 展开和相对路径

View File

@@ -1,18 +1,9 @@
dashscope>=1.20.0
openai>=1.0.0
pyaudio>=0.2.11
webrtcvad>=2.0.10
pypinyin>=0.49.0
rclpy>=3.0.0
pyrealsense2>=2.54.0
Pillow>=10.0.0
numpy>=1.24.0,<2.0.0 # cv_bridge需要NumPy 1.xNumPy 2.x会导致段错误
PyYAML>=6.0
aec-audio-processing
modelscope>=1.33.0
funasr>=1.0.0
datasets==3.6.0
websocket-client>=1.6.0

View File

@@ -12,6 +12,13 @@

View File

@@ -75,8 +75,10 @@ class SkillBridgeNode(Node):
self.get_logger().info(f"Sending skill parameters: {skill_params}")
# 将技能名和参数列表分别用单引号包括,并用逗号隔开
names_str = ", ".join([f"'{name}'" for name in skill_names])
params_str = ", ".join([f"'{param}'" for param in skill_params])
# names_str = ", ".join([f"'{name}'" for name in skill_names])
# params_str = ", ".join([f"'{param}'" for param in skill_params])
names_str = ", ".join(skill_names)
params_str = ", ".join(skill_params)
self.rebuild_now("Remote", names_str, params_str)
except Exception as e:

View File

@@ -12,6 +12,13 @@

View File

@@ -121,3 +121,10 @@ class ConversationHistory:

View File

@@ -1,6 +1,3 @@
"""
语音交互节点
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
@@ -43,11 +40,6 @@ class RobotSpeakerNode(Node):
self.interrupt_event = threading.Event()
self.stop_event = threading.Event()
self.current_skill_task = None
self.skill_retry_count = 0
self.waiting_for_skill_result = False
self.skill_task_lock = threading.Lock()
self.conversation_state = ConversationState.IDLE
self.state_lock = threading.Lock()
@@ -67,7 +59,7 @@ class RobotSpeakerNode(Node):
if self.sv_enabled and self.sv_client:
speaker_count = self.sv_client.get_speaker_count()
if speaker_count == 0:
self.get_logger().info("声纹数据库为空,请注册声纹")
self.get_logger().info("[Speaker] 声纹数据库为空,请注册声纹")
self.skill_sequence_pub = self.create_publisher(String, '/llm_skill_sequence', 10)
@@ -78,7 +70,7 @@ class RobotSpeakerNode(Node):
)
self._start_threads()
self.get_logger().info("语音节点已启动")
self.get_logger().info("[Speaker] 语音节点已启动")
def _load_config(self):
config_file = os.path.join(
@@ -136,8 +128,6 @@ class RobotSpeakerNode(Node):
self.sv_speaker_db_path = os.path.expanduser(system['sv_speaker_db_path'])
self.sv_buffer_size = system['sv_buffer_size']
self.continue_without_image = system['continue_without_image']
self.skill_auto_retry = system['skill_auto_retry']
self.skill_max_retries = system['skill_max_retries']
camera = config['camera']
self.camera_image_jpeg_quality = camera['image']['jpeg_quality']
@@ -153,12 +143,12 @@ class RobotSpeakerNode(Node):
self.tts_client = self.create_client(TTSSynthesize, '/tts/synthesize')
self.audio_data_client = self.create_client(AudioData, '/asr/audio_data')
self.get_logger().info("等待service节点启动...")
self.get_logger().info("[Speaker] 等待service节点启动...")
self.vad_client.wait_for_service(timeout_sec=5.0)
self.asr_client.wait_for_service(timeout_sec=5.0)
self.tts_client.wait_for_service(timeout_sec=5.0)
self.audio_data_client.wait_for_service(timeout_sec=5.0)
self.get_logger().info("所有service节点已就绪")
self.get_logger().info("[Speaker] 所有service节点已就绪")
self.llm_client = OpenAI(api_key=self.dashscope_api_key, base_url=self.llm_base_url)
self.history = ConversationHistory(
@@ -185,7 +175,7 @@ class RobotSpeakerNode(Node):
logger=self.get_logger()
)
except Exception as e:
self.get_logger().warning(f"声纹识别初始化失败: {e},声纹功能将不可用")
self.get_logger().warning(f"[Speaker] 声纹识别初始化失败: {e},声纹功能将不可用")
self.sv_client = None
self.sv_enabled = False
else:
@@ -227,10 +217,10 @@ class RobotSpeakerNode(Node):
with self.state_lock:
old_state = self.conversation_state
self.conversation_state = new_state
self.get_logger().info(f"[状态机] {old_state.value} -> {new_state.value}: {reason}")
self.get_logger().info(f"[Speaker-State] {old_state.value} -> {new_state.value}: {reason}")
def _on_speech_started(self):
self.get_logger().info("[VAD事件] 检测到人声开始")
self.get_logger().info("[Speaker-VAD] 检测到人声开始")
with self.state_lock:
state = self.conversation_state
if state == ConversationState.AUTHORIZED:
@@ -247,10 +237,13 @@ class RobotSpeakerNode(Node):
self._start_sv_recording()
def _on_speech_stopped(self):
self.get_logger().info("[VAD事件] 检测到说话结束")
import threading
self.get_logger().debug(f"[Speaker-VAD] speech_stopped 被调用 | 线程:{threading.current_thread().name} | 当前状态:{self.conversation_state.value}")
with self.state_lock:
state = self.conversation_state
self.get_logger().debug(f"[Speaker-VAD] 准备停止声纹录音 | sv_enabled:{self.sv_enabled} | state:{state}")
if self.sv_enabled and state in [ConversationState.CHECK_VOICE, ConversationState.AUTHORIZED]:
self.sv_speech_end_event.clear()
self._stop_sv_recording()
self._call_asr_service()
@@ -263,68 +256,82 @@ class RobotSpeakerNode(Node):
self.audio_data_client.call_async(request)
def _stop_sv_recording(self):
import threading
self.get_logger().debug(f"[Speaker-SV] _stop_sv_recording 开始 | 线程:{threading.current_thread().name} | 时间:{time.time()}")
request = AudioData.Request()
request.command = "stop"
future = self.audio_data_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=2.0)
future.add_done_callback(self._on_sv_audio_ready)
self.get_logger().debug(f"[Speaker-SV] _stop_sv_recording 已发送异步请求 | future_id:{id(future)}")
def _on_sv_audio_ready(self, future):
import threading
self.get_logger().debug(f"[Speaker-SV] _on_sv_audio_ready 回调触发 | 线程:{threading.current_thread().name} | future_id:{id(future)} | 时间:{time.time()}")
try:
response = future.result()
self.get_logger().debug(f"[Speaker-SV] 收到响应 | success:{response.success} | samples:{response.samples}")
if response.success and response.samples > 0:
audio_array = np.frombuffer(
response.audio_data, dtype=np.int16
)
audio_array = np.frombuffer(response.audio_data, dtype=np.int16)
with self.sv_lock:
self.get_logger().debug(f"[Speaker-SV] 准备写入buffer | 旧大小:{len(self.sv_audio_buffer)} | 新数据:{len(audio_array)}")
self.sv_audio_buffer.clear()
self.sv_audio_buffer.extend(audio_array)
self.get_logger().info(
f"[声纹] 获取音频数据: {response.samples}样本 "
f"({response.samples/response.sample_rate:.2f}秒)"
)
self.get_logger().debug(f"[Speaker-SV] buffer已更新 | 新大小:{len(self.sv_audio_buffer)}")
self.get_logger().debug(f"[Speaker-SV] 准备设置 sv_speech_end_event")
self.sv_speech_end_event.set()
except Exception as e:
self.get_logger().error(f"[声纹] 获取音频数据失败: {e}")
self.get_logger().error(f"[Speaker-SV] _on_sv_audio_ready 异常 | 错误:{e} | 类型:{type(e).__name__}")
def _call_asr_service(self):
self.get_logger().info("[主节点] 调用ASR服务获取识别结果")
self.get_logger().info("[Speaker] 调用ASR服务获取识别结果")
request = ASRRecognize.Request()
request.command = "start"
future = self.asr_client.call_async(request)
future.add_done_callback(self._asr_service_callback)
def _asr_service_callback(self, future):
import threading
self.get_logger().debug(f"[Speaker-ASR] ASR回调触发 | 线程:{threading.current_thread().name} | 时间:{time.time()}")
try:
response = future.result()
self.get_logger().debug(f"[Speaker-ASR] 收到响应 | success:{response.success} | text:{response.text if response.success else 'N/A'}")
if response.success and response.text:
self.text_queue.put(response.text)
self.get_logger().debug(f"[Speaker-ASR] 文本已放入队列 | queue_size:{self.text_queue.qsize()}")
else:
self.get_logger().warn(f"[ASR服务回调] 识别失败或为空: success={response.success}, message={response.message}")
self.get_logger().warn(f"[Speaker-ASR] 识别失败或为空: success={response.success}, message={response.message}")
except Exception as e:
self.get_logger().error(f"ASR服务调用失败: {e}")
self.get_logger().error(f"[Speaker-ASR] 异常 | 错误:{e} | 类型:{type(e).__name__}")
def _vad_event_worker(self):
self.get_logger().info("[VAD事件线程] 启动")
import threading
self.get_logger().info(f"[Speaker-VAD] 启动 | 线程ID:{threading.current_thread().ident}")
while not self.stop_event.is_set():
request = VADEvent.Request()
request.command = "wait"
request.timeout_ms = 500
future = self.vad_client.call_async(request)
rclpy.spin_until_future_complete(
self, future, timeout_sec=1.5
)
try:
response = future.result()
if response.success and response.event != "none":
self.get_logger().info(f"[VAD事件线程] 收到VAD事件: {response.event}")
if response.event == "speech_started":
self._on_speech_started()
elif response.event == "speech_stopped":
self._on_speech_stopped()
except Exception as e:
self.get_logger().debug(f"[VAD事件线程] 等待事件超时或错误: {e}")
future.add_done_callback(self._on_vad_event_response)
time.sleep(0.05)
def _on_vad_event_response(self, future):
import threading
self.get_logger().debug(f"[Speaker-VAD] 回调触发 | 线程:{threading.current_thread().name}")
try:
response = future.result()
if not response.success or response.event == "none":
return
self.get_logger().debug(f"[Speaker-VAD] 收到事件 | event:{response.event} | 线程:{threading.current_thread().name} | 时间:{time.time()}")
if response.event == "speech_started":
self._on_speech_started()
elif response.event == "speech_stopped":
self._on_speech_stopped()
except Exception as e:
self.get_logger().error(f"[Speaker-VAD] 异常 | 错误:{e} | 类型:{type(e).__name__}")
def _process_worker(self):
"""获取文本 → 状态转换 → 唤醒词处理 → 闭嘴指令检查 → 意图路由 → 处理请求"""
self.get_logger().info("[主线程] 启动")
self.get_logger().info("[Speaker] 主线程启动")
while not self.stop_event.is_set():
try:
text = self.text_queue.get(timeout=0.1)
@@ -346,7 +353,7 @@ class RobotSpeakerNode(Node):
self._handle_shutup_command()
continue
intent_result = self.intent_router.route(processed_text)
self.get_logger().info(f"[意图路由] intent={intent_result.intent}, need_camera={intent_result.need_camera}, camera_mode={intent_result.camera_mode}")
self.get_logger().info(f"[Speaker-Intent] intent={intent_result.intent}, need_camera={intent_result.need_camera}, camera_mode={intent_result.camera_mode}")
if intent_result.intent == "kb_qa":
self.interrupt_event.clear()
if self._handle_kb_qa(processed_text):
@@ -391,21 +398,29 @@ class RobotSpeakerNode(Node):
return self.conversation_state
def _handle_speaker_verification(self) -> bool:
import threading
self.get_logger().debug(f"[Speaker-SV] 开始声纹验证 | 线程:{threading.current_thread().name} | result_ready:{self.sv_result_ready_event.is_set()}")
if self.sv_result_ready_event.is_set():
self.get_logger().debug(f"[Speaker-SV] 结果已ready跳过等待")
pass
elif not self.sv_speech_end_event.wait(timeout=0.5):
elif not self.sv_speech_end_event.wait(timeout=2.0):
self.get_logger().warn(f"[Speaker-SV] speech_end_event 等待超时")
self._change_state(ConversationState.IDLE, "没有录音数据,无法验证")
return False
if not self.sv_result_ready_event.wait(timeout=2.0):
self.get_logger().debug(f"[Speaker-SV] speech_end_event 已触发等待result_ready_event")
if not self.sv_result_ready_event.wait(timeout=3.0):
self.get_logger().warn(f"[Speaker-SV] result_ready_event 等待超时")
with self.sv_lock:
self.sv_audio_buffer.clear()
self._change_state(ConversationState.IDLE, "声纹结果未ready")
return False
return False
self.get_logger().debug(f"[Speaker-SV] result_ready_event 已触发,读取结果")
self.sv_result_ready_event.clear()
with self.sv_lock:
speaker_id = self.current_speaker_id
speaker_state = self.current_speaker_state
score = self.current_speaker_score
self.get_logger().debug(f"[Speaker-SV] 验证结果 | speaker_id:{speaker_id} | state:{speaker_state.value} | score:{score:.4f}")
if not (speaker_id and speaker_state == SpeakerState.VERIFIED):
if self.sv_client.get_speaker_count() == 0:
self._change_state(ConversationState.IDLE, "声纹数据库为空")
@@ -439,23 +454,14 @@ class RobotSpeakerNode(Node):
self._put_tts_text(processed_text)
return
if is_skill_sequence:
self.get_logger().info(f"[Speaker-Skill] 任务: {processed_text}")
with self.execution_status_lock:
execution_status = self.last_execution_status
with self.skill_task_lock:
retry_count = self.skill_retry_count
current_task = self.current_skill_task
execution_attempt = retry_count + 1
if retry_count > 0:
system_prompt_with_status = self.intent_router.build_skill_prompt(execution_status)
self.get_logger().info(
f"[技能执行] 第{execution_attempt}次执行(第{retry_count}次重试),"
f"任务: {current_task},使用失败反馈调整策略"
)
else:
system_prompt_with_status = self.intent_router.build_skill_prompt(None)
self.get_logger().info(f"[技能执行] 第1次执行任务: {processed_text},不使用历史状态")
last_status = self.last_execution_status
self.get_logger().debug(f"[Speaker-Skill] 读取执行状态 | 线程:{threading.current_thread().name} | 时间:{time.time()} | 状态:{last_status}")
system_prompt_with_status = self.intent_router.build_skill_prompt(execution_status=last_status)
else:
system_prompt_with_status = intent_result.system_prompt
self.get_logger().debug(f"[Speaker-LLM] intent={intent_result.intent} | system_prompt前100字符: {system_prompt_with_status[:100] if system_prompt_with_status else 'None'}")
reply = self._llm_process_stream_with_camera(
intent_result.text,
intent_result.need_camera,
@@ -473,25 +479,20 @@ class RobotSpeakerNode(Node):
msg = String()
msg.data = reply.strip()
self.skill_sequence_pub.publish(msg)
with self.skill_task_lock:
if self.current_skill_task is None:
self.current_skill_task = processed_text
self.skill_retry_count = 0
self.get_logger().info(f"[技能任务] 开始新任务: {processed_text}")
self.waiting_for_skill_result = True
self.get_logger().info(f"[Speaker-Skill] 开始新任务: {processed_text}")
def _check_shutup_command(self, text: str) -> bool:
text_pinyin = self.intent_router.to_pinyin(text).lower().strip()
for keyword_pinyin in self.shutup_keywords:
keyword_pinyin_clean = keyword_pinyin.lower().strip()
if keyword_pinyin_clean in text_pinyin:
self.get_logger().info(f"[闭嘴指令] 匹配到关键词: {keyword_pinyin} (文本拼音: {text_pinyin})")
self.get_logger().info(f"[Speaker-Intent] 闭嘴指令匹配到关键词: {keyword_pinyin} (文本拼音: {text_pinyin})")
return True
return False
def _interrupt_tts(self):
self.interrupt_event.set()
while True:
while not self.tts_queue.empty():
try:
self.tts_queue.get_nowait()
except queue.Empty:
@@ -501,13 +502,7 @@ class RobotSpeakerNode(Node):
request.text = ""
request.voice = ""
future = self.tts_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=1.0)
try:
response = future.result()
if response.success:
self.get_logger().info("[中断] 已通过Service中断TTS播放")
except Exception as e:
self.get_logger().warn(f"[中断] 中断TTS Service调用失败: {e}")
future.add_done_callback(lambda f: self.get_logger().info("[Speaker-TTS] interrupt sent"))
def _on_skill_result_received(self, msg: String):
try:
@@ -524,37 +519,9 @@ class RobotSpeakerNode(Node):
status_text += f", 总技能数: {total_skills}, 成功: {succeeded_skills}, 失败: {total_skills - succeeded_skills}"
with self.execution_status_lock:
self.last_execution_status = status_text
self.get_logger().info(f"[执行状态] 已更新: {status_text}")
if self.skill_auto_retry:
with self.skill_task_lock:
if self.waiting_for_skill_result and self.current_skill_task:
current_attempt = self.skill_retry_count + 1
if not success and self.skill_retry_count < self.skill_max_retries:
self.skill_retry_count += 1
next_attempt = self.skill_retry_count + 1
self.get_logger().info(
f"[技能重试] 第{current_attempt}次执行失败,"
f"准备第{next_attempt}次执行(第{self.skill_retry_count}次重试),"
f"任务: {self.current_skill_task}"
)
self.text_queue.put(self.current_skill_task)
else:
if success:
self.get_logger().info(
f"[技能任务] 第{current_attempt}次执行成功,任务完成: {self.current_skill_task}"
)
else:
self.get_logger().warning(
f"[技能任务] 第{current_attempt}次执行失败,"
f"已达到最大重试次数({self.skill_max_retries}),放弃任务: {self.current_skill_task}"
)
self.current_skill_task = None
self.skill_retry_count = 0
self.skill_retry_count = 0
with self.execution_status_lock:
self.last_execution_status = None
self.get_logger().info(f"[Speaker-Skill] 执行状态已更新: {status_text}")
except Exception as e:
self.get_logger().warning(f"解析执行结果失败: {e}")
self.get_logger().warning(f"[Speaker-Skill] 解析执行结果失败: {e}")
def _capture_image_from_img_dev(self, camera_mode: Optional[str] = None) -> Optional[np.ndarray]:
@@ -565,17 +532,17 @@ class RobotSpeakerNode(Node):
if camera_mode and camera_mode in self.img_msg_cache:
msg = self.img_msg_cache[camera_mode]
cv_image = self.cv_bridge.imgmsg_to_cv2(msg.image_color, desired_encoding='rgb8')
self.get_logger().info(f"[相机] 使用{camera_mode}相机获取图像成功 (position={msg.position})")
self.get_logger().info(f"[Speaker-Camera] 使用{camera_mode}相机获取图像成功 (position={msg.position})")
return cv_image
elif camera_mode is None and len(self.img_msg_cache) > 0:
msg = next(iter(self.img_msg_cache.values()))
cv_image = self.cv_bridge.imgmsg_to_cv2(msg.image_color, desired_encoding='rgb8')
self.get_logger().info(f"[相机] 未指定相机位置,使用{msg.position}相机获取图像成功")
self.get_logger().info(f"[Speaker-Camera] 未指定相机位置,使用{msg.position}相机获取图像成功")
return cv_image
time.sleep(0.1)
with self.img_msg_lock:
available_positions = list(self.img_msg_cache.keys()) if self.img_msg_cache else []
self.get_logger().warning(f"[相机] 等待图像超时 (期望位置={camera_mode}, 可用位置={available_positions})")
self.get_logger().warning(f"[Speaker-Camera] 等待图像超时 (期望位置={camera_mode}, 可用位置={available_positions})")
return None
def _encode_image_to_base64(self, image_data: np.ndarray, quality: int = 85) -> str:
@@ -589,7 +556,7 @@ class RobotSpeakerNode(Node):
image_bytes = buffer.getvalue()
return base64.b64encode(image_bytes).decode('utf-8')
except Exception as e:
self.get_logger().error(f"图像编码失败: {e}")
self.get_logger().error(f"[Speaker-Camera] 图像编码失败: {e}")
return ""
def _llm_process_stream_with_camera(self, user_text: str, need_camera: bool, camera_mode: Optional[str] = None, system_prompt: Optional[str] = None, intent: str = "chat_text") -> str:
@@ -614,7 +581,7 @@ class RobotSpeakerNode(Node):
if image_base64:
image_base64_list.append(image_base64)
if not image_base64_list and not self.continue_without_image:
self.get_logger().warning(f"[LLM] 需要相机但未获取到图片,且配置为不继续推理,放弃请求")
self.get_logger().warning(f"[Speaker-LLM] 需要相机但未获取到图片,且配置为不继续推理,放弃请求")
return ""
if image_base64_list:
content_list = [{"type": "text", "text": user_text}]
@@ -649,19 +616,19 @@ class RobotSpeakerNode(Node):
content = chunk.choices[0].delta.content
full_reply += content
except Exception as e:
self.get_logger().error(f"[LLM] 调用失败: {e}")
self.get_logger().error(f"[Speaker-LLM] 调用失败: {e}")
return ""
if interrupted:
self.get_logger().info("[LLM] 流式处理被中断")
self.get_logger().info("[Speaker-LLM] 流式处理被中断")
return ""
reply = full_reply.strip() if full_reply else ""
self.get_logger().info(f"[LLM] 生成回复: {reply}")
self.get_logger().info(f"[Speaker-LLM] 生成回复: {reply}")
if reply and intent != "skill_sequence" and not self.interrupt_event.is_set():
self._put_tts_text(reply)
return reply
def _tts_worker(self):
self.get_logger().info("[TTS播放线程] 启动")
self.get_logger().info("[Speaker-TTS] TTS播放线程启动")
while not self.stop_event.is_set():
try:
text = self.tts_queue.get(timeout=0.5)
@@ -677,28 +644,26 @@ class RobotSpeakerNode(Node):
request.text = text_str
request.voice = ""
future = self.tts_client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=30.0)
try:
response = future.result()
if not response.success:
self.get_logger().warn(f"[TTS] 播放失败: {response.message}")
except Exception as e:
self.get_logger().error(f"[TTS] 服务调用失败: {e}")
if self.interrupt_event.is_set():
while True:
try:
self.tts_queue.get_nowait()
except queue.Empty:
break
self.interrupt_event.clear()
future.add_done_callback(self._on_tts_done)
def _on_tts_done(self, future):
try:
response = future.result()
if not response.success:
self.get_logger().warn(f"[Speaker-TTS] 播放失败: {response.message}")
except Exception as e:
self.get_logger().error(f"[Speaker-TTS] error: {e}")
def _sv_worker(self):
self.get_logger().info("[声纹识别线程] 启动")
self.get_logger().info("[Speaker-SV] 启动")
min_audio_samples = int(self.sample_rate * 0.5)
while not self.stop_event.is_set():
try:
self.get_logger().debug(f"[Speaker-SV] 等待 sv_speech_end_event...")
if not self.sv_speech_end_event.wait(timeout=0.1):
continue
self.get_logger().debug(f"[Speaker-SV] sv_speech_end_event 触发 | 时间:{time.time()}")
self.sv_speech_end_event.clear()
if not (self.sv_enabled and self.sv_client):
continue
@@ -711,15 +676,16 @@ class RobotSpeakerNode(Node):
self.current_speaker_score = 0.0
self.current_speaker_threshold = self.sv_client.threshold
self.sv_result_ready_event.set()
self.get_logger().info("[声纹识别] 数据库为空跳过验证直接设置UNKNOWN状态")
self.get_logger().info("[Speaker-SV] 数据库为空跳过验证直接设置UNKNOWN状态")
continue
with self.sv_lock:
audio_list = list(self.sv_audio_buffer)
buffer_size = len(audio_list)
self.get_logger().debug(f"[Speaker-SV] 读取buffer | 大小:{buffer_size} | 时间:{time.time()}")
self.sv_audio_buffer.clear()
self.get_logger().info(f"[声纹识别] 收到speech_end事件录音长度: {buffer_size} 样本({buffer_size/self.sample_rate:.2f}秒)")
self.get_logger().info(f"[Speaker-SV] 收到speech_end事件录音长度: {buffer_size} 样本({buffer_size/self.sample_rate:.2f}秒)")
if buffer_size < min_audio_samples:
self.get_logger().debug(f"[声纹识别] 录音太短: {buffer_size} < {min_audio_samples},跳过处理")
self.get_logger().debug(f"[Speaker-SV] 录音太短: {buffer_size} < {min_audio_samples},跳过处理")
with self.sv_lock:
self.current_speaker_id = None
self.current_speaker_state = SpeakerState.UNKNOWN
@@ -728,12 +694,13 @@ class RobotSpeakerNode(Node):
self.sv_result_ready_event.set()
continue
audio_array = np.array(audio_list, dtype=np.int16)
embedding, success = self.sv_client.extract_embedding(
audio_array,
sample_rate=self.sample_rate
)
if not success or embedding is None:
self.get_logger().debug("[声纹识别] 提取embedding失败")
self.get_logger().debug("[Speaker-SV] 提取embedding失败")
with self.sv_lock:
self.current_speaker_id = None
self.current_speaker_state = SpeakerState.ERROR
@@ -748,21 +715,21 @@ class RobotSpeakerNode(Node):
self.current_speaker_score = score
self.current_speaker_threshold = threshold
if match_state == SpeakerState.VERIFIED:
self.get_logger().info(f"[声纹识别] 识别到说话人: {speaker_id}, 相似度: {score:.4f}, 阈值: {threshold:.4f}")
self.get_logger().info(f"[Speaker-SV] 识别到说话人: {speaker_id}, 相似度: {score:.4f}, 阈值: {threshold:.4f}")
elif match_state == SpeakerState.REJECTED:
self.get_logger().info(f"[声纹识别] 未匹配到已知说话人(相似度不足), 相似度: {score:.4f}, 阈值: {threshold:.4f}")
self.get_logger().info(f"[Speaker-SV] 未匹配到已知说话人(相似度不足), 相似度: {score:.4f}, 阈值: {threshold:.4f}")
else:
self.get_logger().info(f"[声纹识别] 状态: {match_state.value}, 相似度: {score:.4f}, 阈值: {threshold:.4f}")
self.get_logger().info(f"[Speaker-SV] 状态: {match_state.value}, 相似度: {score:.4f}, 阈值: {threshold:.4f}")
self.sv_result_ready_event.set()
except Exception as e:
self.get_logger().error(f"[声纹识别线程] 错误: {e}")
self.get_logger().error(f"[Speaker-SV] 错误: {e}")
time.sleep(0.1)
def _put_tts_text(self, text: str):
try:
self.tts_queue.put(text, timeout=0.2)
except queue.Full:
self.get_logger().warning(f"[TTS] 队列已满,无法发送文本: {text[:50]}")
self.get_logger().warning(f"[Speaker-TTS] 队列已满,无法发送文本: {text[:50]}")
def _handle_wake_word(self, text: str, current_state: ConversationState = None) -> str:
"""处理唤醒词CHECK_VOICE状态下只检查存在性AUTHORIZED状态下移除唤醒词"""
@@ -795,10 +762,10 @@ class RobotSpeakerNode(Node):
return new_text.strip()
def destroy_node(self):
self.get_logger().info("语音节点正在关闭...")
self.get_logger().info("[Speaker] 语音节点正在关闭...")
self.stop_event.set()
self.interrupt_event.set()
self.get_logger().info("强制停止TTS播放...")
self.get_logger().info("[Speaker] 强制停止TTS播放...")
self._interrupt_tts()
threads_to_join = [self.vad_thread, self.process_thread, self.tts_thread]
if self.sv_thread:
@@ -812,16 +779,23 @@ class RobotSpeakerNode(Node):
self.sv_client.save_speakers()
self.sv_client.cleanup()
except Exception as e:
self.get_logger().warning(f"清理声纹识别资源时出错: {e}")
self.get_logger().warning(f"[Speaker] 清理声纹识别资源时出错: {e}")
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = RobotSpeakerNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
from rclpy.executors import MultiThreadedExecutor
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
node.get_logger().info("[Speaker] 收到中断信号,正在关闭节点")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':

View File

@@ -28,9 +28,9 @@ class SpeakerVerificationClient:
self.speaker_db = {} # {speaker_id: {"embedding": np.ndarray, "env": str, "registered_at": float}}
self._lock = threading.Lock()
# 优化CPU性能限制Torch使用的线程数防止多线程竞争导致性能骤降
import torch
torch.set_num_threads(1)
# # 优化CPU性能限制Torch使用的线程数防止多线程竞争导致性能骤降
# import torch
# torch.set_num_threads(1)
from funasr import AutoModel
model_path = os.path.expanduser(self.model_path)
@@ -196,3 +196,4 @@ class SpeakerVerificationClient:
del self.model
except Exception as e:
self._log("error", f"清理资源失败: {e}")

View File

@@ -13,3 +13,10 @@ Service节点模块

View File

@@ -35,21 +35,21 @@ class AudioRecorder:
if 'iFLYTEK' in device_info['name'] and device_info['maxInputChannels'] > 0:
self.device_index = i
if self.logger:
self.logger.info(f"已自动定位到麦克风设备: {device_info['name']} (Index: {i})")
self.logger.info(f"[ASR-Recorder] 已自动定位到麦克风设备: {device_info['name']} (Index: {i})")
break
except Exception as e:
if self.logger:
self.logger.error(f"设备自动检测过程出错: {e}")
self.logger.error(f"[ASR-Recorder] 设备自动检测过程出错: {e}")
if self.device_index == original_index and original_index == -1:
self.device_index = 0
if self.logger:
self.logger.info("未找到 iFLYTEK 设备,使用系统默认输入设备")
self.logger.info("[ASR-Recorder] 未找到 iFLYTEK 设备,使用系统默认输入设备")
self.format = pyaudio.paInt16
def record(self):
if self.logger:
self.logger.info(f"录音线程启动,设备索引: {self.device_index}")
self.logger.info(f"[ASR-Recorder] 录音线程启动,设备索引: {self.device_index}")
stream = None
try:
stream = self.audio.open(
@@ -61,10 +61,10 @@ class AudioRecorder:
frames_per_buffer=self.chunk
)
if self.logger:
self.logger.info("音频输入设备已打开")
self.logger.info("[ASR-Recorder] 音频输入设备已打开")
except Exception as e:
if self.logger:
self.logger.error(f"无法打开音频输入设备: {e}")
self.logger.error(f"[ASR-Recorder] 无法打开音频输入设备: {e}")
return
try:
while not self.stop_event.is_set():
@@ -76,11 +76,11 @@ class AudioRecorder:
self.audio_queue.put_nowait(data)
except OSError as e:
if self.logger:
self.logger.debug(f"录音设备错误: {e}")
self.logger.debug(f"[ASR-Recorder] 录音设备错误: {e}")
break
except KeyboardInterrupt:
if self.logger:
self.logger.info("录音线程收到中断信号")
self.logger.info("[ASR-Recorder] 录音线程收到中断信号")
finally:
if stream is not None:
try:
@@ -90,7 +90,7 @@ class AudioRecorder:
except Exception as e:
pass
if self.logger:
self.logger.info("录音线程已退出")
self.logger.info("[ASR-Recorder] 录音线程已退出")
class DashScopeASR:
@@ -110,6 +110,20 @@ class DashScopeASR:
self._stop_lock = threading.Lock()
self._final_result_event = threading.Event()
self._pending_commit = False
# ========== 连接生命周期管理: 解决 DashScope ASR WebSocket 连接超时导致的识别不稳定 ==========
self._connection_start_time = None # 连接创建时间
self._last_audio_time = None # 最后一次发送音频的时间
self._recognition_count = 0 # 识别次数计数
self._audio_send_count = 0 # 音频发送次数计数
self._last_audio_send_success = True # 最后一次音频发送是否成功
self._consecutive_send_failures = 0 # 连续发送失败次数
# 配置参数
self.MAX_CONNECTION_AGE = 300 # 连接最大存活时间5分钟
self.MAX_IDLE_TIME = 180 # 最大空闲时间3分钟
self.MAX_RECOGNITIONS = 30 # 最大识别次数30次后重建连接
self.MAX_CONSECUTIVE_FAILURES = 3 # 最大连续失败次数
def _log(self, level: str, msg: str):
if not self.logger:
@@ -125,6 +139,37 @@ class DashScopeASR:
self.logger.info(msg)
except Exception:
pass
def _should_reconnect(self) -> tuple[bool, str]:
if not self.running or not self.conversation:
return False, ""
current_time = time.time()
# 检查1连接时间
if self._connection_start_time:
connection_age = current_time - self._connection_start_time
if connection_age > self.MAX_CONNECTION_AGE:
return True, f"连接已存活{connection_age:.0f}秒,超过{self.MAX_CONNECTION_AGE}秒阈值"
# 检查2空闲时间
if self._last_audio_time:
idle_time = current_time - self._last_audio_time
if idle_time > self.MAX_IDLE_TIME:
return True, f"连接已空闲{idle_time:.0f}秒,超过{self.MAX_IDLE_TIME}秒阈值"
# 检查3识别次数
if self._recognition_count >= self.MAX_RECOGNITIONS:
return True, f"已完成{self._recognition_count}次识别,达到重连阈值"
# 检查4连续发送失败
if self._consecutive_send_failures >= self.MAX_CONSECUTIVE_FAILURES:
return True, f"连续{self._consecutive_send_failures}次音频发送失败"
return False, ""
def _reset_connection_stats(self):
self._connection_start_time = time.time()
self._last_audio_time = time.time()
self._recognition_count = 0
self._audio_send_count = 0
self._last_audio_send_success = True
self._consecutive_send_failures = 0
def start(self):
if self.running:
@@ -153,16 +198,18 @@ class DashScopeASR:
transcription_params=transcription_params,
enable_turn_detection=True,
turn_detection_type='server_vad',
prefix_padding_ms=1000,
turn_detection_threshold=0.2,
turn_detection_silence_duration_ms=800
turn_detection_silence_duration_ms=800,
)
self.running = True
self._log("info", "ASR已启动")
self._reset_connection_stats()
self._log("info", f"[ASR] 已启动 | 连接ID:{id(self.conversation)}")
return True
except Exception as e:
self.running = False
self._log("error", f"ASR启动失败: {e}")
self._log("error", f"[ASR] 启动失败: {e}")
if self.conversation:
try:
self.conversation.close()
@@ -172,47 +219,105 @@ class DashScopeASR:
return False
def send_audio(self, audio_chunk: bytes):
should_reconnect, reason = self._should_reconnect()
if should_reconnect:
self._log("warning", f"[ASR] 检测到需要重连: {reason}")
self.running = False
try:
if self.conversation:
self.conversation.close()
except:
pass
self.conversation = None
time.sleep(1.0)
if not self.start():
self._log("error", "[ASR] 自动重连失败")
return False
self._log("info", "[ASR] 自动重连成功")
import threading
self._log("debug", f"[ASR] send_audio 被调用 | 线程:{threading.current_thread().name} | running:{self.running} | conversation:{self.conversation is not None}")
if not self.running or not self.conversation:
self._log("debug", f"[ASR] send_audio 跳过 | running:{self.running} | conversation:{self.conversation is not None}")
return False
try:
audio_b64 = base64.b64encode(audio_chunk).decode('ascii')
self.conversation.append_audio(audio_b64)
self._last_audio_time = time.time()
self._audio_send_count += 1
self._last_audio_send_success = True
self._consecutive_send_failures = 0
self._log("debug", f"[ASR] 音频发送成功 | 总计:{self._audio_send_count} | 连接年龄:{time.time() - self._connection_start_time:.1f}")
return True
except Exception:
except Exception as e:
self._last_audio_send_success = False
self._consecutive_send_failures += 1
error_msg = str(e)
error_type = type(e).__name__
if "Connection is already closed" in error_msg or "WebSocketConnectionClosedException" in error_type or "ConnectionClosed" in error_type or "websocket" in error_msg.lower():
self._log("warning", f"[ASR] WebSocket 连接已断开 | 错误:{error_msg} | 连续失败:{self._consecutive_send_failures}")
self.running = False
try:
if self.conversation:
self.conversation.close()
except:
pass
self.conversation = None
else:
self._log("error", f"[ASR] send_audio 异常 | 错误:{error_msg} | 类型:{error_type} | 连续失败:{self._consecutive_send_failures}")
return False
def stop_current_recognition(self):
import threading
self._log("debug", f"[ASR] stop_current_recognition 被调用 | 线程:{threading.current_thread().name} | running:{self.running}")
if not self._stop_lock.acquire(blocking=False):
self._log("warning", "stop_current_recognition 正在执行,跳过本次调用")
self._log("debug", f"[ASR] 锁获取失败,有其他线程正在执行 stop_current_recognition")
return False
self._final_result_event.clear()
self._pending_commit = True
try:
self._log("debug", f"[ASR] 获得锁,开始停止识别 | conversation:{self.conversation is not None}")
if not self.running or not self.conversation:
self._log("debug", f"[ASR] 无法停止 | running:{self.running} | conversation:{self.conversation is not None}")
return False
self._recognition_count += 1
should_reconnect, reason = self._should_reconnect()
if should_reconnect:
self._log("info", f"[ASR] 识别完成后检测到需要重连: {reason}")
self._final_result_event.clear()
self._pending_commit = True
self.conversation.commit()
self._final_result_event.wait(timeout=3.0)
try:
self.conversation.commit()
self._final_result_event.wait(timeout=3.0)
except Exception as e:
self._log("debug", f"[ASR] commit 异常: {e}")
self._log("debug", f"[ASR] 准备关闭旧连接 | conversation_id:{id(self.conversation)}")
self.running = False
old_conversation = self.conversation
self.conversation = None
self._log("debug", f"[ASR] conversation已设为None准备关闭旧连接")
try:
old_conversation.close()
except Exception:
pass
time.sleep(0.1)
if not self.start():
self._log("error", "ASR重启失败")
return False
self._log("debug", f"[ASR] 旧连接已关闭")
except Exception as e:
self._log("warning", f"[ASR] 关闭连接异常: {e}")
self._log("debug", f"[ASR] 连接已关闭,等待下次语音活动时重连")
return True
finally:
self._pending_commit = False
self._stop_lock.release()
self._log("debug", f"[ASR] stop_current_recognition 完成,锁已释放")
def stop(self):
with self._stop_lock:
@@ -224,6 +329,7 @@ class DashScopeASR:
except Exception:
pass
self.conversation = None
self._log("info", "[ASR] 已完全停止")
class _ASRCallback(OmniRealtimeCallback):
@@ -285,6 +391,11 @@ class ASRAudioNode(Node):
self.audio_buffer = collections.deque(maxlen=240000)
self.audio_recording = False
self.audio_lock = threading.Lock()
# ========== 异常识别检测 ==========
self._abnormal_results = ["嗯。", "", "啊。", "哦。"] # 异常识别结果列表
self._consecutive_abnormal_count = 0 # 连续异常识别次数
self.MAX_CONSECUTIVE_ABNORMAL = 2 # 最大连续异常次数
self.recording_thread = threading.Thread(
target=self.audio_recorder.record, name="RecordingThread", daemon=True
@@ -349,6 +460,25 @@ class ASRAudioNode(Node):
self._last_result = text.strip()
self._last_result_time = time.time()
self._result_event.set()
is_abnormal = self._last_result in self._abnormal_results and len(self._last_result) <= 2
if is_abnormal:
self._consecutive_abnormal_count += 1
self.get_logger().warn(f"[ASR] 检测到异常识别结果: '{self._last_result}' | 连续异常:{self._consecutive_abnormal_count}")
# 如果连续多次异常,强制重置 ASR 连接
if self._consecutive_abnormal_count >= self.MAX_CONSECUTIVE_ABNORMAL:
self.get_logger().error(f"[ASR] 连续{self._consecutive_abnormal_count}次异常识别,强制重置连接")
try:
self.asr_client.stop()
time.sleep(0.5)
self.asr_client.start()
self._consecutive_abnormal_count = 0
self.get_logger().info("[ASR] 连接已重置")
except Exception as e:
self.get_logger().error(f"[ASR] 重置连接失败: {e}")
else:
# 正常识别,重置异常计数
self._consecutive_abnormal_count = 0
try:
self.get_logger().info(f"[ASR] 识别结果: {self._last_result}")
except Exception:
@@ -364,22 +494,28 @@ class ASRAudioNode(Node):
pass
def _audio_data_callback(self, request, response):
import threading
self.get_logger().debug(f"[ASR-AudioData] 回调触发 | command:{request.command} | 线程:{threading.current_thread().name}")
response.sample_rate = self.sample_rate
response.channels = self.channels
if request.command == "start":
with self.audio_lock:
self.get_logger().debug(f"[ASR-AudioData] start命令 | 旧buffer大小:{len(self.audio_buffer)} | recording:{self.audio_recording}")
self.audio_buffer.clear()
self.audio_recording = True
self.get_logger().debug(f"[ASR-AudioData] buffer已清空recording=True")
response.success = True
response.message = "开始录音"
response.samples = 0
return response
if request.command == "stop":
self.get_logger().debug(f"[ASR-AudioData] stop命令 | recording:{self.audio_recording}")
with self.audio_lock:
self.audio_recording = False
audio_list = list(self.audio_buffer)
self.get_logger().debug(f"[ASR-AudioData] 读取buffer | 大小:{len(audio_list)}")
self.audio_buffer.clear()
if len(audio_list) > 0:
audio_array = np.array(audio_list, dtype=np.int16)
@@ -387,10 +523,12 @@ class ASRAudioNode(Node):
response.audio_data = audio_array.tobytes()
response.samples = len(audio_list)
response.message = f"录音完成{len(audio_list)}样本"
self.get_logger().debug(f"[ASR-AudioData] 返回音频 | samples:{len(audio_list)}")
else:
response.success = False
response.message = "缓冲区为空"
response.samples = 0
self.get_logger().debug(f"[ASR-AudioData] buffer为空")
return response
if request.command == "get":
@@ -421,7 +559,7 @@ class ASRAudioNode(Node):
response.message = "等待超时"
except KeyboardInterrupt:
try:
self.get_logger().info("[VAD] 收到中断信号,正在关闭")
self.get_logger().info("[ASR-VAD] 收到中断信号,正在关闭")
except Exception:
pass
response.success = False
@@ -496,21 +634,26 @@ class ASRAudioNode(Node):
continue
except KeyboardInterrupt:
try:
self.get_logger().info("[ASR Worker] 收到中断信号")
self.get_logger().info("[ASR-Worker] 收到中断信号")
except Exception:
pass
break
if self.audio_recording:
self.get_logger().debug(f"[ASR-Worker] 收到音频chunk | recording:{self.audio_recording} | buffer_size:{len(self.audio_buffer)}")
try:
audio_array = np.frombuffer(audio_chunk, dtype=np.int16)
with self.audio_lock:
self.audio_buffer.extend(audio_array)
except Exception:
except Exception as e:
self.get_logger().error(f"[ASR-Worker] buffer写入异常 | 错误:{e}")
pass
if self.asr_client.running:
self.asr_client.send_audio(audio_chunk)
else:
if not self.asr_client.start():
time.sleep(1.0)
def destroy_node(self):
if self._shutdown_in_progress:

View File

@@ -89,7 +89,7 @@ class DashScopeTTSClient:
if not voice_to_use or not voice_to_use.strip():
if self.logger:
self.logger.error(f"Voice参数无效: '{voice_to_use}'")
self.logger.error(f"[TTS] Voice参数无效: '{voice_to_use}'")
self._current_callback = None
return False
synthesizer = SpeechSynthesizer(
@@ -212,7 +212,7 @@ class TTSAudioNode(Node):
self.playing_lock = threading.Lock()
self.is_playing = False
self.get_logger().info("TTS Audio节点已启动")
self.get_logger().info("[TTS] TTS Audio节点已启动")
def _load_config(self):
config_file = os.path.join(

View File

@@ -8,11 +8,3 @@ string message # 状态消息

View File

@@ -18,3 +18,10 @@ string message

View File

@@ -9,9 +9,3 @@ string message