Skip to content

Batch simulation job execution to prevent OOM kills and enable 10K+ scale #25

@daniel-klein

Description

@daniel-klein

Problem

run_simulation_job() in job_runner.py submits ALL parameter sets at once and holds references to every simulation future until the entire job completes. This causes two critical problems:

1. Worker OOM kills with cascading data loss

The runner stores all sim futures in sim_futures_by_param (line 218) to later gather raw model outputs for Parquet views (line 266-272). This prevents Dask from freeing any SimReturn from distributed memory until the job finishes. When a worker pod gets OOM-killed:

  • All Dask results stored on that worker are lost
  • Dask re-schedules those tasks, but progress goes backwards
  • This creates a cycle: accumulate → OOM → lose work → re-accumulate → OOM

2. Task graph pressure at scale

At 1,000 params × 6 targets × 1 replicate = 7,000 tasks submitted simultaneously. At 10,000 params this becomes 70,000 tasks, which overwhelms the Dask scheduler.

Observed behavior

  • job-bf1a45a7: 1,000 params, running 8+ hours (should take ~2h)
  • Workers OOM-killed repeatedly (exit code 137)
  • Dask dashboard shows completed count going backwards (e.g., 280 → 277)
  • 4 of 6 worker pods restarted during the job

Root cause

# job_runner.py lines 217-271
sim_futures_by_param = {}  # Holds ALL refs for entire job duration

for param_id, replicate_tasks in task_groups.items():
    sim_futures = sim_service.submit_replicates(replicate_set)
    sim_futures_by_param[param_id] = sim_futures  # ← never released

# ... hours later, after ALL 7000 tasks complete ...
for param_id, sim_futures in sim_futures_by_param.items():
    sim_returns = sim_service.gather(sim_futures)  # ← second gather of ALL data

The second gather exists solely to write model_outputs/*.parquet via _write_model_outputs(). This holds every SimReturn in distributed memory for the entire job duration.

Proposed solution

Batch-and-gather: Submit params in configurable batches, gather results per batch, write incrementally, release futures between batches.

batch_size = int(os.environ.get("MODELOPS_JOB_BATCH_SIZE", "200"))

for batch in chunks(task_groups, batch_size):
    # Submit batch: e.g., 200 sims + 1200 aggs = 1400 tasks
    sim_futures_batch = {}
    agg_futures_batch = []
    for param_id, tasks in batch:
        sim_futures = submit_replicates(...)
        sim_futures_batch[param_id] = sim_futures
        for target in targets:
            agg_future = submit_aggregation(sim_futures, target, ...)
            agg_futures_batch.append(agg_future)
    
    # Gather this batch
    agg_results = gather(agg_futures_batch)
    
    # Gather sim returns for model outputs (instant - already computed)
    for param_id, futs in sim_futures_batch.items():
        sim_returns = gather(futs)
        append_model_output_parquet(param_id, sim_returns)
    
    # Release everything - Dask frees distributed data
    sim_futures_batch.clear()

Properties at 10K scale (batch_size=200):

  • Peak task graph: 1,400 tasks (not 70,000)
  • Peak distributed memory: 200 SimReturns (not 10,000)
  • Worker death blast radius: 1 batch (not entire job)
  • Natural progress checkpoints every batch

Additional: OOMKill detection

When gathering results, detect OOM errors (exit code 137) from workers and log guidance to reduce MODELOPS_JOB_BATCH_SIZE.

Files to change

  • src/modelops/runners/job_runner.py - main batching logic
  • src/modelops/services/job_views.py - incremental Parquet writing

Configuration

  • MODELOPS_JOB_BATCH_SIZE environment variable (default: 200)
  • Tunable based on model output size and worker memory

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions