Skip to content

feat(runner): batch simulation job execution to prevent OOM kills#26

Open
daniel-klein wants to merge 3 commits intomainfrom
batch-simulation-jobs
Open

feat(runner): batch simulation job execution to prevent OOM kills#26
daniel-klein wants to merge 3 commits intomainfrom
batch-simulation-jobs

Conversation

@daniel-klein
Copy link
Copy Markdown
Member

Summary

  • Replace all-at-once task submission with configurable batched execution in run_simulation_job()
  • Batch size configurable via MODELOPS_JOB_BATCH_SIZE env var (default: 200)
  • Detect OOM kill signals in task failures and log guidance to reduce batch size

Problem

The runner held references to ALL simulation futures for the entire job duration (to support a second gather() pass for model_outputs Parquet writing). This prevented Dask from freeing any SimReturn from distributed memory, causing:

  1. Worker OOM kills — distributed memory accumulates across all workers
  2. Progress going backwards — when a worker dies, all results stored on it are lost and must be recomputed
  3. Scheduler overload — 1,000 params × 6 targets = 7,000 tasks in the graph simultaneously; at 10K params this becomes 70,000

Solution

Submit tasks in batches. Per batch:

  1. Submit sims + aggs (e.g., 200 params = 1,400 tasks)
  2. Gather all results for the batch
  3. Release batch references — Dask frees distributed memory
  4. Next batch starts with clean worker memory
Before After (batch=200)
Peak task graph 7,000 (all at once) 1,400
Peak distributed memory 1,000 SimReturns 200 SimReturns
Worker death blast radius Entire job 1 batch
Scales to 10K params No (70K tasks) Yes (1,400 tasks)

Test plan

  • Run make submit-tiny with small batch (MODELOPS_JOB_BATCH_SIZE=2) to verify batching works
  • Run make submit with 1,000 params and verify no OOM kills
  • Verify model_outputs Parquet files are written correctly
  • Verify target loss Parquet files match previous results

Closes #25

Replace all-at-once task submission with configurable batched execution.
Previously, all sim futures were held for the entire job duration to
support a second gather pass for model_outputs Parquet writing. This
prevented Dask from freeing any SimReturn from distributed memory,
causing workers to OOM-kill and lose completed results (progress going
backwards in the dashboard).

Now tasks are submitted in batches (default 200, configurable via
MODELOPS_JOB_BATCH_SIZE). Each batch's results are gathered before
the next batch starts, allowing Dask to free distributed memory
between batches. This bounds peak distributed memory to batch_size
SimReturns instead of all params, and limits worker death blast
radius to one batch instead of the entire job.

Also adds OOM kill detection: when task failures contain OOM signals,
a warning is logged suggesting a smaller batch size.

Closes #25
Allow submitters to configure runner behavior by setting environment
variables locally. Any env var matching MODELOPS_JOB_* is forwarded
to the job pod spec.

Usage:
  export MODELOPS_JOB_BATCH_SIZE=100
  mops jobs submit study.json
gather() returns list[SimReturn | Exception]. When finding output names
from the first SimReturn, an Exception has no .outputs attribute,
crashing _write_model_outputs silently. This caused model_outputs to
be missing whenever any replicate failed.

Fix: scan for the first successful SimReturn, and skip Exception
entries during iteration.
vsbuffalo added a commit that referenced this pull request Feb 17, 2026
…verload

Set worker-saturation=1.0 so the scheduler only assigns tasks equal to
each worker's thread count, instead of piling all tasks onto workers
immediately. This prevents distributed memory accumulation that causes
OOM kills on large jobs (e.g., 1000+ params).

Without this, submitting 7,000+ tasks causes the scheduler to push them
all onto workers, each holding data for hundreds of queued tasks. With
saturation=1.0, workers only hold data for tasks they're actively
running (~4 per worker with current config).

This is the root-cause fix for the OOM issue described in #25 and
the incident doc. PR #26 proposes batching as a workaround, but
scheduler-level backpressure is the proper Dask-native solution.

