Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/next-release/changeset-4f6d6470.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-agents": patch
---

fix aec (#1821)
85 changes: 45 additions & 40 deletions livekit-agents/livekit/agents/voice/chat_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ def __init__(
auto_gain_control=True,
)

self._render_ring_buffer = np.empty((0,), dtype=np.int16)
self._render_ring_lock = threading.Lock()
self._output_delay = 0.0
self._input_delay = 0.0

self._main_atask: asyncio.Task | None = None

Expand Down Expand Up @@ -247,7 +247,7 @@ def _update_microphone(self, *, enable: bool) -> None:
channels=1,
device=input_device,
samplerate=24000,
blocksize=240,
blocksize=2400,
)
self._input_stream.start()
self._agent.input.audio = _AudioInput(self)
Expand Down Expand Up @@ -284,7 +284,10 @@ def _update_text_output(self, *, enable: bool) -> None:
self._agent.output.transcription = None
self._text_input_buf = []

def _sd_output_callback(self, outdata: np.ndarray, frames: int, *_) -> None:
def _sd_output_callback(self, outdata: np.ndarray, frames: int, time, *_) -> None:
self._output_delay = time.outputBufferDacTime - time.currentTime

FRAME_SAMPLES = 240
with self._audio_sink.lock:
bytes_needed = frames * 2
if len(self._audio_sink.audio_buffer) < bytes_needed:
Expand All @@ -294,51 +297,53 @@ def _sd_output_callback(self, outdata: np.ndarray, frames: int, *_) -> None:
dtype=np.int16,
count=available_bytes // 2,
)

outdata[available_bytes // 2 :, 0] = 0
del self._audio_sink.audio_buffer[:available_bytes]
else:
chunk = self._audio_sink.audio_buffer[:bytes_needed]
outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frames)
del self._audio_sink.audio_buffer[:bytes_needed]

with self._render_ring_lock:
render_chunk = outdata[:, 0].copy()
self._render_ring_buffer = np.concatenate((self._render_ring_buffer, render_chunk))
if self._render_ring_buffer.size > AEC_RING_BUFFER_SIZE:
self._render_ring_buffer = self._render_ring_buffer[-AEC_RING_BUFFER_SIZE:]

def _sd_input_callback(self, indata: np.ndarray, frame_count: int, *_) -> None:
CHUNK_SAMPLES = 240
with self._render_ring_lock:
if self._render_ring_buffer.size >= CHUNK_SAMPLES:
render_chunk = self._render_ring_buffer[:CHUNK_SAMPLES].copy()
self._render_ring_buffer = self._render_ring_buffer[CHUNK_SAMPLES:]
else:
render_chunk = np.zeros((CHUNK_SAMPLES,), dtype=np.int16)

capture_frame_for_aec = rtc.AudioFrame(
data=indata.tobytes(),
samples_per_channel=frame_count,
sample_rate=24000,
num_channels=1,
)
render_frame_for_aec = rtc.AudioFrame(
data=render_chunk.tobytes(),
samples_per_channel=CHUNK_SAMPLES,
sample_rate=24000,
num_channels=1,
)

self._apm.process_reverse_stream(render_frame_for_aec)
self._apm.process_stream(capture_frame_for_aec)
num_chunks = frames // FRAME_SAMPLES
for i in range(num_chunks):
start = i * FRAME_SAMPLES
end = start + FRAME_SAMPLES
render_chunk = outdata[start:end, 0]
render_frame_for_aec = rtc.AudioFrame(
data=render_chunk.tobytes(),
samples_per_channel=FRAME_SAMPLES,
sample_rate=24000,
num_channels=1,
)
self._apm.process_reverse_stream(render_frame_for_aec)

def _sd_input_callback(self, indata: np.ndarray, frame_count: int, time, *_) -> None:
self._input_delay = time.currentTime - time.inputBufferAdcTime
total_delay = self._output_delay + self._input_delay
self._apm.set_stream_delay_ms(int(total_delay * 1000))

FRAME_SAMPLES = 240 # 10ms at 24000 Hz
num_frames = frame_count // FRAME_SAMPLES

for i in range(num_frames):
start = i * FRAME_SAMPLES
end = start + FRAME_SAMPLES
capture_chunk = indata[start:end]

capture_frame_for_aec = rtc.AudioFrame(
data=capture_chunk.tobytes(),
samples_per_channel=FRAME_SAMPLES,
sample_rate=24000,
num_channels=1,
)
self._apm.process_stream(capture_frame_for_aec)

in_data_aec = np.frombuffer(capture_frame_for_aec.data, dtype=np.int16)
rms = np.sqrt(np.mean(in_data_aec.astype(np.float32) ** 2))
max_int16 = np.iinfo(np.int16).max
self._micro_db = 20.0 * np.log10(rms / max_int16 + 1e-6)
in_data_aec = np.frombuffer(capture_frame_for_aec.data, dtype=np.int16)
rms = np.sqrt(np.mean(in_data_aec.astype(np.float32) ** 2))
max_int16 = np.iinfo(np.int16).max
self._micro_db = 20.0 * np.log10(rms / max_int16 + 1e-6)

self._loop.call_soon_threadsafe(self._audio_input_ch.send_nowait, capture_frame_for_aec)
self._loop.call_soon_threadsafe(self._audio_input_ch.send_nowait, capture_frame_for_aec)

@log_exceptions(logger=logger)
async def _input_cli_task(self, in_ch: aio.Chan[str]) -> None:
Expand Down
Loading