增加相机调用,修复对话历史管理,修复asr停止识别逻辑

This commit is contained in:
lxy
2026-01-08 20:59:58 +08:00
parent 8fffd4ab42
commit 63a21999bb
13 changed files with 984 additions and 169 deletions

23
config/camera.yaml Normal file
View File

@@ -0,0 +1,23 @@
# 相机配置文件
# 相机默认一直运行,用户说拍照时自动捕获图像
camera:
serial_number: null # 相机序列号null表示使用第一个可用设备
# RGB流配置
rgb:
width: 640 # 图像宽度
height: 480 # 图像高度
fps: 30 # 帧率支持6, 10, 15, 30, 60
format: "RGB8" # 图像格式RGB8, BGR8
# 图像处理配置
image:
jpeg_quality: 85 # JPEG压缩质量0-10085是质量和大小平衡点
max_size: null # 最大尺寸null表示不缩放格式"1280x720"
# 相机指令关键词(拼音)
commands:
capture_keywords: ["pai zhao", "pai ge zhao", "pai zhang", "da kai xiang ji", "kan zhe li", "zhao xiang"] # 拍照相关指令
# capture_keywords: ["拍照", "拍张", "打开相机", "看这里", "照相"] # 中文指令如果ASR直接输出中文

View File

@@ -1,44 +1,40 @@
# DashScope (阿里云百炼) 配置
# ROS 语音包配置文件
dashscope:
api_key: "sk-7215a5ab7a00469db4072e1672a0661e"
asr:
model: "qwen3-asr-flash-realtime"
url: "wss://dashscope.aliyuncs.com/api-ws/v1/realtime"
llm:
model: "qwen-turbo"
model: "qwen3-vl-flash"
base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
temperature: 0.7
max_tokens: 4096
max_history: 3
tts:
model: "cosyvoice-v3-flash"
voice: "longanyang"
# 音频配置
audio:
microphone:
device_index: 0 # PyAudio设备索引0 = XFM-DP-V0.0.18 (录音设备1输入通道16000Hz)
sample_rate: 16000
channels: 1
device_index: -1 # -1 表示使用默认设备
sample_rate: 16000 # 输入采样率16kHz语音识别常用采样率
channels: 1 # 输入声道数单声道MONO适合语音采集
chunk: 1024
soundcard:
card_index: 1 # ALSA card索引1 = USB Audio Device (播放设备2输出通道44100Hz)
device_index: 0 # ALSA device索引0
card_index: 1 # USB Audio Device (card 1)
device_index: 0 # USB Audio [USB Audio] (device 0)
sample_rate: 48000 # 输出采样率48kHz支持48000或44100
channels: 2 # 输出声道数立体声2声道FL+FR
volume: 0.2 # 音量比例0.0-1.00.2表示20%音量)
# VAD配置
vad:
vad_mode: 3 # 0-3
silence_duration_ms: 1000
min_energy_threshold: 300
vad_mode: 3 # VAD模式0-33最严格
silence_duration_ms: 1000 # 静音持续时长(毫秒)
min_energy_threshold: 300 # 最小能量阈值
# 系统配置
system:
use_llm: true
use_wake_word: true
wake_word: "ni hao"
session_timeout: 30.0 # 会话窗口超时时间(秒),唤醒成功后在此时间内无需重复唤醒词
use_llm: true # 是否使用LLM
use_wake_word: true # 是否启用唤醒词检测
wake_word: "er gou" # 唤醒词(拼音)
session_timeout: 30.0 # 会话超时时间(秒)

View File

@@ -9,20 +9,36 @@ import yaml
def generate_launch_description():
# 加载配置文件
config_file = os.path.join(
voice_config_file = os.path.join(
get_package_share_directory('robot_speaker'),
'config',
'voice.yaml'
)
with open(config_file, 'r') as f:
camera_config_file = os.path.join(
get_package_share_directory('robot_speaker'),
'config',
'camera.yaml'
)
with open(voice_config_file, 'r') as f:
config = yaml.safe_load(f)
# 加载相机配置(如果文件存在)
camera_config = None
if os.path.exists(camera_config_file):
try:
with open(camera_config_file, 'r') as f:
camera_config = yaml.safe_load(f)
except Exception as e:
print(f"警告: 无法加载相机配置文件: {e}")
# 从配置文件提取参数
dashscope_config = config['dashscope']
audio_config = config['audio']
vad_config = config['vad']
system_config = config['system']
camera_config_data = camera_config.get('camera', {}) if camera_config else {}
return LaunchDescription([
# 音频输入参数
@@ -43,6 +59,21 @@ def generate_launch_description():
default_value=str(audio_config['soundcard']['device_index']),
description='声卡 device index'
),
DeclareLaunchArgument(
'output_sample_rate',
default_value=str(audio_config['soundcard'].get('sample_rate', 48000)),
description='输出采样率'
),
DeclareLaunchArgument(
'output_channels',
default_value=str(audio_config['soundcard'].get('channels', 2)),
description='输出声道数'
),
DeclareLaunchArgument(
'output_volume',
default_value=str(audio_config['soundcard'].get('volume', 0.2)),
description='输出音量比例0.0-1.0'
),
# 音频参数
DeclareLaunchArgument(
@@ -152,6 +183,43 @@ def generate_launch_description():
description='会话超时时间(秒)'
),
# 相机参数
DeclareLaunchArgument(
'camera_serial_number',
default_value=str(camera_config_data.get('serial_number', '')) if camera_config_data.get('serial_number') else '',
description='相机序列号'
),
DeclareLaunchArgument(
'camera_width',
default_value=str(camera_config_data.get('rgb', {}).get('width', 640)),
description='相机图像宽度'
),
DeclareLaunchArgument(
'camera_height',
default_value=str(camera_config_data.get('rgb', {}).get('height', 480)),
description='相机图像高度'
),
DeclareLaunchArgument(
'camera_fps',
default_value=str(camera_config_data.get('rgb', {}).get('fps', 30)),
description='相机帧率'
),
DeclareLaunchArgument(
'camera_format',
default_value=camera_config_data.get('rgb', {}).get('format', 'RGB8'),
description='相机图像格式'
),
DeclareLaunchArgument(
'camera_jpeg_quality',
default_value=str(camera_config_data.get('image', {}).get('jpeg_quality', 85)),
description='JPEG压缩质量'
),
DeclareLaunchArgument(
'camera_capture_keywords',
default_value=','.join(camera_config_data.get('commands', {}).get('capture_keywords', ['pai zhao'])),
description='相机拍照指令关键词(逗号分隔)'
),
# 语音节点
Node(
package='robot_speaker',
@@ -165,6 +233,9 @@ def generate_launch_description():
'sample_rate': LaunchConfiguration('sample_rate'),
'channels': LaunchConfiguration('channels'),
'chunk': LaunchConfiguration('chunk'),
'output_sample_rate': LaunchConfiguration('output_sample_rate'),
'output_channels': LaunchConfiguration('output_channels'),
'output_volume': LaunchConfiguration('output_volume'),
# VAD参数
'vad_mode': LaunchConfiguration('vad_mode'),
@@ -188,8 +259,19 @@ def generate_launch_description():
'use_wake_word': LaunchConfiguration('use_wake_word'),
'wake_word': LaunchConfiguration('wake_word'),
'session_timeout': LaunchConfiguration('session_timeout'),
# 相机参数
'camera_serial_number': LaunchConfiguration('camera_serial_number'),
'camera_width': LaunchConfiguration('camera_width'),
'camera_height': LaunchConfiguration('camera_height'),
'camera_fps': LaunchConfiguration('camera_fps'),
'camera_format': LaunchConfiguration('camera_format'),
'camera_jpeg_quality': LaunchConfiguration('camera_jpeg_quality'),
'camera_capture_keywords': LaunchConfiguration('camera_capture_keywords'),
}],
output='screen'
),
])

