From aaed9259e0ac3332c7a43490b94e53c2a58cdf6c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Fri, 27 Mar 2026 22:13:10 +0400 Subject: [PATCH 1/4] fix: close child httpx clients in AsyncStream.aclose() AsyncStream.aclose() only closed the main httpx client but not the cached video/chat/moderation sub-clients, each of which has its own httpx connection pool. After profiling with 50 session create+close cycles, 87 orphaned TCP connections remained. Override aclose() to close all cached child clients before closing the main client. --- getstream/stream.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/getstream/stream.py b/getstream/stream.py index 0d080c6f..08cc3f3b 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -207,6 +207,14 @@ def moderation(self) -> AsyncModerationClient: user_agent=self.user_agent, ) + async def aclose(self): + """Close all child clients and the main HTTPX client.""" + for attr in ("video", "chat", "moderation"): + child = self.__dict__.get(attr) + if child is not None and hasattr(child, "client"): + await child.client.aclose() + await super().aclose() + @cached_property def feeds(self): raise NotImplementedError("Feeds not supported for async client") From 74fde390d2eae36428ef11bb1a15872bf09b41e5 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 28 Mar 2026 16:48:52 +0400 Subject: [PATCH 2/4] test: verify AsyncStream.aclose() closes all child httpx clients Tests that main, video, and chat httpx clients are closed after aclose(), and that aclose() works when child clients were never accessed (cached_property not triggered). --- tests/test_async_stream_close.py | 35 ++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 tests/test_async_stream_close.py diff --git a/tests/test_async_stream_close.py b/tests/test_async_stream_close.py new file mode 100644 index 00000000..5fc33ea6 --- /dev/null +++ b/tests/test_async_stream_close.py @@ -0,0 +1,35 @@ +import pytest + +from getstream import AsyncStream + + +@pytest.mark.asyncio +class TestAsyncStreamClose: + async def test_aclose_closes_main_client(self): + client = AsyncStream(api_key="fake", api_secret="fake") + + assert client.client.is_closed is False + await client.aclose() + assert client.client.is_closed is True + + async def test_aclose_closes_video_client(self): + client = AsyncStream(api_key="fake", api_secret="fake") + _ = client.video # trigger cached_property + + assert client.video.client.is_closed is False + await client.aclose() + assert client.video.client.is_closed is True + + async def test_aclose_closes_chat_client(self): + client = AsyncStream(api_key="fake", api_secret="fake") + _ = client.chat + + assert client.chat.client.is_closed is False + await client.aclose() + assert client.chat.client.is_closed is True + + async def test_aclose_without_child_clients(self): + """aclose() should work even if video/chat were never accessed.""" + client = AsyncStream(api_key="fake", api_secret="fake") + await client.aclose() + assert client.client.is_closed is True From d06376304740f428f02cc98a5c1fe5197679121c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 28 Mar 2026 17:04:58 +0400 Subject: [PATCH 3/4] fix: ensure main client closes even if child close fails, add moderation test Wrap child client closing in try/finally so super().aclose() always runs. Use child.aclose() instead of child.client.aclose() to keep close semantics encapsulated. Add test for moderation client closure. --- getstream/stream.py | 12 +++++++----- tests/test_async_stream_close.py | 8 ++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index 08cc3f3b..3ea55782 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -209,11 +209,13 @@ def moderation(self) -> AsyncModerationClient: async def aclose(self): """Close all child clients and the main HTTPX client.""" - for attr in ("video", "chat", "moderation"): - child = self.__dict__.get(attr) - if child is not None and hasattr(child, "client"): - await child.client.aclose() - await super().aclose() + try: + for attr in ("video", "chat", "moderation"): + child = self.__dict__.get(attr) + if child is not None and hasattr(child, "aclose"): + await child.aclose() + finally: + await super().aclose() @cached_property def feeds(self): diff --git a/tests/test_async_stream_close.py b/tests/test_async_stream_close.py index 5fc33ea6..c9445185 100644 --- a/tests/test_async_stream_close.py +++ b/tests/test_async_stream_close.py @@ -28,6 +28,14 @@ async def test_aclose_closes_chat_client(self): await client.aclose() assert client.chat.client.is_closed is True + async def test_aclose_closes_moderation_client(self): + client = AsyncStream(api_key="fake", api_secret="fake") + _ = client.moderation + + assert client.moderation.client.is_closed is False + await client.aclose() + assert client.moderation.client.is_closed is True + async def test_aclose_without_child_clients(self): """aclose() should work even if video/chat were never accessed.""" client = AsyncStream(api_key="fake", api_secret="fake") From 999715e70ee25b5e999214501bf0c29aa08c5d55 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Sat, 28 Mar 2026 17:18:29 +0400 Subject: [PATCH 4/4] refactor: use AsyncExitStack for robust cleanup in aclose() Replaces try/finally with AsyncExitStack to ensure all child clients are closed even if one fails. Also checks __dict__ instead of hasattr to only close cached_property clients that were actually accessed. --- getstream/stream.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/getstream/stream.py b/getstream/stream.py index 3ea55782..ec946b01 100644 --- a/getstream/stream.py +++ b/getstream/stream.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import AsyncExitStack from functools import cached_property import time from typing import List, Optional @@ -209,13 +210,17 @@ def moderation(self) -> AsyncModerationClient: async def aclose(self): """Close all child clients and the main HTTPX client.""" - try: - for attr in ("video", "chat", "moderation"): - child = self.__dict__.get(attr) - if child is not None and hasattr(child, "aclose"): - await child.aclose() - finally: - await super().aclose() + # AsyncExitStack ensures all clients are closed even if one fails. + # video/chat/moderation are @cached_property - only close if accessed. + async with AsyncExitStack() as stack: + cached = self.__dict__ + if "video" in cached: + stack.push_async_callback(self.video.aclose) + if "chat" in cached: + stack.push_async_callback(self.chat.aclose) + if "moderation" in cached: + stack.push_async_callback(self.moderation.aclose) + stack.push_async_callback(super().aclose) @cached_property def feeds(self):