From 20728cfb2b4a4b229f8d7e07510e59c1ddba0e15 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 21:16:23 +0400 Subject: [PATCH 01/10] fix: close AsyncStream client on session end StreamEdge.close() only set self._call = None, leaving the AsyncStream httpx client and its connection pool open. Each session leaked ~1-2 TCP connections and associated asyncio tasks (heartbeat, reader), accumulating ~1.7 MiB/session. Call self.client.aclose() to release HTTP connection pools, WebSocket connections, and background tasks. --- .../vision_agents/plugins/getstream/stream_edge_transport.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py index e6eb8b79c..7d8cf3fff 100644 --- a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py +++ b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py @@ -496,6 +496,8 @@ def _get_subscription_config(self): ) async def close(self): + if self.client: + await self.client.aclose() self._call = None async def send_custom_event(self, data: dict) -> None: From a15edb7da6713fa7cb97b22287220989a653a805 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 21:28:25 +0400 Subject: [PATCH 02/10] fix: close HTTP clients in ElevenLabs TTS and Deepgram STT ElevenLabs TTS had no close() method at all - the AsyncElevenLabs httpx client was never closed, leaking TCP connections per session. Deepgram STT close() closed the WebSocket but not the underlying AsyncDeepgramClient httpx client, also leaking connections. Together with the StreamEdge fix, this addresses all three sources of TCP connection leaks found during memory investigation. --- .../deepgram/vision_agents/plugins/deepgram/deepgram_stt.py | 4 ++++ plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py index ec7a68b21..ca0c65869 100644 --- a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py +++ b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py @@ -312,3 +312,7 @@ async def close(self): self.connection = None self._connection_context = None self._connection_ready.clear() + + # Close the underlying HTTP client + if hasattr(self.client, "_client"): + await self.client._client.aclose() diff --git a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py index 95a8abd69..aea886cc1 100644 --- a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py +++ b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py @@ -63,6 +63,10 @@ async def stream_audio( audio_stream, sample_rate=16000, channels=1, format=AudioFormat.S16 ) + async def close(self) -> None: + if hasattr(self.client, "_client"): + await self.client._client.aclose() + async def stop_audio(self) -> None: """ Clears the queue and stops playing audio. From 9a7ba0b5204095fdb1d322da201e74ea09b541c4 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 22:35:58 +0400 Subject: [PATCH 03/10] fix: store ConnectionManager early and call leave() in StreamEdge.close() rtc.join() creates a ConnectionManager with WebSocket tasks (heartbeat, reader) immediately. If the session is closed before join() completes, _real_connection was not yet set, so close() couldn't clean up these tasks. Store the connection reference right after rtc.join() returns, and call leave() explicitly in StreamEdge.close() to ensure WebSocket tasks are cancelled even if the Agent's join flow was interrupted. --- .../plugins/getstream/stream_edge_transport.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py index 7d8cf3fff..da803d07c 100644 --- a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py +++ b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py @@ -418,6 +418,8 @@ async def join( connection = await rtc.join( call, agent.agent_user.id, subscription_config=subscription_config ) + # Store immediately so close() can clean up if join is interrupted + self._real_connection = connection @connection.on("track_added") async def on_track(track_id, track_type, user): @@ -446,7 +448,6 @@ async def on_audio_received(pcm: PcmData): # Start the connection await connection.__aenter__() - self._real_connection = connection self._call = call # Re-publish already published tracks in case somebody is already on the call when we joined. # Otherwise, we won't get the video track from participants joined before us. @@ -496,6 +497,12 @@ def _get_subscription_config(self): ) async def close(self): + if self._real_connection: + try: + await self._real_connection.leave() + except Exception: + pass + self._real_connection = None if self.client: await self.client.aclose() self._call = None From 0ab5a5982fd2e1f9ffaa64d928cafaa6147734ab Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 23:24:30 +0400 Subject: [PATCH 04/10] fix: log exceptions in StreamEdge.close() instead of silencing --- .../vision_agents/plugins/getstream/stream_edge_transport.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py index da803d07c..96c8aaecf 100644 --- a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py +++ b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py @@ -501,7 +501,7 @@ async def close(self): try: await self._real_connection.leave() except Exception: - pass + logger.exception("Error during connection leave in StreamEdge.close()") self._real_connection = None if self.client: await self.client.aclose() From ed836de1c385f5d8d916ea8c7473882fa1f4a465 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 23:26:16 +0400 Subject: [PATCH 05/10] fix: add exception handling to ElevenLabs TTS and Deepgram STT close() --- .../vision_agents/plugins/deepgram/deepgram_stt.py | 7 +++++-- plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py index ca0c65869..3836d43a2 100644 --- a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py +++ b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py @@ -314,5 +314,8 @@ async def close(self): self._connection_ready.clear() # Close the underlying HTTP client - if hasattr(self.client, "_client"): - await self.client._client.aclose() + try: + if hasattr(self.client, "_client"): + await self.client._client.aclose() + except Exception: + logger.exception("Error closing Deepgram HTTP client") diff --git a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py index aea886cc1..e1986a313 100644 --- a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py +++ b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py @@ -64,8 +64,11 @@ async def stream_audio( ) async def close(self) -> None: - if hasattr(self.client, "_client"): - await self.client._client.aclose() + try: + if hasattr(self.client, "_client"): + await self.client._client.aclose() + except Exception: + logger.exception("Error closing ElevenLabs HTTP client") async def stop_audio(self) -> None: """ From 69930c51f5aa60dd86e0eb660b97ea8ef8b04580 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 23:27:19 +0400 Subject: [PATCH 06/10] fix: add exception handling to AsyncStream client close --- .../plugins/getstream/stream_edge_transport.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py index 96c8aaecf..ca027052f 100644 --- a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py +++ b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py @@ -503,8 +503,11 @@ async def close(self): except Exception: logger.exception("Error during connection leave in StreamEdge.close()") self._real_connection = None - if self.client: - await self.client.aclose() + try: + if self.client: + await self.client.aclose() + except Exception: + logger.exception("Error closing AsyncStream client") self._call = None async def send_custom_event(self, data: dict) -> None: From 7a8c189526a0b38b350539352cdd5ab02b481691 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 23:52:15 +0400 Subject: [PATCH 07/10] refactor: remove redundant try/except where _apply() already handles it --- .../vision_agents/plugins/deepgram/deepgram_stt.py | 7 ++----- .../elevenlabs/vision_agents/plugins/elevenlabs/tts.py | 7 ++----- .../plugins/getstream/stream_edge_transport.py | 9 +++------ 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py index 3836d43a2..ca0c65869 100644 --- a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py +++ b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py @@ -314,8 +314,5 @@ async def close(self): self._connection_ready.clear() # Close the underlying HTTP client - try: - if hasattr(self.client, "_client"): - await self.client._client.aclose() - except Exception: - logger.exception("Error closing Deepgram HTTP client") + if hasattr(self.client, "_client"): + await self.client._client.aclose() diff --git a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py index e1986a313..aea886cc1 100644 --- a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py +++ b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py @@ -64,11 +64,8 @@ async def stream_audio( ) async def close(self) -> None: - try: - if hasattr(self.client, "_client"): - await self.client._client.aclose() - except Exception: - logger.exception("Error closing ElevenLabs HTTP client") + if hasattr(self.client, "_client"): + await self.client._client.aclose() async def stop_audio(self) -> None: """ diff --git a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py index ca027052f..bdaf31356 100644 --- a/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py +++ b/plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py @@ -501,13 +501,10 @@ async def close(self): try: await self._real_connection.leave() except Exception: - logger.exception("Error during connection leave in StreamEdge.close()") + logger.exception("Error during connection leave") self._real_connection = None - try: - if self.client: - await self.client.aclose() - except Exception: - logger.exception("Error closing AsyncStream client") + if self.client: + await self.client.aclose() self._call = None async def send_custom_event(self, data: dict) -> None: From b27a10c23c5eb818ded6eddf076c75ec334999ea Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 28 Mar 2026 15:54:16 +0400 Subject: [PATCH 08/10] test: verify StreamEdge.close() releases client and connection Checks that after close(): - httpx client is closed (is_closed == True) - _real_connection is set to None - leave() is called on the ConnectionManager --- .../getstream/tests/test_stream_edge_transport.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/plugins/getstream/tests/test_stream_edge_transport.py b/plugins/getstream/tests/test_stream_edge_transport.py index 62cf397e6..ae7a01c56 100644 --- a/plugins/getstream/tests/test_stream_edge_transport.py +++ b/plugins/getstream/tests/test_stream_edge_transport.py @@ -1,6 +1,6 @@ import asyncio import time -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock from uuid import uuid4 import pytest @@ -106,3 +106,16 @@ async def test_create_call_raises_before_authenticate( ): with pytest.raises(RuntimeError, match="not authenticated"): await stream_edge.create_call(call_id="call-1") + + async def test_close_releases_client_resources(self, stream_edge: StreamEdge): + stream_edge._real_connection = AsyncMock() + real_connection = stream_edge._real_connection + + assert stream_edge.client.client.is_closed is False + assert stream_edge._real_connection is not None + + await stream_edge.close() + + assert stream_edge.client.client.is_closed is True + assert stream_edge._real_connection is None + real_connection.leave.assert_called_once() From 228f65b48c4a1372dd7f494872277ddf5224a969 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 28 Mar 2026 16:08:06 +0400 Subject: [PATCH 09/10] fix: use correct httpx client path in ElevenLabs TTS and Deepgram STT close() Both plugins referenced self.client._client which doesn't exist in current SDK versions. The actual path is client._client_wrapper.httpx_client.httpx_client. The hasattr guard silently skipped the close, leaving HTTP connections open. Found by writing tests that check httpx_client.is_closed after close(). --- plugins/deepgram/tests/test_deepgram_stt_close.py | 11 +++++++++++ .../vision_agents/plugins/deepgram/deepgram_stt.py | 11 +++++++++-- plugins/elevenlabs/tests/test_tts_close.py | 11 +++++++++++ .../vision_agents/plugins/elevenlabs/tts.py | 11 +++++++++-- 4 files changed, 40 insertions(+), 4 deletions(-) create mode 100644 plugins/deepgram/tests/test_deepgram_stt_close.py create mode 100644 plugins/elevenlabs/tests/test_tts_close.py diff --git a/plugins/deepgram/tests/test_deepgram_stt_close.py b/plugins/deepgram/tests/test_deepgram_stt_close.py new file mode 100644 index 000000000..1269f4abe --- /dev/null +++ b/plugins/deepgram/tests/test_deepgram_stt_close.py @@ -0,0 +1,11 @@ +from vision_agents.plugins import deepgram + + +class TestDeepgramSTTClose: + async def test_close_closes_http_client(self): + stt = deepgram.STT(api_key="fake") + httpx_client = stt.client._client_wrapper.httpx_client.httpx_client + + assert httpx_client.is_closed is False + await stt.close() + assert httpx_client.is_closed is True diff --git a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py index ca0c65869..5b1586359 100644 --- a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py +++ b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py @@ -314,5 +314,12 @@ async def close(self): self._connection_ready.clear() # Close the underlying HTTP client - if hasattr(self.client, "_client"): - await self.client._client.aclose() + httpx_client = getattr( + getattr( + getattr(self.client, "_client_wrapper", None), "httpx_client", None + ), + "httpx_client", + None, + ) + if httpx_client is not None: + await httpx_client.aclose() diff --git a/plugins/elevenlabs/tests/test_tts_close.py b/plugins/elevenlabs/tests/test_tts_close.py new file mode 100644 index 000000000..a88ad87ec --- /dev/null +++ b/plugins/elevenlabs/tests/test_tts_close.py @@ -0,0 +1,11 @@ +from vision_agents.plugins import elevenlabs + + +class TestElevenLabsTTSClose: + async def test_close_closes_http_client(self): + tts = elevenlabs.TTS(api_key="fake") + httpx_client = tts.client._client_wrapper.httpx_client.httpx_client + + assert httpx_client.is_closed is False + await tts.close() + assert httpx_client.is_closed is True diff --git a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py index aea886cc1..12c7e335f 100644 --- a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py +++ b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py @@ -64,8 +64,15 @@ async def stream_audio( ) async def close(self) -> None: - if hasattr(self.client, "_client"): - await self.client._client.aclose() + httpx_client = getattr( + getattr( + getattr(self.client, "_client_wrapper", None), "httpx_client", None + ), + "httpx_client", + None, + ) + if httpx_client is not None: + await httpx_client.aclose() async def stop_audio(self) -> None: """ From 1bfa9e2f2a4f910febffae31e97780813c79d628 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 28 Mar 2026 17:30:38 +0400 Subject: [PATCH 10/10] fix: call super().close() in ElevenLabs TTS, clean up getattr chains - Add super().close() call in finally block - Break nested getattr into readable steps in both ElevenLabs and Deepgram - Add SDK workaround comments --- .../plugins/deepgram/deepgram_stt.py | 12 ++++-------- .../vision_agents/plugins/elevenlabs/tts.py | 18 +++++++++--------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py index 5b1586359..c31284439 100644 --- a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py +++ b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py @@ -313,13 +313,9 @@ async def close(self): self._connection_context = None self._connection_ready.clear() - # Close the underlying HTTP client - httpx_client = getattr( - getattr( - getattr(self.client, "_client_wrapper", None), "httpx_client", None - ), - "httpx_client", - None, - ) + # SDK doesn't expose a public aclose() - workaround using internals + wrapper = getattr(self.client, "_client_wrapper", None) + http_client = getattr(wrapper, "httpx_client", None) + httpx_client = getattr(http_client, "httpx_client", None) if httpx_client is not None: await httpx_client.aclose() diff --git a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py index 12c7e335f..230329c2a 100644 --- a/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py +++ b/plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py @@ -64,15 +64,15 @@ async def stream_audio( ) async def close(self) -> None: - httpx_client = getattr( - getattr( - getattr(self.client, "_client_wrapper", None), "httpx_client", None - ), - "httpx_client", - None, - ) - if httpx_client is not None: - await httpx_client.aclose() + # SDK doesn't expose a public aclose() - workaround using internals + try: + wrapper = getattr(self.client, "_client_wrapper", None) + http_client = getattr(wrapper, "httpx_client", None) + httpx_client = getattr(http_client, "httpx_client", None) + if httpx_client is not None: + await httpx_client.aclose() + finally: + await super().close() async def stop_audio(self) -> None: """