fix: close AsyncStream client on session end#457
Conversation
StreamEdge.close() only set self._call = None, leaving the AsyncStream httpx client and its connection pool open. Each session leaked ~1-2 TCP connections and associated asyncio tasks (heartbeat, reader), accumulating ~1.7 MiB/session. Call self.client.aclose() to release HTTP connection pools, WebSocket connections, and background tasks.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds explicit resource cleanup across Deepgram STT, ElevenLabs TTS, and GetStream StreamEdge by closing underlying httpx clients and ensuring real connection teardown; adds unit tests that assert client is_closed state changes and connection leave behavior. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
ElevenLabs TTS had no close() method at all - the AsyncElevenLabs httpx client was never closed, leaking TCP connections per session. Deepgram STT close() closed the WebSocket but not the underlying AsyncDeepgramClient httpx client, also leaking connections. Together with the StreamEdge fix, this addresses all three sources of TCP connection leaks found during memory investigation.
…se() rtc.join() creates a ConnectionManager with WebSocket tasks (heartbeat, reader) immediately. If the session is closed before join() completes, _real_connection was not yet set, so close() couldn't clean up these tasks. Store the connection reference right after rtc.join() returns, and call leave() explicitly in StreamEdge.close() to ensure WebSocket tasks are cancelled even if the Agent's join flow was interrupted.
Checks that after close(): - httpx client is closed (is_closed == True) - _real_connection is set to None - leave() is called on the ConnectionManager
… close() Both plugins referenced self.client._client which doesn't exist in current SDK versions. The actual path is client._client_wrapper.httpx_client.httpx_client. The hasattr guard silently skipped the close, leaving HTTP connections open. Found by writing tests that check httpx_client.is_closed after close().
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py`:
- Around line 316-325: The nested reflective access to Deepgram SDK internals
(self.client -> _client_wrapper -> httpx_client -> httpx_client) in the cleanup
block should be removed or replaced: do not access private attributes like
"_client_wrapper" or "httpx_client"; instead either delete the manual httpx
client close and rely on the SDK/user lifecycle, or if you must attempt a
workaround, wrap any access in a clear SDK-internal comment and a defensive
try/except that skips cleanup if attributes are missing or change, referencing
the cleanup site where self.client is inspected and the nested
_client_wrapper/httpx_client chain is currently used; also add a short TODO
pointing to checking Deepgram SDK docs for an official shutdown API.
In `@plugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.py`:
- Around line 66-76: TTS.close currently bypasses the base class cleanup and
uses a nested getattr chain to reach an internal httpx client; replace this with
a call to await super().close() and remove the nested
getattr(getattr(getattr(...))) logic so the SDK/base class handles closing the
internal httpx client (update the TTS.close method to simply await
super().close() and remove manual httpx_client access to comply with repo
guidelines and ElevenLabs SDK recommendations).
In `@plugins/getstream/tests/test_stream_edge_transport.py`:
- Line 3: The test currently uses unittest.mock.AsyncMock to stub
_real_connection and assert calls; replace this with a real ConnectionManager
instance from the existing connection_manager fixture or a small hand-rolled
test double class that implements the same interface (e.g., open/close/send
methods) and records invocation counts; update the test to inject that real
ConnectionManager or test double into the StreamEdgeTransport (or the class
under test that sets _real_connection) and replace assert_called_once() with
assertions on the test double's recorded call counters or the real
ConnectionManager's observable state, removing all uses of AsyncMock and Mock.
In `@plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py`:
- Around line 500-508: The code currently uses a bare except Exception: when
awaiting self._real_connection.leave(); change this to catch the specific
exceptions handled elsewhere in this module (match the pattern in
StreamConnection.close()), e.g., use except (asyncio.TimeoutError,
RuntimeError): and log an appropriate message with logger.exception("Error
during connection leave") for those cases, and let any other exceptions
propagate (do not catch them broadly) so unexpected errors aren't swallowed;
update the block around await self._real_connection.leave() and keep resetting
self._real_connection = None afterward.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ee82fea1-cc1a-419f-8d86-d2c3d506d7aa
📒 Files selected for processing (6)
plugins/deepgram/tests/test_deepgram_stt_close.pyplugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.pyplugins/elevenlabs/tests/test_tts_close.pyplugins/elevenlabs/vision_agents/plugins/elevenlabs/tts.pyplugins/getstream/tests/test_stream_edge_transport.pyplugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py
Outdated
Show resolved
Hide resolved
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
Show resolved
Hide resolved
- Add super().close() call in finally block - Break nested getattr into readable steps in both ElevenLabs and Deepgram - Add SDK workaround comments
Why
After each agent session, HTTP clients and WebSocket connections from STT, TTS, and Edge plugins were never closed. Each session left behind orphaned TCP connections and asyncio tasks. On production GKE, pods accumulated up to 3.3 GiB of non-evictable memory and OOMKilled.
Problem
Agent._close()callsclose()on each plugin via_apply("close"), but:self._call = Nonewithout closing theAsyncStreamhttpx client or callingleave()on the ConnectionManagerclose()method at allclose()closed the WebSocket but not the underlying httpx clientself.client._clientwhich doesn't exist in current SDK versions. Thehasattrguard silently skipped the close._real_connectionwas stored only after__aenter__(), soclose()duringjoin()couldn't clean up WebSocket tasksChanges
AsyncStreamclient, callleave()on ConnectionManager, store_real_connectionimmediately afterrtc.join()close()with correct httpx client path (client._client_wrapper.httpx_client.httpx_client)Benchmark
Tested in a local k8s cluster (OrbStack) with 50 create+close sessions (no audio):
Memory now fully returns to baseline after sessions end. Previously it accumulated ~3 MiB/session permanently.
Remaining 2 TCP connections and 8 aioice tasks (TurnClientMixin.refresh, Connection.check_start) are from aiortc's ICE layer, outside our control.
Companion PR
AsyncStream.aclose()Summary by CodeRabbit
New Features
Bug Fixes
Tests