Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,88 @@ Pre-built transforms for common agent memory patterns. Import from `synix.transf
| `uvx synix fix` | *(Experimental)* LLM-assisted repair of violations |
| `uvx synix lineage <id>` | Show the full provenance chain for an artifact |
| `uvx synix clean` | Delete the build directory |
| `uvx synix batch-build run` | *(Experimental)* Submit a batch build via OpenAI Batch API |
| `uvx synix batch-build plan` | *(Experimental)* Dry-run showing which layers would batch vs sync |
| `uvx synix batch-build run` | *(Experimental)* Submit a batch build via OpenAI Batch API. `--poll` to wait |
| `uvx synix batch-build resume <id>` | *(Experimental)* Resume a previously submitted batch build |
| `uvx synix batch-build list` | *(Experimental)* Show all batch build instances and their status |
| `uvx synix batch-build status <id>` | *(Experimental)* Detailed status for a specific batch build. `--latest` for most recent |

## Batch Build (Experimental)

> **Warning:** Batch build is experimental. Commands, state formats, and behavior may change in future releases.

The OpenAI Batch API processes LLM requests asynchronously at **50% cost** with a 24-hour SLA. Synix wraps this into `batch-build` — submit your pipeline, disconnect, come back when it's done.

### Quick Example

```python
# pipeline.py — mixed-provider pipeline
pipeline.llm_config = {
"provider": "openai", # OpenAI layers → batch mode (automatic)
"model": "gpt-4o",
}

episodes = EpisodeSummary("episodes", depends_on=[transcripts])
monthly = MonthlyRollup("monthly", depends_on=[episodes])

# Force this layer to run synchronously via Anthropic
core = CoreSynthesis("core", depends_on=[monthly], batch=False)
core.config = {"llm_config": {"provider": "anthropic", "model": "claude-sonnet-4-20250514"}}
```

```bash
# Submit and wait for completion
uvx synix batch-build run pipeline.py --poll
```

### Poll vs Resume

**Poll workflow** — submit and wait in a single session:

```bash
uvx synix batch-build run pipeline.py --poll --poll-interval 120
```

**Resume workflow** — submit, disconnect, come back later:

```bash
# Submit (exits after first batch is submitted)
uvx synix batch-build run pipeline.py
# Build ID: batch-a1b2c3d4
# Resume with: synix batch-build resume batch-a1b2c3d4 pipeline.py --poll

# Check on it later
uvx synix batch-build status --latest

