Skip to content

Commit 1eaae1a

Browse files
committed
add preconnect audio buffer API to LocalAudioTrack
Adds start_preconnect_buffer/stop_preconnect_buffer/send_preconnect_buffer to LocalAudioTrack, backed by a zero-allocation circular bytearray ring buffer. Sends buffered audio via byte stream data channels using the existing lk.agent.pre-connect-audio-buffer protocol.
1 parent 9e0436e commit 1eaae1a

6 files changed

Lines changed: 212 additions & 8 deletions

File tree

livekit-rtc/livekit/rtc/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
)
4040
from ._proto.video_frame_pb2 import VideoBufferType, VideoCodec, VideoRotation
4141
from .audio_frame import AudioFrame
42+
from .audio_ring_buffer import AudioRingBuffer
4243
from .audio_source import AudioSource
4344
from .audio_stream import AudioFrameEvent, AudioStream, NoiseCancellationOptions
4445
from .audio_filter import AudioFilter
@@ -137,6 +138,7 @@
137138
"VideoRotation",
138139
"stats",
139140
"AudioFrame",
141+
"AudioRingBuffer",
140142
"AudioSource",
141143
"AudioStream",
142144
"NoiseCancellationOptions",
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
from __future__ import annotations
2+
3+
import threading
4+
5+
from .audio_frame import AudioFrame
6+
7+
8+
class AudioRingBuffer:
9+
"""Pre-allocated circular buffer for raw PCM audio data.
10+
11+
Stores int16 PCM samples in a fixed-size bytearray. Push is zero-allocation.
12+
"""
13+
14+
def __init__(self, max_duration: float, sample_rate: int, num_channels: int) -> None:
15+
self._sample_rate = sample_rate
16+
self._num_channels = num_channels
17+
self._bytes_per_second = sample_rate * num_channels * 2 # int16
18+
self._max_bytes = int(max_duration * self._bytes_per_second)
19+
if self._max_bytes <= 0:
20+
raise ValueError("max_duration must be positive")
21+
22+
self._buf = bytearray(self._max_bytes)
23+
self._write_pos = 0
24+
self._size = 0
25+
self._lock = threading.Lock()
26+
27+
@property
28+
def duration(self) -> float:
29+
with self._lock:
30+
return self._size / self._bytes_per_second
31+
32+
@property
33+
def max_duration(self) -> float:
34+
return self._max_bytes / self._bytes_per_second
35+
36+
def push(self, frame: AudioFrame) -> None:
37+
data = frame.data.cast("b")
38+
n = len(data)
39+
if n == 0:
40+
return
41+
42+
with self._lock:
43+
if n >= self._max_bytes:
44+
# frame larger than buffer — keep only the tail
45+
self._buf[:] = data[n - self._max_bytes :]
46+
self._write_pos = 0
47+
self._size = self._max_bytes
48+
return
49+
50+
end = self._write_pos + n
51+
if end <= self._max_bytes:
52+
self._buf[self._write_pos : end] = data
53+
else:
54+
first = self._max_bytes - self._write_pos
55+
self._buf[self._write_pos : self._max_bytes] = data[:first]
56+
self._buf[: n - first] = data[first:]
57+
58+
self._write_pos = end % self._max_bytes
59+
self._size = min(self._size + n, self._max_bytes)
60+
61+
def capture(self) -> bytes:
62+
"""Snapshot the buffer contents and reset. Returns raw PCM bytes."""
63+
with self._lock:
64+
if self._size == 0:
65+
return b""
66+
67+
read_pos = (self._write_pos - self._size) % self._max_bytes
68+
if read_pos + self._size <= self._max_bytes:
69+
data = bytes(self._buf[read_pos : read_pos + self._size])
70+
else:
71+
first = self._max_bytes - read_pos
72+
data = bytes(self._buf[read_pos:]) + bytes(self._buf[: self._size - first])
73+
74+
self._write_pos = 0
75+
self._size = 0
76+
return data
77+
78+
def clear(self) -> None:
79+
with self._lock:
80+
self._write_pos = 0
81+
self._size = 0

livekit-rtc/livekit/rtc/audio_source.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
import time
1818
import asyncio
19+
from typing import TYPE_CHECKING
1920

2021
from ._ffi_client import FfiHandle, FfiClient
2122
from ._proto import audio_frame_pb2 as proto_audio_frame
2223
from ._proto import ffi_pb2 as proto_ffi
2324
from .audio_frame import AudioFrame
2425

26+
if TYPE_CHECKING:
27+
from .audio_ring_buffer import AudioRingBuffer
28+
2529

