Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3b52e69
add timed string to tts
longcw Jun 11, 2025
92eb005
fix transcription sync when starts
longcw Jun 11, 2025
7678064
clean up
longcw Jun 11, 2025
9905410
fix type
longcw Jun 11, 2025
5b3abd8
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 12, 2025
01d8d9b
update tts node
longcw Jun 12, 2025
93d6434
update tts task
longcw Jun 12, 2025
267848a
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 16, 2025
94d648f
use AudioFrame.user_data
longcw Jun 16, 2025
03512f2
add timed agent transcript example
longcw Jun 16, 2025
bc717e1
rename to userdata
longcw Jun 16, 2025
aba239e
support elevenlabs
longcw Jun 16, 2025
10b6d6a
update 11labs
longcw Jun 16, 2025
7711201
enable TTS timed transcript by default
longcw Jun 17, 2025
e57bb4b
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 17, 2025
535d17c
add use_tts_aligned_transcript
longcw Jun 18, 2025
885127e
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 18, 2025
5f1577b
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 20, 2025
3e297d5
use type and update to aligned_transcript
longcw Jun 20, 2025
a4fc26e
use livekit 1.0.10
longcw Jun 20, 2025
b4104c3
cleanup tee peer
longcw Jun 20, 2025
184d05d
Revert "cleanup tee peer"
longcw Jun 20, 2025
d2751a9
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 20, 2025
76e7e87
Create changeset-06b469b0.md
longcw Jun 23, 2025
04f3534
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 23, 2025
3d9fc4a
Merge remote-tracking branch 'origin/main' into longc/timed-transcrip…
longcw Jun 25, 2025
9165efd
Merge remote-tracking branch 'origin/theo/agents1.2' into longc/timed…
longcw Jul 1, 2025
48f09b3
Merge remote-tracking branch 'origin/theo/agents1.2' into longc/timed…
longcw Jul 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/next-release/changeset-06b469b0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"livekit-agents": patch
"livekit-plugins-cartesia": patch
"livekit-plugins-elevenlabs": patch
---

support aligned transcripts with timestamps from tts (#2580)
54 changes: 54 additions & 0 deletions examples/voice_agents/timed_agent_transcript.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import logging
from collections.abc import AsyncGenerator, AsyncIterable

from dotenv import load_dotenv

from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli
from livekit.agents.voice.agent import ModelSettings
from livekit.agents.voice.io import TimedString
from livekit.plugins import cartesia, deepgram, openai, silero

logger = logging.getLogger("my-worker")
logger.setLevel(logging.INFO)

load_dotenv()


# This example shows how to obtain the timed transcript from the TTS.
# Right now, it's supported for Cartesia and ElevenLabs TTS (word level timestamps)
# and non-streaming TTS with StreamAdapter (sentence level timestamps).


class MyAgent(Agent):
def __init__(self):
super().__init__(instructions="You are a helpful assistant.")

self._closing_task: asyncio.Task[None] | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._closing_task: asyncio.Task[None] | None = None


async def transcription_node(
self, text: AsyncIterable[str | TimedString], model_settings: ModelSettings
) -> AsyncGenerator[str | TimedString, None]:
async for chunk in text:
if isinstance(chunk, TimedString):
logger.info(f"TimedString: '{chunk}' ({chunk.start_time} - {chunk.end_time})")
yield chunk


async def entrypoint(ctx: JobContext):
session = AgentSession(
stt=deepgram.STT(),
llm=openai.LLM(),
tts=cartesia.TTS(),
vad=silero.VAD.load(),
# enable TTS-aligned transcript, can be configured at the Agent level as well
use_tts_aligned_transcript=True,
)

await session.start(agent=MyAgent(), room=ctx.room)

session.generate_reply(instructions="say hello to the user")


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
54 changes: 33 additions & 21 deletions livekit-agents/livekit/agents/tokenize/blingfire.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,32 @@
]


def _split_sentences(text: str, min_sentence_len: int) -> list[tuple[str, int, int]]:
bf_sentences, offsets = blingfire.text_to_sentences_with_offsets(text)
raw_sentences = bf_sentences.split("\n")
def _split_sentences(
text: str, min_sentence_len: int, *, retain_format: bool = False
) -> list[tuple[str, int, int]]:
_, offsets = blingfire.text_to_sentences_with_offsets(text)