# Resume and poll to completion
uvx synix batch-build resume batch-a1b2c3d4 pipeline.py --poll
```

### The `batch` Parameter

Each transform accepts an optional `batch` parameter controlling whether it uses the Batch API:

| Value | Behavior |
|-------|----------|
| `None` (default) | Auto-detect: batch if the layer's provider is native OpenAI, sync otherwise. |
| `True` | Force batch mode. Raises an error if the provider is not native OpenAI. |
| `False` | Force synchronous execution, even if the provider supports batch. |

```python
episodes = EpisodeSummary("episodes", depends_on=[transcripts]) # auto
monthly = MonthlyRollup("monthly", depends_on=[episodes], batch=True) # force batch
core = CoreSynthesis("core", depends_on=[monthly], batch=False) # force sync
```

### Provider Restrictions

Batch mode **only works with native OpenAI** (`provider="openai"` with no custom `base_url`). Transforms using Anthropic, DeepSeek, or OpenAI-compatible endpoints via `base_url` always run synchronously. Setting `batch=True` on a non-OpenAI layer is a hard error.

### Transform Requirements

Transforms used in batch builds must be **stateless** — their `execute()` method must be idempotent and produce deterministic prompts from the same inputs. All built-in transforms (`EpisodeSummary`, `MonthlyRollup`, `TopicalRollup`, `CoreSynthesis`) meet this requirement.

See [docs/batch-build.md](docs/batch-build.md) for the full specification including state management, error handling, and the request collection protocol.

## Key Capabilities

Expand Down
22 changes: 20 additions & 2 deletions src/synix/build/batch_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ def _resolve_batch_mode(layer: Transform, pipeline: Pipeline) -> str:
return "batch"

# batch=None (auto) — batch only if native OpenAI provider (no base_url override)
# and the transform actually benefits from batching (produces multiple outputs)
if is_openai_native:
# N:1 transforms (estimate_output_count always 1) don't benefit from batching
if layer.estimate_output_count(3) <= 1:
return "sync"
if not _has_cassette_responses():
api_key = llm_config.resolve_api_key()
if not api_key:
Expand Down Expand Up @@ -456,6 +460,7 @@ def _run_batch_transform(
collecting = False
in_progress = False

failed_units: list[str] = []
for unit_inputs, config_extras in units:
merged_config = {**transform_config, **config_extras}
try:
Expand All @@ -469,7 +474,10 @@ def _run_batch_transform(
in_progress = True
break
except BatchRequestFailed as exc:
raise RuntimeError(f"Batch request failed for layer {layer.name!r}: {exc}") from exc
# Record per-unit failure and continue with remaining units
unit_desc = ", ".join(a.label for a in unit_inputs) if unit_inputs else "unknown"
logger.warning("Batch request failed for layer %r unit [%s]: %s", layer.name, unit_desc, exc)
failed_units.append(unit_desc)

if in_progress:
layer_artifacts[layer.name] = layer_built
Expand All @@ -484,7 +492,17 @@ def _run_batch_transform(
layer_artifacts[layer.name] = layer_built
return "submitted"

# All results were available
# Report per-unit failures (non-fatal — other units still processed)
if failed_units:
for unit_desc in failed_units:
batch_state.store_error(
f"{layer.name}:{unit_desc}",
"batch_request_failed",
f"Batch request failed for unit [{unit_desc}] in layer {layer.name!r}",
)
batch_state.save()

# All results were available (some may have failed)
layer_artifacts[layer.name] = layer_built
return "completed"

Expand Down
30 changes: 14 additions & 16 deletions src/synix/build/batch_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class BatchState:
errors: {request_key: {code, message}}
"""

def __init__(self, build_dir: Path, build_id: str):
def __init__(self, build_dir: Path, build_id: str, *, _skip_load: bool = False):
self.build_dir = Path(build_dir)
self.build_id = build_id
self._instance_dir = self.build_dir / "builds" / build_id
Expand All @@ -90,7 +90,8 @@ def __init__(self, build_dir: Path, build_id: str):
self._results: dict[str, dict] = {}
self._errors: dict[str, dict] = {}

self._load_state()
if not _skip_load:
self._load_state()

@classmethod
def create_fresh(cls, build_dir: Path, build_id: str) -> BatchState:
Expand All @@ -99,18 +100,7 @@ def create_fresh(cls, build_dir: Path, build_id: str) -> BatchState:
Use this to recover from corrupted state files instead of
bypassing __init__ manually.
"""
instance = object.__new__(cls)
instance.build_dir = Path(build_dir)
instance.build_id = build_id
instance._instance_dir = instance.build_dir / "builds" / build_id
instance._manifest_path = instance._instance_dir / "manifest.json"
instance._state_path = instance._instance_dir / "batch_state.json"
instance._pending = {}
instance._batch_map = {}
instance._batches = {}
instance._results = {}
instance._errors = {}
return instance
return cls(build_dir, build_id, _skip_load=True)

def _load_state(self) -> None:
"""Load batch state from disk. Corrupted JSON -> quarantine + raise."""
Expand Down Expand Up @@ -271,9 +261,12 @@ def compute_pipeline_hash(pipeline) -> str:
"""Compute a hash of the pipeline definition for change detection.

Includes layer names, types, dependencies, llm_config, per-layer
config, and batch mode — so changing model, temperature, or
batch=True/False between submit and resume is detected.
config, batch mode, and transform fingerprints (source code + prompts)
— so changing model, temperature, batch=True/False, prompts, or
transform logic between submit and resume is detected.
"""
from synix.core.models import Transform

parts = [pipeline.name, pipeline.source_dir, pipeline.build_dir]
# Include pipeline-level llm_config
if pipeline.llm_config:
Expand All @@ -288,5 +281,10 @@ def compute_pipeline_hash(pipeline) -> str:
# Include batch mode for transforms
if hasattr(layer, "batch"):
parts.append(f" batch:{layer.batch}")
# Include transform fingerprint (source code + prompts + cache key)
if isinstance(layer, Transform):
config = dict(pipeline.llm_config) if pipeline.llm_config else {}
fp = layer.compute_fingerprint(config)
parts.append(f" fp:{fp.digest}")
raw = "|".join(parts)
return hashlib.sha256(raw.encode()).hexdigest()[:16]
Loading