2630
class AudioSource:
2731
"""
@@ -69,6 +73,7 @@ def __init__(
6973
self._q_size = 0.0
7074
self._join_handle: asyncio.TimerHandle | None = None
7175
self._join_fut: asyncio.Future[None] | None = None
76+
self._preconnect_buffer: AudioRingBuffer | None = None
7277

7378
@property
7479
def sample_rate(self) -> int:
@@ -119,6 +124,9 @@ async def capture_frame(self, frame: AudioFrame) -> None:
119124
if frame.samples_per_channel == 0 or self._ffi_handle.disposed:
120125
return
121126

127+
if self._preconnect_buffer is not None:
128+
self._preconnect_buffer.push(frame)
129+
122130
now = time.monotonic()
123131
elapsed = 0.0 if self._last_capture == 0.0 else now - self._last_capture
124132
self._q_size += frame.samples_per_channel / self.sample_rate - elapsed
@@ -162,6 +170,9 @@ async def wait_for_playout(self) -> None:
162170

163171
await asyncio.shield(self._join_fut)
164172

173+
def _set_preconnect_buffer(self, buf: AudioRingBuffer | None) -> None:
174+
self._preconnect_buffer = buf
175+
165176
def _release_waiter(self) -> None:
166177
if self._join_fut is None:
167178
return # could be None when clear_queue is called

livekit-rtc/livekit/rtc/participant.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import os
2121
import mimetypes
2222
import aiofiles
23-
from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast, TypeVar
23+
from typing import TYPE_CHECKING, List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast, TypeVar
2424
from abc import abstractmethod, ABC
2525

2626
from ._ffi_client import FfiClient, FfiHandle
@@ -36,7 +36,7 @@
3636
ParticipantTrackPermission,
3737
)
3838
from ._utils import BroadcastQueue
39-
from .track import LocalTrack
39+
from .track import LocalAudioTrack, LocalTrack
4040
from .track_publication import (
4141
LocalTrackPublication,
4242
RemoteTrackPublication,
@@ -57,6 +57,9 @@
5757
from .data_track import LocalDataTrack
5858
from ._proto import data_track_pb2 as proto_data_track
5959

60+
if TYPE_CHECKING:
61+
from .room import Room
62+
6063

6164
class PublishTrackError(Exception):
6265
def __init__(self, message: str) -> None:
@@ -189,6 +192,7 @@ def __init__(
189192
self._room_queue = room_queue
190193
self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore
191194
self._rpc_handlers: Dict[str, RpcHandler] = {}
195+
self._room: Room | None = None
192196

193197
@property
194198
def track_publications(self) -> Mapping[str, LocalTrackPublication]:
@@ -728,14 +732,20 @@ async def publish_data_track(
728732
return LocalDataTrack(cb.publish_data_track.track)
729733

730734
async def publish_track(
731-
self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions()
735+
self,
736+
track: LocalTrack,
737+
options: TrackPublishOptions = TrackPublishOptions(),
738+
*,
739+
preconnect_buffer_auto_send_to: str | None = None,
732740
) -> LocalTrackPublication:
733741
"""
734742
Publish a local track to the room.
735743
736744
Args:
737745
track (LocalTrack): The track to publish.
738746
options (TrackPublishOptions, optional): Options for publishing the track.
747+
preconnect_buffer_auto_send_to (str, optional): If set, automatically sends the
748+
preconnect buffer when a participant with this identity becomes active.
739749
740750
Returns:
741751
LocalTrackPublication: The publication of the published track.
@@ -763,11 +773,48 @@ async def publish_track(
763773
track._info.sid = track_publication.sid
764774
self._track_publications[track_publication.sid] = track_publication
765775

776+
if isinstance(track, LocalAudioTrack):
777+
track._participant = self
778+
track._publication_sid = track_publication.sid
779+
780+
if preconnect_buffer_auto_send_to:
781+
if track.has_preconnect_buffer:
782+
self._setup_preconnect_auto_send(
783+
track, preconnect_buffer_auto_send_to
784+
)
785+
else:
786+
logger.warning(
787+
"preconnect_buffer_auto_send_to set but no preconnect buffer "
788+
"is active — call track.start_preconnect_buffer() first"
789+
)
790+
766791
queue.task_done()
767792
return track_publication
768793
finally:
769794
self._room_queue.unsubscribe(queue)
770795

796+
def _setup_preconnect_auto_send(
797+
self, track: LocalAudioTrack, target_identity: str
798+
) -> None:
799+
room = self._room
800+
if room is None:
801+
return
802+
803+
async def _on_participant_active(participant: RemoteParticipant) -> None:
804+
if participant.identity != target_identity:
805+
return
806+
if not track.has_preconnect_buffer:
807+
return
808+
room.off("participant_active", _on_participant_active)
809+
try:
810+
await track.send_preconnect_buffer(
811+
destination_identity=participant.identity
812+
)
813+
except Exception:
814+
logger.exception("failed to auto-send preconnect buffer")
815+
816+
room.on("participant_active", _on_participant_active)
817+
771818
async def unpublish_track(self, track_sid: str) -> None:
772819
"""
773820
Unpublish a track from the room.

livekit-rtc/livekit/rtc/room.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ def on_participant_connected(participant):
525525
self._local_participant = LocalParticipant(
526526
self._room_queue, cb.connect.result.local_participant
527527
)
528+
self._local_participant._room = self
528529

529530
for pt in cb.connect.result.participants:
530531
rp = self._create_remote_participant(pt.participant)

livekit-rtc/livekit/rtc/track.py

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,24 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from __future__ import annotations
16+
17+
import asyncio
1518
from typing import TYPE_CHECKING, List, Union
19+
1620
from ._ffi_client import FfiHandle, FfiClient
1721
from ._proto import ffi_pb2 as proto_ffi
1822
from ._proto import track_pb2 as proto_track
1923
from ._proto import stats_pb2 as proto_stats
2024

2125
if TYPE_CHECKING:
26+
from .audio_ring_buffer import AudioRingBuffer
2227
from .audio_source import AudioSource
28+
from .participant import LocalParticipant
2329
from .video_source import VideoSource
2430

31+
PRE_CONNECT_AUDIO_BUFFER_TOPIC = "lk.agent.pre-connect-audio-buffer"
32+
2533

2634
class Track:
2735
def __init__(self, owned_info: proto_track.OwnedTrack):
@@ -68,26 +76,80 @@ async def get_stats(self) -> List[proto_stats.RtcStats]:
6876

6977

7078
class LocalAudioTrack(Track):
71-
def __init__(self, info: proto_track.OwnedTrack):
79+
def __init__(self, info: proto_track.OwnedTrack, source: AudioSource | None = None):
7280
super().__init__(info)
81+
self._source = source
82+
self._preconnect_buffer: AudioRingBuffer | None = None
83+
self._participant: LocalParticipant | None = None
84+
self._publication_sid: str | None = None
85+
self._send_lock = asyncio.Lock()
7386

7487
@staticmethod
75-
def create_audio_track(name: str, source: "AudioSource") -> "LocalAudioTrack":
88+
def create_audio_track(name: str, source: AudioSource) -> LocalAudioTrack:
7689
req = proto_ffi.FfiRequest()
7790
req.create_audio_track.name = name
7891
req.create_audio_track.source_handle = source._ffi_handle.handle
7992

8093
resp = FfiClient.instance.request(req)
81-
return LocalAudioTrack(resp.create_audio_track.track)
94+
return LocalAudioTrack(resp.create_audio_track.track, source=source)
8295

83-
def mute(self):
96+
@property
97+
def has_preconnect_buffer(self) -> bool:
98+
return self._preconnect_buffer is not None
99+
100+
def start_preconnect_buffer(self, *, max_duration: float = 10.0) -> None:
101+
if self._source is None:
102+
raise RuntimeError("track has no audio source")
103+
104+
from .audio_ring_buffer import AudioRingBuffer
105+
106+
self._preconnect_buffer = AudioRingBuffer(
107+
max_duration=max_duration,
108+
sample_rate=self._source.sample_rate,
109+
num_channels=self._source.num_channels,
110+
)
111+
self._source._set_preconnect_buffer(self._preconnect_buffer)
112+
113+
def stop_preconnect_buffer(self) -> None:
114+
if self._source is not None:
115+
self._source._set_preconnect_buffer(None)
116+
self._preconnect_buffer = None
117+
118+
async def send_preconnect_buffer(self, *, destination_identity: str) -> None:
119+
if self._participant is None:
120+
raise RuntimeError("track is not published")
121+
if self._preconnect_buffer is None:
122+
raise RuntimeError("preconnect buffer is not active")
123+
124+
async with self._send_lock:
125+
data = self._preconnect_buffer.capture()
126+
if not data:
127+
return
128+
129+
assert self._source is not None
130+
writer = await self._participant.stream_bytes(
131+
"preconnect-buffer",
132+
topic=PRE_CONNECT_AUDIO_BUFFER_TOPIC,
133+
mime_type="application/octet-stream",
134+
destination_identities=[destination_identity],
135+
attributes={
136+
"trackId": self._publication_sid or self.sid,
137+
"sampleRate": str(self._source.sample_rate),
138+
"channels": str(self._source.num_channels),
139+
},
140+
)
141+
142+
await writer.write(data)
143+
await writer.aclose()
144+
145+
def mute(self) -> None:
84146
req = proto_ffi.FfiRequest()
85147
req.local_track_mute.track_handle = self._ffi_handle.handle
86148
req.local_track_mute.mute = True
87149
FfiClient.instance.request(req)
88150
self._info.muted = True
89151

90-
def unmute(self):
152+
def unmute(self) -> None:
91153
req = proto_ffi.FfiRequest()
92154
req.local_track_mute.track_handle = self._ffi_handle.handle
93155
req.local_track_mute.mute = False

0 commit comments

Comments
 (0)