merged_sentences = []
buffer = ""
buffer_start = None
start = 0

for i, (sentence, (start, end)) in enumerate(zip(raw_sentences, offsets)):
sentence = sentence.strip()
if not sentence:
for _, end in offsets:
raw_sentence = text[start:end]
sentence = raw_sentence.strip()
if not sentence or len(sentence) < min_sentence_len:
continue

if buffer:
buffer += " " + sentence
buffer_end = end
if retain_format:
merged_sentences.append((raw_sentence, start, end))
else:
buffer = sentence
buffer_start = start
buffer_end = end
merged_sentences.append((sentence, start, end))
start = end

if len(buffer) >= min_sentence_len or i == len(offsets) - 1:
merged_sentences.append((buffer, buffer_start, buffer_end))
buffer = ""
buffer_start = None
if start < len(text):
raw_sentence = text[start:]
if retain_format:
merged_sentences.append((raw_sentence, start, len(text)))
elif sentence := raw_sentence.strip():
merged_sentences.append((sentence, start, len(text)))

return merged_sentences

Expand All @@ -45,6 +46,7 @@ def _split_sentences(text: str, min_sentence_len: int) -> list[tuple[str, int, i
class _TokenizerOptions:
min_sentence_len: int
stream_context_len: int
retain_format: bool


class SentenceTokenizer(tokenizer.SentenceTokenizer):
Expand All @@ -53,20 +55,30 @@ def __init__(
*,
min_sentence_len: int = 20,
stream_context_len: int = 10,
retain_format: bool = False,
) -> None:
self._config = _TokenizerOptions(
min_sentence_len=min_sentence_len, stream_context_len=stream_context_len
min_sentence_len=min_sentence_len,
stream_context_len=stream_context_len,
retain_format=retain_format,
)

def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
return [
tok[0] for tok in _split_sentences(text, min_sentence_len=self._config.min_sentence_len)
tok[0]
for tok in _split_sentences(
text,
min_sentence_len=self._config.min_sentence_len,
retain_format=self._config.retain_format,
)
]

def stream(self, *, language: str | None = None) -> tokenizer.SentenceStream:
return token_stream.BufferedSentenceStream(
tokenizer=functools.partial(
_split_sentences, min_sentence_len=self._config.min_sentence_len
_split_sentences,
min_sentence_len=self._config.min_sentence_len,
retain_format=self._config.retain_format,
),
min_token_len=self._config.min_sentence_len,
min_ctx_len=self._config.stream_context_len,
Expand Down
11 changes: 10 additions & 1 deletion livekit-agents/livekit/agents/tts/fallback_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .. import utils
from .._exceptions import APIConnectionError
from ..log import logger
from ..types import DEFAULT_API_CONNECT_OPTIONS, APIConnectOptions
from ..types import DEFAULT_API_CONNECT_OPTIONS, USERDATA_TIMED_TRANSCRIPT, APIConnectOptions
from ..utils import aio
from .tts import (
TTS,
Expand Down Expand Up @@ -83,6 +83,7 @@ def __init__(
super().__init__(
capabilities=TTSCapabilities(
streaming=all(t.capabilities.streaming for t in tts),
aligned_transcript=all(t.capabilities.aligned_transcript for t in tts),
),
sample_rate=sample_rate,
num_channels=num_channels,
Expand Down Expand Up @@ -202,6 +203,9 @@ async def _run(self, output_emitter: AudioEmitter) -> None:
try:
resampler = tts_status.resampler
async for synthesized_audio in self._try_synthesize(tts=tts, recovering=False):
if texts := synthesized_audio.frame.userdata.get(USERDATA_TIMED_TRANSCRIPT):
output_emitter.push_timed_transcript(texts)

if resampler is not None:
for rf in resampler.push(synthesized_audio.frame):
output_emitter.push(rf.data.tobytes())
Expand Down Expand Up @@ -341,6 +345,11 @@ async def _forward_input_task() -> None:
),
recovering=False,
):
if texts := synthesized_audio.frame.userdata.get(
USERDATA_TIMED_TRANSCRIPT
):
output_emitter.push_timed_transcript(texts)

if resampler is not None:
for resampled_frame in resampler.push(synthesized_audio.frame):
output_emitter.push(resampled_frame.data.tobytes())
Expand Down
17 changes: 12 additions & 5 deletions livekit-agents/livekit/agents/tts/stream_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def __init__(
sentence_tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
) -> None:
super().__init__(
capabilities=TTSCapabilities(
streaming=True,
),
capabilities=TTSCapabilities(streaming=True, aligned_transcript=True),
sample_rate=tts.sample_rate,
num_channels=tts.num_channels,
)
self._wrapped_tts = tts
self._sentence_tokenizer = sentence_tokenizer or tokenize.blingfire.SentenceTokenizer()
self._sentence_tokenizer = sentence_tokenizer or tokenize.blingfire.SentenceTokenizer(
retain_format=True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually we were not using retain_format for the StreamAdapter before. Since it is only used to generate a sentence.

In the PR I did, I was actually keeping the basic.SentenceTokenizer inside the transcription synchronization code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was used in agent's tts_node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe I added it in this pr, we need to format if we use the timed transcript from stream adapter.

Copy link
Member

@theomonnom theomonnom Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, the synchronizer also needs the exact same formatting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it can be different. They process the sentences separately.

Copy link
Member

@theomonnom theomonnom Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but I thought the aligned transcripts returned by the TTSs were not including new lines/special characters. So I assumed retain_format was not needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using the StreamAdapter with OpenAI, the transcription_node is coming from the llm_node right?
In this case I really don't think we should wait for the TTS? Since we have the opt-in flag use_tts_aligned_transcript

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use tts aligned transcript enabled, the input of the transcription node is from tts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdym for we shouldn't wait for the tts when using steam adapter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense, so by default, even if we use the StreamAdapter, it'll use the llm output for the transcription_node

)

@self._wrapped_tts.on("metrics_collected")
def _forward_metrics(*args: Any, **kwargs: Any) -> None:
Expand Down Expand Up @@ -91,12 +91,19 @@ async def _forward_input() -> None:
self._sent_stream.end_input()

async def _synthesize() -> None:
from ..voice.io import TimedString

duration = 0.0
async for ev in self._sent_stream:
output_emitter.push_timed_transcript(
TimedString(text=ev.token, start_time=duration)
)
async with self._tts._wrapped_tts.synthesize(
ev.token, conn_options=self._wrapped_tts_conn_options
ev.token.strip(), conn_options=self._wrapped_tts_conn_options
) as tts_stream:
async for audio in tts_stream:
output_emitter.push(audio.frame.data.tobytes())
duration += audio.frame.duration
output_emitter.flush()

tasks = [
Expand Down
42 changes: 38 additions & 4 deletions livekit-agents/livekit/agents/tts/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import AsyncIterable, AsyncIterator
from dataclasses import dataclass
from types import TracebackType
from typing import Generic, Literal, TypeVar, Union
from typing import TYPE_CHECKING, Generic, Literal, TypeVar, Union

from pydantic import BaseModel, ConfigDict, Field

Expand All @@ -17,9 +17,12 @@
from .._exceptions import APIError
from ..log import logger
from ..metrics import TTSMetrics
from ..types import DEFAULT_API_CONNECT_OPTIONS, APIConnectOptions
from ..types import DEFAULT_API_CONNECT_OPTIONS, USERDATA_TIMED_TRANSCRIPT, APIConnectOptions
from ..utils import aio, audio, codecs, log_exceptions

if TYPE_CHECKING:
from ..voice.io import TimedString

lk_dump_tts = int(os.getenv("LK_DUMP_TTS", 0))


Expand All @@ -41,6 +44,8 @@ class SynthesizedAudio:
class TTSCapabilities:
streaming: bool
"""Whether this TTS supports streaming (generally using websockets)"""
aligned_transcript: bool = False
"""Whether this TTS supports aligned transcripts with word timestamps"""


class TTSError(BaseModel):
Expand Down Expand Up @@ -563,12 +568,15 @@ def initialize(
self._num_channels = num_channels
self._streaming = stream

from ..voice.io import TimedString

self._write_ch = aio.Chan[
Union[
bytes,
AudioEmitter._FlushSegment,
AudioEmitter._StartSegment,
AudioEmitter._EndSegment,
TimedString,
]
]()
self._main_atask = asyncio.create_task(self._main_task(), name="AudioEmitter._main_task")
Expand Down Expand Up @@ -622,6 +630,19 @@ def push(self, data: bytes) -> None:

self._write_ch.send_nowait(data)

def push_timed_transcript(self, delta_text: TimedString | list[TimedString]) -> None:
if not self._started:
raise RuntimeError("AudioEmitter isn't started")

if self._write_ch.closed:
return

if isinstance(delta_text, list):
for text in delta_text:
self._write_ch.send_nowait(text)
else:
self._write_ch.send_nowait(delta_text)

def flush(self) -> None:
if not self._started:
raise RuntimeError("AudioEmitter isn't started")
Expand Down Expand Up @@ -655,14 +676,17 @@ async def aclose(self) -> None:

@log_exceptions(logger=logger)
async def _main_task(self) -> None:
from ..voice.io import TimedString

audio_decoder: codecs.AudioStreamDecoder | None = None
decode_atask: asyncio.Task | None = None
segment_ctx: AudioEmitter._SegmentContext | None = None
last_frame: rtc.AudioFrame | None = None
debug_frames: list[rtc.AudioFrame] = []
timed_transcripts: list[TimedString] = []

def _emit_frame(frame: rtc.AudioFrame | None = None, *, is_final: bool = False) -> None:
nonlocal last_frame, segment_ctx
nonlocal last_frame, segment_ctx, timed_transcripts
assert segment_ctx is not None

if last_frame is None:
Expand All @@ -686,6 +710,7 @@ def _emit_frame(frame: rtc.AudioFrame | None = None, *, is_final: bool = False)
if lk_dump_tts:
debug_frames.append(frame)

frame.userdata[USERDATA_TIMED_TRANSCRIPT] = timed_transcripts
self._dst_ch.send_nowait(
SynthesizedAudio(
frame=frame,
Expand All @@ -694,9 +719,11 @@ def _emit_frame(frame: rtc.AudioFrame | None = None, *, is_final: bool = False)
is_final=True,
)
)
timed_transcripts = []
return

if last_frame is not None:
last_frame.userdata[USERDATA_TIMED_TRANSCRIPT] = timed_transcripts
self._dst_ch.send_nowait(
SynthesizedAudio(
frame=last_frame,
Expand All @@ -705,6 +732,7 @@ def _emit_frame(frame: rtc.AudioFrame | None = None, *, is_final: bool = False)
is_final=is_final,
)
)
timed_transcripts = []
segment_ctx.audio_duration += last_frame.duration
self._audio_durations[-1] += last_frame.duration

Expand All @@ -714,12 +742,13 @@ def _emit_frame(frame: rtc.AudioFrame | None = None, *, is_final: bool = False)
last_frame = frame

def _flush_frame() -> None:
nonlocal last_frame, segment_ctx
nonlocal last_frame, segment_ctx, timed_transcripts
assert segment_ctx is not None

if last_frame is None:
return

last_frame.userdata[USERDATA_TIMED_TRANSCRIPT] = timed_transcripts
self._dst_ch.send_nowait(
SynthesizedAudio(
frame=last_frame,
Expand All @@ -728,6 +757,7 @@ def _flush_frame() -> None:
is_final=False, # flush isn't final
)
)
timed_transcripts = []
segment_ctx.audio_duration += last_frame.duration
self._audio_durations[-1] += last_frame.duration

Expand Down Expand Up @@ -780,6 +810,10 @@ async def _decode_task() -> None:
audio_byte_stream: audio.AudioByteStream | None = None
try:
async for data in self._write_ch:
if isinstance(data, TimedString):
timed_transcripts.append(data)
continue

if isinstance(data, AudioEmitter._StartSegment):
if segment_ctx:
raise RuntimeError(
Expand Down
5 changes: 5 additions & 0 deletions livekit-agents/livekit/agents/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
TOPIC_CHAT = "lk.chat"
TOPIC_TRANSCRIPTION = "lk.transcription"

USERDATA_TIMED_TRANSCRIPT = "lk.timed_transcripts"
"""
The key for the timed transcripts in the audio frame userdata.
"""


_T = TypeVar("_T")

Expand Down
Loading