From 81ce13773d514842634d1844b95e396172c3475f Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:39:19 +0530 Subject: [PATCH 01/13] feat(kg): add no-op curation seam in pipeline (flagged off) --- .../src/repowise/core/analysis/kg_curation.py | 71 +++++++ .../repowise/core/pipeline/orchestrator.py | 23 +++ tests/unit/analysis/test_kg_curation.py | 191 ++++++++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 packages/core/src/repowise/core/analysis/kg_curation.py create mode 100644 tests/unit/analysis/test_kg_curation.py diff --git a/packages/core/src/repowise/core/analysis/kg_curation.py b/packages/core/src/repowise/core/analysis/kg_curation.py new file mode 100644 index 00000000..0a35d99c --- /dev/null +++ b/packages/core/src/repowise/core/analysis/kg_curation.py @@ -0,0 +1,71 @@ +"""Curation/presentation pass over the deterministic KG skeleton. + +The exported knowledge graph is a *presentation* artifact, distinct from the +AST/dependency graph that powers queries. This module is the single seam where +the skeleton produced by :func:`build_knowledge_graph_skeleton` is reshaped into +something a human (or an AI reading the graph cold) can navigate: bounded, +dependency-ordered layers; a capped, ranked set of real entry points; one +canonical layer-aware tour; typed infra/CI/data nodes; and never-empty +summaries. + +**Hard invariant.** Curation reads the NetworkX graph, communities, and +centrality, but it *only ever writes the returned* :class:`KnowledgeGraphResult`. +It never mutates ``graph_builder``'s graph, ``graph_edges``, centrality caches, +community detection, or any DB table. There is a regression test that asserts the +graph's node/edge counts are identical before and after this pass. + +Curation is feature-flagged (``REPOWISE_KG_CURATION``) and defaults **off** so +the exported KG is byte-identical to today's until the multi-repo validation +gate passes. With the flag off, :func:`curate_knowledge_graph` is a no-op that +returns its input unchanged. +""" + +from __future__ import annotations + +import os +from typing import Any + +from repowise.core.analysis.knowledge_graph import KnowledgeGraphResult + +__all__ = ["curate_knowledge_graph", "curation_enabled"] + + +_FLAG_ENV = "REPOWISE_KG_CURATION" + + +def curation_enabled() -> bool: + """Whether KG curation is enabled via the ``REPOWISE_KG_CURATION`` env flag. + + Defaults to **off**. Any of ``1``/``true``/``yes``/``on`` (case-insensitive) + turns it on. Resolved at the call site so :func:`curate_knowledge_graph` + itself stays pure and trivially testable with an explicit ``enabled=``. + """ + return os.environ.get(_FLAG_ENV, "").strip().lower() in {"1", "true", "yes", "on"} + + +def curate_knowledge_graph( + kg: KnowledgeGraphResult, + *, + parsed_files: list[Any], + graph_builder: Any, + repo_structure: Any, + community_info: Any, + enabled: bool = False, +) -> KnowledgeGraphResult: + """Reshape the KG skeleton into an intuitive presentation artifact. + + Pure with respect to the AST graph: reads ``graph_builder`` / + ``community_info`` but writes only the returned result. When ``enabled`` is + ``False`` this is a strict no-op returning ``kg`` unchanged (the default, so + the exported KG is unaffected until the flag flips). + + Each curation step is added in a later phase and guarded so that a failure + degrades to the prior (uncurated) field rather than aborting the export. + """ + if not enabled: + return kg + + # Curation steps are layered in by subsequent phases: + # _curate_layers -> _curate_entry_points -> _curate_tour + # -> _curate_node_types -> _curate_summaries + return kg diff --git a/packages/core/src/repowise/core/pipeline/orchestrator.py b/packages/core/src/repowise/core/pipeline/orchestrator.py index 4639cbaa..948a8658 100644 --- a/packages/core/src/repowise/core/pipeline/orchestrator.py +++ b/packages/core/src/repowise/core/pipeline/orchestrator.py @@ -525,6 +525,29 @@ async def _ingestion_stage() -> tuple: f"{len(knowledge_graph_result.layers)} layers", ) _phase_done(progress, "knowledge_graph.skeleton") + + # ---- KG curation/presentation pass (flagged, default off) -------- + # Reshapes only the exported KG (layers/tour/entry-points/summaries); + # never touches the AST graph, communities, or centrality. No-op when + # the REPOWISE_KG_CURATION flag is off, so default output is unchanged. + # Runs in BOTH FAST and STANDARD (it sits before the generate branch). + if knowledge_graph_result is not None: + from repowise.core.analysis.kg_curation import ( + curate_knowledge_graph, + curation_enabled, + ) + + try: + knowledge_graph_result = curate_knowledge_graph( + knowledge_graph_result, + parsed_files=parsed_files, + graph_builder=graph_builder, + repo_structure=repo_structure, + community_info=graph_builder.community_info(), + enabled=curation_enabled(), + ) + except (ValueError, KeyError, RuntimeError) as cur_err: + logger.error("kg_curation_failed", error=str(cur_err), exc_info=True) except (ValueError, KeyError, OSError, RuntimeError) as kg_err: logger.error("kg_skeleton_building_failed", error=str(kg_err), exc_info=True) diff --git a/tests/unit/analysis/test_kg_curation.py b/tests/unit/analysis/test_kg_curation.py new file mode 100644 index 00000000..e9f40ed1 --- /dev/null +++ b/tests/unit/analysis/test_kg_curation.py @@ -0,0 +1,191 @@ +"""Tests for the KG curation/presentation pass (``kg_curation``). + +Grows phase-by-phase. Phase 0 locks the seam: a no-op when the flag is off, a +flag reader, and the AST-graph-untouched guard. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from types import SimpleNamespace +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from repowise.core.analysis.kg_curation import curate_knowledge_graph, curation_enabled +from repowise.core.analysis.knowledge_graph import ( + KnowledgeGraphResult, + build_knowledge_graph_skeleton, +) + +# --------------------------------------------------------------------------- +# Fixtures / fakes +# --------------------------------------------------------------------------- + + +@dataclass +class FakeFileInfo: + path: str + language: str = "python" + size_bytes: int = 1000 + is_test: bool = False + is_config: bool = False + is_api_contract: bool = False + is_entry_point: bool = False + line_count: int = 100 + + +@dataclass +class FakeSymbol: + name: str = "my_func" + kind: str = "function" + start_line: int = 1 + end_line: int = 10 + is_reexport: bool = False + + +@dataclass +class FakeParsedFile: + file_info: FakeFileInfo + symbols: list = field(default_factory=list) + imports: list = field(default_factory=list) + exports: list = field(default_factory=list) + + +def _make_graph_builder( + nodes: dict[str, dict], + edges: list[tuple[str, str, dict]], + communities: dict[str, int], + community_infos: dict[int, Any], + pagerank: dict[str, float], + betweenness: dict[str, float] | None = None, +): + import networkx as nx + + g = nx.DiGraph() + for nid, data in nodes.items(): + g.add_node(nid, **data) + for u, v, data in edges: + g.add_edge(u, v, **data) + + builder = MagicMock() + builder.graph.return_value = g + builder.pagerank.return_value = pagerank + builder.betweenness_centrality.return_value = betweenness or {} + builder.community_detection.return_value = communities + builder.community_info.return_value = community_infos + return builder + + +def _community_info(cid: int, label: str, members: list[str]): + return SimpleNamespace( + community_id=cid, + label=label, + members=members, + size=len(members), + cohesion=0.8, + dominant_language="python", + ) + + +@pytest.fixture +def simple_repo(): + """A tiny three-file repo: entry, core, test.""" + parsed = [ + FakeParsedFile( + FakeFileInfo("src/main.py", is_entry_point=True), symbols=[FakeSymbol("main")] + ), + FakeParsedFile(FakeFileInfo("src/core.py"), symbols=[FakeSymbol("Core", "class")]), + FakeParsedFile( + FakeFileInfo("tests/test_main.py", is_test=True), symbols=[FakeSymbol("test_main")] + ), + ] + nodes = { + "src/main.py": {"node_type": "file", "language": "python", "is_entry_point": True}, + "src/core.py": {"node_type": "file", "language": "python"}, + "tests/test_main.py": {"node_type": "file", "language": "python", "is_test": True}, + } + edges = [ + ("src/main.py", "src/core.py", {"edge_type": "imports", "confidence": 1.0}), + ("tests/test_main.py", "src/main.py", {"edge_type": "imports", "confidence": 1.0}), + ] + communities = {"src/main.py": 0, "src/core.py": 0, "tests/test_main.py": 1} + infos = { + 0: _community_info(0, "src/core", ["src/main.py", "src/core.py"]), + 1: _community_info(1, "tests", ["tests/test_main.py"]), + } + pagerank = {"src/main.py": 0.5, "src/core.py": 0.3, "tests/test_main.py": 0.2} + builder = _make_graph_builder(nodes, edges, communities, infos, pagerank) + repo_structure = SimpleNamespace( + is_monorepo=False, + total_files=3, + entry_points=["src/main.py"], + ) + return SimpleNamespace(parsed=parsed, builder=builder, repo_structure=repo_structure) + + +def _build_skeleton(repo) -> KnowledgeGraphResult: + return build_knowledge_graph_skeleton( + parsed_files=repo.parsed, + graph_builder=repo.builder, + repo_structure=repo.repo_structure, + tech_stack=[], + external_systems=[], + ) + + +def _curate(repo, **kw) -> KnowledgeGraphResult: + return curate_knowledge_graph( + _build_skeleton(repo), + parsed_files=repo.parsed, + graph_builder=repo.builder, + repo_structure=repo.repo_structure, + community_info=repo.builder.community_info(), + **kw, + ) + + +# --------------------------------------------------------------------------- +# Phase 0 — the seam +# --------------------------------------------------------------------------- + + +class TestCurationFlag: + def test_default_off(self, monkeypatch): + monkeypatch.delenv("REPOWISE_KG_CURATION", raising=False) + assert curation_enabled() is False + + @pytest.mark.parametrize("val", ["1", "true", "TRUE", "yes", "on"]) + def test_truthy_values_enable(self, monkeypatch, val): + monkeypatch.setenv("REPOWISE_KG_CURATION", val) + assert curation_enabled() is True + + @pytest.mark.parametrize("val", ["0", "false", "no", "off", "", "garbage"]) + def test_falsy_values_disable(self, monkeypatch, val): + monkeypatch.setenv("REPOWISE_KG_CURATION", val) + assert curation_enabled() is False + + +class TestIdentityPass: + def test_noop_returns_input_unchanged(self, simple_repo): + kg = _build_skeleton(simple_repo) + before = kg.to_dict() + out = curate_knowledge_graph( + kg, + parsed_files=simple_repo.parsed, + graph_builder=simple_repo.builder, + repo_structure=simple_repo.repo_structure, + community_info=simple_repo.builder.community_info(), + enabled=False, + ) + assert out is kg + assert out.to_dict() == before + + def test_ast_graph_untouched(self, simple_repo): + """The §4D guard: graph node/edge counts identical pre/post curation.""" + g = simple_repo.builder.graph() + before = (g.number_of_nodes(), g.number_of_edges()) + _curate(simple_repo, enabled=True) + g = simple_repo.builder.graph() + assert (g.number_of_nodes(), g.number_of_edges()) == before From e04eb0b67f9c66cff778e25c0c5159b994316b6e Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:42:08 +0530 Subject: [PATCH 02/13] feat(kg): add CLI hint to layer spine (edge case A) --- .../src/repowise/core/generation/layers.py | 20 ++++++++++--------- tests/unit/generation/test_layers.py | 8 +++++++- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/packages/core/src/repowise/core/generation/layers.py b/packages/core/src/repowise/core/generation/layers.py index 3bc0f8c7..15eac903 100644 --- a/packages/core/src/repowise/core/generation/layers.py +++ b/packages/core/src/repowise/core/generation/layers.py @@ -32,6 +32,7 @@ # --------------------------------------------------------------------------- _LAYER_HINTS: tuple[tuple[str, frozenset[str]], ...] = ( + ("CLI", frozenset({"cli", "commands", "cmd", "cli_commands"})), ("API", frozenset({"routes", "api", "controllers", "endpoints", "handlers", "routers"})), ("Service", frozenset({"services", "core", "lib", "domain", "logic", "usecases"})), ("Data", frozenset({"models", "db", "data", "persistence", "repository", "repositories", "store", "stores", "entities"})), @@ -52,15 +53,16 @@ # (foundational): top imports middle imports bottom. _CANONICAL_RANK: dict[str, int] = { "UI": 0, - "API": 1, - "Middleware": 2, - "Service": 3, - DEFAULT_LAYER: 4, - "Data": 5, - "Types": 6, - "Config": 7, - "Utility": 8, - "Test": 9, + "CLI": 1, + "API": 2, + "Middleware": 3, + "Service": 4, + DEFAULT_LAYER: 5, + "Data": 6, + "Types": 7, + "Config": 8, + "Utility": 9, + "Test": 10, } diff --git a/tests/unit/generation/test_layers.py b/tests/unit/generation/test_layers.py index 669b238d..a9084df1 100644 --- a/tests/unit/generation/test_layers.py +++ b/tests/unit/generation/test_layers.py @@ -8,7 +8,6 @@ infer_layer, ) - # --------------------------------------------------------------------------- # infer_layer — every file maps to exactly one layer # --------------------------------------------------------------------------- @@ -26,6 +25,13 @@ def test_infer_layer_matches_directory_hints(): assert infer_layer("src/types/dtos.ts") == "Types" +def test_infer_layer_recognizes_cli_command_surface(): + # Edge case A: a CLI command surface must not fall through to Application. + assert infer_layer("packages/cli/src/repowise/cli/commands/init_cmd.py") == "CLI" + assert infer_layer("src/cli/main.py") == "CLI" + assert infer_layer("app/cmd/serve.py") == "CLI" + + def test_infer_layer_uses_deepest_matching_directory(): # The closest directory wins over a shallower one. assert infer_layer("services/api/handler.py") == "API" From 63f60c9619b25e9252620269111458b646173bd9 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:45:46 +0530 Subject: [PATCH 03/13] feat(kg): curate layers from infer_layer spine with mega-layer sub-split --- .../src/repowise/core/analysis/kg_curation.py | 161 +++++++++++++++++- 1 file changed, 159 insertions(+), 2 deletions(-) diff --git a/packages/core/src/repowise/core/analysis/kg_curation.py b/packages/core/src/repowise/core/analysis/kg_curation.py index 0a35d99c..93d8cba0 100644 --- a/packages/core/src/repowise/core/analysis/kg_curation.py +++ b/packages/core/src/repowise/core/analysis/kg_curation.py @@ -22,16 +22,34 @@ from __future__ import annotations +import logging import os +from collections import defaultdict +from pathlib import PurePosixPath from typing import Any -from repowise.core.analysis.knowledge_graph import KnowledgeGraphResult +from repowise.core.analysis.knowledge_graph import KnowledgeGraphResult, _slugify +from repowise.core.generation.layers import compute_layer_order, infer_layer __all__ = ["curate_knowledge_graph", "curation_enabled"] +logger = logging.getLogger(__name__) + _FLAG_ENV = "REPOWISE_KG_CURATION" +# A primary layer larger than this many files, or spanning more than this many +# distinct sub-directories, is given a two-level structure (primary → named +# sub-groups) so a mega-layer like core/* or ui/* stays drill-down legible +# instead of becoming one opaque bucket (plan §Phase 1, edge case B). +_SUBSPLIT_FILE_THRESHOLD = 60 +_SUBSPLIT_DIR_THRESHOLD = 8 + +# Hard bound on the curated primary-layer count. The spine is bounded ≤~11 by +# construction; if a future change ever blows past this we degrade to the +# uncurated layers rather than ship an unreadable list. +_MAX_LAYERS = 15 + def curation_enabled() -> bool: """Whether KG curation is enabled via the ``REPOWISE_KG_CURATION`` env flag. @@ -65,7 +83,146 @@ def curate_knowledge_graph( if not enabled: return kg - # Curation steps are layered in by subsequent phases: + # Each step mutates only ``kg`` (the presentation result) and is guarded so + # a failure degrades to the prior, uncurated field rather than aborting the + # export. Steps are layered in by subsequent phases: # _curate_layers -> _curate_entry_points -> _curate_tour # -> _curate_node_types -> _curate_summaries + try: + curated = _curate_layers(kg, graph_builder) + if curated is not None: + kg.layers = curated + except Exception: # pragma: no cover - defensive; keep uncurated layers + logger.exception("kg_curation._curate_layers failed; keeping community layers") + return kg + + +# --------------------------------------------------------------------------- +# Phase 1 — curated layers (replace raw-community layers with the spine) +# --------------------------------------------------------------------------- + + +def _file_nodes(kg: KnowledgeGraphResult) -> list[dict]: + """Return the file-typed nodes of *kg* (ids prefixed ``file:``).""" + return [ + n + for n in kg.nodes + if isinstance(n.get("id"), str) + and n["id"].startswith("file:") + and isinstance(n.get("filePath"), str) + ] + + +def _file_import_edges(graph_builder: Any) -> list[tuple[str, str]]: + """``(src, dst)`` string edges from the AST graph (src imports dst). + + Mirrors the wiki spine's edge extraction. Symbol-node ids and externals are + naturally ignored downstream by :func:`compute_layer_order`, which only + counts edges whose endpoints are both in ``file_layers``. + """ + edges: list[tuple[str, str]] = [] + try: + for src, dst in graph_builder.graph().edges(): + if isinstance(src, str) and isinstance(dst, str): + edges.append((src, dst)) + except Exception: # pragma: no cover - defensive + pass + return edges + + +def _common_dir_prefix(seg_lists: list[tuple[str, ...]]) -> tuple[str, ...]: + """Longest common leading directory-segment prefix across *seg_lists*.""" + if not seg_lists: + return () + common = list(seg_lists[0]) + for segs in seg_lists[1:]: + i = 0 + while i < len(common) and i < len(segs) and common[i] == segs[i]: + i += 1 + del common[i:] + if not common: + break + return tuple(common) + + +def _sub_split(layer_id: str, node_ids: list[str], id_to_path: dict[str, str]) -> list[dict] | None: + """Two-level sub-groups for an oversized/wide primary layer, else ``None``. + + Groups files by the first path segment that distinguishes them (the segment + after the layer's common directory prefix), so e.g. ``core/ingestion`` / + ``core/analysis`` / ``core/generation`` become named sub-groups. Only kicks + in past the size/width thresholds and only when it yields ≥2 groups. + """ + if len(node_ids) < 2: + return None + + dir_segs = {nid: PurePosixPath(id_to_path[nid]).parts[:-1] for nid in node_ids} + common = _common_dir_prefix(list(dir_segs.values())) + + groups: dict[str, list[str]] = defaultdict(list) + for nid in node_ids: + segs = dir_segs[nid] + key = segs[len(common)] if len(segs) > len(common) else "(root)" + groups[key].append(nid) + + oversized = len(node_ids) > _SUBSPLIT_FILE_THRESHOLD + wide = len(groups) > _SUBSPLIT_DIR_THRESHOLD + if not (oversized or wide) or len(groups) < 2: + return None + + return [ + {"id": f"{layer_id}:{_slugify(name)}", "name": name, "nodeIds": groups[name]} + for name in sorted(groups) + ] + + +def _curate_layers(kg: KnowledgeGraphResult, graph_builder: Any) -> list[dict] | None: + """Build bounded, dependency-ordered layers from the ``infer_layer`` spine. + + Returns the curated layer list, or ``None`` to keep the existing + (community) layers when the result would be degenerate or violate the + partition / bound invariants. Every file lands in exactly one layer, so the + partition (Σ nodeIds == file-node count) and singleton-elimination hold by + construction. + """ + file_nodes = _file_nodes(kg) + if not file_nodes: + return None + + id_to_path = {n["id"]: n["filePath"] for n in file_nodes} + file_layers = {n["filePath"]: infer_layer(n["filePath"]) for n in file_nodes} + order = compute_layer_order(file_layers, _file_import_edges(graph_builder)) + + by_layer: dict[str, list[str]] = defaultdict(list) + for n in file_nodes: + by_layer[file_layers[n["filePath"]]].append(n["id"]) + + layers: list[dict] = [] + for display_order, layer_name in enumerate(order): + node_ids = by_layer[layer_name] + layer_id = f"layer:{_slugify(layer_name)}" + layer: dict[str, Any] = { + "id": layer_id, + "name": layer_name, + "description": "", + "nodeIds": node_ids, + "display_order": display_order, + } + sub_groups = _sub_split(layer_id, node_ids, id_to_path) + if sub_groups: + layer["subGroups"] = sub_groups + layers.append(layer) + + # Degrade rather than ship a broken artifact: enforce bound + partition. + total = sum(len(layer["nodeIds"]) for layer in layers) + if not layers or len(layers) > _MAX_LAYERS or total != len(file_nodes): + logger.warning( + "kg_curation: curated layers failed invariant " + "(count=%d, partition=%d/%d); keeping community layers", + len(layers), + total, + len(file_nodes), + ) + return None + return layers From b640d7f23a6dfa3ddb13f1ae86fbdae8559912e7 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:45:46 +0530 Subject: [PATCH 04/13] test(kg): layer count + partition + sub-split invariants --- tests/unit/analysis/test_kg_curation.py | 133 ++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/tests/unit/analysis/test_kg_curation.py b/tests/unit/analysis/test_kg_curation.py index e9f40ed1..d9b36835 100644 --- a/tests/unit/analysis/test_kg_curation.py +++ b/tests/unit/analysis/test_kg_curation.py @@ -146,6 +146,70 @@ def _curate(repo, **kw) -> KnowledgeGraphResult: ) +def build_repo( + paths: list[str], + *, + tests: set[str] | None = None, + entries: set[str] | None = None, + edges: list[tuple[str, str]] | None = None, + reexport_only: set[str] | None = None, +): + """Build a synthetic repo (parsed files + mock graph builder) from paths.""" + tests = tests or set() + entries = entries or set() + reexport_only = reexport_only or set() + + parsed = [] + nodes: dict[str, dict] = {} + for p in paths: + is_test = p in tests + is_entry = p in entries + if p in reexport_only: + syms = [FakeSymbol(name="reexport", kind="variable", is_reexport=True)] + else: + syms = [FakeSymbol(name="thing", kind="function")] + parsed.append( + FakeParsedFile(FakeFileInfo(p, is_test=is_test, is_entry_point=is_entry), symbols=syms) + ) + nodes[p] = {"node_type": "file", "language": "python"} + if is_test: + nodes[p]["is_test"] = True + if is_entry: + nodes[p]["is_entry_point"] = True + + graph_edges = [(u, v, {"edge_type": "imports", "confidence": 1.0}) for u, v in (edges or [])] + communities = {p: 0 for p in paths} + infos = {0: _community_info(0, "all", list(paths))} + pagerank = {p: 1.0 / max(len(paths), 1) for p in paths} + builder = _make_graph_builder(nodes, graph_edges, communities, infos, pagerank) + repo_structure = SimpleNamespace( + is_monorepo=True, total_files=len(paths), entry_points=sorted(entries) + ) + return SimpleNamespace(parsed=parsed, builder=builder, repo_structure=repo_structure) + + +@pytest.fixture +def large_repo(): + """A realistically-shaped monorepo: several layers, two mega-layers.""" + paths: list[str] = [] + # Service mega-layer (core/*) spanning sub-dirs → should sub-split. + for sub in ("ingestion", "analysis", "generation"): + paths += [f"packages/core/src/repowise/core/{sub}/mod{i}.py" for i in range(24)] + # UI mega-layer, spanning sub-dirs → should also sub-split. + for sub in ("buttons", "forms", "layout"): + paths += [f"packages/ui/src/components/{sub}/C{i}.tsx" for i in range(24)] + # CLI (edge case A — must not be Application). + paths += [f"packages/cli/src/repowise/cli/commands/cmd{i}.py" for i in range(20)] + # API, Data, Config, Test, Utility — smaller named layers. + paths += [f"src/api/route{i}.py" for i in range(12)] + paths += [f"src/models/model{i}.py" for i in range(10)] + paths += [f"src/utils/util{i}.py" for i in range(8)] + paths += [f"config/conf{i}.yaml" for i in range(6)] + tests = {f"tests/unit/test_{i}.py" for i in range(30)} + paths += sorted(tests) + return build_repo(paths, tests=tests) + + # --------------------------------------------------------------------------- # Phase 0 — the seam # --------------------------------------------------------------------------- @@ -189,3 +253,72 @@ def test_ast_graph_untouched(self, simple_repo): _curate(simple_repo, enabled=True) g = simple_repo.builder.graph() assert (g.number_of_nodes(), g.number_of_edges()) == before + + +# --------------------------------------------------------------------------- +# Phase 1 — curated layers +# --------------------------------------------------------------------------- + + +def _layer_names(kg) -> set[str]: + return {layer["name"] for layer in kg.layers} + + +def _file_node_count(kg) -> int: + return sum(1 for n in kg.nodes if n["id"].startswith("file:")) + + +class TestCuratedLayers: + def test_flag_off_keeps_community_layers(self, large_repo): + kg = _curate(large_repo, enabled=False) + # The skeleton's community layers: one community "all" → one layer. + assert _layer_names(kg) == {"all"} + + def test_layer_count_bounded(self, large_repo): + kg = _curate(large_repo, enabled=True) + assert 6 <= len(kg.layers) <= 15 + + def test_partition_invariant(self, large_repo): + kg = _curate(large_repo, enabled=True) + seen: set[str] = set() + for layer in kg.layers: + for nid in layer["nodeIds"]: + assert nid not in seen, "a file appears in two layers" + seen.add(nid) + assert len(seen) == _file_node_count(kg), "every file in exactly one layer" + + def test_no_singleton_spam(self, large_repo): + kg = _curate(large_repo, enabled=True) + singletons = sum(1 for layer in kg.layers if len(layer["nodeIds"]) == 1) + assert singletons / len(kg.layers) < 0.10 + + def test_cli_is_its_own_layer(self, large_repo): + kg = _curate(large_repo, enabled=True) + assert "CLI" in _layer_names(kg) + assert "Application" not in _layer_names(kg) # nothing falls through here + + def test_mega_layers_sub_split(self, large_repo): + kg = _curate(large_repo, enabled=True) + by_name = {layer["name"]: layer for layer in kg.layers} + for mega in ("Service", "UI"): + sub = by_name[mega].get("subGroups") + assert sub and len(sub) >= 2, f"{mega} should sub-split" + # Sub-groups partition their parent layer. + sub_ids = [nid for grp in sub for nid in grp["nodeIds"]] + assert sorted(sub_ids) == sorted(by_name[mega]["nodeIds"]) + + def test_largest_primary_layer_within_bound(self, large_repo): + kg = _curate(large_repo, enabled=True) + total = _file_node_count(kg) + largest = max(len(layer["nodeIds"]) for layer in kg.layers) + assert largest / total <= 0.35 + + def test_layers_are_dependency_ordered(self, large_repo): + kg = _curate(large_repo, enabled=True) + orders = [layer["display_order"] for layer in kg.layers] + assert orders == list(range(len(kg.layers))) + + def test_deterministic(self, large_repo): + a = _curate(large_repo, enabled=True) + b = _curate(large_repo, enabled=True) + assert a.layers == b.layers From 2b72486bf60b10cb3fa7c64c17e85af609f476a8 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:49:19 +0530 Subject: [PATCH 05/13] feat(kg): demote barrels, rank + cap entry points --- .../src/repowise/core/analysis/kg_curation.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/packages/core/src/repowise/core/analysis/kg_curation.py b/packages/core/src/repowise/core/analysis/kg_curation.py index 93d8cba0..8399d854 100644 --- a/packages/core/src/repowise/core/analysis/kg_curation.py +++ b/packages/core/src/repowise/core/analysis/kg_curation.py @@ -50,6 +50,17 @@ # uncurated layers rather than ship an unreadable list. _MAX_LAYERS = 15 +# Entry-point precision (plan §Phase 2). A re-export *barrel* (typically an +# ``index.ts``) carries the ``index`` stem heuristic's ``entry_point`` flag but +# teaches a reader nothing, so it is demoted in the presentation view. Runtime +# entries that survive are ranked by ``pagerank + betweenness`` and the surfaced +# set is capped — the full ranked list is kept as ``entry_candidates``. +_BARREL_STEMS = frozenset({"index"}) +_SUBSTANTIVE_KINDS = frozenset( + {"function", "method", "class", "struct", "interface", "enum", "trait", "impl", "macro"} +) +_MAX_ENTRY_POINTS = 8 + def curation_enabled() -> bool: """Whether KG curation is enabled via the ``REPOWISE_KG_CURATION`` env flag. @@ -95,6 +106,11 @@ def curate_knowledge_graph( except Exception: # pragma: no cover - defensive; keep uncurated layers logger.exception("kg_curation._curate_layers failed; keeping community layers") + try: + _curate_entry_points(kg, parsed_files, graph_builder) + except Exception: # pragma: no cover - defensive; keep skeleton entry points + logger.exception("kg_curation._curate_entry_points failed; keeping raw entry points") + return kg @@ -226,3 +242,72 @@ def _curate_layers(kg: KnowledgeGraphResult, graph_builder: Any) -> list[dict] | ) return None return layers + + +# --------------------------------------------------------------------------- +# Phase 2 — entry-point precision (demote barrels, rank + cap survivors) +# --------------------------------------------------------------------------- + + +def _is_barrel(parsed_file: Any) -> bool: + """True if *parsed_file* is a re-export barrel (``index`` shell, no runtime). + + Conservative by design: a file is a barrel only when its stem is ``index`` + and it defines no runtime-bearing symbol (function/class/method/…) — purely + re-exporting or empty. Anything that defines executable behaviour, even if + named ``index``, is kept as a genuine entry candidate. + """ + fi = getattr(parsed_file, "file_info", None) + path = getattr(fi, "path", "") + if PurePosixPath(path).stem.lower() not in _BARREL_STEMS: + return False + + symbols = getattr(parsed_file, "symbols", []) or [] + if any(getattr(s, "kind", "") in _SUBSTANTIVE_KINDS for s in symbols): + return False + + has_reexports = any(getattr(imp, "is_reexport", False) for imp in getattr(parsed_file, "imports", []) or []) + exports_only = bool(getattr(parsed_file, "exports", [])) + return has_reexports or exports_only or not symbols + + +def _curate_entry_points(kg: KnowledgeGraphResult, parsed_files: list[Any], graph_builder: Any) -> None: + """Demote re-export barrels and surface a capped, ranked entry-point set. + + Mutates only the presentation view: drops the ``entry_point`` *tag* from + barrel nodes (and adds a ``barrel`` tag) without touching the AST graph's + ``is_entry_point`` flag (the dead-code pass relies on it). Survivors are + ranked by ``pagerank + betweenness``; ``project.entry_points`` holds the top + few, ``project.entry_candidates`` the full ranked list. + """ + pf_by_path = {pf.file_info.path: pf for pf in parsed_files if getattr(pf, "file_info", None)} + pagerank = graph_builder.pagerank() or {} + try: + betweenness = graph_builder.betweenness_centrality() or {} + except Exception: # pragma: no cover - defensive + betweenness = {} + + survivors: list[tuple[float, str]] = [] + for node in kg.nodes: + nid = node.get("id", "") + if not (isinstance(nid, str) and nid.startswith("file:")): + continue + tags = node.get("tags") or [] + if "entry_point" not in tags: + continue + path = node.get("filePath", "") + pf = pf_by_path.get(path) + if pf is not None and _is_barrel(pf): + new_tags = [t for t in tags if t != "entry_point"] + if "barrel" not in new_tags: + new_tags.append("barrel") + node["tags"] = new_tags + continue + score = pagerank.get(path, 0.0) + betweenness.get(path, 0.0) + survivors.append((score, path)) + + # Highest score first; path as a stable, deterministic tie-break. + survivors.sort(key=lambda sp: (-sp[0], sp[1])) + ranked = [path for _, path in survivors] + kg.project["entry_points"] = ranked[:_MAX_ENTRY_POINTS] + kg.project["entry_candidates"] = ranked From e7dd8efc2d38e8b4dbedd38a62734333e166b429 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:49:19 +0530 Subject: [PATCH 06/13] test(kg): entry-point precision invariants --- tests/unit/analysis/test_kg_curation.py | 98 ++++++++++++++++++++++--- 1 file changed, 88 insertions(+), 10 deletions(-) diff --git a/tests/unit/analysis/test_kg_curation.py b/tests/unit/analysis/test_kg_curation.py index d9b36835..250dd1dc 100644 --- a/tests/unit/analysis/test_kg_curation.py +++ b/tests/unit/analysis/test_kg_curation.py @@ -152,25 +152,34 @@ def build_repo( tests: set[str] | None = None, entries: set[str] | None = None, edges: list[tuple[str, str]] | None = None, - reexport_only: set[str] | None = None, + barrels: set[str] | None = None, + pagerank: dict[str, float] | None = None, + betweenness: dict[str, float] | None = None, ): """Build a synthetic repo (parsed files + mock graph builder) from paths.""" tests = tests or set() entries = entries or set() - reexport_only = reexport_only or set() + barrels = barrels or set() parsed = [] nodes: dict[str, dict] = {} for p in paths: is_test = p in tests is_entry = p in entries - if p in reexport_only: - syms = [FakeSymbol(name="reexport", kind="variable", is_reexport=True)] + if p in barrels: + # A re-export shell: no runtime symbols, exports names only. + pf = FakeParsedFile( + FakeFileInfo(p, is_test=is_test, is_entry_point=is_entry), + symbols=[], + imports=[SimpleNamespace(is_reexport=True)], + exports=["A", "B"], + ) else: - syms = [FakeSymbol(name="thing", kind="function")] - parsed.append( - FakeParsedFile(FakeFileInfo(p, is_test=is_test, is_entry_point=is_entry), symbols=syms) - ) + pf = FakeParsedFile( + FakeFileInfo(p, is_test=is_test, is_entry_point=is_entry), + symbols=[FakeSymbol(name="thing", kind="function")], + ) + parsed.append(pf) nodes[p] = {"node_type": "file", "language": "python"} if is_test: nodes[p]["is_test"] = True @@ -180,8 +189,8 @@ def build_repo( graph_edges = [(u, v, {"edge_type": "imports", "confidence": 1.0}) for u, v in (edges or [])] communities = {p: 0 for p in paths} infos = {0: _community_info(0, "all", list(paths))} - pagerank = {p: 1.0 / max(len(paths), 1) for p in paths} - builder = _make_graph_builder(nodes, graph_edges, communities, infos, pagerank) + pr = pagerank or {p: 1.0 / max(len(paths), 1) for p in paths} + builder = _make_graph_builder(nodes, graph_edges, communities, infos, pr, betweenness) repo_structure = SimpleNamespace( is_monorepo=True, total_files=len(paths), entry_points=sorted(entries) ) @@ -322,3 +331,72 @@ def test_deterministic(self, large_repo): a = _curate(large_repo, enabled=True) b = _curate(large_repo, enabled=True) assert a.layers == b.layers + + +# --------------------------------------------------------------------------- +# Phase 2 — entry-point precision +# --------------------------------------------------------------------------- + + +@pytest.fixture +def entry_repo(): + """Real runtime entries plus re-export barrels, all flagged entry_point.""" + reals = [f"src/app{i}/main.py" for i in range(12)] + barrels = {f"packages/p{i}/index.ts" for i in range(5)} + paths = reals + sorted(barrels) + entries = set(reals) | barrels + # Give barrels deliberately high PageRank — they must still be demoted. + pagerank = {p: (12 - i) / 100.0 for i, p in enumerate(reals)} + for b in barrels: + pagerank[b] = 0.9 + return build_repo(paths, entries=entries, barrels=barrels, pagerank=pagerank) + + +def _project(kg) -> dict: + return kg.project + + +class TestEntryPointPrecision: + def test_barrels_demoted_in_presentation(self, entry_repo): + kg = _curate(entry_repo, enabled=True) + for node in kg.nodes: + if node.get("filePath", "").endswith("index.ts"): + assert "entry_point" not in node["tags"] + assert "barrel" in node["tags"] + + def test_no_barrel_in_surfaced_set(self, entry_repo): + kg = _curate(entry_repo, enabled=True) + assert all(not p.endswith("index.ts") for p in _project(kg)["entry_points"]) + assert all(not p.endswith("index.ts") for p in _project(kg)["entry_candidates"]) + + def test_surfaced_set_capped(self, entry_repo): + kg = _curate(entry_repo, enabled=True) + assert len(_project(kg)["entry_points"]) <= 8 + + def test_ranked_by_centrality(self, entry_repo): + kg = _curate(entry_repo, enabled=True) + # app0 has the highest PageRank among reals → ranks first. + assert _project(kg)["entry_points"][0] == "src/app0/main.py" + + def test_full_candidate_list_kept(self, entry_repo): + kg = _curate(entry_repo, enabled=True) + # All 12 real entries survive as candidates; 5 barrels excluded. + assert len(_project(kg)["entry_candidates"]) == 12 + + def test_ast_is_entry_point_flag_untouched(self, entry_repo): + """Demotion is presentation-only — the graph flag stays for dead-code.""" + _curate(entry_repo, enabled=True) + g = entry_repo.builder.graph() + for path, data in g.nodes(data=True): + if path.endswith("index.ts"): + assert data.get("is_entry_point") is True + + def test_deterministic(self, entry_repo): + a = _curate(entry_repo, enabled=True) + b = _curate(entry_repo, enabled=True) + assert a.project["entry_points"] == b.project["entry_points"] + assert a.project["entry_candidates"] == b.project["entry_candidates"] + + def test_flag_off_leaves_entry_points_untouched(self, entry_repo): + kg = _curate(entry_repo, enabled=False) + assert "entry_candidates" not in kg.project From c7ca129f5f579bdffa042d7a10c54b7f2c18dfbd Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:55:15 +0530 Subject: [PATCH 07/13] feat(kg): export canonical layer-aware tour; keep it over LLM tour when curated --- .../src/repowise/core/analysis/kg_curation.py | 171 +++++++++++++++++- .../core/generation/knowledge_graph.py | 17 +- 2 files changed, 182 insertions(+), 6 deletions(-) diff --git a/packages/core/src/repowise/core/analysis/kg_curation.py b/packages/core/src/repowise/core/analysis/kg_curation.py index 8399d854..fc5afdda 100644 --- a/packages/core/src/repowise/core/analysis/kg_curation.py +++ b/packages/core/src/repowise/core/analysis/kg_curation.py @@ -30,6 +30,11 @@ from repowise.core.analysis.knowledge_graph import KnowledgeGraphResult, _slugify from repowise.core.generation.layers import compute_layer_order, infer_layer +from repowise.core.generation.tour import ( + DEFAULT_MAX_STOPS, + build_tour, + score_entry_points, +) __all__ = ["curate_knowledge_graph", "curation_enabled"] @@ -111,6 +116,13 @@ def curate_knowledge_graph( except Exception: # pragma: no cover - defensive; keep skeleton entry points logger.exception("kg_curation._curate_entry_points failed; keeping raw entry points") + try: + tour = _curate_tour(kg, parsed_files, graph_builder) + if tour is not None: + kg.tour = tour + except Exception: # pragma: no cover - defensive; keep skeleton/LLM tour + logger.exception("kg_curation._curate_tour failed; keeping existing tour") + return kg @@ -266,12 +278,16 @@ def _is_barrel(parsed_file: Any) -> bool: if any(getattr(s, "kind", "") in _SUBSTANTIVE_KINDS for s in symbols): return False - has_reexports = any(getattr(imp, "is_reexport", False) for imp in getattr(parsed_file, "imports", []) or []) + has_reexports = any( + getattr(imp, "is_reexport", False) for imp in getattr(parsed_file, "imports", []) or [] + ) exports_only = bool(getattr(parsed_file, "exports", [])) return has_reexports or exports_only or not symbols -def _curate_entry_points(kg: KnowledgeGraphResult, parsed_files: list[Any], graph_builder: Any) -> None: +def _curate_entry_points( + kg: KnowledgeGraphResult, parsed_files: list[Any], graph_builder: Any +) -> None: """Demote re-export barrels and surface a capped, ranked entry-point set. Mutates only the presentation view: drops the ``entry_point`` *tag* from @@ -311,3 +327,154 @@ def _curate_entry_points(kg: KnowledgeGraphResult, parsed_files: list[Any], grap ranked = [path for _, path in survivors] kg.project["entry_points"] = ranked[:_MAX_ENTRY_POINTS] kg.project["entry_candidates"] = ranked + + +# --------------------------------------------------------------------------- +# Phase 3 — canonical, layer-aware tour +# --------------------------------------------------------------------------- + + +def _readme_overview_node(kg: KnowledgeGraphResult) -> dict | None: + """The best root-level README/overview file node, if one exists.""" + best: dict | None = None + for n in _file_nodes(kg): + path = n["filePath"] + name = PurePosixPath(path).name.lower() + depth = len(PurePosixPath(path).parts) - 1 + if not (name.startswith("readme") and depth <= 1): + continue + # Prefer the shallowest README (the repo-root one). + if best is None or depth < (len(PurePosixPath(best["filePath"]).parts) - 1): + best = n + return best + + +def _best_in_layer(paths: list[str], rank: dict[str, float], pagerank: dict[str, float]) -> str: + """Highest-ranked path in a layer (entry score, then PageRank, then name).""" + return sorted(paths, key=lambda p: (-rank.get(p, 0.0), -pagerank.get(p, 0.0), p))[0] + + +def _curate_tour( + kg: KnowledgeGraphResult, parsed_files: list[Any], graph_builder: Any +) -> list[dict] | None: + """Build one canonical, layer-aware tour over the curated layers. + + Uses the deterministic :func:`build_tour` (BFS-from-entry + PageRank) as the + base ordering, opens with the repo README/overview, then diversifies so the + walk covers as many curated layers as the step budget allows (swapping + redundant same-layer stops for representatives of uncovered layers). Every + step carries a ``layer_id`` mapping it to a curated layer, so the tour reads + the architecture top→bottom. The LLM may later rewrite step *prose* only. + """ + file_nodes = _file_nodes(kg) + if not file_nodes: + return None + + paths = [n["filePath"] for n in file_nodes] + type_by_path = {n["filePath"]: n.get("type", "file") for n in file_nodes} + file_layers = {p: infer_layer(p) for p in paths} + order = compute_layer_order(file_layers, _file_import_edges(graph_builder)) + layer_index = {name: i for i, name in enumerate(order)} + + pagerank = graph_builder.pagerank() or {} + rank = {path: s for s, path in score_entry_points(parsed_files, pagerank)} + + # Infra files (Docker/CI/etc.) close the tour; everything else is code. + infra_paths = [p for p in paths if type_by_path.get(p) in {"service", "pipeline"}] + + project_name = kg.project.get("name") or "repository" + base = build_tour( + parsed_files, + pagerank, + _file_import_edges(graph_builder), + file_page_paths=paths, + infra_paths=infra_paths, + repo_name=project_name, + max_stops=DEFAULT_MAX_STOPS, + ) + + overview = [s for s in base if s.kind == "overview"] + code = [s for s in base if s.kind == "code"] + infra = [s for s in base if s.kind == "infra"] + + # --- Diversify code stops for layer coverage ------------------------- + by_layer: dict[str, list[str]] = defaultdict(list) + for p in paths: + by_layer[file_layers[p]].append(p) + + code_paths = [s.target_path for s in code] + seen_layers: set[str] = set() + redundant_positions: list[int] = [] + for i, p in enumerate(code_paths): + layer = file_layers.get(p) + if layer in seen_layers: + redundant_positions.append(i) + else: + seen_layers.add(layer) + + uncovered = [name for name in order if name not in seen_layers] + for layer in uncovered: + if not redundant_positions: + break + candidates = [p for p in by_layer.get(layer, []) if p not in code_paths] + if not candidates: + continue + rep = _best_in_layer(candidates, rank, pagerank) + pos = redundant_positions.pop() + code_paths[pos] = rep + seen_layers.add(layer) + + # Order the walk top→bottom: by layer dependency rank, then path. + code_paths = sorted( + dict.fromkeys(code_paths), + key=lambda p: (layer_index.get(file_layers.get(p, ""), len(order)), p), + ) + + # --- Assemble the exported tour -------------------------------------- + tour: list[dict] = [] + order_n = 0 + + readme = _readme_overview_node(kg) + if overview: + order_n += 1 + ov = overview[0].as_dict() + ov["order"] = order_n + if readme is not None: + ov["target_path"] = readme["filePath"] + ov["title"] = PurePosixPath(readme["filePath"]).name + ov["layer_id"] = f"layer:{_slugify(file_layers[readme['filePath']])}" + else: + ov["layer_id"] = None + tour.append(ov) + + for p in code_paths: + order_n += 1 + layer = file_layers.get(p, "") + idx = layer_index.get(layer, len(order)) + if idx == 0: + reason = f"Top of the stack ({layer}) — start of the request/control flow." + elif idx >= len(order) - 1: + reason = f"Foundational layer ({layer}) — the others build on this." + else: + reason = f"The {layer} layer — sits mid-stack between consumers and foundations." + tour.append( + { + "order": order_n, + "target_path": p, + "page_type": "file_page", + "title": PurePosixPath(p).name, + "depth": idx, + "kind": "code", + "reason": reason, + "layer_id": f"layer:{_slugify(layer)}", + } + ) + + for s in infra: + order_n += 1 + step = s.as_dict() + step["order"] = order_n + step["layer_id"] = f"layer:{_slugify(file_layers.get(s.target_path, 'Config'))}" + tour.append(step) + + return tour diff --git a/packages/core/src/repowise/core/generation/knowledge_graph.py b/packages/core/src/repowise/core/generation/knowledge_graph.py index 2075b920..b84b4929 100644 --- a/packages/core/src/repowise/core/generation/knowledge_graph.py +++ b/packages/core/src/repowise/core/generation/knowledge_graph.py @@ -58,10 +58,19 @@ async def enrich_knowledge_graph( reasoning=reasoning, ) - tour = await _generate_tour( - enriched_layers, llm_client, graph_builder, repo_structure, kg_skeleton, - reasoning=reasoning, - ) + # When curation is enabled it has already written the canonical, + # layer-aware tour (deterministic, one per layer top→bottom). The LLM must + # not reselect or reorder it — keep the curated tour as-is (prose narration + # can be layered on separately). Otherwise fall back to LLM tour generation. + from repowise.core.analysis.kg_curation import curation_enabled + + if curation_enabled() and kg_skeleton.tour: + tour = kg_skeleton.tour + else: + tour = await _generate_tour( + enriched_layers, llm_client, graph_builder, repo_structure, kg_skeleton, + reasoning=reasoning, + ) if generated_pages: _backfill_summaries(kg_skeleton, generated_pages) From 411ae25d907191b21f7b68ae05d40e811a9bfd89 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 17:55:15 +0530 Subject: [PATCH 08/13] test(kg): layer-aware tour invariants --- tests/unit/analysis/test_kg_curation.py | 61 +++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/unit/analysis/test_kg_curation.py b/tests/unit/analysis/test_kg_curation.py index 250dd1dc..8f6f48e4 100644 --- a/tests/unit/analysis/test_kg_curation.py +++ b/tests/unit/analysis/test_kg_curation.py @@ -400,3 +400,64 @@ def test_deterministic(self, entry_repo): def test_flag_off_leaves_entry_points_untouched(self, entry_repo): kg = _curate(entry_repo, enabled=False) assert "entry_candidates" not in kg.project + + +# --------------------------------------------------------------------------- +# Phase 3 — canonical, layer-aware tour +# --------------------------------------------------------------------------- + + +@pytest.fixture +def readme_repo(): + """large_repo shape plus a real root README to anchor the tour.""" + paths = ["README.md", "src/api/route0.py", "src/api/route1.py"] + paths += [f"src/models/model{i}.py" for i in range(4)] + paths += [f"src/utils/util{i}.py" for i in range(3)] + paths += [f"packages/cli/src/cli/commands/cmd{i}.py" for i in range(3)] + return build_repo(paths) + + +def _layer_ids(kg) -> set[str]: + return {layer["id"] for layer in kg.layers} + + +class TestCuratedTour: + def test_within_step_budget(self, large_repo): + kg = _curate(large_repo, enabled=True) + assert 0 < len(kg.tour) <= 12 + + def test_opens_with_overview(self, large_repo): + kg = _curate(large_repo, enabled=True) + assert kg.tour[0]["kind"] == "overview" + assert kg.tour[0]["order"] == 1 + + def test_every_step_maps_to_a_curated_layer(self, large_repo): + kg = _curate(large_repo, enabled=True) + ids = _layer_ids(kg) + for step in kg.tour: + if step["kind"] == "overview": + continue # overview maps to a layer only when a README exists + assert step["layer_id"] in ids + + def test_covers_most_layers(self, large_repo): + kg = _curate(large_repo, enabled=True) + covered = {s["layer_id"] for s in kg.tour if s["kind"] != "overview"} + assert len(covered) / len(_layer_ids(kg)) >= 0.90 + + def test_orders_are_contiguous(self, large_repo): + kg = _curate(large_repo, enabled=True) + assert [s["order"] for s in kg.tour] == list(range(1, len(kg.tour) + 1)) + + def test_readme_is_first_stop(self, readme_repo): + kg = _curate(readme_repo, enabled=True) + assert kg.tour[0]["kind"] == "overview" + assert kg.tour[0]["target_path"] == "README.md" + + def test_deterministic(self, large_repo): + a = _curate(large_repo, enabled=True) + b = _curate(large_repo, enabled=True) + assert a.tour == b.tour + + def test_flag_off_leaves_tour_empty(self, large_repo): + kg = _curate(large_repo, enabled=False) + assert kg.tour == [] From da00d42cb489fef56368eb9f2b584383e7f7b9b7 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 18:00:38 +0530 Subject: [PATCH 09/13] feat(kg): type infra/CI/data nodes and add never-empty summary floor --- .../src/repowise/core/analysis/kg_curation.py | 148 +++++++++++++++++- .../core/generation/knowledge_graph.py | 25 ++- .../repowise/core/pipeline/orchestrator.py | 5 + 3 files changed, 171 insertions(+), 7 deletions(-) diff --git a/packages/core/src/repowise/core/analysis/kg_curation.py b/packages/core/src/repowise/core/analysis/kg_curation.py index fc5afdda..4cb3bd6d 100644 --- a/packages/core/src/repowise/core/analysis/kg_curation.py +++ b/packages/core/src/repowise/core/analysis/kg_curation.py @@ -36,7 +36,7 @@ score_entry_points, ) -__all__ = ["curate_knowledge_graph", "curation_enabled"] +__all__ = ["apply_summary_floor", "curate_knowledge_graph", "curation_enabled"] logger = logging.getLogger(__name__) @@ -85,6 +85,7 @@ def curate_knowledge_graph( repo_structure: Any, community_info: Any, enabled: bool = False, + defer_summary_floor: bool = False, ) -> KnowledgeGraphResult: """Reshape the KG skeleton into an intuitive presentation artifact. @@ -93,8 +94,12 @@ def curate_knowledge_graph( ``False`` this is a strict no-op returning ``kg`` unchanged (the default, so the exported KG is unaffected until the flag flips). - Each curation step is added in a later phase and guarded so that a failure - degrades to the prior (uncurated) field rather than aborting the export. + ``defer_summary_floor`` skips the never-empty summary floor here so it can + run *after* the wiki-page backfill in generate mode (where richer summaries + exist); FAST mode leaves it ``False`` so the floor still lands at this seam. + + Each curation step is guarded so that a failure degrades to the prior + (uncurated) field rather than aborting the export. """ if not enabled: return kg @@ -123,6 +128,17 @@ def curate_knowledge_graph( except Exception: # pragma: no cover - defensive; keep skeleton/LLM tour logger.exception("kg_curation._curate_tour failed; keeping existing tour") + try: + _curate_node_types(kg) + except Exception: # pragma: no cover - defensive; keep skeleton types + logger.exception("kg_curation._curate_node_types failed; keeping coarse types") + + if not defer_summary_floor: + try: + apply_summary_floor(kg, parsed_files) + except Exception: # pragma: no cover - defensive; leave summaries as-is + logger.exception("kg_curation summary floor failed; leaving summaries empty") + return kg @@ -478,3 +494,129 @@ def _curate_tour( tour.append(step) return tour + + +# --------------------------------------------------------------------------- +# Phase 4 — node typing & never-empty summaries +# --------------------------------------------------------------------------- + +# Path signals for richer node typing than the skeleton's coarse +# file/config/service/document. These run only in the presentation view; the +# AST graph node_type used elsewhere is untouched. +_CI_PATH_MARKERS = ( + ".github/workflows/", + ".gitlab-ci", + ".circleci/", + "azure-pipelines", + "jenkinsfile", + "bitbucket-pipelines", +) +_INFRA_NAME_MARKERS = ("dockerfile", "docker-compose", "compose.yaml", "compose.yml") +_INFRA_PATH_MARKERS = ("/k8s/", "/kubernetes/", "/helm/", "/terraform/") +_INFRA_SUFFIXES = (".tf", ".hcl") +_DATA_PATH_MARKERS = ("/migrations/", "/migration/") +_DATA_SUFFIXES = (".sql", ".prisma") + + +def _enrich_type(path: str, current_type: str) -> tuple[str, str | None]: + """Return a richer ``(type, extra_tag)`` for a file node, or keep current. + + The tag (``ci``/``infra``/``data``) is additive; ``None`` means no new tag. + """ + p = path.lower() + name = PurePosixPath(p).name + suffix = PurePosixPath(p).suffix + + if any(m in p for m in _CI_PATH_MARKERS) or name == "jenkinsfile": + return "pipeline", "ci" + if ( + name.startswith("dockerfile") + or any(m in name for m in _INFRA_NAME_MARKERS) + or any(m in p for m in _INFRA_PATH_MARKERS) + or suffix in _INFRA_SUFFIXES + ): + return "service", "infra" + if any(m in p for m in _DATA_PATH_MARKERS) or suffix in _DATA_SUFFIXES: + return "schema", "data" + return current_type, None + + +def _curate_node_types(kg: KnowledgeGraphResult) -> None: + """Promote infra/CI/data file nodes to first-class presentation types.""" + for node in _file_nodes(kg): + new_type, tag = _enrich_type(node["filePath"], node.get("type", "file")) + if new_type != node.get("type"): + node["type"] = new_type + if tag: + tags = node.setdefault("tags", []) + if tag not in tags: + tags.append(tag) + + +def _infer_test_target(path: str) -> str: + """Best-effort name of what a test file covers (strip test markers).""" + stem = PurePosixPath(path).stem + for marker in (".test", ".spec", "_test", "test_", "_spec", "spec_"): + if marker in stem.lower(): + cleaned = stem.lower().replace(marker, "") + return cleaned.strip("_.- ") or stem + return stem + + +def _cheap_summary(node: dict, parsed_file: Any | None) -> str: + """A deterministic, honest fallback summary (zero LLM cost).""" + path = node["filePath"] + stem = PurePosixPath(path).stem + parent = PurePosixPath(path).parent.name or "root" + node_type = node.get("type", "file") + tags = node.get("tags") or [] + layer = infer_layer(path) + + if "barrel" in tags: + return f"Re-export barrel for {parent}/." + if node_type == "pipeline" or "ci" in tags: + return f"CI / pipeline definition: {PurePosixPath(path).name}." + if node_type == "service" or "infra" in tags: + return f"Infrastructure definition: {PurePosixPath(path).name}." + if node_type == "schema" or "data" in tags: + return f"Data / schema definition: {PurePosixPath(path).name}." + if node_type == "config" or "config" in tags: + return f"Configuration file: {PurePosixPath(path).name}." + if node_type == "document": + return f"Documentation: {PurePosixPath(path).name}." + if "test" in tags: + return f"Tests for {_infer_test_target(path)}." + + # Code file: name the layer and its most prominent symbols. + symbol_names: list[str] = [] + if parsed_file is not None: + symbol_names = [ + getattr(s, "name", "") + for s in (getattr(parsed_file, "symbols", []) or []) + if getattr(s, "kind", "") in _SUBSTANTIVE_KINDS and getattr(s, "name", "") + ][:3] + if symbol_names: + return f"{layer} module {stem} defining {', '.join(symbol_names)}." + count = node.get("symbolCount", 0) + if count: + return f"{layer} module {stem} ({count} symbols)." + return f"{layer} module {stem}." + + +def apply_summary_floor(kg: KnowledgeGraphResult, parsed_files: list[Any] | None = None) -> None: + """Ensure every file node carries a summary (cheap deterministic floor). + + Idempotent and never clobbering: only fills nodes whose summary is still + empty, so a richer wiki-page summary (backfilled before this runs in + generate mode) always wins. ``parsed_files`` is optional — when absent the + fallback uses the node's symbol count instead of naming top symbols. + """ + pf_by_path = { + pf.file_info.path: pf + for pf in (parsed_files or []) + if getattr(pf, "file_info", None) + } + for node in _file_nodes(kg): + if node.get("summary"): + continue + node["summary"] = _cheap_summary(node, pf_by_path.get(node["filePath"])) diff --git a/packages/core/src/repowise/core/generation/knowledge_graph.py b/packages/core/src/repowise/core/generation/knowledge_graph.py index b84b4929..435f7db8 100644 --- a/packages/core/src/repowise/core/generation/knowledge_graph.py +++ b/packages/core/src/repowise/core/generation/knowledge_graph.py @@ -75,6 +75,18 @@ async def enrich_knowledge_graph( if generated_pages: _backfill_summaries(kg_skeleton, generated_pages) + # Deterministic summary floor, applied *after* the page backfill so rich + # page summaries always win and only never-paged files fall back. Gated by + # the curation flag (the seam already floored FAST-mode output; this covers + # the generate-mode path where the seam deferred to let backfill run first). + if curation_enabled(): + from repowise.core.analysis.kg_curation import apply_summary_floor + + try: + apply_summary_floor(kg_skeleton) + except Exception as exc: # pragma: no cover - defensive + logger.warning("kg_summary_floor_failed", error=str(exc)) + kg_skeleton.layers = enriched_layers kg_skeleton.tour = tour return kg_skeleton @@ -337,10 +349,15 @@ def _backfill_summaries(kg_result: Any, generated_pages: list[Any]) -> None: page_summaries[target] = summary for node in kg_result.nodes: - if node["type"] in ("file", "config", "service", "document"): - path = node.get("filePath", node["id"].removeprefix("file:")) - if path in page_summaries and not node.get("summary"): - node["summary"] = page_summaries[path] + # Any file-level node (any presentation type — file/config/service/ + # pipeline/schema/document). Only fill empties: a page summary is the + # richest source, and the deterministic curation floor is applied + # *after* this backfill so it never blocks a real page summary. + if not str(node.get("id", "")).startswith("file:"): + continue + path = node.get("filePath", node["id"].removeprefix("file:")) + if path in page_summaries and not node.get("summary"): + node["summary"] = page_summaries[path] # --------------------------------------------------------------------------- diff --git a/packages/core/src/repowise/core/pipeline/orchestrator.py b/packages/core/src/repowise/core/pipeline/orchestrator.py index 948a8658..841ef01d 100644 --- a/packages/core/src/repowise/core/pipeline/orchestrator.py +++ b/packages/core/src/repowise/core/pipeline/orchestrator.py @@ -538,6 +538,10 @@ async def _ingestion_stage() -> tuple: ) try: + # In generate mode the summary floor is deferred to run after + # the wiki-page backfill (in ``enrich_knowledge_graph``), so + # rich page summaries win; FAST mode floors here. + will_generate = generate_docs and llm_client is not None knowledge_graph_result = curate_knowledge_graph( knowledge_graph_result, parsed_files=parsed_files, @@ -545,6 +549,7 @@ async def _ingestion_stage() -> tuple: repo_structure=repo_structure, community_info=graph_builder.community_info(), enabled=curation_enabled(), + defer_summary_floor=will_generate, ) except (ValueError, KeyError, RuntimeError) as cur_err: logger.error("kg_curation_failed", error=str(cur_err), exc_info=True) From a2f1ad43bc0421b067b2954482e9ad67ef0330c3 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 18:00:38 +0530 Subject: [PATCH 10/13] test(kg): node typing + summary floor invariants --- tests/unit/analysis/test_kg_curation.py | 106 ++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/tests/unit/analysis/test_kg_curation.py b/tests/unit/analysis/test_kg_curation.py index 8f6f48e4..3eb53e06 100644 --- a/tests/unit/analysis/test_kg_curation.py +++ b/tests/unit/analysis/test_kg_curation.py @@ -461,3 +461,109 @@ def test_deterministic(self, large_repo): def test_flag_off_leaves_tour_empty(self, large_repo): kg = _curate(large_repo, enabled=False) assert kg.tour == [] + + +# --------------------------------------------------------------------------- +# Phase 4 — node typing & never-empty summaries +# --------------------------------------------------------------------------- + + +@pytest.fixture +def typed_repo(): + """A repo exercising infra/CI/data typing plus a barrel and a test.""" + barrel = "packages/p/index.ts" + paths = [ + ".github/workflows/ci.yml", + "Dockerfile", + "infra/main.tf", + "db/migrations/001_init.sql", + "config/app.yaml", + "README.md", + "src/api/route.py", + "tests/unit/test_route.py", + barrel, + ] + return build_repo( + paths, + tests={"tests/unit/test_route.py"}, + entries={barrel}, + barrels={barrel}, + ) + + +def _node_by_path(kg, path): + return next(n for n in kg.nodes if n.get("filePath") == path) + + +class TestNodeTyping: + def test_ci_workflow_is_pipeline(self, typed_repo): + kg = _curate(typed_repo, enabled=True) + n = _node_by_path(kg, ".github/workflows/ci.yml") + assert n["type"] == "pipeline" + assert "ci" in n["tags"] + + def test_dockerfile_and_terraform_are_infra(self, typed_repo): + kg = _curate(typed_repo, enabled=True) + for p in ("Dockerfile", "infra/main.tf"): + n = _node_by_path(kg, p) + assert n["type"] == "service" + assert "infra" in n["tags"] + + def test_migration_sql_is_schema(self, typed_repo): + kg = _curate(typed_repo, enabled=True) + n = _node_by_path(kg, "db/migrations/001_init.sql") + assert n["type"] == "schema" + assert "data" in n["tags"] + + +class TestSummaryFloor: + def test_no_empty_file_summary(self, typed_repo, large_repo): + for repo in (typed_repo, large_repo): + kg = _curate(repo, enabled=True) + for n in kg.nodes: + if n["id"].startswith("file:"): + assert n["summary"], f"empty summary for {n['filePath']}" + + def test_barrel_summary_is_honest(self, typed_repo): + kg = _curate(typed_repo, enabled=True) + n = _node_by_path(kg, "packages/p/index.ts") + assert "barrel" in n["summary"].lower() + + def test_test_summary_names_target(self, typed_repo): + kg = _curate(typed_repo, enabled=True) + n = _node_by_path(kg, "tests/unit/test_route.py") + assert n["summary"].lower().startswith("tests for") + + def test_flag_off_leaves_summaries_empty(self, typed_repo): + kg = _curate(typed_repo, enabled=False) + assert all(n["summary"] == "" for n in kg.nodes if n["id"].startswith("file:")) + + def test_deterministic(self, typed_repo): + a = _curate(typed_repo, enabled=True) + b = _curate(typed_repo, enabled=True) + assert [n.get("summary") for n in a.nodes] == [n.get("summary") for n in b.nodes] + + +class TestSummaryFloorDeferral: + def test_defer_leaves_summaries_for_later(self, typed_repo): + # Generate mode defers the floor so page backfill can win first. + kg = curate_knowledge_graph( + _build_skeleton(typed_repo), + parsed_files=typed_repo.parsed, + graph_builder=typed_repo.builder, + repo_structure=typed_repo.repo_structure, + community_info=typed_repo.builder.community_info(), + enabled=True, + defer_summary_floor=True, + ) + assert any(n["summary"] == "" for n in kg.nodes if n["id"].startswith("file:")) + + def test_apply_floor_fills_only_empties(self, typed_repo): + from repowise.core.analysis.kg_curation import apply_summary_floor + + kg = _build_skeleton(typed_repo) + # Simulate a rich page summary already backfilled onto one node. + _node_by_path(kg, "src/api/route.py")["summary"] = "Rich page summary." + apply_summary_floor(kg, typed_repo.parsed) + assert _node_by_path(kg, "src/api/route.py")["summary"] == "Rich page summary." + assert all(n["summary"] for n in kg.nodes if n["id"].startswith("file:")) From ae12c41004b058d479e1669e5951d7db019b0d42 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 18:04:10 +0530 Subject: [PATCH 11/13] feat(kg): group C4 externals by category; lock curated-layer inheritance --- .../server/services/c4_builder/mermaid.py | 52 ++++++++-- .../unit/server/services/test_c4_curation.py | 99 +++++++++++++++++++ 2 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 tests/unit/server/services/test_c4_curation.py diff --git a/packages/server/src/repowise/server/services/c4_builder/mermaid.py b/packages/server/src/repowise/server/services/c4_builder/mermaid.py index 4d4ca4b8..49b34dad 100644 --- a/packages/server/src/repowise/server/services/c4_builder/mermaid.py +++ b/packages/server/src/repowise/server/services/c4_builder/mermaid.py @@ -9,12 +9,24 @@ from __future__ import annotations import re +from collections import defaultdict from .models import C4L1, C4L2, C4L3, Container, ExternalSystemView - _SAFE = re.compile(r"[^a-zA-Z0-9_]") +# Beyond this many external systems the L1/L2 diagram groups them into labelled +# category boundaries ("Frameworks", "Services & Infrastructure", …) instead of +# rendering N loose boxes, so the context view stays legible (plan §Phase 5). +_EXTERNAL_GROUP_THRESHOLD = 8 +_CATEGORY_TITLES: dict[str, str] = { + "framework": "Frameworks", + "service": "Services & Infrastructure", + "tool": "Tools", + "library": "Libraries", +} +_CATEGORY_ORDER = ("framework", "service", "tool", "library") + def _sid(node_id: str) -> str: """Mermaid identifiers must be alnum/underscore.""" @@ -46,13 +58,7 @@ def to_mermaid_l1(view: C4L1) -> str: f'"{_q(view.system.description or "System under analysis")}")' ) - for ext in view.external_systems: - kind = _ext_kind(ext.category) - version = f" {ext.version}" if ext.version else "" - lines.append( - f' {kind}({_sid(ext.id)}, "{_q(ext.display_name)}", ' - f'"{_q(ext.ecosystem + version)}")' - ) + lines.extend(_emit_externals(view.external_systems)) if view.relations: lines.append("") @@ -69,8 +75,7 @@ def to_mermaid_l2(view: C4L2, system_name: str) -> str: lines.append(_container_line(c, indent=" ")) lines.append(" }") - for ext in view.external_systems: - lines.append(_external_line(ext)) + lines.extend(_emit_externals(view.external_systems)) if view.relations: lines.append("") @@ -113,6 +118,33 @@ def _container_line(c: Container, indent: str = " ") -> str: ) +def _emit_externals(externals: list[ExternalSystemView]) -> list[str]: + """Render external systems, grouping by category once there are many. + + Below the threshold they stay as flat boxes (today's behaviour). Above it, + each non-empty category is wrapped in a labelled ``Boundary`` so the diagram + reads as a handful of buckets rather than a wall of dependency boxes. + """ + if len(externals) <= _EXTERNAL_GROUP_THRESHOLD: + return [_external_line(ext) for ext in externals] + + by_cat: dict[str, list[ExternalSystemView]] = defaultdict(list) + for ext in externals: + by_cat[ext.category].append(ext) + + ordered = [c for c in _CATEGORY_ORDER if c in by_cat] + ordered += sorted(c for c in by_cat if c not in _CATEGORY_ORDER) + + lines: list[str] = [] + for cat in ordered: + title = _CATEGORY_TITLES.get(cat, f"{cat.title()}s") + lines.append(f' Boundary(extgrp_{_sid(cat)}, "{_q(title)}") {{') + for ext in sorted(by_cat[cat], key=lambda e: e.name): + lines.append(" " + _external_line(ext)) + lines.append(" }") + return lines + + def _external_line(ext: ExternalSystemView) -> str: kind = _ext_kind(ext.category) version = f" {ext.version}" if ext.version else "" diff --git a/tests/unit/server/services/test_c4_curation.py b/tests/unit/server/services/test_c4_curation.py new file mode 100644 index 00000000..843280b6 --- /dev/null +++ b/tests/unit/server/services/test_c4_curation.py @@ -0,0 +1,99 @@ +"""C4 legibility: curated KG layers feed the architecture view, and the L1/L2 +Mermaid groups externals by category once there are many (plan §Phase 5).""" + +from __future__ import annotations + +from repowise.server.services.c4_builder.architecture import ( + _layers_from_knowledge_graph, +) +from repowise.server.services.c4_builder.mermaid import to_mermaid_l1 +from repowise.server.services.c4_builder.models import ( + C4L1, + ExternalSystemView, + Person, + Relation, + System, +) + +# --------------------------------------------------------------------------- +# Curated layers flow through the architecture cascade (tier 2: KG file) +# --------------------------------------------------------------------------- + + +def test_architecture_view_consumes_curated_layers(): + kg = { + "layers": [ + { + "id": "layer:ui", + "name": "UI", + "description": "front end", + "nodeIds": ["file:src/ui/a.tsx", "file:src/ui/b.tsx"], + }, + { + "id": "layer:service", + "name": "Service", + "description": "core", + "nodeIds": ["file:src/core/x.py"], + }, + ] + } + node_ids = {"src/ui/a.tsx", "src/ui/b.tsx", "src/core/x.py"} + layers = _layers_from_knowledge_graph(kg, node_ids) + + # Curated names/ids/order preserved — not community-N / cluster-N. + assert [layer["name"] for layer in layers] == ["UI", "Service"] + assert [layer["id"] for layer in layers] == ["layer:ui", "layer:service"] + assert layers[0]["node_ids"] == ["src/ui/a.tsx", "src/ui/b.tsx"] + + +def _ext(name: str, category: str) -> ExternalSystemView: + return ExternalSystemView( + id=f"ext:{name}", + name=name, + display_name=name, + category=category, + ecosystem="pypi", + version="", + ) + + +def _l1(externals: list[ExternalSystemView]) -> C4L1: + system = System(id="sys:r", name="r") + return C4L1( + system=system, + people=[Person(id="person:user", name="User", description="")], + external_systems=externals, + relations=[ + Relation(source_id=system.id, target_id=e.id, label=e.category) for e in externals + ], + ) + + +# --------------------------------------------------------------------------- +# Mermaid external grouping +# --------------------------------------------------------------------------- + + +def test_few_externals_stay_flat(): + externals = [_ext(f"lib{i}", "library") for i in range(4)] + out = to_mermaid_l1(_l1(externals)) + assert "Boundary(extgrp_" not in out + + +def test_many_externals_group_by_category(): + externals = ( + [_ext(f"fw{i}", "framework") for i in range(4)] + + [_ext(f"svc{i}", "service") for i in range(3)] + + [_ext(f"lib{i}", "library") for i in range(5)] + ) + out = to_mermaid_l1(_l1(externals)) + assert "Boundary(extgrp_framework" in out + assert "Boundary(extgrp_service" in out + assert "Boundary(extgrp_library" in out + assert '"Frameworks"' in out + assert '"Services & Infrastructure"' in out + # Frameworks group is rendered before Libraries (category priority order). + assert out.index("extgrp_framework") < out.index("extgrp_library") + # Every external still appears as a box. + for i in range(5): + assert f"ext_lib{i}" in out From d6f10697f1e5e5a6f3d5fc61614a09993fe275d9 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 18:11:14 +0530 Subject: [PATCH 12/13] feat(kg): add validate_kg invariant checker + portable self-validated export --- .../cli/src/repowise/cli/state_persistence.py | 22 ++- .../src/repowise/core/analysis/kg_curation.py | 176 +++++++++++++++++- 2 files changed, 192 insertions(+), 6 deletions(-) diff --git a/packages/cli/src/repowise/cli/state_persistence.py b/packages/cli/src/repowise/cli/state_persistence.py index 0c8d6fe6..3b487311 100644 --- a/packages/cli/src/repowise/cli/state_persistence.py +++ b/packages/cli/src/repowise/cli/state_persistence.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging from pathlib import Path from typing import Any @@ -22,16 +23,33 @@ def build_kg_state(kg: Any) -> dict[str, Any]: } -def save_knowledge_graph_json(repo_path: Path, kg: Any) -> None: +def save_knowledge_graph_json(repo_path: Path, kg: Any, *, portable: bool = False) -> None: """Write ``.repowise/knowledge-graph.json`` for a KG result. No-op when the result can't serialize itself (``to_dict`` missing), so callers only need to guard against a ``None`` knowledge graph. + + When ``portable`` is set, write the self-contained, self-validated artifact + (curated layers + tour + entry points + summaries + a ``meta``/``validation`` + block) instead of the bare ``to_dict`` output. Hard invariant violations are + logged but the artifact is still emitted, with the failures recorded under + ``meta.validation`` so a consumer can see them ("repaired, not rejected"). """ if not hasattr(kg, "to_dict"): return import json + if portable: + from repowise.core.analysis.kg_curation import build_portable_kg + + data, validation = build_portable_kg(kg) + if not validation.ok: + logging.getLogger(__name__).warning( + "portable KG failed invariants: %s", "; ".join(validation.errors) + ) + else: + data = kg.to_dict() + kg_json_path = repo_path / ".repowise" / "knowledge-graph.json" kg_json_path.parent.mkdir(parents=True, exist_ok=True) - kg_json_path.write_text(json.dumps(kg.to_dict(), indent=2), encoding="utf-8") + kg_json_path.write_text(json.dumps(data, indent=2), encoding="utf-8") diff --git a/packages/core/src/repowise/core/analysis/kg_curation.py b/packages/core/src/repowise/core/analysis/kg_curation.py index 4cb3bd6d..a0ea56d6 100644 --- a/packages/core/src/repowise/core/analysis/kg_curation.py +++ b/packages/core/src/repowise/core/analysis/kg_curation.py @@ -25,6 +25,7 @@ import logging import os from collections import defaultdict +from dataclasses import dataclass, field from pathlib import PurePosixPath from typing import Any @@ -36,7 +37,14 @@ score_entry_points, ) -__all__ = ["apply_summary_floor", "curate_knowledge_graph", "curation_enabled"] +__all__ = [ + "KGValidation", + "apply_summary_floor", + "build_portable_kg", + "curate_knowledge_graph", + "curation_enabled", + "validate_kg", +] logger = logging.getLogger(__name__) @@ -612,11 +620,171 @@ def apply_summary_floor(kg: KnowledgeGraphResult, parsed_files: list[Any] | None fallback uses the node's symbol count instead of naming top symbols. """ pf_by_path = { - pf.file_info.path: pf - for pf in (parsed_files or []) - if getattr(pf, "file_info", None) + pf.file_info.path: pf for pf in (parsed_files or []) if getattr(pf, "file_info", None) } for node in _file_nodes(kg): if node.get("summary"): continue node["summary"] = _cheap_summary(node, pf_by_path.get(node["filePath"])) + + +# --------------------------------------------------------------------------- +# Phase 7 — invariant validation (shared by tests and the portable writer) +# --------------------------------------------------------------------------- + +# Quality thresholds. The lower layer bound and coverage targets are *soft* +# (warnings) because they depend on repo size/shape; the partition, hard count +# bound, capped entry set, never-empty summaries, and tour budget are *hard*. +_MIN_LAYERS = 6 +_MAX_LAYER_FRACTION = 0.35 +_MAX_CATCHALL_FRACTION = 0.20 +_MAX_SINGLETON_FRACTION = 0.10 +_MIN_TOUR_COVERAGE = 0.90 + + +@dataclass +class KGValidation: + """Outcome of :func:`validate_kg` — hard errors, soft warnings, metrics.""" + + ok: bool + errors: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + metrics: dict[str, Any] = field(default_factory=dict) + + def as_dict(self) -> dict[str, Any]: + return { + "ok": self.ok, + "errors": self.errors, + "warnings": self.warnings, + "metrics": self.metrics, + } + + +def validate_kg(kg: KnowledgeGraphResult) -> KGValidation: + """Validate a curated KG against the intuitiveness invariants (plan §5/§7). + + Pure and side-effect free. Hard violations set ``ok=False`` and populate + ``errors``; size/shape-dependent shortfalls go to ``warnings``. The + ``metrics`` block is the per-repo intuitiveness scorecard. + """ + errors: list[str] = [] + warnings: list[str] = [] + + file_nodes = _file_nodes(kg) + file_count = len(file_nodes) + file_ids = {n["id"] for n in file_nodes} + tags_by_path = {n["filePath"]: (n.get("tags") or []) for n in file_nodes} + summary_by_id = {n["id"]: n.get("summary") for n in file_nodes} + + layers = kg.layers or [] + n_layers = len(layers) + + # -- Layer count ------------------------------------------------------- + if n_layers == 0: + errors.append("no layers") + elif n_layers > _MAX_LAYERS: + errors.append(f"too many layers: {n_layers} > {_MAX_LAYERS}") + elif n_layers < _MIN_LAYERS: + warnings.append(f"few layers: {n_layers} < {_MIN_LAYERS} (small/flat repo?)") + + # -- Partition --------------------------------------------------------- + layered: list[str] = [nid for layer in layers for nid in layer.get("nodeIds", [])] + layered_set = set(layered) + if len(layered) != len(layered_set): + errors.append("partition: a file appears in more than one layer") + if file_count and layered_set != file_ids: + missing = len(file_ids - layered_set) + extra = len(layered_set - file_ids) + errors.append(f"partition: {missing} unlayered, {extra} unknown ids") + + # -- Singleton spam & mega-layer balance ------------------------------- + sizes = [len(layer.get("nodeIds", [])) for layer in layers] + singleton_frac = (sum(1 for s in sizes if s == 1) / n_layers) if n_layers else 0.0 + if singleton_frac >= _MAX_SINGLETON_FRACTION: + warnings.append(f"singleton layers {singleton_frac:.0%} ≥ {_MAX_SINGLETON_FRACTION:.0%}") + + largest_frac = (max(sizes) / file_count) if (sizes and file_count) else 0.0 + if largest_frac > _MAX_LAYER_FRACTION: + warnings.append(f"largest layer {largest_frac:.0%} > {_MAX_LAYER_FRACTION:.0%}") + + catchall = next((layer for layer in layers if layer.get("name") == "Application"), None) + catchall_frac = ( + (len(catchall.get("nodeIds", [])) / file_count) if (catchall and file_count) else 0.0 + ) + if catchall_frac > _MAX_CATCHALL_FRACTION: + warnings.append(f"Application catch-all {catchall_frac:.0%} > {_MAX_CATCHALL_FRACTION:.0%}") + + # -- Entry points ------------------------------------------------------ + entry_points = kg.project.get("entry_points", []) if isinstance(kg.project, dict) else [] + if len(entry_points) > _MAX_ENTRY_POINTS: + errors.append(f"too many entry points: {len(entry_points)} > {_MAX_ENTRY_POINTS}") + barrels_surfaced = [p for p in entry_points if "barrel" in tags_by_path.get(p, [])] + if barrels_surfaced: + errors.append(f"barrels surfaced as entry points: {barrels_surfaced}") + + # -- Tour -------------------------------------------------------------- + tour = kg.tour or [] + tour_coverage = 0.0 + if tour: + if len(tour) > DEFAULT_MAX_STOPS: + errors.append(f"tour too long: {len(tour)} > {DEFAULT_MAX_STOPS}") + if tour[0].get("kind") != "overview": + errors.append("tour does not open with an overview/README step") + layer_ids = {layer.get("id") for layer in layers} + covered = { + s.get("layer_id") + for s in tour + if s.get("kind") != "overview" and s.get("layer_id") in layer_ids + } + tour_coverage = (len(covered) / len(layer_ids)) if layer_ids else 0.0 + if tour_coverage < _MIN_TOUR_COVERAGE: + warnings.append(f"tour covers {tour_coverage:.0%} of layers < {_MIN_TOUR_COVERAGE:.0%}") + + # -- Summaries --------------------------------------------------------- + empty_summaries = [nid for nid, s in summary_by_id.items() if not s] + if empty_summaries: + errors.append(f"{len(empty_summaries)} file nodes have an empty summary") + summary_completeness = 1.0 - len(empty_summaries) / file_count if file_count else 1.0 + + metrics = { + "file_count": file_count, + "layer_count": n_layers, + "singleton_layer_pct": round(singleton_frac * 100, 1), + "largest_layer_pct": round(largest_frac * 100, 1), + "application_pct": round(catchall_frac * 100, 1), + "entry_point_count": len(entry_points), + "tour_steps": len(tour), + "tour_coverage_pct": round(tour_coverage * 100, 1), + "summary_completeness_pct": round(summary_completeness * 100, 1), + } + + return KGValidation(ok=not errors, errors=errors, warnings=warnings, metrics=metrics) + + +# --------------------------------------------------------------------------- +# Phase 6 — portable, self-validated export artifact +# --------------------------------------------------------------------------- + + +def build_portable_kg(kg: KnowledgeGraphResult) -> tuple[dict, KGValidation]: + """Assemble a self-contained, self-validated ``knowledge-graph.json`` dict. + + Kept separate from :meth:`KnowledgeGraphResult.to_dict` so the *default* + export stays byte-identical (curation flag-off contract); the portable + artifact adds a ``meta`` block (counts, fingerprint) and an embedded + ``validation`` report so an external consumer can trust it without a server. + Returns ``(data, validation)`` so the writer can decide on hard violations. + """ + data = kg.to_dict() + validation = validate_kg(kg) + data["meta"] = { + "schema_version": data.get("version", "1.0.0"), + "generator": "repowise-kg-curation", + "fingerprint": getattr(kg, "fingerprint", ""), + "file_count": validation.metrics.get("file_count", 0), + "layer_count": validation.metrics.get("layer_count", 0), + "entry_point_count": validation.metrics.get("entry_point_count", 0), + "tour_steps": validation.metrics.get("tour_steps", 0), + "validation": validation.as_dict(), + } + return data, validation From 8d5acf3ef6fe192abc1d6961404bedc4a6ba65b0 Mon Sep 17 00:00:00 2001 From: Swati Ahuja Date: Wed, 3 Jun 2026 18:11:14 +0530 Subject: [PATCH 13/13] test(kg): cross-repo invariants, portable artifact, many-isolates regression --- tests/unit/analysis/kg_fixtures.py | 137 ++++++++++++++++++++ tests/unit/analysis/test_kg_invariants.py | 147 ++++++++++++++++++++++ 2 files changed, 284 insertions(+) create mode 100644 tests/unit/analysis/kg_fixtures.py create mode 100644 tests/unit/analysis/test_kg_invariants.py diff --git a/tests/unit/analysis/kg_fixtures.py b/tests/unit/analysis/kg_fixtures.py new file mode 100644 index 00000000..499930ef --- /dev/null +++ b/tests/unit/analysis/kg_fixtures.py @@ -0,0 +1,137 @@ +"""Shared synthetic-repo builders for KG curation/invariant tests. + +Not a test module (no ``test_`` prefix → not collected). Builds parsed files + +a mock ``GraphBuilder`` and runs the real skeleton + curation, so invariant +tests exercise the production code paths. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from types import SimpleNamespace +from unittest.mock import MagicMock + +from repowise.core.analysis.kg_curation import curate_knowledge_graph +from repowise.core.analysis.knowledge_graph import ( + KnowledgeGraphResult, + build_knowledge_graph_skeleton, +) + + +@dataclass +class FakeFileInfo: + path: str + language: str = "python" + size_bytes: int = 1000 + is_test: bool = False + is_config: bool = False + is_api_contract: bool = False + is_entry_point: bool = False + line_count: int = 100 + + +@dataclass +class FakeSymbol: + name: str = "thing" + kind: str = "function" + start_line: int = 1 + end_line: int = 10 + + +@dataclass +class FakeParsedFile: + file_info: FakeFileInfo + symbols: list = field(default_factory=list) + imports: list = field(default_factory=list) + exports: list = field(default_factory=list) + + +def _community_info(cid: int, label: str, members: list[str]): + return SimpleNamespace( + community_id=cid, + label=label, + members=members, + size=len(members), + cohesion=0.8, + dominant_language="python", + ) + + +def build_repo( + paths: list[str], + *, + tests: set[str] | None = None, + entries: set[str] | None = None, + edges: list[tuple[str, str]] | None = None, + barrels: set[str] | None = None, +): + """Build a synthetic repo: parsed files + a mock GraphBuilder.""" + import networkx as nx + + tests = tests or set() + entries = entries or set() + barrels = barrels or set() + + parsed = [] + g = nx.DiGraph() + for p in paths: + is_test, is_entry = p in tests, p in entries + if p in barrels: + pf = FakeParsedFile( + FakeFileInfo(p, is_test=is_test, is_entry_point=is_entry), + symbols=[], + imports=[SimpleNamespace(is_reexport=True)], + exports=["A"], + ) + else: + pf = FakeParsedFile( + FakeFileInfo(p, is_test=is_test, is_entry_point=is_entry), + symbols=[FakeSymbol()], + ) + parsed.append(pf) + attrs = {"node_type": "file", "language": "python"} + if is_test: + attrs["is_test"] = True + if is_entry: + attrs["is_entry_point"] = True + g.add_node(p, **attrs) + for u, v in edges or []: + g.add_edge(u, v, edge_type="imports", confidence=1.0) + + # One community per file → the "103 layers" pathology curation must absorb. + communities = {p: i for i, p in enumerate(paths)} + infos = {i: _community_info(i, f"cluster_{i}", [p]) for i, p in enumerate(paths)} + pagerank = {p: 1.0 / max(len(paths), 1) for p in paths} + + builder = MagicMock() + builder.graph.return_value = g + builder.pagerank.return_value = pagerank + builder.betweenness_centrality.return_value = {} + builder.community_detection.return_value = communities + builder.community_info.return_value = infos + repo_structure = SimpleNamespace( + is_monorepo=True, total_files=len(paths), entry_points=sorted(entries) + ) + return SimpleNamespace(parsed=parsed, builder=builder, repo_structure=repo_structure) + + +def build_skeleton(repo) -> KnowledgeGraphResult: + return build_knowledge_graph_skeleton( + parsed_files=repo.parsed, + graph_builder=repo.builder, + repo_structure=repo.repo_structure, + tech_stack=[], + external_systems=[], + ) + + +def curate(repo, **kw) -> KnowledgeGraphResult: + return curate_knowledge_graph( + build_skeleton(repo), + parsed_files=repo.parsed, + graph_builder=repo.builder, + repo_structure=repo.repo_structure, + community_info=repo.builder.community_info(), + enabled=kw.pop("enabled", True), + **kw, + ) diff --git a/tests/unit/analysis/test_kg_invariants.py b/tests/unit/analysis/test_kg_invariants.py new file mode 100644 index 00000000..3f33d541 --- /dev/null +++ b/tests/unit/analysis/test_kg_invariants.py @@ -0,0 +1,147 @@ +"""Phase 7 — KG intuitiveness invariants locked across structurally different +repos (many-isolates regression, flat single-package, deep monorepo) plus the +portable artifact and the AST-untouched guard.""" + +from __future__ import annotations + +import pytest + +from repowise.core.analysis.kg_curation import build_portable_kg, validate_kg + +from .kg_fixtures import build_repo, build_skeleton, curate + +# --------------------------------------------------------------------------- +# Structurally different repos +# --------------------------------------------------------------------------- + + +@pytest.fixture +def many_isolates_repo(): + """Many weakly-connected files — the historical 103-layers / 73-singletons + pathology. Curated layers must collapse to a bounded named set.""" + paths: list[str] = [] + for layer_dir in ("api", "services", "models", "ui", "utils", "config"): + paths += [f"pkg{layer_dir}/{layer_dir}/f{i}.py" for i in range(14)] + tests = {f"tests/test_{i}.py" for i in range(14)} + paths += sorted(tests) + return build_repo(paths, tests=tests) # no edges → every file an isolate + + +@pytest.fixture +def flat_repo(): + """A single flat package — few layers, but must stay partitioned/valid.""" + return build_repo([f"src/mod{i}.py" for i in range(40)]) + + +@pytest.fixture +def deep_monorepo(): + """A realistically layered monorepo with two mega-layers.""" + paths: list[str] = [] + for sub in ("ingestion", "analysis", "generation"): + paths += [f"packages/core/src/repowise/core/{sub}/m{i}.py" for i in range(24)] + for sub in ("buttons", "forms", "layout"): + paths += [f"packages/ui/src/components/{sub}/c{i}.tsx" for i in range(24)] + paths += [f"packages/cli/src/cli/commands/cmd{i}.py" for i in range(20)] + paths += [f"src/api/r{i}.py" for i in range(12)] + paths += [f"src/models/m{i}.py" for i in range(10)] + paths += [f"src/utils/u{i}.py" for i in range(8)] + paths += [f"config/c{i}.yaml" for i in range(6)] + tests = {f"tests/unit/test_{i}.py" for i in range(30)} + paths += sorted(tests) + return build_repo(paths, tests=tests) + + +ALL_REPOS = ["many_isolates_repo", "flat_repo", "deep_monorepo"] + + +@pytest.mark.parametrize("repo_fixture", ALL_REPOS) +class TestInvariantsAcrossRepos: + def test_layer_count_never_explodes(self, repo_fixture, request): + kg = curate(request.getfixturevalue(repo_fixture)) + assert len(kg.layers) <= 15 # the 103→bounded guarantee + + def test_partition_holds(self, repo_fixture, request): + kg = curate(request.getfixturevalue(repo_fixture)) + v = validate_kg(kg) + assert "partition" not in " ".join(v.errors) + seen: set[str] = set() + for layer in kg.layers: + for nid in layer["nodeIds"]: + assert nid not in seen + seen.add(nid) + file_count = sum(1 for n in kg.nodes if n["id"].startswith("file:")) + assert len(seen) == file_count + + def test_no_empty_summaries(self, repo_fixture, request): + kg = curate(request.getfixturevalue(repo_fixture)) + assert all(n["summary"] for n in kg.nodes if n["id"].startswith("file:")) + + def test_entry_points_capped(self, repo_fixture, request): + kg = curate(request.getfixturevalue(repo_fixture)) + assert len(kg.project.get("entry_points", [])) <= 8 + + def test_tour_within_budget_and_opens_overview(self, repo_fixture, request): + kg = curate(request.getfixturevalue(repo_fixture)) + assert len(kg.tour) <= 12 + if kg.tour: + assert kg.tour[0]["kind"] == "overview" + + def test_no_hard_validation_errors(self, repo_fixture, request): + kg = curate(request.getfixturevalue(repo_fixture)) + v = validate_kg(kg) + assert v.ok, v.errors + + def test_deterministic(self, repo_fixture, request): + a = curate(request.getfixturevalue(repo_fixture)) + b = curate(request.getfixturevalue(repo_fixture)) + assert a.layers == b.layers + assert a.tour == b.tour + assert a.project.get("entry_points") == b.project.get("entry_points") + + def test_ast_graph_untouched(self, repo_fixture, request): + repo = request.getfixturevalue(repo_fixture) + g = repo.builder.graph() + before = (g.number_of_nodes(), g.number_of_edges()) + curate(repo) + g = repo.builder.graph() + assert (g.number_of_nodes(), g.number_of_edges()) == before + + +class TestManyIsolatesRegression: + def test_does_not_produce_one_layer_per_file(self, many_isolates_repo): + # Skeleton (community) layers = one per file (the pathology). + skel = build_skeleton(many_isolates_repo) + file_count = sum(1 for n in skel.nodes if n["id"].startswith("file:")) + assert len(skel.layers) == file_count + # Curated layers collapse to a bounded named set. + kg = curate(many_isolates_repo) + assert len(kg.layers) <= 15 + assert len(kg.layers) < file_count + + +# --------------------------------------------------------------------------- +# Portable artifact (Phase 6) +# --------------------------------------------------------------------------- + + +class TestPortableArtifact: + def test_self_contained_and_validated(self, deep_monorepo): + kg = curate(deep_monorepo) + data, validation = build_portable_kg(kg) + for key in ("version", "project", "nodes", "edges", "layers", "tour", "meta"): + assert key in data + assert data["meta"]["validation"]["ok"] is validation.ok + assert data["meta"]["layer_count"] == len(kg.layers) + assert validation.ok, validation.errors + + def test_default_to_dict_has_no_meta(self, deep_monorepo): + # The bare export stays byte-identical-shaped (no meta leakage). + kg = curate(deep_monorepo) + assert "meta" not in kg.to_dict() + + def test_metrics_block_populated(self, deep_monorepo): + kg = curate(deep_monorepo) + m = validate_kg(kg).metrics + assert m["layer_count"] >= 6 + assert m["summary_completeness_pct"] == 100.0 + assert 0 <= m["largest_layer_pct"] <= 35