test: add E2E tests for global extension worker cleanup (#906)#913
test: add E2E tests for global extension worker cleanup (#906)#913
Conversation
Adds two E2E tests covering the acceptance criteria of #906: 1. test_graceful_disconnect_removes_global_extension — verifies that after jobs.disconnect() the extension disappears from GET /v1/joblib/rooms/@global/jobs within 2s (happy path). 2. test_sigkill_worker_cleared_by_sweeper — spawns a worker in a subprocess, SIGKILLs it, and asserts the background sweeper removes the extension within worker_timeout_seconds + one sweeper interval. Uses server_factory overrides (ZNDRAW_JOBLIB_WORKER_TIMEOUT_SECONDS=3, ZNDRAW_JOBLIB_SWEEPER_INTERVAL_SECONDS=1) so the test runs in ~5s. No production code changes — both scenarios already behave correctly. The tests lock in this behavior to guard against future regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds end-to-end tests and a subprocess helper that validate removal of globally-registered ( Changes
Sequence Diagram(s)Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/zndraw/worker/test_global_cleanup_e2e.py`:
- Around line 167-168: The test currently uses a fixed max_seconds=15.0 when
calling _poll_until_gone("SigkillCleanup"), which ignores the configured
worker_timeout and sweeper_interval and can mask slow regressions; change the
test to compute the allowed window from the configured values (worker_timeout +
sweeper_interval + small slack), record the start time before calling
_poll_until_gone(observer, "SigkillCleanup", ...), pass that computed
max_seconds (or assert that elapsed_time <= computed_window after the call) so
the assertion enforces the actual configured sweeper window rather than a
hardcoded 15s; reference the existing worker_timeout and sweeper_interval
variables and the _poll_until_gone / "SigkillCleanup" call to locate where to
update the assertion.
- Around line 143-157: The loop waiting for the child's "READY " line currently
calls proc.stdout.readline() which can block past the deadline; change the wait
to perform a timed wait on proc.stdout before reading by computing remaining =
deadline - time.time() and using a timed wait (e.g.,
select.select([proc.stdout], [], [], remaining) on proc.stdout.fileno()) so you
only call proc.stdout.readline() when data is ready and you re-check the
deadline each iteration; keep the existing proc.poll() and proc.stderr.read()
failure path and set ready_line when a line startswith "READY ".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 18d4b0b4-e396-4116-8c7e-2a1c874da497
📒 Files selected for processing (1)
tests/zndraw/worker/test_global_cleanup_e2e.py
| # Wait for child to signal READY (with a hard timeout) | ||
| assert proc.stdout is not None | ||
| deadline = time.time() + 20.0 | ||
| ready_line: str | None = None | ||
| while time.time() < deadline: | ||
| line = proc.stdout.readline() | ||
| if not line: | ||
| if proc.poll() is not None: | ||
| err = proc.stderr.read() if proc.stderr else "" | ||
| pytest.fail(f"child exited before READY:\n{err}") | ||
| continue | ||
| if line.startswith("READY "): | ||
| ready_line = line.strip() | ||
| break | ||
| assert ready_line is not None, "child did not print READY within 20s" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
python - <<'PY'
import subprocess
import sys
import threading
proc = subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(5)"],
stdout=subprocess.PIPE,
text=True,
)
state = {"returned": False}
def _read():
assert proc.stdout is not None
proc.stdout.readline()
state["returned"] = True
thread = threading.Thread(target=_read, daemon=True)
thread.start()
thread.join(timeout=1.0)
print(state) # expected: {'returned': False}
proc.kill()
proc.wait()
PYRepository: zincware/ZnDraw
Length of output: 78
🏁 Script executed:
cat -n tests/zndraw/worker/test_global_cleanup_e2e.py | sed -n '130,170p'Repository: zincware/ZnDraw
Length of output: 1847
🏁 Script executed:
rg '_CHILD_SCRIPT' tests/zndraw/worker/test_global_cleanup_e2e.py -B 5 -A 40Repository: zincware/ZnDraw
Length of output: 2767
The READY wait can hang indefinitely.
Line 148 uses blocking readline() without timeout, and if the child process stalls before printing READY\n (e.g., during ZnDraw initialization or registration at lines in _CHILD_SCRIPT), the loop never re-checks the deadline. The time.time() < deadline check at line 147 only executes at loop start, not during the blocking I/O, so the stated "hard timeout" is ineffective and the test can hang instead of failing after 20 seconds.
🛠️ One portable way to make the timeout real
+import queue
+import threading
...
- # Wait for child to signal READY (with a hard timeout)
- assert proc.stdout is not None
- deadline = time.time() + 20.0
- ready_line: str | None = None
- while time.time() < deadline:
- line = proc.stdout.readline()
+ assert proc.stdout is not None
+ lines: queue.Queue[str] = queue.Queue()
+
+ def _pump_stdout() -> None:
+ for line in proc.stdout:
+ lines.put(line)
+ lines.put("")
+
+ threading.Thread(target=_pump_stdout, daemon=True).start()
+ deadline = time.monotonic() + 20.0
+ ready_line: str | None = None
+ while time.monotonic() < deadline:
+ try:
+ line = lines.get(timeout=max(deadline - time.monotonic(), 0))
+ except queue.Empty:
+ break
if not line:
if proc.poll() is not None:
err = proc.stderr.read() if proc.stderr else ""
pytest.fail(f"child exited before READY:\n{err}")
- continue
+ break
if line.startswith("READY "):
ready_line = line.strip()
break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Wait for child to signal READY (with a hard timeout) | |
| assert proc.stdout is not None | |
| deadline = time.time() + 20.0 | |
| ready_line: str | None = None | |
| while time.time() < deadline: | |
| line = proc.stdout.readline() | |
| if not line: | |
| if proc.poll() is not None: | |
| err = proc.stderr.read() if proc.stderr else "" | |
| pytest.fail(f"child exited before READY:\n{err}") | |
| continue | |
| if line.startswith("READY "): | |
| ready_line = line.strip() | |
| break | |
| assert ready_line is not None, "child did not print READY within 20s" | |
| assert proc.stdout is not None | |
| lines: queue.Queue[str] = queue.Queue() | |
| def _pump_stdout() -> None: | |
| for line in proc.stdout: | |
| lines.put(line) | |
| lines.put("") | |
| threading.Thread(target=_pump_stdout, daemon=True).start() | |
| deadline = time.monotonic() + 20.0 | |
| ready_line: str | None = None | |
| while time.monotonic() < deadline: | |
| try: | |
| line = lines.get(timeout=max(deadline - time.monotonic(), 0)) | |
| except queue.Empty: | |
| break | |
| if not line: | |
| if proc.poll() is not None: | |
| err = proc.stderr.read() if proc.stderr else "" | |
| pytest.fail(f"child exited before READY:\n{err}") | |
| break | |
| if line.startswith("READY "): | |
| ready_line = line.strip() | |
| break | |
| assert ready_line is not None, "child did not print READY within 20s" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/zndraw/worker/test_global_cleanup_e2e.py` around lines 143 - 157, The
loop waiting for the child's "READY " line currently calls
proc.stdout.readline() which can block past the deadline; change the wait to
perform a timed wait on proc.stdout before reading by computing remaining =
deadline - time.time() and using a timed wait (e.g.,
select.select([proc.stdout], [], [], remaining) on proc.stdout.fileno()) so you
only call proc.stdout.readline() when data is ready and you re-check the
deadline each iteration; keep the existing proc.poll() and proc.stderr.read()
failure path and set ready_line when a line startswith "READY ".
| # Budget: worker_timeout (3s) + sweeper_interval (1s) + generous slack | ||
| _poll_until_gone(observer, "SigkillCleanup", max_seconds=15.0) |
There was a problem hiding this comment.
This no longer verifies the configured sweeper window.
Lines 128-129 set a 3s worker timeout and 1s sweeper interval, but Lines 167-168 only require disappearance within 15s and ignore the elapsed value. A much slower cleanup path would still pass, so this does not actually lock the acceptance criterion.
⏱️ Tighten the assertion to the configured window
- # Budget: worker_timeout (3s) + sweeper_interval (1s) + generous slack
- _poll_until_gone(observer, "SigkillCleanup", max_seconds=15.0)
+ # worker_timeout (3s) + sweeper_interval (1s) + small CI slack
+ elapsed = _poll_until_gone(observer, "SigkillCleanup", max_seconds=8.0)
+ assert elapsed <= 6.0, f"sweeper cleanup took {elapsed:.2f}s"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Budget: worker_timeout (3s) + sweeper_interval (1s) + generous slack | |
| _poll_until_gone(observer, "SigkillCleanup", max_seconds=15.0) | |
| # worker_timeout (3s) + sweeper_interval (1s) + small CI slack | |
| elapsed = _poll_until_gone(observer, "SigkillCleanup", max_seconds=8.0) | |
| assert elapsed <= 6.0, f"sweeper cleanup took {elapsed:.2f}s" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/zndraw/worker/test_global_cleanup_e2e.py` around lines 167 - 168, The
test currently uses a fixed max_seconds=15.0 when calling
_poll_until_gone("SigkillCleanup"), which ignores the configured worker_timeout
and sweeper_interval and can mask slow regressions; change the test to compute
the allowed window from the configured values (worker_timeout + sweeper_interval
+ small slack), record the start time before calling _poll_until_gone(observer,
"SigkillCleanup", ...), pass that computed max_seconds (or assert that
elapsed_time <= computed_window after the call) so the assertion enforces the
actual configured sweeper window rather than a hardcoded 15s; reference the
existing worker_timeout and sweeper_interval variables and the _poll_until_gone
/ "SigkillCleanup" call to locate where to update the assertion.
- Move the inline subprocess script out of a heredoc string into a proper _sigkill_worker_child.py helper file. The leading underscore keeps pytest from collecting it as a test. - Introduce a spawn_sigkill_worker fixture that owns the subprocess lifecycle (spawn, wait-for-READY, teardown kill) so the test body only contains the actual assertion logic. - Rename _Noop -> NoopCleanup and drop the _X.__name__ = "X" rename hack. The pattern is only justified in conftest.py where a fixture collides with the class name; in a plain test module the class can just be named directly. Note: other cargo-culted instances of the __name__ rename hack remain in test_global.py:527, test_run_kwargs.py:35, and test_resilience.py:36. They are unrelated to #906 and should be cleaned up in a follow-up. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
tests/zndraw/worker/test_global_cleanup_e2e.py (1)
100-110:⚠️ Potential issue | 🟡 MinorThe
readline()call can block past the deadline.The deadline check at line 101 only runs at loop iteration start, but
readline()at line 102 is blocking I/O. If the child stalls duringZnDrawinitialization or registration before printing anything, the call blocks indefinitely and the 20s timeout becomes ineffective.While the child subprocess is simple and unlikely to hang in practice, this could cause CI flakiness if the server is slow to respond.
🛠️ One portable fix using a reader thread
+import queue +import threading ... def _spawn(server_url: str) -> tuple[subprocess.Popen[str], str]: ... assert proc.stdout is not None - deadline = time.time() + 20.0 - while time.time() < deadline: - line = proc.stdout.readline() + lines: queue.Queue[str] = queue.Queue() + + def _pump() -> None: + assert proc.stdout is not None + for ln in proc.stdout: + lines.put(ln) + lines.put("") # sentinel on EOF + + threading.Thread(target=_pump, daemon=True).start() + deadline = time.monotonic() + 20.0 + while time.monotonic() < deadline: + try: + line = lines.get(timeout=max(0.1, deadline - time.monotonic())) + except queue.Empty: + continue if not line: if proc.poll() is not None: err = proc.stderr.read() if proc.stderr else "" pytest.fail(f"sigkill worker child exited before READY:\n{err}") - continue + break if line.startswith("READY "): return proc, line.strip().split(" ", 1)[1] pytest.fail("sigkill worker child did not print READY within 20s")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/zndraw/worker/test_global_cleanup_e2e.py` around lines 100 - 110, The loop that waits for READY uses blocking proc.stdout.readline(), so the deadline check can be bypassed; change the logic in the test (the block using proc, deadline, while, and proc.stdout.readline()) to perform a non-blocking/timeout-aware read: spawn a small reader thread or use select/poll with a timeout to read lines from proc.stdout and push them into a queue, then in the main loop poll that queue until deadline to check for a line starting with "READY " (and fall back to proc.poll()/proc.stderr reads on child exit); ensure the reader thread is joined/cleaned up on test exit so the test never blocks past 20s.
🧹 Nitpick comments (1)
tests/zndraw/worker/_sigkill_worker_child.py (1)
32-39: Missing argument validation could obscure test failures.If
_sigkill_worker_child.pyis accidentally invoked without a server URL argument, theIndexErrorfromsys.argv[1]will be less informative than an explicit error message. This is a minor robustness improvement for debugging.💡 Optional: Add minimal argument validation
if __name__ == "__main__": + if len(sys.argv) < 2: + sys.exit("Usage: _sigkill_worker_child.py <server_url>") server_url = sys.argv[1]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/zndraw/worker/_sigkill_worker_child.py` around lines 32 - 39, Add minimal argument validation before using sys.argv[1] in the __main__ block: check that len(sys.argv) > 1 and if not, log or print a clear error and exit (so the test failure is explicit). Update the startup sequence around ZnDraw(...) and worker.jobs.register(SigkillCleanup) to only run after the server_url is validated; include the problematic symbol names (sys.argv, ZnDraw, SigkillCleanup, worker.jobs.worker_id) in the message to make debugging easier.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@tests/zndraw/worker/test_global_cleanup_e2e.py`:
- Around line 100-110: The loop that waits for READY uses blocking
proc.stdout.readline(), so the deadline check can be bypassed; change the logic
in the test (the block using proc, deadline, while, and proc.stdout.readline())
to perform a non-blocking/timeout-aware read: spawn a small reader thread or use
select/poll with a timeout to read lines from proc.stdout and push them into a
queue, then in the main loop poll that queue until deadline to check for a line
starting with "READY " (and fall back to proc.poll()/proc.stderr reads on child
exit); ensure the reader thread is joined/cleaned up on test exit so the test
never blocks past 20s.
---
Nitpick comments:
In `@tests/zndraw/worker/_sigkill_worker_child.py`:
- Around line 32-39: Add minimal argument validation before using sys.argv[1] in
the __main__ block: check that len(sys.argv) > 1 and if not, log or print a
clear error and exit (so the test failure is explicit). Update the startup
sequence around ZnDraw(...) and worker.jobs.register(SigkillCleanup) to only run
after the server_url is validated; include the problematic symbol names
(sys.argv, ZnDraw, SigkillCleanup, worker.jobs.worker_id) in the message to make
debugging easier.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 329da8a3-290f-49fc-9b6c-9708e2115e4b
📒 Files selected for processing (2)
tests/zndraw/worker/_sigkill_worker_child.pytests/zndraw/worker/test_global_cleanup_e2e.py
… handlers vis.wait() documented behavior was "blocks until Ctrl+C" but the implementation delegated to socketio.Client.wait() which blocks until the transport disconnects. The docs were wrong — the implementation was correct: delegating to the library's own primitive is the right approach. JobManager.wait() had a separate ~25 LoC implementation that installed SIGINT/SIGTERM handlers and blocked on a threading.Event. This was necessary for the Event-based unblock (HTTP-only mode), but the signal handler reinvention was not — KeyboardInterrupt propagates naturally from the underlying select/poll call, and users can catch it themselves with try/except, matching the standard Python pattern. Changes: - ZnDraw.wait(): docstring now accurately describes socketio semantics with the canonical try/except KeyboardInterrupt pattern - JobManager.wait(): stripped signal-handler installation (~25 LoC removed). Now a one-liner: self._stop.wait(). Docstring documents the Pythonic try/except pattern and explains when this method (vs vis.wait) is appropriate. - SocketManager.wait(): docstring updated to point users toward vis.wait() as the documented public API - python-api.rst: all vis.wait() examples now show the consistent try/except KeyboardInterrupt + finally: vis.disconnect() pattern - README.md: same canonical pattern - _sigkill_worker_child.py: uses vis.wait() (socketio-based) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/zndraw_joblib/client.py (1)
277-295: Document recommended signal handler pattern for production workers.
JobManager.wait()has no built-in SIGTERM handling, onlyKeyboardInterruptsupport. Thedisconnect()method is thread-safe (guarded by_disconnect_lock) and can be safely called from a signal handler, but this pattern is not documented. Production workers running under Docker, systemd, or Kubernetes should wrap the wait loop with explicit signal handling:import signal manager = JobManager(api) manager.register(MyExtension) def handle_sigterm(signum, frame): manager.disconnect() signal.signal(signal.SIGTERM, handle_sigterm) try: manager.wait() except KeyboardInterrupt: pass finally: manager.disconnect()Consider adding this example to the docstring or creating a guide for production deployments.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/zndraw_joblib/client.py` around lines 277 - 295, Update the JobManager.wait() docstring to document the recommended SIGTERM handling pattern for production workers: state that JobManager.disconnect() is thread-safe (guarded by _disconnect_lock) and may be safely invoked from a signal handler, and add a concise example showing registering a SIGTERM handler that calls manager.disconnect() (using signal.signal(signal.SIGTERM, handler)) around the existing try/except KeyboardInterrupt wrapper so operators running under Docker/systemd/Kubernetes know to install the handler and still call manager.disconnect() in finally.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/zndraw_joblib/client.py`:
- Around line 277-295: Update the JobManager.wait() docstring to document the
recommended SIGTERM handling pattern for production workers: state that
JobManager.disconnect() is thread-safe (guarded by _disconnect_lock) and may be
safely invoked from a signal handler, and add a concise example showing
registering a SIGTERM handler that calls manager.disconnect() (using
signal.signal(signal.SIGTERM, handler)) around the existing try/except
KeyboardInterrupt wrapper so operators running under Docker/systemd/Kubernetes
know to install the handler and still call manager.disconnect() in finally.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 79507458-4bce-4ae3-98f4-3521ccd43aad
📒 Files selected for processing (6)
README.mddocs/source/python-api.rstsrc/zndraw/client/core.pysrc/zndraw/client/socket.pysrc/zndraw_joblib/client.pytests/zndraw/worker/_sigkill_worker_child.py
✅ Files skipped from review due to trivial changes (2)
- src/zndraw/client/socket.py
- src/zndraw/client/core.py
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/zndraw/worker/_sigkill_worker_child.py
Summary
Adds two E2E tests covering the acceptance criteria of #906. No production code changes — investigation against the live server at
https://zndraw.icp.uni-stuttgart.deconfirmed both scenarios already behave correctly. These tests lock in that behavior against future regressions.Investigation recap
Reproduced all three scenarios against production with default sweeper settings (
worker_timeout_seconds=60,sweeper_interval_seconds=30):cleanup_worker()→JobsInvalidateworker_timeout + sweeper_intervalwindow, as designedThe ~85s latency on the unhappy paths is the deliberate cost of blip tolerance from #887 — lowering it would reintroduce the problem that commit fixed. #911 already prevents accidental submission to 0-worker jobs, so the user-visible impact is cosmetic: a grayed-out "0 workers" chip for up to ~90s during edge cases, then self-heals.
The "Consider: emit
JobsInvalidatefrom the SIOLeaveJobRoomhandler" bullet was evaluated but skipped — on the happy path it's redundant (HTTP DELETE already emits), and on the unhappy path it doesn't change observable state (the refetch returns the same data).What this PR adds
test_graceful_disconnect_removes_global_extension— verifies the extension disappears fromGET /v1/joblib/rooms/@global/jobswithin 2 seconds ofjobs.disconnect(). Matches the first acceptance bullet of Global extension stays listed after worker graceful shutdown #906.test_sigkill_worker_cleared_by_sweeper— spawns a worker in a subprocess, SIGKILLs it, and asserts the sweeper removes the extension withinworker_timeout_seconds + sweeper_interval. Usesserver_factoryoverrides (ZNDRAW_JOBLIB_WORKER_TIMEOUT_SECONDS=3,ZNDRAW_JOBLIB_SWEEPER_INTERVAL_SECONDS=1) so the test runs in ~5s instead of ~90s. Matches the second acceptance bullet of Global extension stays listed after worker graceful shutdown #906.Closes #906.
Test plan
uv run pytest tests/zndraw/worker/test_global_cleanup_e2e.py— 2 passed in 6.46suv run pytest tests/zndraw/worker/ tests/zndraw_joblib/— 385 passed, 1 skipped, 0 failures in 142s (no regressions)uvx prek run --files tests/zndraw/worker/test_global_cleanup_e2e.py— all hooks pass🤖 Generated with Claude Code
Summary by CodeRabbit