Skip to content

Commit 9f00fcd

Browse files
chore(tests): stop leaking channels on cleanup failure
The CI test suite creates ~94 channels per run (~50 sync + ~44 async) across fixtures and explicit cases, but cleanup historically ran through three escape hatches that silently leaked channels into the shared test app: 1. channel fixture used client.delete_channels([cid], hard_delete=True), which returns a task_id immediately. When the task queue is backed up (the same backend slowness that breaks test_delete_channels) the fixture says 'done' before the delete actually fires. 2. Every cleanup path swallowed exceptions: except Exception: pass. Failures invisible. 3. test_query_drafts (sync + async) created a second channel, channel2, and did not delete it on the failure path. Stale channels drift across runs and eventually break unrelated tests that query the shared app expecting clean state. Three fixes: * Swap client.delete_channels for the synchronous channel.delete(hard=True) HTTP DELETE in every fixture. No task_id race; the channel is gone before the fixture returns. * Log cleanup exceptions to stderr ('[cleanup] channel <cid> failed: ...'). Visible in CI logs, doesn't fail the test. * Add a session-scoped, autouse=True sweep that queries channels tagged {'test': True, 'language': 'python'} at start and end of the run and hard-deletes whatever is still hanging around. Self-healing across runs; tag test_query_drafts's channel2 so the sweep catches it too.
1 parent cb36a31 commit 9f00fcd

4 files changed

Lines changed: 306 additions & 150 deletions

File tree

stream_chat/tests/async_chat/conftest.py

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import os
3+
import sys
34
import uuid
45
from typing import Dict, List
56

@@ -26,13 +27,66 @@ def pytest_configure(config):
2627
config.addinivalue_line("markers", "incremental: mark test incremental")
2728

2829

30+
def _warn_cleanup_failure(label: str, identifier: str, exc: BaseException) -> None:
31+
"""See sync conftest for rationale; mirrored to keep both suites consistent."""
32+
print(
33+
f"[cleanup] {label} {identifier} failed: {exc.__class__.__name__}: {exc}",
34+
file=sys.stderr,
35+
)
36+
37+
2938
@pytest.fixture(scope="module")
3039
def event_loop():
3140
loop = asyncio.get_event_loop_policy().new_event_loop()
3241
yield loop
3342
loop.close()
3443

3544

45+
@pytest.fixture(scope="session", autouse=True)
46+
def _purge_stale_test_channels_async(event_loop):
47+
"""Async mirror of the sync session sweep; queries channels tagged
48+
``{"test": True, "language": "python"}`` and hard-deletes them at
49+
session start and end so leaks don't compound across runs."""
50+
51+
async def sweep() -> None:
52+
base_url = os.environ.get("STREAM_HOST")
53+
options = {"base_url": base_url} if base_url else {}
54+
async with StreamChatAsync(
55+
api_key=os.environ["STREAM_KEY"],
56+
api_secret=os.environ["STREAM_SECRET"],
57+
timeout=10,
58+
**options,
59+
) as client:
60+
try:
61+
response = await client.query_channels(
62+
{"test": True, "language": "python"},
63+
sort=[{"field": "created_at", "direction": -1}],
64+
limit=30,
65+
)
66+
except Exception as exc:
67+
print(
68+
f"[cleanup] sweep query_channels failed: "
69+
f"{exc.__class__.__name__}: {exc}",
70+
file=sys.stderr,
71+
)
72+
return
73+
cids = [c["channel"]["cid"] for c in response.get("channels", [])]
74+
if not cids:
75+
return
76+
try:
77+
await client.delete_channels(cids, hard_delete=True)
78+
print(
79+
f"[cleanup] swept {len(cids)} leaked test channels",
80+
file=sys.stderr,
81+
)
82+
except Exception as exc:
83+
_warn_cleanup_failure("sweep delete_channels", str(len(cids)), exc)
84+
85+
event_loop.run_until_complete(sweep())
86+
yield
87+
event_loop.run_until_complete(sweep())
88+
89+
3690
@pytest.fixture(scope="function", autouse=True)
3791
@pytest.mark.asyncio
3892
async def client():
@@ -85,10 +139,12 @@ async def channel(client: StreamChatAsync, random_user: Dict):
85139
await channel.create(random_user["id"])
86140
yield channel
87141