View File

@@ -4,4 +4,9 @@ pyaudio>=0.2.11
webrtcvad>=2.0.10
pypinyin>=0.49.0
rclpy>=3.0.0
pyrealsense2>=2.54.0
Pillow>=10.0.0
numpy>=1.24.0

View File

@@ -1,2 +1,4 @@
# robot_speaker package

View File

@@ -3,6 +3,7 @@ ASR语音识别模块
"""
import base64
import time
import threading
import dashscope
from dashscope.audio.qwen_omni import OmniRealtimeConversation, OmniRealtimeCallback
from dashscope.audio.qwen_omni.omni_realtime import TranscriptionParams, MultiModality
@@ -25,10 +26,27 @@ class DashScopeASR:
self.conversation = None
self.running = False
self.on_sentence_end = None
self.on_text_update = None # 实时文本更新回调
# 线程同步机制
self._stop_lock = threading.Lock() # 防止并发调用 stop_current_recognition
self._final_result_event = threading.Event() # 等待 final 回调完成
self._pending_commit = False # 标记是否有待处理的 commit
def _log(self, level: str, msg: str):
"""记录日志根据级别调用对应的ROS2日志方法"""
if self.logger:
getattr(self.logger, level)(msg)
# ROS2 logger不能动态改变severity级别需要显式调用对应方法
if level == "debug":
self.logger.debug(msg)
elif level == "info":
self.logger.info(msg)
elif level == "warning":
self.logger.warn(msg)
elif level == "error":
self.logger.error(msg)
else:
self.logger.info(msg) # 默认使用info级别
else:
print(f"[ASR] {msg}")
@@ -37,58 +55,137 @@ class DashScopeASR:
if self.running:
return False
callback = _ASRCallback(self)
self.conversation = OmniRealtimeConversation(
model=self.model,
url=self.url,
callback=callback
)
callback.conversation = self.conversation
self.conversation.connect()
transcription_params = TranscriptionParams(
language='zh',
sample_rate=self.sample_rate,
input_audio_format="pcm"
)
self.conversation.update_session(
output_modalities=[MultiModality.TEXT],
enable_input_audio_transcription=True,
transcription_params=transcription_params
)
self.running = True
self._log("info", "ASR已启动")
return True
try:
callback = _ASRCallback(self)
self.conversation = OmniRealtimeConversation(
model=self.model,
url=self.url,
callback=callback
)
callback.conversation = self.conversation
self.conversation.connect()
# 自定义文本语料增强识别
custom_text = "蜂核科技, 杭州蜂核科技有限公司, 西林瓶,瓶子"
transcription_params = TranscriptionParams(
language='zh',
sample_rate=self.sample_rate,
input_audio_format="pcm",
corpus_text=custom_text,
)
# 本地 VAD → 只控制 TTS 打断
# 服务端 turn detection → 只控制 ASR 输出、LLM 生成轮次
self.conversation.update_session(
output_modalities=[MultiModality.TEXT],
enable_input_audio_transcription=True,
transcription_params=transcription_params,
enable_turn_detection=True,
# 保留服务端 turn detection
turn_detection_type='server_vad', # 服务端VAD
turn_detection_threshold=0.2, # 可调
turn_detection_silence_duration_ms=800
)
self.running = True
self._log("info", "ASR已启动")
return True
except Exception as e:
self.running = False
self._log("error", f"ASR启动失败: {e}")
if self.conversation:
try:
self.conversation.close()
except:
pass
self.conversation = None
return False
def send_audio(self, audio_chunk: bytes):
"""发送音频chunk到ASR"""
if not self.running or not self.conversation:
return False
audio_b64 = base64.b64encode(audio_chunk).decode('ascii')
self.conversation.append_audio(audio_b64)
return True
try:
audio_b64 = base64.b64encode(audio_chunk).decode('ascii')
self.conversation.append_audio(audio_b64)
return True
except Exception as e:
# 连接已关闭或其他错误,静默处理(避免日志过多)
# running状态会在stop_current_recognition中正确设置
return False
def stop_current_recognition(self):
"""停止当前识别触发final结果然后重新启动"""
if not self.running or not self.conversation:
"""
停止当前识别触发final结果然后重新启动
优化:
1. 使用事件代替 sleep等待 final 回调完成
2. 使用锁防止并发调用
3. 处理 start() 失败的情况,确保 running 状态正确
4. 添加超时机制,避免无限等待
"""
# 使用锁防止并发调用
if not self._stop_lock.acquire(blocking=False):
self._log("warning", "stop_current_recognition 正在执行,跳过本次调用")
return False
self.conversation.commit()
time.sleep(0.5) # 等待结果返回
self.conversation.close()
time.sleep(0.2)
self.start()
return True
try:
if not self.running or not self.conversation:
return False
# 重置事件,准备等待 final 回调
self._final_result_event.clear()
self._pending_commit = True
# 触发 commit等待 final 结果
self.conversation.commit()
# 等待 final 回调完成最多等待1秒
if self._final_result_event.wait(timeout=1.0):
self._log("debug", "已收到 final 回调,准备关闭连接")
else:
self._log("warning", "等待 final 回调超时,继续执行")
# 先设置running=False防止ASR线程继续发送音频
self.running = False
# 关闭当前连接
old_conversation = self.conversation
self.conversation = None # 立即清空防止send_audio继续使用
try:
old_conversation.close()
except Exception as e:
self._log("warning", f"关闭连接时出错: {e}")
# 短暂等待,确保连接完全关闭
time.sleep(0.1)
# 重新启动,如果失败则保持 running=False
if not self.start():
self._log("error", "ASR重启失败running状态已重置")
return False
# 启动成功running已在start()中设置为True
return True
finally:
self._pending_commit = False
self._stop_lock.release()
def stop(self):
"""停止ASR识别器"""
self.running = False
if self.conversation:
self.conversation.close()
self._log("info", "ASR已停止")
# 等待正在执行的 stop_current_recognition 完成
with self._stop_lock:
self.running = False
self._final_result_event.set() # 唤醒可能正在等待的线程
if self.conversation:
try:
self.conversation.close()
except Exception as e:
self._log("warning", f"停止时关闭连接出错: {e}")
self.conversation = None
self._log("info", "ASR已停止")
class _ASRCallback(OmniRealtimeCallback):
@@ -116,10 +213,16 @@ class _ASRCallback(OmniRealtimeCallback):
transcript = response.get('transcript', '')
if transcript and transcript.strip() and self.asr_client.on_sentence_end:
self.asr_client.on_sentence_end(transcript.strip())
# 如果有待处理的 commit通知等待的线程
if self.asr_client._pending_commit:
self.asr_client._final_result_event.set()
elif event_type == 'conversation.item.input_audio_transcription.text':
# 中间结果stash忽略
pass
# 实时识别文本更新(多轮提示)
transcript = response.get('transcript', '') or response.get('text', '')
if transcript and transcript.strip() and self.asr_client.on_text_update:
self.asr_client.on_text_update(transcript.strip())
elif event_type == 'input_audio_buffer.speech_started':
self.asr_client._log("info", "ASR检测到说话开始")

View File

@@ -29,7 +29,7 @@ class AudioRecorder:
chunk: int, vad_detector: VADDetector,
audio_queue: queue.Queue, # 音频队列:录音线程 → ASR线程
silence_duration_ms: int = 1000,
min_energy_threshold: int = 300,
min_energy_threshold: int = 300, # 音频能量 > 300有语音
heartbeat_interval: float = 2.0,
on_heartbeat=None,
is_playing=None,
@@ -55,12 +55,7 @@ class AudioRecorder:
self.on_speech_end = on_speech_end
self.stop_flag = stop_flag or (lambda: False)
self.logger = logger
try:
self.audio = pyaudio.PyAudio()
except Exception as e:
raise RuntimeError(f"无法初始化PyAudio: {e}")
self.audio = pyaudio.PyAudio()
self.format = pyaudio.paInt16
self._debug_counter = 0
@@ -85,25 +80,29 @@ class AudioRecorder:
except Exception as e:
raise RuntimeError(f"无法打开音频输入设备: {e}")
window_sec = 0.5 # VAD检测窗口
no_speech_threshold = max(self.silence_duration_ms / 1000.0, 0.1)
# VAD检测窗口, 最快 0.5s 内发现说话
window_sec = 0.5
# 连续 1s 没有检测到语音,就判定为静音状态
no_speech_threshold = max(self.silence_duration_ms / 1000.0, 0.1)
audio_buffer = []
last_active_time = time.time()
last_heartbeat_time = time.time()
is_speaking = False
was_speaking = False
audio_buffer = [] # VAD 滑动窗口
last_active_time = time.time() # 静音计时基准
was_speaking = False # 上一窗口状态
is_speaking = False # 当前窗口状态
try:
while not self.stop_flag():
# exception_on_overflow=False, 宁可丢帧,也不阻塞
data = stream.read(self.chunk, exception_on_overflow=False)
# 队列满时丢弃最旧的数据,保证实时性
# 队列满时丢弃最旧的数据,ASR 跟不上时系统仍然听得见
if self.audio_queue.full():
self.audio_queue.get_nowait()
self.audio_queue.put_nowait(data)
audio_buffer.append(data)
audio_buffer.append(data) # 只用于 VAD不用于 ASR
# VAD检测窗口
now = time.time()
@@ -112,7 +111,6 @@ class AudioRecorder:
energy = self._calculate_energy(raw_audio)
vad_result = self._check_activity(raw_audio)
# 每10次检测输出一次调试信息
self._debug_counter += 1
if self._debug_counter >= 10:
if self.logger:
@@ -125,18 +123,19 @@ class AudioRecorder:
if is_speaking:
last_active_time = now
if not was_speaking:
if not was_speaking: # 上一轮没说话,本轮开始说话
if self.on_speech_start:
self.on_speech_start()
# 检测当前 TTS 是否在播放
if self.is_playing() and self.on_new_segment:
self.on_new_segment()
self.on_new_segment() # 打断 TTS的回调
else:
if was_speaking:
silence_duration = now - last_active_time
if silence_duration >= no_speech_threshold:
if self.on_speech_end:
self.on_speech_end()
self.on_speech_end() # 通知系统用户停止说话
if self.on_heartbeat and now - last_heartbeat_time >= self.heartbeat_interval:
self.on_heartbeat()
@@ -153,9 +152,11 @@ class AudioRecorder:
"""计算音频能量RMS"""
if not audio_chunk:
return 0.0
# 计算样本数:音频字节数 // 2因为是16位PCM1个样本=2字节
n = len(audio_chunk) // 2
if n <= 0:
return 0.0
# 把字节数据解包为16位有符号整数小端序
samples = struct.unpack(f'<{n}h', audio_chunk[: n * 2])
if not samples:
return 0.0
@@ -165,23 +166,30 @@ class AudioRecorder:
"""VAD + 能量检测先VAD检测能量作为辅助判断"""
energy = self._calculate_energy(audio_data)
# VAD统计rate=0.4step=20ms
rate = 0.4
rate = 0.4 # 连续人声经验值
num = 0
step = int(self.sample_rate * 0.02)
if step <= 0:
return False
flag_rate = round(rate * len(audio_data) // step)
for i in range(0, len(audio_data), step):
chunk = audio_data[i:i + step]
if len(chunk) == step:
# 采样率:16000 Hz, 帧时长:20ms=0.02s, 每帧采样点数=16000×0.02=320samples
# 每帧字节数=320×2=640bytes
bytes_per_sample = 2 # paInt16
frame_samples = int(self.sample_rate * 0.02)
frame_bytes = frame_samples * bytes_per_sample
if frame_bytes <= 0 or len(audio_data) < frame_bytes:
return False
total_frames = len(audio_data) // frame_bytes
required = max(1, int(total_frames * rate))
for i in range(0, len(audio_data), frame_bytes):
chunk = audio_data[i:i + frame_bytes]
if len(chunk) == frame_bytes:
if self.vad_detector.vad.is_speech(chunk, sample_rate=self.sample_rate):
num += 1
# VAD检测到语音且能量不是太低能量阈值作为辅助判断不直接过滤
vad_result = num > flag_rate
if vad_result and energy < 10: # 能量太低可能是噪声
# 语音开头能量高, 中后段(拖音、尾音)能量下降
vad_result = num >= required
if vad_result and energy < self.min_energy_threshold * 0.5:
return False
return vad_result

147
robot_speaker/camera.py Normal file
View File

@@ -0,0 +1,147 @@
"""
相机模块 - RealSense相机封装
相机默认一直运行,只在用户说拍照时捕获图像
"""
import numpy as np
import contextlib
class CameraClient:
"""
相机客户端 - 封装RealSense相机操作
相机初始化后一直运行capture_rgb() 只负责从运行中的管道捕获一帧
"""
def __init__(self,
serial_number: str | None = None,
width: int = 640,
height: int = 480,
fps: int = 30,
format: str = 'RGB8',
logger=None):
self.serial_number = serial_number
self.width = width
self.height = height
self.fps = fps
self.format = format
self.logger = logger
self.pipeline = None
self.config = None
self._is_initialized = False
self._rs = None
def _log(self, level: str, msg: str):
if self.logger:
getattr(self.logger, level, self.logger.info)(msg)
else:
print(f"[Camera] {msg}")
def initialize(self) -> bool:
"""
初始化并启动相机管道
相机启动后会一直运行,直到调用 cleanup()
"""
if self._is_initialized:
return True
try:
import pyrealsense2 as rs
self._rs = rs
self.pipeline = rs.pipeline()
self.config = rs.config()
if self.serial_number:
self.config.enable_device(self.serial_number)
self.config.enable_stream(
rs.stream.color,
self.width,
self.height,
rs.format.rgb8 if self.format == 'RGB8' else rs.format.bgr8,
self.fps
)
# 启动管道,相机开始运行
self.pipeline.start(self.config)
self._is_initialized = True
self._log("info", f"相机已启动并保持运行: {self.width}x{self.height}@{self.fps}fps")
return True
except ImportError:
self._log("error", "pyrealsense2库未安装请运行: pip install pyrealsense2")
return False
except Exception as e:
self._log("error", f"相机初始化失败: {e}")
self.cleanup()
return False
def cleanup(self):
"""停止相机管道,释放资源"""
if self.pipeline:
self.pipeline.stop()
self._log("info", "相机已停止")
self.pipeline = None
self.config = None
self._is_initialized = False
def capture_rgb(self) -> np.ndarray | None:
"""
从运行中的相机管道捕获一帧RGB图像
相机管道必须已经通过 initialize() 启动
"""
if not self._is_initialized:
self._log("error", "相机未初始化,无法捕获图像")
return None
try:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
self._log("warning", "未获取到颜色帧")
return None
return np.asanyarray(color_frame.get_data())
except Exception as e:
self._log("error", f"捕获图像失败: {e}")
return None
@contextlib.contextmanager
def capture_context(self):
"""
上下文管理器:拍照并自动清理资源
"""
image_data = self.capture_rgb()
try:
yield image_data
finally:
if image_data is not None:
del image_data
def capture_multiple(self, count: int = 1) -> list[np.ndarray]:
"""
捕获多张图像(为未来扩展准备)
"""
images = []
for i in range(count):
img = self.capture_rgb()
if img is not None:
images.append(img)
else:
self._log("warning", f"{i+1}张图像捕获失败")
return images
@contextlib.contextmanager
def capture_multiple_context(self, count: int = 1):
"""
上下文管理器:捕获多张图像并自动清理资源
"""
images = self.capture_multiple(count)
try:
yield images
finally:
for img in images:
del img
images.clear()

View File

@@ -2,33 +2,109 @@
对话历史管理模块
"""
from .types import LLMMessage
import threading
class ConversationHistory:
"""对话历史管理器"""
"""
对话历史管理器 - 实时语音友好版本
使用待确认机制确保历史完整性:
1. start_turn() - 开始新轮次,暂存用户消息
2. get_messages() - 获取历史包含待确认的用户消息用于LLM上下文
3. commit_turn() - 确认轮次完成,写入历史
4. cancel_turn() - 取消当前轮次,丢弃待确认消息
"""
def __init__(self, max_history: int = 3, summary_trigger: int = 3):
self.max_history = max_history
self.summary_trigger = summary_trigger
self.conversation_history: list[LLMMessage] = []
self.summary: str | None = None
# 待确认机制
self._pending_user_message: LLMMessage | None = None # 待确认的用户消息
self._lock = threading.Lock() # 线程安全锁
def start_turn(self, user_content: str):
"""
开始一个新的对话轮次,暂存用户消息等待LLM完成后确认写入历史
"""
with self._lock:
# 如果有未确认的轮次,新消息会覆盖它(不写入历史,防止半句污染)
# 这是正常的场景,比如用户快速连续说话,不需要特殊处理
self._pending_user_message = LLMMessage(role="user", content=user_content)
def commit_turn(self, assistant_content: str) -> bool:
"""
确认当前轮次完成,将用户和助手消息写入历史
"""
with self._lock:
if self._pending_user_message is None:
return False
# 只有助手回复非空时才写入历史
if not assistant_content or not assistant_content.strip():
self._pending_user_message = None
return False
# 写入用户消息和助手回复
self.conversation_history.append(self._pending_user_message)
self.conversation_history.append(
LLMMessage(role="assistant", content=assistant_content.strip())
)
# 清空待确认消息
self._pending_user_message = None
# 检查是否需要压缩
self._maybe_compress()
return True
def cancel_turn(self):
"""
取消当前待确认的轮次,丢弃待确认的用户消息,用于处理中断情况,防止不完整内容污染历史
"""
with self._lock:
if self._pending_user_message is not None:
self._pending_user_message = None
def add_message(self, role: str, content: str):
"""添加消息"""
self.conversation_history.append(LLMMessage(role=role, content=content))
self._maybe_compress()
"""
直接添加消息(向后兼容,但推荐使用 start_turn/commit_turn
注意:此方法会立即写入历史,不会经过待确认机制
"""
with self._lock:
# 如果有待确认的轮次,先取消它
self.cancel_turn()
self.conversation_history.append(LLMMessage(role=role, content=content))
self._maybe_compress()
def get_messages(self) -> list[LLMMessage]:
"""获取消息列表(包含摘要)"""
messages = []
if self.summary:
messages.append(LLMMessage(role="system", content=self.summary))
if self.max_history > 0:
messages.extend(self.conversation_history[-self.max_history * 2:])
return messages
"""
获取消息列表(包含摘要和待确认的用户消息)
"""
with self._lock:
messages = []
# 添加摘要
if self.summary:
messages.append(LLMMessage(role="system", content=self.summary))
# 添加历史消息
if self.max_history > 0:
messages.extend(self.conversation_history[-self.max_history * 2:])
# 添加待确认的用户消息用于LLM上下文但不写入历史
if self._pending_user_message is not None:
messages.append(self._pending_user_message)
return messages
def has_pending_turn(self) -> bool:
"""检查是否有待确认的轮次"""
with self._lock:
return self._pending_user_message is not None
def _maybe_compress(self):
"""压缩对话历史"""
@@ -55,7 +131,9 @@ class ConversationHistory:
self.summary = compressed
def clear(self):
"""清空历史"""
self.conversation_history.clear()
self.summary = None
"""清空历史和待确认消息"""
with self._lock:
self.conversation_history.clear()
self.summary = None
self._pending_user_message = None

