Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/pipeline-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ work_styles = MapSynthesis(
|-----------|------|---------|-------------|
| `prompt` | `str` | *required* | Template with `{artifact}`, `{label}`, `{artifact_type}` placeholders |
| `label_fn` | `Callable[[Artifact], str] \| None` | `None` | Custom label derivation. Default: `{name}-{input_label}` |
| `metadata_fn` | `Callable[[Artifact], dict] \| None` | `None` | Custom metadata derivation from the input artifact. Merged on top of auto-propagated input metadata |
| `artifact_type` | `str` | `"summary"` | Output artifact type |

**Metadata propagation:** MapSynthesis automatically copies input artifact metadata to the output (1:1 natural inheritance). The `metadata_fn` result and `source_label` are merged on top.

**Custom labels:**

```python
Expand Down Expand Up @@ -98,6 +101,7 @@ by_customer = GroupSynthesis(
| `group_by` | `str \| Callable[[Artifact], str]` | *required* | Metadata key or callable that returns the group key |
| `prompt` | `str` | *required* | Template with `{group_key}`, `{artifacts}`, `{count}`, `{artifact_type}` placeholders |
| `label_prefix` | `str \| None` | `None` | Prefix for output labels. Default: derived from `group_by` key |
| `metadata_fn` | `Callable[[str, list[Artifact]], dict] \| None` | `None` | Custom metadata from `(group_key, inputs)`. Merged on top of default `group_key` and `input_count` |
| `artifact_type` | `str` | `"summary"` | Output artifact type |
| `on_missing` | `str` | `"group"` | Behavior when metadata key is absent (see below) |
| `missing_key` | `str` | `"_ungrouped"` | Group name for missing-key artifacts when `on_missing="group"` |
Expand Down Expand Up @@ -139,6 +143,7 @@ team_dynamics = ReduceSynthesis(
|-----------|------|---------|-------------|
| `prompt` | `str` | *required* | Template with `{artifacts}`, `{count}` placeholders |
| `label` | `str` | *required* | Fixed output label |
| `metadata_fn` | `Callable[[list[Artifact]], dict] \| None` | `None` | Custom metadata from the input list. Merged on top of default `input_count` |
| `artifact_type` | `str` | `"summary"` | Output artifact type |

### FoldSynthesis (N:1 sequential)
Expand Down Expand Up @@ -167,6 +172,7 @@ progressive = FoldSynthesis(
| `initial` | `str` | `""` | Starting value for the accumulator |
| `sort_by` | `str \| Callable \| None` | `None` | Metadata key, callable, or `None` (sort by `artifact_id`) |
| `label` | `str` | *required* | Fixed output label |
| `metadata_fn` | `Callable[[list[Artifact]], dict] \| None` | `None` | Custom metadata from the input list. Merged on top of default `input_count` |
| `artifact_type` | `str` | `"summary"` | Output artifact type |

FoldSynthesis always runs synchronously (never batched) because each step depends on the previous.
Expand Down Expand Up @@ -200,6 +206,7 @@ Drop files into `source_dir` — the parser auto-detects format by file structur
|--------|-----------|-------|
| **ChatGPT** | `.json` | `conversations.json` exports. Handles regeneration branches via `current_node` |
| **Claude** | `.json` | Claude conversation exports with `chat_messages` arrays |
| **Claude Code** | `.jsonl` | Claude Code session transcripts. Extracts user/assistant turns, skips tool blocks |
| **Text / Markdown** | `.txt`, `.md` | YAML frontmatter support. Auto-detects conversation turns (`User:` / `Assistant:` prefixes) |

## Projections
Expand Down
2 changes: 1 addition & 1 deletion src/synix/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Synix adapters — source format parsers (ChatGPT, Claude, text/markdown)."""
"""Synix adapters — source format parsers (ChatGPT, Claude, Claude Code, text/markdown)."""
148 changes: 148 additions & 0 deletions src/synix/adapters/claude_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Claude Code JSONL session parser — .jsonl session files → transcript Artifacts."""

from __future__ import annotations

import json
import logging
from datetime import datetime
from pathlib import Path

from synix.core.models import Artifact

logger = logging.getLogger(__name__)

DEFAULT_MAX_CHARS = 80_000
MIN_MEANINGFUL_TURNS = 2


def _extract_text(content) -> str:
"""Extract human-readable text from message content.

Content can be a plain string or an array of content blocks.
Only text blocks are kept — tool_use, tool_result, thinking, etc. are skipped.
"""
if isinstance(content, str):
return content.strip()

if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if text.strip():
parts.append(text.strip())
return "\n\n".join(parts)

return ""


def _middle_cut(text: str, max_chars: int) -> str:
"""Truncate text preserving start and end, cutting from the middle."""
if len(text) <= max_chars:
return text

keep = max_chars - 50 # room for marker
head = keep // 2
tail = keep - head
return text[:head] + "\n\n[... middle truncated ...]\n\n" + text[-tail:]


def parse_claude_code(filepath: str | Path, max_chars: int = DEFAULT_MAX_CHARS) -> list[Artifact]:
"""Parse a Claude Code .jsonl session file into a transcript Artifact.

Claude Code sessions are line-delimited JSON with entries like::

{"type": "user"|"assistant", "message": {"role": "...", "content": ...},
"timestamp": "...", "sessionId": "...", "cwd": "...", ...}

Returns a list with one Artifact for valid sessions, or empty list for
sessions with < 2 meaningful turns or non-Claude-Code JSONL files.
"""
filepath = Path(filepath)
turns: list[str] = []
session_id = filepath.stem
slug = ""
date = ""
git_branch = ""
cwd = ""
is_claude_code = False

with open(filepath, encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
logger.warning("Malformed JSON at %s:%d, skipping line", filepath, line_num)
continue

if not isinstance(entry, dict):
continue

entry_type = entry.get("type")
if entry_type not in ("user", "assistant"):
continue

# If we see a user/assistant entry with a message dict, it's Claude Code format
message = entry.get("message")
if not isinstance(message, dict):
continue
is_claude_code = True

# Extract metadata from first relevant entry
if not date:
ts = entry.get("timestamp", "")
if ts:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
date = dt.strftime("%Y-%m-%d")
except (ValueError, TypeError):
pass

if not slug:
slug = entry.get("slug", "")
if not git_branch:
git_branch = entry.get("gitBranch", "")
if not cwd:
cwd = entry.get("cwd", "")

content = message.get("content", "")
text = _extract_text(content)
if not text:
continue

role = message.get("role", entry_type)
prefix = "User" if role == "user" else "Assistant"
turns.append(f"{prefix}: {text}")

if not is_claude_code:
logger.debug("JSONL file %s does not contain Claude Code session data, skipping", filepath)
return []

if len(turns) < MIN_MEANINGFUL_TURNS:
return []

transcript = "\n\n".join(turns)
transcript = _middle_cut(transcript, max_chars)

title = slug if slug else session_id
metadata = {
"source": "claude-code",
"session_id": session_id,
"title": title,
"date": date,
"cwd": cwd,
"git_branch": git_branch,
"message_count": len(turns),
}

return [
Artifact(
label=f"t-claude-code-{session_id}",
artifact_type="transcript",
content=transcript,
metadata=metadata,
)
]
5 changes: 5 additions & 0 deletions src/synix/adapters/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ def _parse_json_autodetect(filepath: Path) -> list[Artifact]:
from synix.adapters.text import parse_text # noqa: E402

register_adapter([".txt", ".md"])(parse_text)

# Claude Code JSONL session files
from synix.adapters.claude_code import parse_claude_code # noqa: E402

register_adapter([".jsonl"])(parse_claude_code)
90 changes: 64 additions & 26 deletions src/synix/build/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,14 @@ def _plan_layer(

# Estimate parallel unit count from split() if the layer will be built
if step.status != "cached" and inputs:
try:
units = layer.split(inputs, transform_config)
step.parallel_units = len(units)
except Exception:
step.parallel_units = 1
if upstream_dirty and hasattr(layer, "estimate_output_count"):
step.parallel_units = layer.estimate_output_count(len(inputs))
else:
try:
units = layer.split(inputs, transform_config)
step.parallel_units = len(units)
except Exception:
step.parallel_units = 1

return step

Expand Down Expand Up @@ -620,7 +623,13 @@ def _plan_projection(
store: ArtifactStore | None,
) -> ProjectionPlan:
"""Analyze a projection to determine its plan status."""
from synix.build.runner import PROJECTION_CACHE_FILE, _compute_projection_hash, _get_projection_config
from synix.build.runner import (
PROJECTION_CACHE_FILE,
_compute_content_only_hash,
_compute_embedding_config_hash,
_compute_projection_hash,
_get_projection_config,
)

source_layers = [s.name for s in proj.sources]
build_dir = Path(pipeline.build_dir)
Expand Down Expand Up @@ -668,28 +677,57 @@ def _plan_projection(
embedding_config=embedding_config,
)

# Compute current hash including projection config
proj_config = _get_projection_config(proj)
current_hash = _compute_projection_hash(all_artifacts, proj_config)
# Support both old (source_hash) and new (content_hash/embedding_hash) cache formats
if "content_hash" in cached_entry:
# New split-hash format
content_hash = _compute_content_only_hash(all_artifacts)
emb_hash = _compute_embedding_config_hash(proj) if isinstance(proj, SearchIndex) else None

if cached_entry.get("source_hash") == current_hash and cached_entry.get("artifact_count") == artifact_count:
return ProjectionPlan(
name=proj.name,
projection_type=proj_type,
source_layers=source_layers,
status="cached",
artifact_count=artifact_count,
reason="all cached",
embedding_config=embedding_config,
content_cached = (
cached_entry.get("content_hash") == content_hash and cached_entry.get("artifact_count") == artifact_count
)

# Determine reason: check if config changed vs artifacts changed
reason = "source artifacts changed"
if cached_entry.get("config_hash") is not None and proj_config:
old_config_hash = cached_entry["config_hash"]
new_config_hash = hashlib.sha256(json.dumps(proj_config, sort_keys=True, default=str).encode()).hexdigest()
if old_config_hash != new_config_hash:
reason = "projection config changed"
embedding_cached = cached_entry.get("embedding_hash") == emb_hash

if content_cached and embedding_cached:
return ProjectionPlan(
name=proj.name,
projection_type=proj_type,
source_layers=source_layers,
status="cached",
artifact_count=artifact_count,
reason="all cached",
embedding_config=embedding_config,
)

# Determine reason
reasons = []
if not content_cached:
reasons.append("source artifacts changed")
if not embedding_cached:
reasons.append("embedding config changed")
reason = ", ".join(reasons)
else:
# Legacy format — single source_hash
proj_config = _get_projection_config(proj)
current_hash = _compute_projection_hash(all_artifacts, proj_config)

if cached_entry.get("source_hash") == current_hash and cached_entry.get("artifact_count") == artifact_count:
return ProjectionPlan(
name=proj.name,
projection_type=proj_type,
source_layers=source_layers,
status="cached",
artifact_count=artifact_count,
reason="all cached",
embedding_config=embedding_config,
)

reason = "source artifacts changed"
if cached_entry.get("config_hash") is not None and proj_config:
old_config_hash = cached_entry["config_hash"]
new_config_hash = hashlib.sha256(json.dumps(proj_config, sort_keys=True, default=str).encode()).hexdigest()
if old_config_hash != new_config_hash:
reason = "projection config changed"

return ProjectionPlan(
name=proj.name,
Expand Down
Loading
Loading