[2.7] Fix external kill before server download#4275
[2.7] Fix external kill before server download#4275chesterxgchen merged 29 commits intoNVIDIA:2.7from
Conversation
Greptile SummaryThis PR fixes a reliable deadlock in the The fix introduces a deferred Key changes and observations:
Confidence Score: 4/5
Sequence DiagramsequenceDiagram
participant CR as ClientRunner
participant LE as LauncherExecutor
participant SL as SubprocessLauncher
participant SP as Subprocess (FlareAgent)
participant SV as Server
Note over CR,SV: Round N — launch_once=False (deferred-stop path)
CR->>LE: execute(task_name)
LE->>LE: _initialize_external_execution()<br/>waits _deferred_stop_event (set → OK)
LE->>SL: launch_task()
SL->>SP: spawn subprocess
SP-->>LE: flare.init() / heartbeat (pipe up)
LE->>LE: _wait_external_setup() → True
LE->>SP: send task shareable via pipe
SP->>SP: train / compute result
SP->>LE: send result via CellPipe (pass_through_on_send)
Note over SP: registers DOWNLOAD_COMPLETE_CB<br/>blocks on download_done.wait()
LE->>LE: check_output_shareable() → sets _received_result
LE->>LE: _finalize_external_execution():<br/>deferred path → clear _deferred_stop_event,<br/>start deferred-stop thread, return immediately
LE-->>CR: return result (execute() returns)
CR->>SV: SubmitUpdate (subprocess cell still alive!)
SV->>SP: download tensors from DownloadService
SP->>SP: download_done fires → DOWNLOAD_COMPLETE_CB
SP->>SP: os._exit(0) — subprocess exits
Note over LE: deferred-stop thread polls check_run_status
LE->>SL: stop_task() [deferred thread]
SL->>SL: _stop_external_process() → _process = None
LE->>LE: _deferred_stop_event.set()
Note over CR,SV: Round N+1
CR->>LE: execute(task_name)
LE->>LE: _initialize_external_execution()<br/>waits _deferred_stop_event (now set → OK)
LE->>LE: reset_peer_is_up_or_dead()
LE->>SL: launch_task() → new subprocess
|
There was a problem hiding this comment.
Pull request overview
Fixes a deadlock/failure mode in the subprocess Client API path where the client subprocess was being torn down (via stop_task()) before the server could connect back to the subprocess DownloadService to pull large tensors.
Changes:
- Add an opt-in deferred
stop_task()mechanism inLauncherExecutorthat waits for the external process to exit naturally (or times out) before stopping/cleaning up. - Add round-boundary synchronization (
_deferred_stop_event) so a deferred cleanup from round N can’t interfere with launching round N+1. - Enable the deferred-stop behavior for
ClientAPILauncherExecutorby wiring_stop_task_wait_timeouttodownload_complete_timeout.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
nvflare/app_common/executors/launcher_executor.py |
Adds deferred stop thread + event coordination to avoid killing the subprocess before server-side downloads complete. |
nvflare/app_common/executors/client_api_launcher_executor.py |
Opts the Client API executor into deferred-stop behavior using download_complete_timeout. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@greptile review |
|
@greptile review |
|
@greptile review |
|
@greptile review |
|
For Issue 2 — handler started before launch_task() "This one is not a real problem in practice. The concern is that the fresh PipeHandler starts its heartbeat clock before the subprocess connects. heartbeat_timeout=60s by default, and _ensure_pipe_handler_alive() is called only nanoseconds before launch_task(). The subprocess only needs to connect within 60s of launch_task() — which is already enforced by _external_pre_init_timeout. There's no new window being opened. Even if PEER_GONE fired early (which would require the subprocess to take >60s just to call flare.init()), _wait_external_setup would detect it: peer_is_up_or_dead() returns True (set by PEER_GONE), _wait_external_setup returns, then execute() immediately sees pipe_handler.asked_to_stop=True and returns TASK_ABORTED. It's a detectable failure, not a silent hang." not going to change code |
|
/build |
|
Codex review found two issues in this PR.
I added a minimal local reproducer test for this against the PR branch and it fails in both modes.
These findings are from Codex review and local validation on PR #4275. |
|
/build |
## Fix: subprocess cell torn down before server tensor download (large-model deadlock) ### Problem `big_model_4g` CI test reliably failed with: ``` RuntimeError: failed to download from site-X_active ``` after 1–3 hours of retry loops (300 s SubmitUpdate timeout × 3 download retries). **Root cause 1 — ordering bug in `_finalize_external_execution()`:** The subprocess client API works by keeping the subprocess's CellNet connection alive after it sends its result, so the server can pull large tensors directly from the subprocess's `DownloadService` (the "reverse PASS_THROUGH" path). To prevent the subprocess from exiting before the download completes, `_do_submit_result()` blocks on a `download_done.wait(1800 s)` event that fires only after the server has finished downloading all tensors. However, `_finalize_external_execution()` called `stop_task()` **synchronously**, which sends `SIGTERM` to the subprocess immediately after `execute()` receives the result — before `ClientRunner` had even sent `SubmitUpdate` to the server, let alone before the server had started downloading. This tore down the subprocess cell, causing every server download attempt to hit `"no connection to site-X_active"` / `"cannot forward req: no path"`. The subprocess-side wait was therefore unreachable: the process was killed externally before it could block. **Root cause 2 — `launch_once=True` subprocess exits after round 1:** `_do_submit_result()` called `os._exit(0)` unconditionally at the end of the `CellPipe + pass_through_on_send` path. For `launch_once=False` (one subprocess per round) this is correct — the process should exit immediately after the download so the deferred-stop poller unblocks. But for `launch_once=True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`, `pt_client_api_launch_once`), the subprocess was killed after round 1, leaving rounds 2–N unhandled and the job hanging. --- ### Fix **1. Deferred `stop_task()` (`launcher_executor.py`)** Instead of calling `stop_task()` synchronously, `_finalize_external_execution()` now starts a background thread (when `_stop_task_wait_timeout > 0`) that polls `check_run_status()` until the subprocess exits naturally (i.e. after `download_done` fires and the subprocess unblocks), then calls `stop_task()`. `execute()` returns immediately, so `ClientRunner` can send `SubmitUpdate` and the server can connect to the still-alive subprocess cell to download tensors. **2. Round-boundary coordination (`_deferred_stop_event`)** Without additional synchronization, the deferred thread from round N could fire *after* round N+1's `launch_task()` call. Since `SubprocessLauncher` guards `_start_external_process()` with `if self._process is None`, seeing a not-yet-cleared reference to the exited round-N process causes it to skip starting a new subprocess. Round N+1 then sees `COMPLETE_SUCCESS` immediately and fails with `"External process has not called flare.init and run status becomes success"`. A `threading.Event` (`_deferred_stop_event`, initially set) coordinates the two: - **Cleared** in `_finalize_external_execution()` just before the deferred thread starts. - **Set** unconditionally in a `finally` block when the deferred thread completes. - `_initialize_external_execution()` **waits** on this event before calling `launch_task()`, with a forced `stop_task()` fallback if it times out. In practice the wait is 0–1 s (subprocess exits naturally after download, well before the server completes global aggregation and dispatches the next round's task), so there is no meaningful latency impact. **3. `ClientAPILauncherExecutor` opt-in (`client_api_launcher_executor.py`)** `_stop_task_wait_timeout` is set to `download_complete_timeout` (default 1800 s) in `ClientAPILauncherExecutor.__init__()`, enabling the deferred path only for the subprocess client API where large-tensor downloads are expected. The base `LauncherExecutor` defaults to `0.0` (original synchronous behaviour, no change). **4. `launch_once`-aware subprocess exit (`flare_agent.py`, `config.py`, `ex_process/api.py`)** `prepare_config_for_launch()` now writes `launch_once` (derived from `launcher.needs_deferred_stop()`) into the subprocess config file. The subprocess reads it via `ClientConfig.get_launch_once()` and passes it to `FlareAgent`. `_do_submit_result()` branches on `_launch_once`: | `launch_once` | Behaviour after download gate | |---|---| | `False` (one subprocess per round, e.g. `pt-client-api`) | `os._exit(0)` called directly — deferred-stop poller unblocks immediately (original behaviour preserved) | | `True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`) | `atexit.register(os._exit, 0)` registered once — subprocess continues to next round; `os._exit` fires only when `main()` finally returns, bypassing non-daemon CoreCell thread cleanup | `_resolve_launch_once()` safely fetches the launcher even when `self.launcher` is still `None` at `initialize()` time (resolved directly from the engine component registry). **5. Pipe handler identity guard and safe close (`task_exchanger.py`)** Pipe handler creation is refactored into `_create_pipe_handler()`, which binds a per-handler status callback that checks `self.pipe_handler is _h` before acting. This prevents a late `PEER_GONE` from round N's (now-stale) handler from stopping round N+1's handler. `stop(close_pipe=False)` is used because `CellPipe.close()` is irreversible — closing it in the status callback would prevent the next round from communicating. An explicit `self.pipe.close()` is added in `END_RUN` instead. **6. Defensive logging and heartbeat error handling (`pipe_handler.py`)** `_send_to_pipe` now logs `asked_to_stop` and `abort_triggered` status when a send is suppressed. Peer-gone detection logs the elapsed time since the last heartbeat. Heartbeat sends are wrapped in `try/except` so a broken pipe sets `asked_to_stop` and breaks the heartbeat loop cleanly instead of propagating an unhandled exception. **7. New unit tests (`test_download_complete_gating.py`, `test_download_initiated_gating.py`)** Two new test files covering the subprocess download-gating behaviour (the `download_done` wait, the validate-path fast return, and `launch_once`). Both use `FlareAgent.__new__()` to construct minimal agent stubs. To prevent `os._exit(0)` from killing the pytest-xdist worker: - An `autouse` `_no_os_exit` fixture patches `nvflare.client.flare_agent.os._exit` to a no-op for every test in the file. - `_make_agent()` sets `agent._launch_once = False` (the per-round path that calls `os._exit` directly, making the fixture's patch the active guard). --- ### Files changed | File | Change | |------|--------| | `nvflare/app_common/executors/launcher_executor.py` | Deferred `stop_task()` background thread + `_deferred_stop_event` round-boundary coordination | | `nvflare/app_common/executors/client_api_launcher_executor.py` | Set `_stop_task_wait_timeout = download_complete_timeout`; add `_resolve_launch_once()`; write `LAUNCH_ONCE` to subprocess config | | `nvflare/app_common/abstract/launcher.py` | `needs_deferred_stop()` abstract method + idempotency/thread-safety note on `stop_task()` | | `nvflare/app_common/launchers/subprocess_launcher.py` | Implement `needs_deferred_stop()`; add info logging for process start/stop | | `nvflare/app_common/executors/task_exchanger.py` | Refactor pipe handler creation into `_create_pipe_handler()` with identity-checking callback; use `close_pipe=False` to prevent irreversible `CellPipe.close()` | | `nvflare/app_common/widgets/metric_relay.py` | Include `msg.data` in pipe status log message | | `nvflare/fuel/utils/pipe/pipe_handler.py` | Enhanced logging (send failures, peer-gone elapsed time); heartbeat send error handling | | `nvflare/client/config.py` | Add `LAUNCH_ONCE` config key + `get_launch_once()` | | `nvflare/client/flare_agent.py` | `launch_once`-aware `_do_submit_result()`: direct `os._exit` vs `atexit` | | `nvflare/client/ex_process/api.py` | Pass `launch_once` to `FlareAgentWithFLModel` | | `tests/unit_test/client/test_download_complete_gating.py` | **New** — tests for DOWNLOAD_COMPLETE_CB registration, download-done wait, timeout, status logging, and cleanup | | `tests/unit_test/client/test_download_initiated_gating.py` | **New** — tests for thread-local download-initiation detection (validate-path fast return, no spurious 1800 s wait) | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | New tests for deferred stop, `_deferred_stop_event`, `_stop_task_wait_timeout` | ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated.
### Description Cherry-pick #4225 #4231 #4247 #4250 #4260 #4259 #4272 #4270 #4249 #4275 #4296 #4297 #4309 #4307 #4312 #4314 ### Types of changes <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated. --------- Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Peter Cnudde <pcnudde@nvidia.com>
(large-model deadlock) `big_model_4g` CI test reliably failed with: ``` RuntimeError: failed to download from site-X_active ``` after 1–3 hours of retry loops (300 s SubmitUpdate timeout × 3 download retries). **Root cause 1 — ordering bug in `_finalize_external_execution()`:** The subprocess client API works by keeping the subprocess's CellNet connection alive after it sends its result, so the server can pull large tensors directly from the subprocess's `DownloadService` (the "reverse PASS_THROUGH" path). To prevent the subprocess from exiting before the download completes, `_do_submit_result()` blocks on a `download_done.wait(1800 s)` event that fires only after the server has finished downloading all tensors. However, `_finalize_external_execution()` called `stop_task()` **synchronously**, which sends `SIGTERM` to the subprocess immediately after `execute()` receives the result — before `ClientRunner` had even sent `SubmitUpdate` to the server, let alone before the server had started downloading. This tore down the subprocess cell, causing every server download attempt to hit `"no connection to site-X_active"` / `"cannot forward req: no path"`. The subprocess-side wait was therefore unreachable: the process was killed externally before it could block. **Root cause 2 — `launch_once=True` subprocess exits after round 1:** `_do_submit_result()` called `os._exit(0)` unconditionally at the end of the `CellPipe + pass_through_on_send` path. For `launch_once=False` (one subprocess per round) this is correct — the process should exit immediately after the download so the deferred-stop poller unblocks. But for `launch_once=True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`, `pt_client_api_launch_once`), the subprocess was killed after round 1, leaving rounds 2–N unhandled and the job hanging. --- **1. Deferred `stop_task()` (`launcher_executor.py`)** Instead of calling `stop_task()` synchronously, `_finalize_external_execution()` now starts a background thread (when `_stop_task_wait_timeout > 0`) that polls `check_run_status()` until the subprocess exits naturally (i.e. after `download_done` fires and the subprocess unblocks), then calls `stop_task()`. `execute()` returns immediately, so `ClientRunner` can send `SubmitUpdate` and the server can connect to the still-alive subprocess cell to download tensors. **2. Round-boundary coordination (`_deferred_stop_event`)** Without additional synchronization, the deferred thread from round N could fire *after* round N+1's `launch_task()` call. Since `SubprocessLauncher` guards `_start_external_process()` with `if self._process is None`, seeing a not-yet-cleared reference to the exited round-N process causes it to skip starting a new subprocess. Round N+1 then sees `COMPLETE_SUCCESS` immediately and fails with `"External process has not called flare.init and run status becomes success"`. A `threading.Event` (`_deferred_stop_event`, initially set) coordinates the two: - **Cleared** in `_finalize_external_execution()` just before the deferred thread starts. - **Set** unconditionally in a `finally` block when the deferred thread completes. - `_initialize_external_execution()` **waits** on this event before calling `launch_task()`, with a forced `stop_task()` fallback if it times out. In practice the wait is 0–1 s (subprocess exits naturally after download, well before the server completes global aggregation and dispatches the next round's task), so there is no meaningful latency impact. **3. `ClientAPILauncherExecutor` opt-in (`client_api_launcher_executor.py`)** `_stop_task_wait_timeout` is set to `download_complete_timeout` (default 1800 s) in `ClientAPILauncherExecutor.__init__()`, enabling the deferred path only for the subprocess client API where large-tensor downloads are expected. The base `LauncherExecutor` defaults to `0.0` (original synchronous behaviour, no change). **4. `launch_once`-aware subprocess exit (`flare_agent.py`, `config.py`, `ex_process/api.py`)** `prepare_config_for_launch()` now writes `launch_once` (derived from `launcher.needs_deferred_stop()`) into the subprocess config file. The subprocess reads it via `ClientConfig.get_launch_once()` and passes it to `FlareAgent`. `_do_submit_result()` branches on `_launch_once`: | `launch_once` | Behaviour after download gate | |---|---| | `False` (one subprocess per round, e.g. `pt-client-api`) | `os._exit(0)` called directly — deferred-stop poller unblocks immediately (original behaviour preserved) | | `True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`) | `atexit.register(os._exit, 0)` registered once — subprocess continues to next round; `os._exit` fires only when `main()` finally returns, bypassing non-daemon CoreCell thread cleanup | `_resolve_launch_once()` safely fetches the launcher even when `self.launcher` is still `None` at `initialize()` time (resolved directly from the engine component registry). **5. Pipe handler identity guard and safe close (`task_exchanger.py`)** Pipe handler creation is refactored into `_create_pipe_handler()`, which binds a per-handler status callback that checks `self.pipe_handler is _h` before acting. This prevents a late `PEER_GONE` from round N's (now-stale) handler from stopping round N+1's handler. `stop(close_pipe=False)` is used because `CellPipe.close()` is irreversible — closing it in the status callback would prevent the next round from communicating. An explicit `self.pipe.close()` is added in `END_RUN` instead. **6. Defensive logging and heartbeat error handling (`pipe_handler.py`)** `_send_to_pipe` now logs `asked_to_stop` and `abort_triggered` status when a send is suppressed. Peer-gone detection logs the elapsed time since the last heartbeat. Heartbeat sends are wrapped in `try/except` so a broken pipe sets `asked_to_stop` and breaks the heartbeat loop cleanly instead of propagating an unhandled exception. **7. New unit tests (`test_download_complete_gating.py`, `test_download_initiated_gating.py`)** Two new test files covering the subprocess download-gating behaviour (the `download_done` wait, the validate-path fast return, and `launch_once`). Both use `FlareAgent.__new__()` to construct minimal agent stubs. To prevent `os._exit(0)` from killing the pytest-xdist worker: - An `autouse` `_no_os_exit` fixture patches `nvflare.client.flare_agent.os._exit` to a no-op for every test in the file. - `_make_agent()` sets `agent._launch_once = False` (the per-round path that calls `os._exit` directly, making the fixture's patch the active guard). --- | File | Change | |------|--------| | `nvflare/app_common/executors/launcher_executor.py` | Deferred `stop_task()` background thread + `_deferred_stop_event` round-boundary coordination | | `nvflare/app_common/executors/client_api_launcher_executor.py` | Set `_stop_task_wait_timeout = download_complete_timeout`; add `_resolve_launch_once()`; write `LAUNCH_ONCE` to subprocess config | | `nvflare/app_common/abstract/launcher.py` | `needs_deferred_stop()` abstract method + idempotency/thread-safety note on `stop_task()` | | `nvflare/app_common/launchers/subprocess_launcher.py` | Implement `needs_deferred_stop()`; add info logging for process start/stop | | `nvflare/app_common/executors/task_exchanger.py` | Refactor pipe handler creation into `_create_pipe_handler()` with identity-checking callback; use `close_pipe=False` to prevent irreversible `CellPipe.close()` | | `nvflare/app_common/widgets/metric_relay.py` | Include `msg.data` in pipe status log message | | `nvflare/fuel/utils/pipe/pipe_handler.py` | Enhanced logging (send failures, peer-gone elapsed time); heartbeat send error handling | | `nvflare/client/config.py` | Add `LAUNCH_ONCE` config key + `get_launch_once()` | | `nvflare/client/flare_agent.py` | `launch_once`-aware `_do_submit_result()`: direct `os._exit` vs `atexit` | | `nvflare/client/ex_process/api.py` | Pass `launch_once` to `FlareAgentWithFLModel` | | `tests/unit_test/client/test_download_complete_gating.py` | **New** — tests for DOWNLOAD_COMPLETE_CB registration, download-done wait, timeout, status logging, and cleanup | | `tests/unit_test/client/test_download_initiated_gating.py` | **New** — tests for thread-local download-initiation detection (validate-path fast return, no spurious 1800 s wait) | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | New tests for deferred stop, `_deferred_stop_event`, `_stop_task_wait_timeout` | <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated.
(large-model deadlock) `big_model_4g` CI test reliably failed with: ``` RuntimeError: failed to download from site-X_active ``` after 1–3 hours of retry loops (300 s SubmitUpdate timeout × 3 download retries). **Root cause 1 — ordering bug in `_finalize_external_execution()`:** The subprocess client API works by keeping the subprocess's CellNet connection alive after it sends its result, so the server can pull large tensors directly from the subprocess's `DownloadService` (the "reverse PASS_THROUGH" path). To prevent the subprocess from exiting before the download completes, `_do_submit_result()` blocks on a `download_done.wait(1800 s)` event that fires only after the server has finished downloading all tensors. However, `_finalize_external_execution()` called `stop_task()` **synchronously**, which sends `SIGTERM` to the subprocess immediately after `execute()` receives the result — before `ClientRunner` had even sent `SubmitUpdate` to the server, let alone before the server had started downloading. This tore down the subprocess cell, causing every server download attempt to hit `"no connection to site-X_active"` / `"cannot forward req: no path"`. The subprocess-side wait was therefore unreachable: the process was killed externally before it could block. **Root cause 2 — `launch_once=True` subprocess exits after round 1:** `_do_submit_result()` called `os._exit(0)` unconditionally at the end of the `CellPipe + pass_through_on_send` path. For `launch_once=False` (one subprocess per round) this is correct — the process should exit immediately after the download so the deferred-stop poller unblocks. But for `launch_once=True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`, `pt_client_api_launch_once`), the subprocess was killed after round 1, leaving rounds 2–N unhandled and the job hanging. --- **1. Deferred `stop_task()` (`launcher_executor.py`)** Instead of calling `stop_task()` synchronously, `_finalize_external_execution()` now starts a background thread (when `_stop_task_wait_timeout > 0`) that polls `check_run_status()` until the subprocess exits naturally (i.e. after `download_done` fires and the subprocess unblocks), then calls `stop_task()`. `execute()` returns immediately, so `ClientRunner` can send `SubmitUpdate` and the server can connect to the still-alive subprocess cell to download tensors. **2. Round-boundary coordination (`_deferred_stop_event`)** Without additional synchronization, the deferred thread from round N could fire *after* round N+1's `launch_task()` call. Since `SubprocessLauncher` guards `_start_external_process()` with `if self._process is None`, seeing a not-yet-cleared reference to the exited round-N process causes it to skip starting a new subprocess. Round N+1 then sees `COMPLETE_SUCCESS` immediately and fails with `"External process has not called flare.init and run status becomes success"`. A `threading.Event` (`_deferred_stop_event`, initially set) coordinates the two: - **Cleared** in `_finalize_external_execution()` just before the deferred thread starts. - **Set** unconditionally in a `finally` block when the deferred thread completes. - `_initialize_external_execution()` **waits** on this event before calling `launch_task()`, with a forced `stop_task()` fallback if it times out. In practice the wait is 0–1 s (subprocess exits naturally after download, well before the server completes global aggregation and dispatches the next round's task), so there is no meaningful latency impact. **3. `ClientAPILauncherExecutor` opt-in (`client_api_launcher_executor.py`)** `_stop_task_wait_timeout` is set to `download_complete_timeout` (default 1800 s) in `ClientAPILauncherExecutor.__init__()`, enabling the deferred path only for the subprocess client API where large-tensor downloads are expected. The base `LauncherExecutor` defaults to `0.0` (original synchronous behaviour, no change). **4. `launch_once`-aware subprocess exit (`flare_agent.py`, `config.py`, `ex_process/api.py`)** `prepare_config_for_launch()` now writes `launch_once` (derived from `launcher.needs_deferred_stop()`) into the subprocess config file. The subprocess reads it via `ClientConfig.get_launch_once()` and passes it to `FlareAgent`. `_do_submit_result()` branches on `_launch_once`: | `launch_once` | Behaviour after download gate | |---|---| | `False` (one subprocess per round, e.g. `pt-client-api`) | `os._exit(0)` called directly — deferred-stop poller unblocks immediately (original behaviour preserved) | | `True` (one subprocess for all rounds, e.g. `np-loop-cell-pipe`) | `atexit.register(os._exit, 0)` registered once — subprocess continues to next round; `os._exit` fires only when `main()` finally returns, bypassing non-daemon CoreCell thread cleanup | `_resolve_launch_once()` safely fetches the launcher even when `self.launcher` is still `None` at `initialize()` time (resolved directly from the engine component registry). **5. Pipe handler identity guard and safe close (`task_exchanger.py`)** Pipe handler creation is refactored into `_create_pipe_handler()`, which binds a per-handler status callback that checks `self.pipe_handler is _h` before acting. This prevents a late `PEER_GONE` from round N's (now-stale) handler from stopping round N+1's handler. `stop(close_pipe=False)` is used because `CellPipe.close()` is irreversible — closing it in the status callback would prevent the next round from communicating. An explicit `self.pipe.close()` is added in `END_RUN` instead. **6. Defensive logging and heartbeat error handling (`pipe_handler.py`)** `_send_to_pipe` now logs `asked_to_stop` and `abort_triggered` status when a send is suppressed. Peer-gone detection logs the elapsed time since the last heartbeat. Heartbeat sends are wrapped in `try/except` so a broken pipe sets `asked_to_stop` and breaks the heartbeat loop cleanly instead of propagating an unhandled exception. **7. New unit tests (`test_download_complete_gating.py`, `test_download_initiated_gating.py`)** Two new test files covering the subprocess download-gating behaviour (the `download_done` wait, the validate-path fast return, and `launch_once`). Both use `FlareAgent.__new__()` to construct minimal agent stubs. To prevent `os._exit(0)` from killing the pytest-xdist worker: - An `autouse` `_no_os_exit` fixture patches `nvflare.client.flare_agent.os._exit` to a no-op for every test in the file. - `_make_agent()` sets `agent._launch_once = False` (the per-round path that calls `os._exit` directly, making the fixture's patch the active guard). --- | File | Change | |------|--------| | `nvflare/app_common/executors/launcher_executor.py` | Deferred `stop_task()` background thread + `_deferred_stop_event` round-boundary coordination | | `nvflare/app_common/executors/client_api_launcher_executor.py` | Set `_stop_task_wait_timeout = download_complete_timeout`; add `_resolve_launch_once()`; write `LAUNCH_ONCE` to subprocess config | | `nvflare/app_common/abstract/launcher.py` | `needs_deferred_stop()` abstract method + idempotency/thread-safety note on `stop_task()` | | `nvflare/app_common/launchers/subprocess_launcher.py` | Implement `needs_deferred_stop()`; add info logging for process start/stop | | `nvflare/app_common/executors/task_exchanger.py` | Refactor pipe handler creation into `_create_pipe_handler()` with identity-checking callback; use `close_pipe=False` to prevent irreversible `CellPipe.close()` | | `nvflare/app_common/widgets/metric_relay.py` | Include `msg.data` in pipe status log message | | `nvflare/fuel/utils/pipe/pipe_handler.py` | Enhanced logging (send failures, peer-gone elapsed time); heartbeat send error handling | | `nvflare/client/config.py` | Add `LAUNCH_ONCE` config key + `get_launch_once()` | | `nvflare/client/flare_agent.py` | `launch_once`-aware `_do_submit_result()`: direct `os._exit` vs `atexit` | | `nvflare/client/ex_process/api.py` | Pass `launch_once` to `FlareAgentWithFLModel` | | `tests/unit_test/client/test_download_complete_gating.py` | **New** — tests for DOWNLOAD_COMPLETE_CB registration, download-done wait, timeout, status logging, and cleanup | | `tests/unit_test/client/test_download_initiated_gating.py` | **New** — tests for thread-local download-initiation detection (validate-path fast return, no spurious 1800 s wait) | | `tests/unit_test/app_common/executors/client_api_launcher_executor_test.py` | New tests for deferred stop, `_deferred_stop_event`, `_stop_task_wait_timeout` | <!--- Put an `x` in all the boxes that apply, and remove the not applicable items --> - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated.
Fix: subprocess cell torn down before server tensor download (large-model deadlock)
Problem
big_model_4gCI test reliably failed with:after 1–3 hours of retry loops (300 s SubmitUpdate timeout × 3 download retries).
Root cause 1 — ordering bug in
_finalize_external_execution():The subprocess client API works by keeping the subprocess's CellNet connection alive
after it sends its result, so the server can pull large tensors directly from the
subprocess's
DownloadService(the "reverse PASS_THROUGH" path). To prevent thesubprocess from exiting before the download completes,
_do_submit_result()blockson a
download_done.wait(1800 s)event that fires only after the server hasfinished downloading all tensors.
However,
_finalize_external_execution()calledstop_task()synchronously,which sends
SIGTERMto the subprocess immediately afterexecute()receives theresult — before
ClientRunnerhad even sentSubmitUpdateto the server, let alonebefore the server had started downloading. This tore down the subprocess cell,
causing every server download attempt to hit
"no connection to site-X_active"/"cannot forward req: no path".The subprocess-side wait was therefore unreachable: the process was killed
externally before it could block.
Root cause 2 —
launch_once=Truesubprocess exits after round 1:_do_submit_result()calledos._exit(0)unconditionally at the end of theCellPipe + pass_through_on_sendpath. Forlaunch_once=False(one subprocess perround) this is correct — the process should exit immediately after the download so
the deferred-stop poller unblocks. But for
launch_once=True(one subprocess forall rounds, e.g.
np-loop-cell-pipe,pt_client_api_launch_once), the subprocesswas killed after round 1, leaving rounds 2–N unhandled and the job hanging.
Fix
1. Deferred
stop_task()(launcher_executor.py)Instead of calling
stop_task()synchronously,_finalize_external_execution()now starts a background thread (when
_stop_task_wait_timeout > 0) that pollscheck_run_status()until the subprocess exits naturally (i.e. afterdownload_donefires and the subprocess unblocks), then callsstop_task().execute()returns immediately, soClientRunnercan sendSubmitUpdateand theserver can connect to the still-alive subprocess cell to download tensors.
2. Round-boundary coordination (
_deferred_stop_event)Without additional synchronization, the deferred thread from round N could fire
after round N+1's
launch_task()call. SinceSubprocessLauncherguards_start_external_process()withif self._process is None, seeing a not-yet-clearedreference to the exited round-N process causes it to skip starting a new subprocess.
Round N+1 then sees
COMPLETE_SUCCESSimmediately and fails with"External process has not called flare.init and run status becomes success".A
threading.Event(_deferred_stop_event, initially set) coordinates the two:_finalize_external_execution()just before the deferred thread starts.finallyblock when the deferred thread completes._initialize_external_execution()waits on this event before callinglaunch_task(), with a forcedstop_task()fallback if it times out.In practice the wait is 0–1 s (subprocess exits naturally after download, well
before the server completes global aggregation and dispatches the next round's task),
so there is no meaningful latency impact.
3.
ClientAPILauncherExecutoropt-in (client_api_launcher_executor.py)_stop_task_wait_timeoutis set todownload_complete_timeout(default 1800 s) inClientAPILauncherExecutor.__init__(), enabling the deferred path only for thesubprocess client API where large-tensor downloads are expected. The base
LauncherExecutordefaults to0.0(original synchronous behaviour, no change).4.
launch_once-aware subprocess exit (flare_agent.py,config.py,ex_process/api.py)prepare_config_for_launch()now writeslaunch_once(derived fromlauncher.needs_deferred_stop()) into the subprocess config file. The subprocessreads it via
ClientConfig.get_launch_once()and passes it toFlareAgent._do_submit_result()branches on_launch_once:launch_onceFalse(one subprocess per round, e.g.pt-client-api)os._exit(0)called directly — deferred-stop poller unblocks immediately (original behaviour preserved)True(one subprocess for all rounds, e.g.np-loop-cell-pipe)atexit.register(os._exit, 0)registered once — subprocess continues to next round;os._exitfires only whenmain()finally returns, bypassing non-daemon CoreCell thread cleanup_resolve_launch_once()safely fetches the launcher even whenself.launcherisstill
Noneatinitialize()time (resolved directly from the engine componentregistry).
5. Pipe handler identity guard and safe close (
task_exchanger.py)Pipe handler creation is refactored into
_create_pipe_handler(), which binds aper-handler status callback that checks
self.pipe_handler is _hbefore acting.This prevents a late
PEER_GONEfrom round N's (now-stale) handler from stoppinground N+1's handler.
stop(close_pipe=False)is used becauseCellPipe.close()is irreversible — closing it in the status callback would prevent the next round
from communicating. An explicit
self.pipe.close()is added inEND_RUNinstead.6. Defensive logging and heartbeat error handling (
pipe_handler.py)_send_to_pipenow logsasked_to_stopandabort_triggeredstatus when a sendis suppressed. Peer-gone detection logs the elapsed time since the last heartbeat.
Heartbeat sends are wrapped in
try/exceptso a broken pipe setsasked_to_stopand breaks the heartbeat loop cleanly instead of propagating an unhandled exception.
7. New unit tests (
test_download_complete_gating.py,test_download_initiated_gating.py)Two new test files covering the subprocess download-gating behaviour (the
download_donewait, the validate-path fast return, andlaunch_once). Both useFlareAgent.__new__()to construct minimal agentstubs. To prevent
os._exit(0)from killing the pytest-xdist worker:autouse_no_os_exitfixture patchesnvflare.client.flare_agent.os._exitto a no-op for every test in the file.
_make_agent()setsagent._launch_once = False(the per-round path that callsos._exitdirectly, making the fixture's patch the active guard).Files changed
nvflare/app_common/executors/launcher_executor.pystop_task()background thread +_deferred_stop_eventround-boundary coordinationnvflare/app_common/executors/client_api_launcher_executor.py_stop_task_wait_timeout = download_complete_timeout; add_resolve_launch_once(); writeLAUNCH_ONCEto subprocess confignvflare/app_common/abstract/launcher.pyneeds_deferred_stop()abstract method + idempotency/thread-safety note onstop_task()nvflare/app_common/launchers/subprocess_launcher.pyneeds_deferred_stop(); add info logging for process start/stopnvflare/app_common/executors/task_exchanger.py_create_pipe_handler()with identity-checking callback; useclose_pipe=Falseto prevent irreversibleCellPipe.close()nvflare/app_common/widgets/metric_relay.pymsg.datain pipe status log messagenvflare/fuel/utils/pipe/pipe_handler.pynvflare/client/config.pyLAUNCH_ONCEconfig key +get_launch_once()nvflare/client/flare_agent.pylaunch_once-aware_do_submit_result(): directos._exitvsatexitnvflare/client/ex_process/api.pylaunch_oncetoFlareAgentWithFLModeltests/unit_test/client/test_download_complete_gating.pytests/unit_test/client/test_download_initiated_gating.pytests/unit_test/app_common/executors/client_api_launcher_executor_test.py_deferred_stop_event,_stop_task_wait_timeoutTypes of changes
./runtest.sh.