Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def make_session_report(self, session: AgentSession | None = None) -> SessionRep

sr = SessionReport(
enable_recording=session._enable_recording,
include_internal_events=session._include_internal_events,
job_id=self.job.id,
room_id=self.job.room.sid,
room=self.job.room.name,
Expand All @@ -273,6 +274,7 @@ def make_session_report(self, session: AgentSession | None = None) -> SessionRep
audio_recording_started_at=recorder_io.recording_started_at if recorder_io else None,
started_at=session._started_at,
events=session._recorded_events,
internal_events=session._recorded_internal_events,
chat_history=session.history.copy(),
)

Expand Down
5 changes: 4 additions & 1 deletion livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

@dataclass
class InputSpeechStartedEvent:
pass
type: Literal["input_speech_started"] = "input_speech_started"


@dataclass
class InputSpeechStoppedEvent:
user_transcription_enabled: bool
type: Literal["input_speech_stopped"] = "input_speech_stopped"


@dataclass
Expand All @@ -32,6 +33,7 @@ class MessageGeneration:
text_stream: AsyncIterable[str] # could be io.TimedString
audio_stream: AsyncIterable[rtc.AudioFrame]
modalities: Awaitable[list[Literal["text", "audio"]]]
type: Literal["message_generation"] = "message_generation"


@dataclass
Expand All @@ -42,6 +44,7 @@ class GenerationCreatedEvent:
"""True if the message was generated by the user using generate_reply()"""
response_id: str | None = None
"""The response ID associated with this generation, used for metrics attribution"""
type: Literal["generation_created"] = "generation_created"


class RealtimeModelError(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions livekit-agents/livekit/agents/tts/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class SynthesizedAudio:
"""Segment ID, each segment is separated by a flush (streaming only)"""
delta_text: str = ""
"""Current segment of the synthesized audio (streaming only)"""
type: Literal["synthesized_audio"] = "synthesized_audio"


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@


class FlushSentinel:
pass
type: Literal["flush_sentinel"] = "flush_sentinel"


class NotGiven:
Expand Down
11 changes: 11 additions & 0 deletions livekit-agents/livekit/agents/voice/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ async def _forward_input() -> None:
forward_task = asyncio.create_task(_forward_input())
try:
async for event in stream:
if activity.session._include_internal_events:
activity.session.collect(event)
yield event
finally:
await utils.aio.cancel_and_wait(forward_task)
Expand All @@ -416,6 +418,8 @@ async def llm_node(
chat_ctx=chat_ctx, tools=tools, tool_choice=tool_choice, conn_options=conn_options
) as stream:
async for chunk in stream:
if activity.session._include_internal_events:
activity.session.collect(chunk)
yield chunk

@staticmethod
Expand Down Expand Up @@ -446,6 +450,8 @@ async def _forward_input() -> None:
forward_task = asyncio.create_task(_forward_input())
try:
async for ev in stream:
if activity.session._include_internal_events:
activity.session.collect(ev)
yield ev.frame
finally:
await utils.aio.cancel_and_wait(forward_task)
Expand All @@ -455,7 +461,10 @@ async def transcription_node(
agent: Agent, text: AsyncIterable[str | TimedString], model_settings: ModelSettings
) -> AsyncGenerator[str | TimedString, None]:
"""Default implementation for `Agent.transcription_node`"""
activity = agent._get_activity_or_raise()
async for delta in text:
if activity.session._include_internal_events:
activity.session.collect(delta)
yield delta

@staticmethod
Expand All @@ -469,6 +478,8 @@ async def realtime_audio_output_node(
)

async for frame in audio:
if activity.session._include_internal_events:
activity.session.collect(frame)
yield frame

@property
Expand Down
17 changes: 16 additions & 1 deletion livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,8 @@ def _on_error(

self._session._on_error(error)

def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None:
def _on_input_speech_started(self, ev: llm.InputSpeechStartedEvent) -> None:
self._session.collect(ev)
if self.vad is None:
self._session._update_user_state("speaking")

Expand All @@ -1114,6 +1115,7 @@ def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None:
)

def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None:
self._session.collect(ev)
if self.vad is None:
self._session._update_user_state("listening")

Expand All @@ -1123,6 +1125,7 @@ def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None:
)

def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionCompleted) -> None:
self._session.collect(ev)
self._session._user_input_transcribed(
UserInputTranscribedEvent(transcript=ev.transcript, is_final=ev.is_final)
)
Expand All @@ -1135,6 +1138,8 @@ def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionComp
self._session._conversation_item_added(msg)

def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None:
self._session.collect(ev)

if ev.user_initiated:
# user_initiated generations are directly handled inside _realtime_reply_task
return
Expand Down Expand Up @@ -1207,6 +1212,9 @@ def _interrupt_by_audio_activity(self) -> None:
# region recognition hooks

def on_start_of_speech(self, ev: vad.VADEvent | None) -> None:
if ev:
self._session.collect(ev)

self._session._update_user_state("speaking")
self._user_silence_event.clear()