Refs #25, #26
vsbuffalo added a commit that referenced this pull request Feb 17, 2026
…verload (#29)

Set worker-saturation=1.0 so the scheduler only assigns tasks equal to
each worker's thread count, instead of piling all tasks onto workers
immediately. This prevents distributed memory accumulation that causes
OOM kills on large jobs (e.g., 1000+ params).

Without this, submitting 7,000+ tasks causes the scheduler to push them
all onto workers, each holding data for hundreds of queued tasks. With
saturation=1.0, workers only hold data for tasks they're actively
running (~4 per worker with current config).

This is the root-cause fix for the OOM issue described in #25 and
the incident doc. PR #26 proposes batching as a workaround, but
scheduler-level backpressure is the proper Dask-native solution.

Refs #25, #26
@daniel-klein
Copy link
Copy Markdown
Member Author

Why this PR is still needed (despite #29/#30 worker-saturation fix)

Root cause: two separate memory problems

Problem 1 — Task over-saturation (fixed by #29/#30): Without worker-saturation=1.0, the scheduler piles hundreds of queued tasks onto workers, each holding result data in memory. The saturation fix limits each worker to 1 active task per thread.

Problem 2 — Cumulative memory growth in long-lived worker processes (NOT fixed by #29/#30): Each simulation allocates millions of small Python objects (numpy arrays, starsim states, etc.). When a task completes, Python frees the objects but CPython's pymalloc allocator cannot return fragmented memory pages to the OS. Worker RSS grows monotonically across sequential tasks — even with only 1 task at a time.

Why worker-saturation alone doesn't prevent OOM

Current worker config: 6 nworkers per pod × --memory-limit 4.0GiB = exactly 24 GiB pod limit. Zero headroom. Even with saturation=1.0, workers running tasks sequentially accumulate fragmented memory until they exceed 4 GiB. Since all 6 workers grow together, the combined RSS hits the 24 GiB K8s pod limit before Dask's nanny (which monitors individual workers at the 95% threshold = 3.8 GiB) can intervene. Result: K8s OOMKills the entire pod, taking all 6 workers down at once.

Observed on job-e56c635a (2026-02-19): 9 out of 21 running worker pods OOMKilled within 30 minutes, despite running the latest image with worker-saturation fix.

Why the blast radius is so large

The current gather() pattern uses wait(all_futures) — it waits for ALL tasks to complete before collecting any results. This means every worker holds results for every task it has ever run, for the entire job duration. When a worker dies (OOMKill or nanny restart), ALL its previously completed results are lost — not just the 1 in-flight task, potentially hundreds. This is what causes the dramatic "progress going backwards" effect.

With a pod OOMKill: 6 workers × N completed results each = massive result loss and recomputation.

Why as_completed doesn't fit this architecture

An incremental as_completed() gather would reduce result retention on workers, but doesn't work well here because:

  • Aggregation tasks depend on grouped sim results: each agg task needs ALL replicates for a param_id as *args. Can't process individual sim results in isolation.
  • Would require architectural change: either move aggregation to the runner (losing worker-side parallelism) or dynamically track replicate group completion and submit agg tasks on the fly (complex, error-prone).

Why batching works

Batching naturally fits the grouped sim→agg architecture:

  1. Submit a batch of param_ids (e.g., 200) with their replicates + agg tasks
  2. Dask handles within-batch sim→agg dependencies normally
  3. Gather all batch results to the runner, persist to blob storage
  4. Release batch references → Dask frees distributed memory
  5. Next batch starts with bounded worker memory

This fixes all three aspects:

  • Bounded memory: workers hold at most batch_size × replicates results, not the entire job
  • Fault containment: a worker death loses at most 1 batch worth of results
  • Memory recycling: between batches, fragmented worker memory is bounded because old results are released (and optionally workers can be restarted)

Production validation

This PR has been tested at scale (2,500 params × 5 replicates × 6 targets, batch_size=200) with zero OOM kills on the same cluster configuration that consistently OOM-kills without batching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Batch simulation job execution to prevent OOM kills and enable 10K+ scale

1 participant