142+
# Synchronous channel.delete (HTTP DELETE), not the async-task
143+
# delete_channels — see sync conftest for the leak rationale.
88144
try:
89-
await client.delete_channels([channel.cid], hard_delete=True)
90-
except Exception:
91-
pass
145+
await channel.delete(hard=True)
146+
except Exception as exc:
147+
_warn_cleanup_failure("channel", channel.cid, exc)
92148

93149

94150
@pytest.fixture(scope="function")
@@ -107,10 +163,10 @@ async def command(client: StreamChatAsync):
107163
):
108164
try:
109165
await client.delete_command(cmd["name"])
110-
except Exception:
111-
pass
112-
except Exception:
113-
pass
166+
except Exception as exc:
167+
_warn_cleanup_failure("stale command", cmd["name"], exc)
168+
except Exception as exc:
169+
_warn_cleanup_failure("list_commands", "<sweep>", exc)
114170

115171
response = await client.create_command(
116172
dict(name=str(uuid.uuid4()), description="My command")
@@ -120,8 +176,8 @@ async def command(client: StreamChatAsync):
120176

121177
try:
122178
await client.delete_command(response["command"]["name"])
123-
except Exception:
124-
pass
179+
except Exception as exc:
180+
_warn_cleanup_failure("command", response["command"]["name"], exc)
125181

126182

127183
@pytest.fixture(scope="function")
@@ -145,8 +201,8 @@ async def fellowship_of_the_ring(client: StreamChatAsync):
145201
]
146202
try:
147203
await client.restore_users([m["id"] for m in members])
148-
except Exception:
149-
pass
204+
except Exception as exc:
205+
_warn_cleanup_failure("restore_users", "fellowship", exc)
150206
await client.upsert_users(members)
151207
channel = client.channel(
152208
"team", "fellowship-of-the-ring", {"members": [m["id"] for m in members]}
@@ -155,15 +211,15 @@ async def fellowship_of_the_ring(client: StreamChatAsync):
155211
yield
156212
try:
157213
await channel.delete(hard=True)
158-
await hard_delete_users(client, [m["id"] for m in members])
159-
except Exception:
160-
pass
214+
except Exception as exc:
215+
_warn_cleanup_failure("channel", channel.cid, exc)
216+
await hard_delete_users(client, [m["id"] for m in members])
161217

162218

163219
async def hard_delete_users(client: StreamChatAsync, user_ids: List[str]):
164220
try:
165221
await client.delete_users(
166222
user_ids, "hard", conversations="hard", messages="hard"
167223
)
168-
except Exception:
169-
pass
224+
except Exception as exc:
225+
_warn_cleanup_failure("delete_users", ",".join(user_ids), exc)

stream_chat/tests/async_chat/test_draft.py

Lines changed: 74 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -84,65 +84,80 @@ async def test_query_drafts(
8484
draft1 = {"text": "Draft in channel 1"}
8585
await channel.create_draft(draft1, random_user["id"])
8686

87-
# Create another channel with a draft
88-
channel2 = client.channel("messaging", str(uuid.uuid4()))
89-
await channel2.create(random_user["id"])
90-
91-
draft2 = {"text": "Draft in channel 2"}
92-
await channel2.create_draft(draft2, random_user["id"])
93-
94-
# Query all drafts for the user
95-
response = await client.query_drafts(random_user["id"])
96-
97-
assert "drafts" in response
98-
assert len(response["drafts"]) == 2
99-
100-
# Query drafts for a specific channel
101-
response = await client.query_drafts(
102-
random_user["id"], filter={"channel_cid": channel2.cid}
87+
# Create another channel with a draft. Tag with {"test": True,
88+
# "language": "python"} so the session-level GC sweep in conftest
89+
# catches it if cleanup at the end of this test doesn't run.
90+
channel2 = client.channel(
91+
"messaging",
92+
str(uuid.uuid4()),
93+
{"test": True, "language": "python"},
10394
)
95+
await channel2.create(random_user["id"])
10496

105-
assert "drafts" in response
106-
assert len(response["drafts"]) == 1
107-
draft = response["drafts"][0]
108-
assert draft["channel_cid"] == channel2.cid
109-
assert draft["message"]["text"] == "Draft in channel 2"
110-
111-
# Query drafts with sort
112-
response = await client.query_drafts(
113-
random_user["id"],
114-
sort=[{"field": "created_at", "direction": SortOrder.ASC}],
115-
)
116-
117-
assert "drafts" in response
118-
assert len(response["drafts"]) == 2
119-
assert response["drafts"][0]["channel_cid"] == channel.cid
120-
assert response["drafts"][1]["channel_cid"] == channel2.cid
121-
122-
# Query drafts with pagination
123-
response = await client.query_drafts(
124-
random_user["id"],
125-
options={"limit": 1},
126-
)
127-
128-
assert "drafts" in response
129-
assert len(response["drafts"]) == 1
130-
assert response["drafts"][0]["channel_cid"] == channel2.cid
131-
132-
assert response["next"] is not None
133-
134-
# Query drafts with pagination
135-
response = await client.query_drafts(
136-
random_user["id"],
137-
options={"limit": 1, "next": response["next"]},
138-
)
139-
140-
assert "drafts" in response
141-
assert len(response["drafts"]) == 1
142-
assert response["drafts"][0]["channel_cid"] == channel.cid
143-
144-
# Cleanup
14597
try:
146-
await channel2.delete()
147-
except Exception:
148-
pass
98+
draft2 = {"text": "Draft in channel 2"}
99+
await channel2.create_draft(draft2, random_user["id"])
100+
101+
# Query all drafts for the user
102+
response = await client.query_drafts(random_user["id"])
103+
104+
assert "drafts" in response
105+
assert len(response["drafts"]) == 2
106+
107+
# Query drafts for a specific channel
108+
response = await client.query_drafts(
109+
random_user["id"], filter={"channel_cid": channel2.cid}
110+
)
111+
112+
assert "drafts" in response
113+
assert len(response["drafts"]) == 1
114+
draft = response["drafts"][0]
115+
assert draft["channel_cid"] == channel2.cid
116+
assert draft["message"]["text"] == "Draft in channel 2"
117+
118+
# Query drafts with sort
119+
response = await client.query_drafts(
120+
random_user["id"],
121+
sort=[{"field": "created_at", "direction": SortOrder.ASC}],
122+
)
123+
124+
assert "drafts" in response
125+
assert len(response["drafts"]) == 2
126+
assert response["drafts"][0]["channel_cid"] == channel.cid
127+
assert response["drafts"][1]["channel_cid"] == channel2.cid
128+
129+
# Query drafts with pagination
130+
response = await client.query_drafts(
131+
random_user["id"],
132+
options={"limit": 1},
133+
)
134+
135+
assert "drafts" in response
136+
assert len(response["drafts"]) == 1
137+
assert response["drafts"][0]["channel_cid"] == channel2.cid
138+
139+
assert response["next"] is not None
140+
141+
# Query drafts with pagination
142+
response = await client.query_drafts(
143+
random_user["id"],
144+
options={"limit": 1, "next": response["next"]},
145+
)
146+
147+
assert "drafts" in response
148+
assert len(response["drafts"]) == 1
149+
assert response["drafts"][0]["channel_cid"] == channel.cid
150+
finally:
151+
# Hard-delete via the synchronous channel.delete (HTTP DELETE)
152+
# rather than the async-task delete_channels helper. Failures
153+
# surface in CI logs instead of being swallowed silently.
154+
try:
155+
await channel2.delete(hard=True)
156+
except Exception as exc:
157+
import sys
158+
159+
print(
160+
f"[cleanup] channel {channel2.cid} delete failed: "
161+
f"{exc.__class__.__name__}: {exc}",
162+
file=sys.stderr,
163+
)

0 commit comments

Comments
 (0)