Expand All @@ -1218,6 +1226,7 @@ def on_start_of_speech(self, ev: vad.VADEvent | None) -> None:
def on_end_of_speech(self, ev: vad.VADEvent | None) -> None:
speech_end_time = time.time()
if ev:
self._session.collect(ev)
speech_end_time = speech_end_time - ev.silence_duration
self._session._update_user_state(
"listening",
Expand All @@ -1233,6 +1242,9 @@ def on_end_of_speech(self, ev: vad.VADEvent | None) -> None:
self._start_false_interruption_timer(timeout)

def on_vad_inference_done(self, ev: vad.VADEvent) -> None:
if ev:
self._session.collect(ev)

if self._turn_detection in ("manual", "realtime_llm"):
# ignore vad inference done event if turn_detection is manual or realtime_llm
return
Expand All @@ -1250,6 +1262,8 @@ def on_vad_inference_done(self, ev: vad.VADEvent) -> None:
self._user_silence_event.set()

def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -> None:
self._session.collect(ev)

if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription:
# skip stt transcription if user_transcription is enabled on the realtime model
return
Expand Down Expand Up @@ -1278,6 +1292,7 @@ def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -
self._start_false_interruption_timer(timeout)

def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = None) -> None:
self._session.collect(ev)
if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription:
# skip stt transcription if user_transcription is enabled on the realtime model
return
Expand Down
19 changes: 17 additions & 2 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
CloseReason,
ConversationItemAddedEvent,
EventTypes,
InternalEvent,
UserInputTranscribedEvent,
UserState,
UserStateChangedEvent,
Expand Down Expand Up @@ -357,16 +358,22 @@ def __init__(
self._session_ctx_token: Token[otel_context.Context] | None = None

self._recorded_events: list[AgentEvent] = []
self._recorded_internal_events: list[InternalEvent] = []
self._enable_recording: bool = False
self._include_internal_events: bool = False
self._started_at: float | None = None

# ivr activity
self._ivr_activity: IVRActivity | None = None

def emit(self, event: EventTypes, arg: AgentEvent) -> None: # type: ignore
self._recorded_events.append(arg)
self._recorded_internal_events.append(arg)
super().emit(event, arg)

def collect(self, event: InternalEvent) -> None:
self._recorded_internal_events.append(event)

@property
def userdata(self) -> Userdata_T:
if self._userdata is None:
Expand Down Expand Up @@ -433,7 +440,7 @@ def run(self, *, user_input: str, output_type: type[Run_T] | None = None) -> Run
if self._global_run_state is not None and not self._global_run_state.done():
raise RuntimeError("nested runs are not supported")

run_state = RunResult(user_input=user_input, output_type=output_type)
run_state = RunResult(agent_session=self, user_input=user_input, output_type=output_type)
self._global_run_state = run_state
self.generate_reply(user_input=user_input)
return run_state
Expand All @@ -450,6 +457,7 @@ async def start(
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
record: bool = True,
include_internal_events: bool = False,
) -> RunResult: ...

@overload
Expand All @@ -464,6 +472,7 @@ async def start(
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
record: bool = True,
include_internal_events: bool = False,
) -> None: ...

async def start(
Expand All @@ -477,6 +486,7 @@ async def start(
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
record: NotGivenOr[bool] = NOT_GIVEN,
include_internal_events: bool = False,
) -> RunResult | None:
"""Start the voice agent.

Expand All @@ -489,6 +499,7 @@ async def start(
room_input_options: Options for the room input
room_output_options: Options for the room output
record: Whether to record the audio
include_internal_events: Whether to include internal events in the session report
"""
async with self._lock:
if self._started:
Expand All @@ -504,6 +515,7 @@ async def start(
record = job_ctx.job.enable_recording

self._enable_recording = record
self._include_internal_events = include_internal_events

if self._enable_recording:
job_ctx.init_recording()
Expand Down Expand Up @@ -624,12 +636,15 @@ async def start(
)
self._job_context_cb_registered = True

if self._output.audio:
self._output.audio.on("playback_finished", self.collect)

run_state: RunResult | None = None
if capture_run:
if self._global_run_state is not None and not self._global_run_state.done():
raise RuntimeError("nested runs are not supported")

run_state = RunResult(output_type=None)
run_state = RunResult(agent_session=self, output_type=None)
self._global_run_state = run_state

# it is ok to await it directly, there is no previous task to drain
Expand Down
39 changes: 37 additions & 2 deletions livekit-agents/livekit/agents/voice/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,32 @@
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator
from typing_extensions import Self

from livekit.rtc import AudioFrame

from ..llm import (
LLM,
ChatChunk,
ChatMessage,
FunctionCall,
FunctionCallOutput,
GenerationCreatedEvent,
InputSpeechStartedEvent,
InputSpeechStoppedEvent,
LLMError,
RealtimeModel,
RealtimeModelError,
)
from ..log import logger
from ..metrics import AgentMetrics
from ..stt import STT, STTError
from ..tts import TTS, TTSError
from ..stt import STT, SpeechEvent, STTError
from ..tts import TTS, SynthesizedAudio, TTSError
from ..types import FlushSentinel, TimedString
from ..vad import VADEvent
from .io import PlaybackFinishedEvent
from .room_io.types import TextInputEvent
from .run_result import (
RunEvent,
)
from .speech_handle import SpeechHandle

if TYPE_CHECKING:
Expand Down Expand Up @@ -240,3 +253,25 @@ class CloseEvent(BaseModel):
],
Field(discriminator="type"),
]


InternalEvent = Annotated[
Union[
AgentEvent,
VADEvent,
RunEvent,
SpeechEvent,
InputSpeechStartedEvent,
InputSpeechStoppedEvent,
GenerationCreatedEvent,
PlaybackFinishedEvent,
TextInputEvent,
SynthesizedAudio,
FlushSentinel,
ChatChunk,
str,
TimedString,
AudioFrame,
],
Field(discriminator="type"),
]
1 change: 1 addition & 0 deletions livekit-agents/livekit/agents/voice/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class PlaybackFinishedEvent:
synchronized_transcript: str | None = None
"""Transcript synced with playback; may be partial if the audio was interrupted
When None, the transcript is not synchronized with the playback"""
type: Literal["playback_finished"] = "playback_finished"


@dataclass
Expand Down
Loading
Loading