View File

@@ -1,8 +1,10 @@
"""
LLM大语言模型模块
支持多模态(文本+图像)
"""
from openai import OpenAI
from .types import LLMMessage
from typing import Optional, List
class LLMClient:
@@ -32,12 +34,23 @@ class DashScopeLLM(LLMClient):
self.logger = logger
def _log(self, level: str, msg: str):
"""记录日志根据级别调用对应的ROS2日志方法"""
msg = f"[{self.name}] {msg}"
if self.logger:
getattr(self.logger, level)(msg)
# ROS2 logger不能动态改变severity级别需要显式调用对应方法
if level == "debug":
self.logger.debug(msg)
elif level == "info":
self.logger.info(msg)
elif level == "warning":
self.logger.warn(msg)
elif level == "error":
self.logger.error(msg)
else:
self.logger.info(msg) # 默认使用info级别
def chat(self, messages: list[LLMMessage]) -> str | None:
"""非流式聊天"""
"""非流式聊天:任务规划"""
payload_messages = [{"role": msg.role, "content": msg.content} for msg in messages]
response = self.client.chat.completions.create(
model=self.model,
@@ -50,9 +63,62 @@ class DashScopeLLM(LLMClient):
return reply if reply else None
def chat_stream(self, messages: list[LLMMessage],
on_token=None) -> str | None:
"""流式聊天"""
payload_messages = [{"role": msg.role, "content": msg.content} for msg in messages]
on_token=None,
images: Optional[List[str]] = None) -> str | None:
"""
流式聊天:语音系统
支持多模态(文本+图像)
"""
# 转换消息格式,支持多模态
# 图像只添加到最后一个user消息中
payload_messages = []
last_user_idx = -1
for i, msg in enumerate(messages):
if msg.role == "user":
last_user_idx = i
has_images_in_message = False
for i, msg in enumerate(messages):
msg_dict = {"role": msg.role}
# 如果当前消息是最后一个user消息且有图像构建多模态content
if i == last_user_idx and msg.role == "user" and images and len(images) > 0:
content_list = [{"type": "text", "text": msg.content}]
# 添加所有图像
for img_idx, img_base64 in enumerate(images):
image_url = f"data:image/jpeg;base64,{img_base64[:50]}..." if len(img_base64) > 50 else f"data:image/jpeg;base64,{img_base64}"
content_list.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
})
self._log("info", f"[多模态] 添加图像 #{img_idx+1} 到user消息base64长度: {len(img_base64)}")
msg_dict["content"] = content_list
has_images_in_message = True
else:
msg_dict["content"] = msg.content
payload_messages.append(msg_dict)
# 记录多模态信息
if images and len(images) > 0:
if has_images_in_message:
# 找到最后一个user消息记录其content结构
last_user_msg = payload_messages[last_user_idx] if last_user_idx >= 0 else None
if last_user_msg and isinstance(last_user_msg.get("content"), list):
content_items = last_user_msg["content"]
text_items = [item for item in content_items if item.get("type") == "text"]
image_items = [item for item in content_items if item.get("type") == "image_url"]
self._log("info", f"[多模态] 已发送多模态请求: {len(text_items)}个文本 + {len(image_items)}张图片")
self._log("debug", f"[多模态] 用户文本: {text_items[0].get('text', '')[:50] if text_items else 'N/A'}")
else:
self._log("warning", "[多模态] 消息格式异常,无法确认图片是否添加")
else:
self._log("warning", f"[多模态] 有{len(images)}张图片但未找到user消息图片未被添加")
else:
self._log("debug", "[多模态] 纯文本请求(无图片)")
full_reply = ""
stream = self.client.chat.completions.create(

View File

@@ -8,6 +8,10 @@ import threading
import queue
import time
import re
import base64
import io
import numpy as np
from PIL import Image
from pypinyin import pinyin, Style
from .audio import VADDetector, AudioRecorder
from .asr import DashScopeASR
@@ -15,6 +19,7 @@ from .tts import DashScopeTTSClient, TTSRequest
from .llm import DashScopeLLM
from .history import ConversationHistory
from .types import LLMMessage
from .camera import CameraClient
class RobotSpeakerNode(Node):
@@ -60,9 +65,12 @@ class RobotSpeakerNode(Node):
self.declare_parameter('input_device_index', -1)
self.declare_parameter('output_card_index', -1)
self.declare_parameter('output_device_index', -1)
self.declare_parameter('sample_rate', 16000)
self.declare_parameter('channels', 1)
self.declare_parameter('sample_rate', 16000) # 输入采样率
self.declare_parameter('channels', 1) # 输入声道数
self.declare_parameter('chunk', 1024)
self.declare_parameter('output_sample_rate', 48000) # 输出采样率
self.declare_parameter('output_channels', 2) # 输出声道数
self.declare_parameter('output_volume', 0.2) # 输出音量比例0.0-1.0
# VAD参数
self.declare_parameter('vad_mode', 3)
@@ -91,6 +99,15 @@ class RobotSpeakerNode(Node):
self.declare_parameter('wake_word', 'xiao qian')
self.declare_parameter('session_timeout', 30.0)
# 相机参数
self.declare_parameter('camera_serial_number', '')
self.declare_parameter('camera_width', 640)
self.declare_parameter('camera_height', 480)
self.declare_parameter('camera_fps', 30)
self.declare_parameter('camera_format', 'RGB8')
self.declare_parameter('camera_jpeg_quality', 85)
self.declare_parameter('camera_capture_keywords', 'pai zhao,pai zhang,da kai xiang ji,kan zhe li,zhao xiang')
def _load_parameters(self):
"""加载ROS参数"""
# 音频参数
@@ -100,6 +117,9 @@ class RobotSpeakerNode(Node):
self.sample_rate = self.get_parameter('sample_rate').get_parameter_value().integer_value
self.channels = self.get_parameter('channels').get_parameter_value().integer_value
self.chunk = self.get_parameter('chunk').get_parameter_value().integer_value
self.output_sample_rate = self.get_parameter('output_sample_rate').get_parameter_value().integer_value
self.output_channels = self.get_parameter('output_channels').get_parameter_value().integer_value
self.output_volume = self.get_parameter('output_volume').get_parameter_value().double_value
# VAD参数
self.vad_mode = self.get_parameter('vad_mode').get_parameter_value().integer_value
@@ -128,6 +148,17 @@ class RobotSpeakerNode(Node):
self.wake_word = self.get_parameter('wake_word').get_parameter_value().string_value.strip()
self.session_timeout = self.get_parameter('session_timeout').get_parameter_value().double_value
# 相机参数
camera_serial = self.get_parameter('camera_serial_number').get_parameter_value().string_value
self.camera_serial_number = camera_serial if camera_serial and camera_serial.strip() else None
self.camera_width = self.get_parameter('camera_width').get_parameter_value().integer_value
self.camera_height = self.get_parameter('camera_height').get_parameter_value().integer_value
self.camera_fps = self.get_parameter('camera_fps').get_parameter_value().integer_value
self.camera_format = self.get_parameter('camera_format').get_parameter_value().string_value
self.camera_jpeg_quality = self.get_parameter('camera_jpeg_quality').get_parameter_value().integer_value
keywords_str = self.get_parameter('camera_capture_keywords').get_parameter_value().string_value
self.camera_capture_keywords = [k.strip() for k in keywords_str.split(',') if k.strip()]
def _init_components(self):
"""初始化所有组件"""
# VAD检测器
@@ -165,6 +196,7 @@ class RobotSpeakerNode(Node):
logger=self.get_logger()
)
self.asr_client.on_sentence_end = self._on_asr_sentence_end
self.asr_client.on_text_update = self._on_asr_text_update
self.asr_client.start()
# LLM客户端
@@ -185,15 +217,33 @@ class RobotSpeakerNode(Node):
# TTS客户端
self.get_logger().info(f"TTS配置: model={self.tts_model}, voice={self.tts_voice}")
self.get_logger().info(f"音频输出配置: sample_rate={self.output_sample_rate}, channels={self.output_channels}")
self.tts_client = DashScopeTTSClient(
api_key=self.dashscope_api_key,
model=self.tts_model,
voice=self.tts_voice,
sample_rate=22050,
card_index=self.output_card_index,
device_index=self.output_device_index,
output_sample_rate=self.output_sample_rate,
output_channels=self.output_channels,
output_volume=self.output_volume,
logger=self.get_logger()
)
# 相机客户端(默认一直运行)
try:
self.camera_client = CameraClient(
serial_number=self.camera_serial_number,
width=self.camera_width,
height=self.camera_height,
fps=self.camera_fps,
format=self.camera_format,
logger=self.get_logger()
)
self.camera_client.initialize()
except Exception as e:
self.get_logger().warning(f"相机初始化失败: {e},相机功能将不可用")
self.camera_client = None
def _start_threads(self):
"""启动4个线程"""
@@ -234,6 +284,9 @@ class RobotSpeakerNode(Node):
if msg.data == "interrupt":
self.get_logger().info("收到中断命令")
self.interrupt_event.set()
# 取消待确认的轮次,防止不完整内容污染历史
if self.use_llm and self.history:
self.history.cancel_turn()
self._drain_queue(self.audio_queue)
self._drain_queue(self.text_queue)
self._drain_queue(self.tts_queue)
@@ -268,6 +321,13 @@ class RobotSpeakerNode(Node):
self.get_logger().info(f"[ASR] 识别完成: {text_clean}")
self.text_queue.put(text_clean, timeout=1.0)
def _on_asr_text_update(self, text: str):
"""ASR 实时文本更新回调 - 用于多轮提示"""
if not text or not text.strip():
return
# 使用debug级别避免日志过多但仍保留实时反馈功能
self.get_logger().debug(f"[ASR] 识别中: {text.strip()}")
# ==================== 线程工作函数 ====================
def _recording_worker(self):
@@ -298,8 +358,10 @@ class RobotSpeakerNode(Node):
步骤:
1. 从文本队列取文本
2. 唤醒词处理
3. LLM处理流式
4. TTS文本放入TTS队列
3. 开始对话轮次(使用待确认机制
4. LLM处理流式
5. 确认轮次完成并写入历史
6. TTS文本放入TTS队列
"""
self.get_logger().info("[处理线程] 启动")
while not self.stop_event.is_set():
@@ -310,6 +372,9 @@ class RobotSpeakerNode(Node):
self.get_logger().info(f"[处理线程] 收到识别文本: {text}")
if self._check_interrupt():
# 中断时取消待确认的轮次,防止污染历史
if self.use_llm and self.history:
self.history.cancel_turn()
continue
# 步骤2: 唤醒词处理
@@ -321,39 +386,237 @@ class RobotSpeakerNode(Node):
self.get_logger().info(f"[处理线程] 唤醒词处理后: {processed_text}")
if self._check_interrupt():
if self.use_llm and self.history:
self.history.cancel_turn()
continue
# 步骤3: LLM处理流式
# 步骤2.5: 检测相机指令
need_camera, user_text = self._check_camera_command(processed_text)
if need_camera:
self.get_logger().info(f"[相机指令] 检测到拍照指令,将进行多模态推理")
# 步骤3: 开始对话轮次使用待确认机制防止半句ASR污染历史
if self.use_llm and self.history:
self.history.start_turn(user_text if user_text else processed_text)
# 步骤4: LLM处理流式支持多模态
reply = ""
if self.use_llm and self.llm_client:
self.get_logger().info(f"[处理线程] 发送到LLM: {processed_text}")
reply = self._llm_process_stream(processed_text)
if not reply or not reply.strip():
self.get_logger().info(f"[处理线程] 发送到LLM: {user_text if user_text else processed_text}")
reply = self._llm_process_stream_with_camera(
user_text if user_text else processed_text,
need_camera
)
# 检查是否被中断
if self._check_interrupt():
# 中断时取消待确认的轮次防止LLM被打断写脏历史
if self.history:
self.history.cancel_turn()
continue
text_to_speak = reply
self.get_logger().info(f"[处理线程] LLM回复: {text_to_speak[:100]}...")
if not reply or not reply.strip():
# LLM返回空取消待确认的轮次
if self.history:
self.history.cancel_turn()
continue
# 步骤5: 确认轮次完成,写入历史(只有完整完成的轮次才会写入)
if self.history:
if self.history.commit_turn(reply):
self.get_logger().info(f"[处理线程] 轮次已确认并写入历史")
else:
self.get_logger().warning("[处理线程] 轮次确认失败,可能已被取消")
self.get_logger().info(f"[处理线程] LLM回复: {reply[:100]}...")
# 注意:流式播放已经在 _llm_process_stream_with_camera 内部完成了分段播放
# 不需要再将完整文本放入队列,避免重复播放
continue
else:
text_to_speak = processed_text
if self._check_interrupt():
if self.use_llm and self.history:
self.history.cancel_turn()
continue
# 步骤4: TTS文本放入队列
# 步骤6: TTS文本放入队列仅非LLM模式
self.tts_queue.put(text_to_speak, timeout=0.2)
self.get_logger().info("[处理线程] 已放入TTS队列准备播放")
def _llm_process_stream(self, user_text: str) -> str:
def _check_camera_command(self, text: str) -> tuple[bool, str]:
"""检测文本中是否包含相机指令"""
text_pinyin = self._extract_chinese_to_pinyin(text).lower().strip()
# 改进匹配逻辑:支持关键词的部分匹配
# 例如:"pai zhao" 可以匹配 "pai ge zhao""pai zhao" 等
for keyword_pinyin in self.camera_capture_keywords:
keyword_parts = keyword_pinyin.lower().strip().split()
text_parts = text_pinyin.split()
# 检查关键词的所有部分是否按顺序出现在文本中
if len(keyword_parts) == 0:
continue
# 简单的顺序匹配:关键词的所有部分都出现在文本中,且顺序正确
keyword_idx = 0
for text_part in text_parts:
if keyword_parts[keyword_idx] in text_part or text_part in keyword_parts[keyword_idx]:
keyword_idx += 1
if keyword_idx >= len(keyword_parts):
self.get_logger().debug(f"[相机指令] 匹配到关键词: {keyword_pinyin} (文本拼音: {text_pinyin})")
return True, text
# 也支持直接包含匹配(向后兼容)
if keyword_pinyin in text_pinyin:
self.get_logger().debug(f"[相机指令] 匹配到关键词: {keyword_pinyin} (文本拼音: {text_pinyin})")
return True, text
self.get_logger().debug(f"[相机指令] 未匹配到关键词 (文本拼音: {text_pinyin})")
return False, text
def _encode_image_to_base64(self, image_data: np.ndarray, quality: int = 85) -> str:
"""
LLM流式处理 - token stream → TTS流式播放
步骤:
1. 收集LLM token
2. 每20字符或遇到标点发送到TTS队列
3. 返回完整回复
将numpy图像数组编码为base64字符串
"""
try:
# 转换为PIL Image
if image_data.shape[2] == 3:
pil_image = Image.fromarray(image_data, 'RGB')
else:
pil_image = Image.fromarray(image_data)
# 编码为JPEG并转换为base64
buffer = io.BytesIO()
pil_image.save(buffer, format='JPEG', quality=quality)
image_bytes = buffer.getvalue()
base64_str = base64.b64encode(image_bytes).decode('utf-8')
return base64_str
except Exception as e:
self.get_logger().error(f"图像编码失败: {e}")
return ""
def _llm_process_stream_with_camera(self, user_text: str, need_camera: bool) -> str:
"""
LLM流式处理 - 支持多模态(文本+图像)
使用上下文管理器确保图像资源自动清理
"""
if not self.llm_client or not self.history:
return ""
# 获取历史消息并添加系统提示词(根据是否有图像进行意图识别)
messages = self.history.get_messages()
messages.append(LLMMessage(role="user", content=user_text))
# 如果消息列表中没有系统消息,添加系统提示词
has_system_msg = any(msg.role == "system" for msg in messages)
if not has_system_msg:
system_prompt = (
"你是一个智能语音助手。\n"
"- 当用户发送图片时,这是【拍照闲聊】模式:请仔细观察图片内容,结合用户的问题或描述,提供详细、有用的回答。\n"
"- 当用户没有发送图片时,这是【闲聊】模式:请自然、友好地与用户对话。\n"
"请根据对话模式调整你的回答风格。"
)
messages.insert(0, LLMMessage(role="system", content=system_prompt))
full_reply = ""
tts_text_buffer = ""
image_base64_list = []
def on_token(token: str):
nonlocal full_reply, tts_text_buffer
full_reply += token
tts_text_buffer += token
# 如果需要拍照,使用上下文管理器自动清理资源
if need_camera and self.camera_client:
with self.camera_client.capture_context() as image_data:
if image_data is not None:
image_base64 = self._encode_image_to_base64(image_data, quality=self.camera_jpeg_quality)
if image_base64:
image_base64_list.append(image_base64)
self.get_logger().info("[相机] 已拍照")
# 退出上下文后image_data已自动清理
# 调用LLM支持多模态
# 注意images参数会在chat_stream内部被使用必须等推理完成后再清理
if image_base64_list:
self.get_logger().info(f"[多模态] 准备发送给LLM: {len(image_base64_list)}张图片,用户文本: {user_text[:50]}")
# 记录图像base64长度用于验证
for idx, img_b64 in enumerate(image_base64_list):
self.get_logger().debug(f"[多模态] 图片#{idx+1} base64长度: {len(img_b64)}")
reply = self.llm_client.chat_stream(
messages,
on_token=on_token,
images=image_base64_list if image_base64_list else None
)
# 检查LLM回复是否包含图片相关内容简单启发式判断
if image_base64_list and reply:
reply_lower = reply.lower()
# 检查回复中是否提到图片相关内容
image_keywords = ['图片', '图像', '照片', '画面', '看到', '显示', '内容', '图中', 'image', 'photo', 'picture', '视觉', '视觉', '识别', '分析']
negative_keywords = ['无法', '不能', '没有', '看不到', '无法查看', '无法分析']
has_image_mention = any(keyword in reply_lower for keyword in image_keywords)
has_negative = any(keyword in reply_lower for keyword in negative_keywords)
if has_image_mention and not has_negative:
self.get_logger().info("[多模态] ✓ LLM回复中包含图片相关内容可能已识别到图片")
# 提取包含图片关键词的句子片段
import re
for keyword in image_keywords:
if keyword in reply_lower:
# 找到包含关键词的句子片段
pattern = f'[^。!?]*{keyword}[^。!?]*[。!?]?'
matches = re.findall(pattern, reply, re.IGNORECASE)
if matches:
self.get_logger().info(f"[多模态] 相关回复片段: {matches[0][:100]}")
break
elif has_negative:
self.get_logger().warning("[多模态] ✗ LLM回复表明无法查看图片可能未识别到图片")
self.get_logger().warning(f"[多模态] 完整回复: {reply[:200]}")
else:
self.get_logger().warning("[多模态] ? LLM回复中未发现明显的图片相关内容")
self.get_logger().debug(f"[多模态] 完整回复: {reply[:200]}")
# LLM推理完成后清理base64字符串
if image_base64_list:
for img_b64 in image_base64_list:
del img_b64
image_base64_list.clear()
self.get_logger().info("[相机] 已删除照片")
# 发送剩余文本流式返回结束时buffer中可能还有未发送的内容
if tts_text_buffer.strip():
self.get_logger().debug(f"[流式TTS] 发送最后片段: {tts_text_buffer.strip()[:50]}")
self.tts_queue.put(tts_text_buffer.strip(), timeout=0.2)
# 返回完整回复,由调用者通过 commit_turn() 写入历史
return reply.strip() if reply else ""
def _llm_process_stream(self, user_text: str) -> str:
"""
LLM流式处理 - token stream → TTS流式播放
"""
if not self.llm_client or not self.history:
return ""
# get_messages() 已经包含了待确认的用户消息,无需手动添加
messages = self.history.get_messages()
# 添加系统提示词(如果还没有)
has_system_msg = any(msg.role == "system" for msg in messages)
if not has_system_msg:
system_prompt = (
"你是一个智能语音助手。\n"
"- 当用户发送图片时,这是【拍照闲聊】模式:请仔细观察图片内容,结合用户的问题或描述,提供详细、有用的回答。\n"
"- 当用户没有发送图片时,这是【闲聊】模式:请自然、友好地与用户对话。\n"
"请根据对话模式调整你的回答风格。"
)
messages.insert(0, LLMMessage(role="system", content=system_prompt))
full_reply = ""
tts_text_buffer = ""
@@ -363,24 +626,20 @@ class RobotSpeakerNode(Node):
full_reply += token
tts_text_buffer += token
# 每20字符或遇到标点发送到TTS
if len(tts_text_buffer) >= 20 or token in ['', '', '', '\n', '.', '!', '?']:
if tts_text_buffer.strip():
self.tts_queue.put(tts_text_buffer.strip(), timeout=0.2)
tts_text_buffer = ""
# 完全依赖服务端分段,不进行客户端分段
# 阿里云百炼TTS服务在流式返回时已经做了合适的分段客户端不应该干扰
# 只在流式返回结束时一次性发送所有内容
reply = self.llm_client.chat_stream(messages, on_token=on_token)
# 发送剩余文本
# 发送剩余文本流式返回结束时buffer中可能还有未发送的内容
if tts_text_buffer.strip():
self.get_logger().debug(f"[流式TTS] 发送最后片段: {tts_text_buffer.strip()[:50]}")
self.tts_queue.put(tts_text_buffer.strip(), timeout=0.2)
if reply and reply.strip():
self.history.add_message("user", user_text)
self.history.add_message("assistant", reply)
return reply
return full_reply if full_reply else ""
# 返回完整回复,由调用者通过 commit_turn() 写入历史
# 这样可以确保只有完整完成的回复才会写入历史,防止被打断时写脏历史
return reply.strip() if reply else ""
def _tts_worker(self):
"""
@@ -415,7 +674,10 @@ class RobotSpeakerNode(Node):
# ==================== 工具函数 ====================
def _check_interrupt(self) -> bool:
"""检查中断标志"""
"""
检查中断标志
注意:中断时不会自动取消待确认的轮次,需要在调用处手动处理
"""
if self.interrupt_event.is_set():
self.interrupt_event.clear()
return True
@@ -531,6 +793,10 @@ class RobotSpeakerNode(Node):
if hasattr(self, 'audio_recorder') and self.audio_recorder:
self.audio_recorder.cleanup()
# 清理相机资源
if hasattr(self, 'camera_client') and self.camera_client:
self.camera_client.cleanup()
super().destroy_node()

View File

@@ -22,33 +22,48 @@ class DashScopeTTSClient(TTSClient):
def __init__(self, api_key: str,
model: str,
voice: str,
sample_rate: int,
card_index: int,
device_index: int,
output_sample_rate: int = 48000,
output_channels: int = 2,
output_volume: float = 1.0,
logger=None):
dashscope.api_key = api_key
self.model = model
self.voice = voice
self.sample_rate = sample_rate
self.card_index = card_index
self.device_index = device_index
self.output_sample_rate = output_sample_rate
self.output_channels = output_channels
self.output_volume = output_volume
self.logger = logger
# 构建ALSA设备
# 构建ALSA设备, 允许 ffmpeg 自动重采样 / 重声道
self.alsa_device = f"plughw:{card_index},{device_index}" if (
card_index >= 0 and device_index >= 0
) else "default"
def _log(self, level: str, msg: str):
"""记录日志根据级别调用对应的ROS2日志方法"""
if self.logger:
getattr(self.logger, level)(msg)
# ROS2 logger不能动态改变severity级别需要显式调用对应方法
if level == "debug":
self.logger.debug(msg)
elif level == "info":
self.logger.info(msg)
elif level == "warning":
self.logger.warn(msg)
elif level == "error":
self.logger.error(msg)
else:
self.logger.info(msg) # 默认使用info级别
else:
print(f"[TTS] {msg}")
def synthesize(self, request: TTSRequest,
on_chunk=None,
interrupt_check=None) -> bool:
"""流式合成并播放"""
"""主流程:流式合成并播放"""
callback = _TTSCallback(self, interrupt_check, on_chunk)
# 使用配置的voicerequest.voice为None或空时使用self.voice
voice_to_use = request.voice if request.voice and request.voice.strip() else self.voice
@@ -89,22 +104,38 @@ class _TTSCallback(ResultCallback):
def on_open(self):
# 使用ffmpeg播放自动处理采样率转换22050 -> 设备采样率)
# TTS服务输出固定为22050Hz单声道ffmpeg会自动转换为设备采样率和声道数
tts_output_rate = 22050 # TTS服务固定输出采样率
tts_output_channels = 1 # TTS服务固定输出声道数单声道
ffmpeg_cmd = [
'ffmpeg',
'-f', 's16le',
'-ar', '22050', # TTS输出采样率
'-ac', '1',
'-i', 'pipe:0',
'-f', 'alsa',
'-ar', '48000', # 设备采样率(ffmpeg会自动重采样
'-ac', '1',
'-acodec', 'pcm_s16le',
'-fflags', '+nobuffer',
'-flags', 'low_delay',
'-strict', 'experimental',
'-f', 's16le', # 原始 PCM
'-ar', str(tts_output_rate), # TTS输出采样率固定22050
'-ac', str(tts_output_channels), # TTS输出声道数固定单声道
'-i', 'pipe:0', # stdin
'-f', 'alsa', # 输出到 ALSA
'-ar', str(self.tts_client.output_sample_rate), # 输出设备采样率(从配置文件读取
'-ac', str(self.tts_client.output_channels), # 输出设备声道数(从配置文件读取)
'-acodec', 'pcm_s16le', # 输出编码
'-fflags', 'nobuffer', # 减少缓冲
'-flags', 'low_delay', # 低延迟
'-avioflags', 'direct', # 尝试直通写入 ALSA减少延迟
'-thread_queue_size', '1024', # 输入线程队列大小,防止丢帧
self.tts_client.alsa_device
]
self.tts_client._log("info", f"启动ffmpeg播放: ALSA设备={self.tts_client.alsa_device}")
# 添加音量调节filter如果音量不是1.0
if self.tts_client.output_volume != 1.0:
# 在输出编码前插入音量filter
# volume filter放在输入之后、输出编码之前
acodec_idx = ffmpeg_cmd.index('-acodec')
ffmpeg_cmd.insert(acodec_idx, f'volume={self.tts_client.output_volume}')
ffmpeg_cmd.insert(acodec_idx, '-af')
self.tts_client._log("info", f"启动ffmpeg播放: ALSA设备={self.tts_client.alsa_device}, "
f"输出采样率={self.tts_client.output_sample_rate}Hz, "
f"输出声道数={self.tts_client.output_channels}, "
f"音量={self.tts_client.output_volume * 100:.0f}%")
self._proc = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
@@ -130,6 +161,7 @@ class _TTSCallback(ResultCallback):
return
if self.interrupt_check and self.interrupt_check():
# 停止播放,不停止 TTS
self._interrupted = True
if self._proc:
self._proc.terminate()

View File

@@ -27,3 +27,10 @@ class TTSRequest:
speed: float | None = None
pitch: float | None = None
@dataclass
class ImageMessage:
"""图像消息 - 用于多模态LLM"""
image_data: bytes # base64编码的图像数据
image_format: str = "jpeg"