From ff2becde188d41ba4b4c891aa1b415ae3b65917b Mon Sep 17 00:00:00 2001 From: Summer Yang Date: Wed, 11 Mar 2026 17:30:00 -0700 Subject: [PATCH 1/8] fix(test): resolve thread leak failures in CI - Add mod.stop() to test_process_crash_triggers_stop so watchdog, LCM, and event-loop threads are properly joined from the test thread - Filter third-party daemon threads with generic names (Thread-\d+) in conftest monitor_threads to ignore torch/HF background threads that have no cleanup API --- dimos/conftest.py | 11 +++++++++++ dimos/core/test_native_module.py | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/dimos/conftest.py b/dimos/conftest.py index 4ab8a401f8..5f7f30e882 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -14,6 +14,7 @@ import asyncio import os +import re import threading from dotenv import load_dotenv @@ -160,6 +161,16 @@ def monitor_threads(request): if not any(t.name.startswith(prefix) for prefix in expected_persistent_thread_prefixes) ] + # Filter out third-party daemon threads with generic names (e.g. "Thread-109"). + # On Python 3.12+ our own threads include the target function name in parens + # (e.g. "Thread-166 (run_forever)"), so this only matches unnamed threads + # from libraries like torch/HuggingFace that have no cleanup API. + new_threads = [ + t + for t in new_threads + if not (t.daemon and re.fullmatch(r"Thread-\d+", t.name)) + ] + # Filter out threads we've already seen (from previous tests) truly_new = [t for t in new_threads if t.ident not in _seen_threads] diff --git a/dimos/core/test_native_module.py b/dimos/core/test_native_module.py index e77b8f9a53..c9556493f5 100644 --- a/dimos/core/test_native_module.py +++ b/dimos/core/test_native_module.py @@ -106,6 +106,10 @@ def test_process_crash_triggers_stop() -> None: break assert mod._process is None, f"Watchdog did not clean up after process {pid} died" + # Explicitly stop to join watchdog, LCM, and event-loop threads from the + # test thread. The watchdog's self.stop() can't join itself, so these + # threads would otherwise leak. stop() is idempotent. + mod.stop() @pytest.mark.slow From 6bd7ad60c3a1e73e96793f8e5355404045b34bd3 Mon Sep 17 00:00:00 2001 From: SUMMERxYANG <69720581+SUMMERxYANG@users.noreply.github.com> Date: Thu, 12 Mar 2026 00:35:29 +0000 Subject: [PATCH 2/8] CI code cleanup --- dimos/conftest.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dimos/conftest.py b/dimos/conftest.py index 5f7f30e882..1a7a4f943b 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -166,9 +166,7 @@ def monitor_threads(request): # (e.g. "Thread-166 (run_forever)"), so this only matches unnamed threads # from libraries like torch/HuggingFace that have no cleanup API. new_threads = [ - t - for t in new_threads - if not (t.daemon and re.fullmatch(r"Thread-\d+", t.name)) + t for t in new_threads if not (t.daemon and re.fullmatch(r"Thread-\d+", t.name)) ] # Filter out threads we've already seen (from previous tests) From ee752c023dd6f20f8afbc6969a26de298210d9b8 Mon Sep 17 00:00:00 2001 From: Summer Yang Date: Thu, 12 Mar 2026 14:08:39 -0700 Subject: [PATCH 3/8] fix(test): use fixture for native module crash test cleanup Convert test_process_crash_triggers_stop to use a fixture that calls mod.stop() in teardown. The watchdog thread calls self.stop() but can't join itself, so an explicit stop() from the test thread is needed to properly clean up all threads. Drop the broad conftest regex filter for generic daemon thread names per review feedback. --- dimos/conftest.py | 8 -------- dimos/core/test_native_module.py | 33 ++++++++++++++++++++------------ 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/dimos/conftest.py b/dimos/conftest.py index 1a7a4f943b..29eaf05567 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -14,7 +14,6 @@ import asyncio import os -import re import threading from dotenv import load_dotenv @@ -161,13 +160,6 @@ def monitor_threads(request): if not any(t.name.startswith(prefix) for prefix in expected_persistent_thread_prefixes) ] - # Filter out third-party daemon threads with generic names (e.g. "Thread-109"). - # On Python 3.12+ our own threads include the target function name in parens - # (e.g. "Thread-166 (run_forever)"), so this only matches unnamed threads - # from libraries like torch/HuggingFace that have no cleanup API. - new_threads = [ - t for t in new_threads if not (t.daemon and re.fullmatch(r"Thread-\d+", t.name)) - ] # Filter out threads we've already seen (from previous tests) truly_new = [t for t in new_threads if t.ident not in _seen_threads] diff --git a/dimos/core/test_native_module.py b/dimos/core/test_native_module.py index c9556493f5..600795b031 100644 --- a/dimos/core/test_native_module.py +++ b/dimos/core/test_native_module.py @@ -18,8 +18,11 @@ The echo script writes received CLI args to a temp file for assertions. """ +from collections.abc import Generator +from dataclasses import dataclass import json from pathlib import Path +import threading import time import pytest @@ -90,26 +93,32 @@ def start(self) -> None: pass -def test_process_crash_triggers_stop() -> None: - """When the native process dies unexpectedly, the watchdog calls stop().""" +@pytest.fixture +def crash_module() -> Generator[StubNativeModule, None, None]: + """Create a StubNativeModule that dies after 0.2s, ensuring cleanup.""" mod = StubNativeModule(die_after=0.2) - mod.pointcloud.transport = LCMTransport("/pc", PointCloud2) - mod.start() + yield mod + # Join watchdog, LCM, and event-loop threads from the test thread. + # The watchdog's self.stop() can't join itself, so without this the + # threads leak. stop() is idempotent. + mod.stop() - assert mod._process is not None - pid = mod._process.pid + +def test_process_crash_triggers_stop(crash_module: StubNativeModule) -> None: + """When the native process dies unexpectedly, the watchdog calls stop().""" + crash_module.pointcloud.transport = LCMTransport("/pc", PointCloud2) + crash_module.start() + + assert crash_module._process is not None + pid = crash_module._process.pid # Wait for the process to die and the watchdog to call stop() for _ in range(30): time.sleep(0.1) - if mod._process is None: + if crash_module._process is None: break - assert mod._process is None, f"Watchdog did not clean up after process {pid} died" - # Explicitly stop to join watchdog, LCM, and event-loop threads from the - # test thread. The watchdog's self.stop() can't join itself, so these - # threads would otherwise leak. stop() is idempotent. - mod.stop() + assert crash_module._process is None, f"Watchdog did not clean up after process {pid} died" @pytest.mark.slow From e316626b09f2ef5b08a80973b2df2d1251251dec Mon Sep 17 00:00:00 2001 From: SUMMERxYANG <69720581+SUMMERxYANG@users.noreply.github.com> Date: Thu, 12 Mar 2026 21:09:29 +0000 Subject: [PATCH 4/8] CI code cleanup --- dimos/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dimos/conftest.py b/dimos/conftest.py index 29eaf05567..4ab8a401f8 100644 --- a/dimos/conftest.py +++ b/dimos/conftest.py @@ -160,7 +160,6 @@ def monitor_threads(request): if not any(t.name.startswith(prefix) for prefix in expected_persistent_thread_prefixes) ] - # Filter out threads we've already seen (from previous tests) truly_new = [t for t in new_threads if t.ident not in _seen_threads] From f13b2b325244a86ccc2a9a9d6568162723dcb826 Mon Sep 17 00:00:00 2001 From: Summer Yang Date: Thu, 12 Mar 2026 14:13:08 -0700 Subject: [PATCH 5/8] chore: retrigger CI From 3197ad3307a1a9e2f6c9d57589b14ad90d5e99cb Mon Sep 17 00:00:00 2001 From: Summer Yang Date: Thu, 12 Mar 2026 15:58:14 -0700 Subject: [PATCH 6/8] fix(test): join threads directly in crash_module fixture mod.stop() is a no-op when the watchdog already called it, so capture thread IDs before the test and join new ones in teardown. --- dimos/core/test_native_module.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dimos/core/test_native_module.py b/dimos/core/test_native_module.py index 600795b031..d5609003ba 100644 --- a/dimos/core/test_native_module.py +++ b/dimos/core/test_native_module.py @@ -96,12 +96,15 @@ def start(self) -> None: @pytest.fixture def crash_module() -> Generator[StubNativeModule, None, None]: """Create a StubNativeModule that dies after 0.2s, ensuring cleanup.""" + before = {t.ident for t in threading.enumerate()} mod = StubNativeModule(die_after=0.2) yield mod - # Join watchdog, LCM, and event-loop threads from the test thread. - # The watchdog's self.stop() can't join itself, so without this the - # threads leak. stop() is idempotent. - mod.stop() + # The watchdog calls stop() from its own thread, which sets + # _module_closed=True. A second stop() from here is then a no-op, + # so we explicitly join any threads the test created. + for t in threading.enumerate(): + if t.ident not in before and t is not threading.current_thread(): + t.join(timeout=5) def test_process_crash_triggers_stop(crash_module: StubNativeModule) -> None: From 43d5434b0779e5e92e6a4fd16fe61a74e14f94d3 Mon Sep 17 00:00:00 2001 From: SUMMERxYANG <69720581+SUMMERxYANG@users.noreply.github.com> Date: Thu, 12 Mar 2026 23:09:08 +0000 Subject: [PATCH 7/8] CI code cleanup --- dimos/core/test_native_module.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dimos/core/test_native_module.py b/dimos/core/test_native_module.py index d5609003ba..5811da4b08 100644 --- a/dimos/core/test_native_module.py +++ b/dimos/core/test_native_module.py @@ -19,7 +19,6 @@ """ from collections.abc import Generator -from dataclasses import dataclass import json from pathlib import Path import threading From 1ff8769015e7e9cae1d14fcddab506488e80b227 Mon Sep 17 00:00:00 2001 From: Summer Yang Date: Mon, 23 Mar 2026 16:58:44 -0700 Subject: [PATCH 8/8] fix(native_module): preserve watchdog reference so second stop() can join it --- dimos/core/native_module.py | 12 ++++++++--- dimos/core/test_native_module.py | 35 +++++++++++--------------------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/dimos/core/native_module.py b/dimos/core/native_module.py index f4a674cb5d..00cc2d77d0 100644 --- a/dimos/core/native_module.py +++ b/dimos/core/native_module.py @@ -183,11 +183,17 @@ def stop(self) -> None: ) self._process.kill() self._process.wait(timeout=5) - if self._watchdog is not None and self._watchdog is not threading.current_thread(): - self._watchdog.join(timeout=2) - self._watchdog = None self._process = None super().stop() + # Join the watchdog AFTER super().stop() so all module threads are + # cleaned up first. When the watchdog itself is the caller (crash + # path), it skips joining itself — but the thread exits naturally + # right after this returns. A second stop() from external code + # (e.g. test teardown) will reach here and join the now-finished + # watchdog thread, preventing monitor_threads from seeing a leak. + if self._watchdog is not None and self._watchdog is not threading.current_thread(): + self._watchdog.join(timeout=2) + self._watchdog = None def _watch_process(self) -> None: """Block until the native process exits; trigger stop() if it crashed.""" diff --git a/dimos/core/test_native_module.py b/dimos/core/test_native_module.py index 5811da4b08..a7e6bd2b9a 100644 --- a/dimos/core/test_native_module.py +++ b/dimos/core/test_native_module.py @@ -18,10 +18,8 @@ The echo script writes received CLI args to a temp file for assertions. """ -from collections.abc import Generator import json from pathlib import Path -import threading import time import pytest @@ -92,35 +90,26 @@ def start(self) -> None: pass -@pytest.fixture -def crash_module() -> Generator[StubNativeModule, None, None]: - """Create a StubNativeModule that dies after 0.2s, ensuring cleanup.""" - before = {t.ident for t in threading.enumerate()} - mod = StubNativeModule(die_after=0.2) - yield mod - # The watchdog calls stop() from its own thread, which sets - # _module_closed=True. A second stop() from here is then a no-op, - # so we explicitly join any threads the test created. - for t in threading.enumerate(): - if t.ident not in before and t is not threading.current_thread(): - t.join(timeout=5) - - -def test_process_crash_triggers_stop(crash_module: StubNativeModule) -> None: +def test_process_crash_triggers_stop() -> None: """When the native process dies unexpectedly, the watchdog calls stop().""" - crash_module.pointcloud.transport = LCMTransport("/pc", PointCloud2) - crash_module.start() + mod = StubNativeModule(die_after=0.2) + mod.pointcloud.transport = LCMTransport("/pc", PointCloud2) + mod.start() - assert crash_module._process is not None - pid = crash_module._process.pid + assert mod._process is not None + pid = mod._process.pid # Wait for the process to die and the watchdog to call stop() for _ in range(30): time.sleep(0.1) - if crash_module._process is None: + if mod._process is None: break - assert crash_module._process is None, f"Watchdog did not clean up after process {pid} died" + assert mod._process is None, f"Watchdog did not clean up after process {pid} died" + + # Join the watchdog thread. stop() is idempotent but will now join the + # watchdog on the second call since the reference is preserved. + mod.stop() @pytest.mark.slow