Implement Synix v0.1: build system for agent memory#2
Conversation
Full implementation of the Synix pipeline engine with CLI, going from spec-only (CLAUDE.md + pipeline definitions) to a working system. Core engine: - Artifact store (filesystem + manifest.json) with content hashing - Provenance tracker (provenance.json) with recursive chain walking - DAG resolver (topological sort) with incremental rebuild detection - Pipeline runner: walks DAG, runs transforms, caches via hash comparison, materializes intermediate projections between layers Transforms: - Source parsers for ChatGPT (tree-walking) and Claude (flat messages) exports - Transform registry with 5 registered transforms: parse, episode_summary, monthly_rollup, topical_rollup, core_synthesis - LLM transforms use Anthropic SDK with prompt templates Projections: - SearchIndexProjection: SQLite FTS5 full-text search across layers with provenance chain resolution - FlatFileProjection: renders core memory as markdown context document CLI (Click + Rich): - `synix run <pipeline.py>` — build pipeline with progress bars - `synix search <query>` — search with colored results and provenance - `synix lineage <artifact-id>` — provenance tree view - `synix status` — build summary table 93 tests (71 unit + 15 integration + 7 e2e), all passing.
| _topological_sort(pipeline) | ||
|
|
||
|
|
||
| def _topological_sort(pipeline: Pipeline) -> list[str]: |
There was a problem hiding this comment.
Dont understand why we need to compute topo sort twice..should we store parsed Pipeline as as intermediate artifact like "BuildPlan" what captures this
|
|
||
| # Build config for the transform | ||
| transform_config = dict(pipeline.llm_config) if pipeline.llm_config else {} | ||
| transform_config["llm_config"] = dict(pipeline.llm_config) if pipeline.llm_config else {} |
There was a problem hiding this comment.
should be overriddable at the transform level
|
|
||
| # Get the transform | ||
| transform = get_transform(layer.transform) | ||
| prompt_id = None |
There was a problem hiding this comment.
Even this is transform specific...we should let each transform compute it's own transform specific hash key suffix
|
|
||
| # Check rebuild for each artifact | ||
| for artifact in new_artifacts: | ||
| if needs_rebuild(artifact.artifact_id, artifact.input_hashes, prompt_id, store): |
There was a problem hiding this comment.
As mentioned make generic and pluggable per transform eg. transform interface defineds get_transform_hash_key which allow transform to append any transform specific idempotency params.
| stats.cached += 1 | ||
| else: | ||
| # Execute transform to get candidate artifacts | ||
| new_artifacts = transform.execute(inputs, transform_config) |
There was a problem hiding this comment.
I think this is going to be too slow unless we allow conncurrency. Ideally for this we need to define a transformer runner than allow and queueing mechanism we will add this in next version no in this PR but make note of it. It needs to be able to compute all unblocked transforms across layers. Rather than blocking a whole layer on the completion of the previous is there is no dependencies between them.
|
|
||
|
|
||
| class FlatFileProjection(BaseProjection): | ||
| """Renders artifacts as a markdown context document.""" |
There was a problem hiding this comment.
Nit: We dont' care if it's markdown or not.
marklubin
left a comment
There was a problem hiding this comment.
Critical Issues (fix before demo)
- _layer_fully_cached has a logic bug for topical rollups
In runner.py, _layer_fully_cached compares current_input_hashes (a set of all input content hashes) against existing_input_hashes (union of all existing artifacts' input_hashes). For the topical rollup, each topic artifact only stores hashes of its relevant episodes, not all episodes. So the union of existing input hashes will likely never equal the full input set. This means topical rollups may always rebuild even when nothing changed.
Fix: for topical layers, cache validation should be per-artifact, not per-layer. - TopicalRollupTransform opens/closes a new SearchIndex connection per topic
pythonfor topic in topics:
if search_db_path:
index = SearchIndex(Path(search_db_path))
search_results = index.query(...)
index.close()
With 5 topics this is fine. But the index is created, queried, and closed in a loop. Move the open/close outside the loop. - The search_index projection sources list renders empty in both pipeline files
Looking at the diff for pipeline.py:
pythonsources=[
],
The sources list that should contain the layer/search config dicts appears empty in the rendered diff. Double-check the actual file — if the sources are truly empty, the search index won't contain any artifacts. - pyproject.toml still has pyright config targeting "py312" but requires-python = ">=3.11"
The [tool.pyright] section still says pythonVersion = "3.12". Either remove it or set to "3.11".
Medium Issues (fix if time allows)
5. ParseTransform only iterates top-level files, no subdirectory recursion
pythonfor filepath in sorted(source_dir.iterdir()):
If your exports are in subdirectories (e.g., exports/chatgpt/conversations.json), this won't find them. Consider source_dir.rglob("*.json").
6. Claude parser assumes chat_messages field structure
The Claude export format has changed over time. The parser assumes "text" field on messages, but Claude exports sometimes use "content" with nested content blocks (especially for longer conversations with attachments). For your own export this may work fine, but it's fragile.
7. No error handling in LLM transform calls
All four LLM transforms call client.messages.create() with no try/except. At 1000+ conversations, you will hit rate limits, network timeouts, or context length errors. At minimum wrap with retry logic or a clear error message.
8. monthly_rollup silently drops episodes with no date metadata
pythonmonth = ep.metadata.get("date", "")[:7]
if month:
months[month].append(ep)
Episodes without a parseable date just vanish. Should at least log a warning or put them in an "undated" bucket.
9. core_synthesis uses max_tokens=2048 hardcoded override
pythonmax_tokens=model_config.get("max_tokens", 2048),
This overrides the pipeline's max_tokens: 1024. For core memory synthesis you probably want more tokens, but it should come from context_budget or the layer config, not a silent hardcode.
Minor / Style
10. Duplicate topological sort in both config.py and dag.py. The one in config.py is for validation, dag.py for runtime. Fine conceptually but they should share the implementation.
11. ArtifactStore.list_artifacts loads every artifact from disk one at a time. At 1000+ transcripts this will be slow. Consider caching loaded artifacts or loading from manifest metadata only when you just need IDs/hashes.
12. SearchIndex.create() does DROP TABLE IF EXISTS every time materialize is called. This means intermediate materialization (after episodes) is wiped when the final materialization runs. This will break the topical rollup's ability to query the episode index if it was already rebuilt in the final pass. Check the ordering carefully — intermediate materialization happens per-layer, but _materialize_all_projections at the end drops and recreates.
Actually, #12 is potentially critical — if the final _materialize_all_projections call drops and recreates the search index, and the topical rollup already ran using an intermediate index, you're fine for the current run. But on a cached rerun where only the final projections rebuild, the intermediate search index from the first run gets nuked. Shouldn't affect the demo flow since the topical pipeline is a separate run, but worth understanding.
What's Good
Transform registry pattern is clean and extensible
Provenance chain walking (BFS) is correct
CLI formatting with Rich is well done — color-coded layers, tree views, progress bars
The _layer_fully_cached concept is right even if the implementation needs tuning
Pipeline Python API is exactly what we designed — no YAML, clean dataclasses
Test count (93) is strong for a sprint
Bottom line: Fix #1 (cache bug), #2 (index connection leak), and verify #3 (empty sources). Those are demo-breaking. The rest you can ship with. The architecture is sound and matches the spec.
marklubin
left a comment
There was a problem hiding this comment.
The PR reads like a legit “v0.1 build graph + artifact cache” cut, but the main risk is cache correctness (materialization keys) and fold/backfill semantics—get those wrong and everything else becomes untrustworthy.
-
The repo shape is clean and opinionated in the right places:
artifacts/(provenance + store),pipeline/(config + dag + runner),sources/(chatgpt/claude),transforms/(parse/summarize + prompt assets),projections/(flat_file + search_index),search/(index + results), plus CLI and an e2e demo test. That’s the correct skeleton for “build system for memory.” ([GitHub]1) -
Biggest architecture tension: your Round-2 doc goes “Python-first prompts as functions,” but the PR structure includes prompt
.txtassets undertransforms/prompts/(core_memory / episode_summary / monthly_rollup / topical_rollup). That’s fine, but pick a lane: -
Cache/materialization correctness is the load-bearing part. Your tests hint you’re already attacking it (
test_incremental_rebuild.py,test_config_change.py). Good. Now make the materialization key definition painfully explicit and enforced in one place. Minimum inputs for the cache key: (branch, step name, step version hash, input fingerprints, group key). If you omit any of: prompt/template bytes, model name, decoding params, transform code version, group-by logic, ordering logic, you will get silent reuse of wrong artifacts. ([GitHub]1) -
Fold is the second load-bearing risk. You need a declared policy for:
- inserting data in the middle (rebuild from insertion point vs lazy invalidation),
- bounding state growth (truncate/compress/checkpoint),
- partial failure recovery (checkpoint/resume semantics).
If this is “v0.1,” the right move is: rebuild-from-insertion-point + deterministic checkpoint boundaries + a hard max-state token budget.
-
Provenance: storing a lineage DAG is necessary but not sufficient; you also need “provenance stability.” Decide whether derived records point to specific input record IDs (strong provenance) or to input fingerprints (weaker but more resilient to re-import). If you ever re-import a source and IDs change, ID-based provenance breaks unless you have stable identity at the source layer.
-
Practical cleanup that pays off fast:
pipeline.pyandpipeline_topical.pyat repo root smell like demo scripts; move them toexamples/and keep the package API as the canonical interface. It reduces future API drift and keeps the “workbench” story clean. ([GitHub]1)
- Fix _layer_fully_cached() logic for topical rollups (per-input validation) - Add model_config + transform cache key to needs_rebuild() check - Add BaseTransform.get_cache_key() for transform-specific invalidation - Deep-merge layer llm_config over pipeline defaults - Wrap LLM calls with retry on transient errors (rate limit, timeout) - Handle undated episodes in monthly rollup - Derive core synthesis max_tokens from context_budget - Hoist SearchIndex lifecycle outside topic loop - Add subdirectory recursion in parse transform (rglob) - Add defensive content field fallback in Claude parser - Deduplicate topological sort (config.py delegates to dag.py) - Fix flat_file.py docstring
Summary
synix run,synix search,synix lineage,synix statusArchitecture
Modules (24 Python files + 4 prompt templates)
__init__.py): Artifact, ProvenanceRecord, Layer, Projection, Pipelineartifacts/): ArtifactStore (filesystem-backed), ProvenanceTracker (BFS chain walking)pipeline/): Config loader (Python modules via importlib), DAG resolver (Kahn's topological sort), Runner (orchestrator with intermediate projection materialization)transforms/): Registry pattern with 5 transforms — parse, episode_summary, monthly_rollup, topical_rollup, core_synthesissearch/): SQLite FTS5 index with layer filtering and provenance chain resolutionprojections/): SearchIndexProjection (FTS5), FlatFileProjection (markdown)cli.py): Click + Rich — progress bars, colored panels, provenance trees, status tablesDemo flow
Test plan
93 tests all passing: