diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba277fa..dbfb01a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,8 +1,8 @@ # InQL CI — Incan library package # -# Uses the reusable Incan composite action from the incan repository. -# This eliminates copy-paste drift and provides a supported integration path. -# The composite action caches built binaries for faster subsequent runs. +# Builds the Incan compiler from source in CI, then runs the InQL package +# checks against that local binary. Keeping this workflow self-contained avoids +# a hard dependency on a remote composite action path staying in sync. name: CI @@ -35,12 +35,27 @@ jobs: - name: Check out InQL uses: actions/checkout@v4 - - name: Install Incan (cached) - uses: dannys-code-corner/incan/.github/actions/install-incan@main + - name: Check out Incan + uses: actions/checkout@v4 + with: + repository: dannys-code-corner/incan + ref: main + path: incan + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache Incan build artifacts + uses: Swatinem/rust-cache@v2 with: - incan-ref: main - incan-repo: dannys-code-corner/incan - runner-os: ${{ matrix.os }} + workspaces: incan -> target + + - name: Build Incan compiler + working-directory: incan + run: cargo build --locked --bin incan + + - name: Expose local Incan binary on PATH + run: echo "$GITHUB_WORKSPACE/incan/target/debug" >> "$GITHUB_PATH" - name: Show toolchain run: | diff --git a/.github/workflows/issue_auto_label.yml b/.github/workflows/issue_auto_label.yml index ef19084..a93845b 100644 --- a/.github/workflows/issue_auto_label.yml +++ b/.github/workflows/issue_auto_label.yml @@ -28,9 +28,9 @@ jobs: - name: Mint GitHub App installation token id: app_token env: - APP_ID: ${{ secrets.INQL_TRIAGE_APP_ID }} - INSTALLATION_ID: ${{ secrets.INQL_TRIAGE_APP_INSTALLATION_ID }} - PRIVATE_KEY: ${{ secrets.INQL_TRIAGE_APP_PRIVATE_KEY }} + APP_ID: ${{ secrets.TRIAGE_APP_ID }} + INSTALLATION_ID: ${{ secrets.TRIAGE_APP_INSTALLATION_ID }} + PRIVATE_KEY: ${{ secrets.TRIAGE_APP_PRIVATE_KEY }} run: | set -euo pipefail diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e291ebe..fbe0f9f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -85,15 +85,15 @@ Templates apply **type** labels (`bug`, `feature`, `chore`, `documentation`, `RF ### Triage GitHub App (CI) -The auto-label workflow uses a **GitHub App** (same pattern as the Incan repository). Add these **repository secrets** on `dannys-code-corner/InQL`: +The auto-label workflow uses a shared **organization-level GitHub App** installation for `dannys-code-corner`. Configure these **organization Actions secrets** and grant access to this repository: | Secret | Purpose | | ------ | ------- | -| `INQL_TRIAGE_APP_ID` | App ID | -| `INQL_TRIAGE_APP_INSTALLATION_ID` | Installation ID for **this** repo (differs per installation) | -| `INQL_TRIAGE_APP_PRIVATE_KEY` | App private key (PEM) | +| `TRIAGE_APP_ID` | App ID | +| `TRIAGE_APP_INSTALLATION_ID` | Installation ID for the **organization-level** app installation | +| `TRIAGE_APP_PRIVATE_KEY` | App private key (PEM) | -Install the app on the InQL repository (you can reuse the same app as Incan with a second installation). Without these secrets the workflow fails at the token step. +Install the app on the `dannys-code-corner` organization and grant it access to `InQL` (and any future repositories that should share triage automation). Without these secrets the workflow fails at the token step. ## Pull request guidelines diff --git a/docs/architecture.md b/docs/architecture.md index a78c7a1..32ea881 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -15,7 +15,7 @@ The Incan compiler remains responsible for parsing, typechecking, lowering, and InQL is organized around three layers: -- **Prism internally** — the immutable planning and optimization engine +- **Prism internally** — the immutable planning and optimization engine over persistent authored plan state and derived optimized views - **Substrait at the boundary** — the normative emitted logical interchange contract - **Session for execution** — the execution/binding layer that consumes plans but does not define them @@ -46,7 +46,7 @@ Incan models / model-derived schema │ ├──► authored plan state ├──► lineage-preserving optimization - └──► optimized logical view + └──► derived optimized views │ ▼ Substrait Plan / Rel emission @@ -66,7 +66,7 @@ The core rule is: ### Carriers -The author-facing carrier family is rooted in `DataSet[T]` and currently includes `LazyFrame[T]`, `DataFrame[T]`, and `DataStream[T]`. +The author-facing carrier family is rooted in `DataSet[T]` and includes `LazyFrame[T]`, `DataFrame[T]`, and `DataStream[T]`. Carriers are expected to be: @@ -83,10 +83,10 @@ Per [RFC 007][rfc-007], Prism is InQL’s internal logical planning and optimiza Prism is responsible for: -- persistent logical plan storage +- persistent authored logical plan storage - cheap branching through structural sharing - lineage preservation -- logical rewrites and optimization before boundary emission or execution +- logical rewrites and derived optimized views before boundary emission or execution Prism is **not** the normative interchange format and **not** the execution engine. @@ -122,15 +122,19 @@ Session is intentionally outside RFC 002’s normative emitted contract. It cons ## Current implementation -The repository currently includes: +The repository includes: - author-facing carrier types exist in [mod.incn](../src/dataset/mod.incn) - canonical relational operator helpers exist in [ops.incn](../src/dataset/ops.incn) - RFC 002 emits **real proto-backed Substrait plans** - conformance scenarios are represented as typed package code in [conformance.incn](../src/substrait/conformance.incn) -- Prism is specified as the internal planning substrate, while parts of its full implementation remain ahead of the current package code +- `LazyFrame[T]` is a thin adapter over a backend-native `PrismCursor[T]` in [mod.incn](../src/dataset/mod.incn) +- the current internal Prism implementation lives in [prism/mod.incn](../src/prism/mod.incn) with an authored graph, `PrismCursor[T]`, default-on canonical rewrites before RFC 002 lowering, explain/debug artifacts (applied-rule list + rewritten-to-authored origins), and an Incan-native typed store-id allocator (`static` + `newtype`) +- cross-store join adoption dedups equivalent reachable RHS nodes within one adoption pass before appending the join node +- dataset methods route through Prism internal seam helpers so future authoring surfaces can reuse one planning entry path +- `DataFrame[T]` and `DataStream[T]` still remain direct `Rel` wrappers for now -This means the package has a concrete Substrait boundary and conformance layer, while some internal planning mechanics remain transitional. +This means the package has a concrete Substrait boundary, conformance layer, and a real Prism-backed backend path for the current `LazyFrame[T]` operator surface with safe canonical rewrites. The remaining gap is breadth, not existence: Prism backs `LazyFrame[T]`, while other carriers, richer semantic spec types, and advanced optimizer phases remain follow-on work. ## Repository layout @@ -140,6 +144,7 @@ This means the package has a concrete Substrait boundary and conformance layer, | `src/lib.incn` | Public package exports | | `src/dataset/mod.incn` | Carrier types and trait surface | | `src/dataset/ops.incn` | Canonical relational operator helpers | +| `src/prism/mod.incn` | Internal Prism graph, cursor, and lowering logic | | `src/substrait/plan.incn` | RFC 002 proto-backed Substrait emission helpers | | `src/substrait/conformance.incn` | Typed conformance corpus and validation helpers | | `src/substrait/schema.incn` | Model/schema to Substrait type bridging | diff --git a/docs/release_notes/v0_1.md b/docs/release_notes/v0_1.md index 397d966..308015a 100644 --- a/docs/release_notes/v0_1.md +++ b/docs/release_notes/v0_1.md @@ -10,6 +10,7 @@ Entries will be filled in as work lands (link RFCs and PRs when applicable). - **Carriers:** `DataSet[T]` hierarchy including bounded vs unbounded traits and concrete frame/stream types. - **Plans:** Apache Substrait as the logical interchange contract. - **Authoring:** method-chain lowering into a real Substrait boundary today, with `query {}` work still ahead. +- **Prism:** `LazyFrame` lowering applies safe canonical rewrites (`Filter(true)` elimination and adjacent `Limit`/`Project`/`OrderBy` collapse) before RFC 002 plan emission. - **Execution:** Session-oriented read, execute, and write (reference backend per RFC 004). Pipe-forward (`|>`) is specified in RFC 005 but **out of scope** for v0.1. diff --git a/docs/rfcs/004_inql_execution_context.md b/docs/rfcs/004_inql_execution_context.md index 5170cfc..23ec597 100644 --- a/docs/rfcs/004_inql_execution_context.md +++ b/docs/rfcs/004_inql_execution_context.md @@ -17,6 +17,8 @@ This RFC specifies the **execution context**: the session object that bridges InQL's **typed logical plans** and **real execution**. It defines how authors **read data** into `DataSet[T]` values, **execute plans** (lowered to Substrait per InQL RFC 002), and **write results** back to storage. **Apache DataFusion** is the **reference (and default) execution backend** for plan optimization and execution: it consumes Substrait plans, applies query optimizations (predicate pushdown, projection pruning, join reordering, constant folding), and executes against registered data sources, returning **Apache Arrow** record batches that InQL wraps in typed `DataFrame[T]` carriers. This RFC standardizes the explicit core `Session` contract; higher operational layers may compose, scope, or inject sessions and adapter conveniences on top, but they do not redefine InQL execution semantics. With RFCs 000–004, InQL is usable for read → transform → write workflows. +> Editorial note (2026-04-07): RFC 004 remains authoritative for the `Session` execution boundary and backend abstraction. The optimizer boundary between Prism and `Session` is clarified by [InQL RFC 008](008_optimizer_boundary_stats_cbo_aqe.md), which governs ownership of statistics, cost-based optimization inputs, physical planning, and adaptive re-planning. + ## Core model 1. A **`Session`** (or **execution context**) is the entry point for InQL programs that interact with data. It holds **table registrations**, **configuration**, and a **reference to the execution backend**. diff --git a/docs/rfcs/007_prism_planning_engine.md b/docs/rfcs/007_prism_planning_engine.md index 3e5973b..14e1cce 100644 --- a/docs/rfcs/007_prism_planning_engine.md +++ b/docs/rfcs/007_prism_planning_engine.md @@ -1,6 +1,6 @@ # InQL RFC 007: Prism logical planning and optimization engine -- **Status:** Draft +- **Status:** In Progress - **Created:** 2026-04-02 - **Author(s):** Danny Meijer - **Related:** @@ -9,7 +9,7 @@ - InQL RFC 003 (`query {}` — lowers through Prism-managed logical work before Substrait emission) - InQL RFC 004 (execution context — session executes Prism-backed plans but does not define Prism) - InQL RFC 005 (optional pipe-forward — must stay Prism-consistent with equivalent surfaces) -- **Issue:** — +- **Issue:** [InQL #16](https://github.com/dannys-code-corner/InQL/issues/16) - **RFC PR:** — - **Written against:** Incan v0.2 - **Shipped in:** — @@ -18,6 +18,8 @@ This RFC defines **Prism** as InQL's immutable internal logical planning and optimization engine. Prism owns persistent plan storage, cheap branching through structural sharing, lineage-preserving rewrites, and logical optimization prior to Substrait emission or session execution. Prism is an **internal planning substrate**, not the normative interchange contract: **Apache Substrait** remains the boundary format per InQL RFC 002. `LazyFrame`, `DataFrame`, and `DataStream` are carrier experiences over Prism-managed plan state; `Session` and `SessionContext` bind and execute those plans per InQL RFC 004. +RFC 007 is the design and implementation record for the first Prism adoption slice. Optimizer-boundary ownership is further clarified by [InQL RFC 008](008_optimizer_boundary_stats_cbo_aqe.md): Prism owns immutable authored state, lineage-preserving logical work, and internal optimized views, while RFC 008 governs ownership of statistics, backend pushdown policy, physical planning, cost-based optimization inputs, and adaptive re-planning at the `Session` boundary. Follow-on RFC 007 hardening includes an Incan-native typed store-id allocator (`static` + `newtype`) and cross-store adoption dedup for equivalent reachable RHS nodes; this remains internal Prism substrate work and does not expand RFC 008 scope. + ## Motivation InQL already has a strong external story around typed carriers, Substrait emission, and the execution boundary, but it lacks a dedicated specification for the internal planning layer that sits between authored logic and emitted plans. Without that layer being named and scoped, plan construction, optimization, lineage, interactive behavior, and future explain/debug tooling risk becoming an accidental mix of implementation details spread across InQL RFC 001, InQL RFC 002, and InQL RFC 004. @@ -29,7 +31,7 @@ Prism gives that layer a home. It lets InQL say clearly that: - optimization is a first-class responsibility, not an incidental backend side effect - lineage must survive rewrites so optimized plans remain explainable -This matters for more than simple query lowering. Complex multi-hop pipelines, future interactive environments, and prospective reuse of the planning substrate beyond InQL all benefit from a stable definition of what the internal plan engine is allowed and required to do. +This matters for more than simple query lowering. Complex multi-hop pipelines, future interactive environments, and future explain/debug tooling all benefit from a stable definition of what the internal plan engine is allowed and required to do. ## Goals @@ -46,7 +48,7 @@ This matters for more than simple query lowering. Complex multi-hop pipelines, f - Defining physical execution behavior, backend binding, or secret management — that remains outside Prism and is scoped by InQL RFC 004 and surrounding operational layers. - Defining new author-facing query syntax — Prism is an internal planning engine, not a new language surface. - Forcing one exact in-memory data structure implementation for authored and optimized plan state. -- Promising Prism as a general-purpose platform beyond InQL today. This RFC scopes Prism normatively to InQL, while requiring a clean enough module boundary that future extraction remains possible. +- Promising Prism as a general-purpose platform beyond InQL today. This RFC scopes Prism normatively to InQL; future extraction remains a possible consequence of a clean boundary, not a current requirement. ## Guide-level explanation @@ -69,7 +71,7 @@ The important user-visible behavior is: - branching from a shared base plan is cheap - execution still belongs to the session boundary -Prism is the reason this can work efficiently. It stores the shared logical planning state, allows both `high_value` and `recent` to branch from the same base plan, and may optimize the resulting logical graph before the plan is emitted to Substrait or executed by a session. +Prism is the reason this can work efficiently. It stores the shared authored planning state, allows both `high_value` and `recent` to branch from the same base plan, and may derive optimized views of that state before the plan is emitted to Substrait or executed by a session. Prism should be thought of as the internal engine that **thinks** about the plan. Substrait is how the plan is **communicated** at the boundary. Session is how the plan is **executed**. @@ -107,7 +109,7 @@ The relationship is: - Prism = internal logical planning, lineage, and optimization - Substrait = emitted logical interchange contract -An implementation **may** use Prism-native node kinds or overlays internally, but emitted plans that claim conformance **must** still follow InQL RFC 002. +An implementation **may** use Prism-native node kinds or derived optimized views internally, but emitted plans that claim conformance **must** still follow InQL RFC 002. ### Relationship to session execution @@ -123,7 +125,7 @@ Prism **should** conceptually distinguish between: - **optimized plan state**: semantically equivalent rewritten state used for lowering or execution - **lineage metadata**: mappings from optimized state back to authored history -This distinction is normative at the conceptual level, but implementations retain freedom in how they realize it. A single persistent graph with overlays, separate graphs with references, or another equivalent structure are all acceptable if the invariants below hold. +This distinction is normative at the conceptual level, but implementations retain freedom in how they realize it. The intended first implementation centers on a persistent authored graph with derived optimized views and explicit origin mappings. Equivalent implementations, including separate optimized graphs, remain acceptable if the invariants below hold. ### Required invariants @@ -139,16 +141,16 @@ The following invariants **must** hold: Optimization is a core Prism responsibility, not merely a downstream backend concern. -Prism **may** perform: +For the first Prism slice, Prism **may** perform: - projection pruning - predicate pushdown - redundant-node elimination - normalization of equivalent logical shapes -- shared subplan detection and sharing +- simple shared subplan detection and sharing - other semantically valid rewrites consistent with schema and lineage invariants -More advanced rewrites such as join reordering or sink-aware splitting **may** be added later. +More advanced rewrites such as join reordering, cost-based optimization, or sink-aware splitting **may** be added later. Implementations **may** apply some rewrites incrementally during plan construction and defer others until lowering or explicit analysis, provided authored history remains intact. @@ -204,11 +206,103 @@ It may, however, motivate refactoring of implementation architecture so that pla - **Execution / interchange** — Session-backed lowering and execution flows **must** treat Prism as internal preparation and Substrait as the boundary contract. - **Documentation** — RFC indexes, architecture notes, and implementation planning notes **should** distinguish Prism from Substrait and from session execution. -## Unresolved questions +## Implementation Plan + +### Phase 1: Internal Prism carrier slice + +- Rework `LazyFrame[T]` to become the first real Prism-backed carrier while keeping the public dataset API stable. +- Replace direct `Rel` storage in `LazyFrame[T]` with Prism-managed authored state plus a current logical tip. +- Keep Prism internal-only; do not expose public `Prism*` package APIs in this phase. + +### Phase 2: Authored graph + optimized view contract + +- Implement the minimum Prism authored node set needed for the first slice: read roots, filters, and joins. +- Represent optimized state as a derived view over authored state with explicit optimized-to-authored origin mappings. +- Keep the first rewrite surface limited to safe canonicalization (`Filter(true)` elimination and adjacent `Limit`/`Project`/`OrderBy` collapse) with explicit lineage bookkeeping; defer heavier rewrite families. + +### Phase 3: Boundary lowering and source construction + +- Keep RFC 002 as the only emitted boundary by lowering Prism-backed `LazyFrame` state into Substrait at `to_substrait_plan()`. +- Add the internal source-construction seam needed to create Prism-backed lazy carriers from named tables or equivalent read roots. +- Support joins between independently constructed lazy carriers by unifying roots into one Prism-authored graph when needed; do not keep the research-only same-graph join restriction. + +### Phase 4: Tests, docs, and current-slice hardening + +- Add package tests that prove immutable branching, lineage preservation, and stable lowering back to real proto-backed Substrait plans. +- Update architecture and RFC docs so the implementation status matches the intended internal design rather than the earlier research-only framing. + +### Phase 5: Broader carrier and authoring-surface adoption + +- Extend Prism backing beyond `LazyFrame[T]` once the remaining foundational RFCs are landed and the surrounding carrier/session story is stable enough to avoid churn. +- Evaluate where `DataFrame[T]`, `DataStream[T]`, and `query {}` should converge on shared Prism planning entry paths without forcing premature execution-boundary coupling. +- Keep advanced optimization families (for example join reordering, cost-based exploration, and AQE-adjacent behavior) out of this phase; those remain optimizer-boundary follow-on work under RFC 008. + +## Progress Checklist + +### Spec / design + +- [x] Lock Prism as an internal-only planning substrate rather than a public package API. +- [x] Lock the intended first implementation to authored graph + derived optimized view + origin mappings. +- [x] Reject the prototype's same-graph-only join constraint as the production design. +- [x] Lock `LazyFrame[T]` as the first real Prism-backed carrier. + +### Prism core + +- [x] Define the current authored node set for the first implementation slice (`Read`, `Filter`, `Join`, `Project`, `GroupBy`, `Aggregate`, `OrderBy`, `Limit`, `Explode`) so the existing `LazyFrame[T]` method surface is Prism-native. +- [x] Add persistent Prism-managed plan state plus logical tip tracking for `LazyFrame[T]`. +- [x] Add derived optimized views with stable optimized-to-authored origin mappings. +- [x] Introduce a backend-native `PrismCursor[T]` handle as the internal target for `LazyFrame[T]` method delegation. +- [x] Replace temporary Rust-backed Prism store-id allocation with Incan-native typed module static allocation. +- [x] Dedup equivalent reachable RHS nodes during cross-store join adoption while keeping authored-store append-only semantics. +- [x] Retire prototype naming in package internals by moving Prism implementation to `src/prism/mod.incn` and stable `Prism*` internal type names. +- [x] Add default canonical rewrite passes for safe local simplifications (`Filter(true)` elimination, adjacent `Limit`/`Project`/`OrderBy` collapse) before RFC 002 lowering. +- [x] Keep authored graph immutable while deriving rewritten views with rewritten-to-authored origin mappings. +- [x] Add internal rewrite explain artifacts (applied-rule names and rewritten/origin cardinality facts) for test diagnostics. + +### Carrier integration + +- [x] Replace `LazyFrame[T]` direct `Rel` storage with Prism-backed state. +- [x] Keep the public `DataSet[T]` / `LazyFrame[T]` method surface unchanged. +- [x] Support joins across independently constructed lazy carriers by graph unification rather than prototype-only shared-graph assumptions. +- [x] Route `LazyFrame[T]` method semantics through a backend-native cursor layer instead of per-method carrier-owned graph manipulation. +- [x] Route `LazyFrame[T]` methods through Prism internal seam helpers so future authoring surfaces can reuse one planning entry path. + +### Substrait boundary + +- [x] Lower Prism-backed `LazyFrame[T]` into real RFC 002 Substrait only at the boundary. +- [x] Preserve current conformance behavior for `Read`, `Filter`, and `Join`. +- [x] Replace identity-only lowering with safe canonical rewritten lowering while preserving semantic equivalence. + +### Tests + +- [x] Add package tests for immutable branching over shared authored state. +- [x] Add package tests for optimized-view origin mapping. +- [x] Add package tests for join lowering across branches and independently constructed lazy carriers. +- [x] Add regression coverage proving Prism-backed `LazyFrame[T]` still emits real proto-backed Substrait plans. +- [x] Add regression coverage proving the current `LazyFrame[T]` method surface now maps to native Prism node kinds rather than opaque compatibility nodes. +- [x] Add rewrite regressions for canonicalization and explain artifact coherence. + +### Docs + +- [x] Update architecture docs to reflect Prism as active implementation work rather than purely ahead-of-code design. +- [x] Keep RFC 007, RFC index, and related architecture notes aligned as implementation lands. + +### Phase 5 follow-on adoption + +- [ ] Extend Prism backing to `DataFrame[T]` once the remaining foundational RFCs are complete. +- [ ] Extend Prism backing to `DataStream[T]` once the remaining foundational RFCs are complete. +- [ ] Route `query {}` authoring through shared Prism planning entry paths once the remaining foundational RFCs are complete. + +## Design Decisions -- Should Prism maintain one persistent graph with optimized overlays, or separate authored and optimized graphs with explicit references? -- Which optimization passes are part of the Prism north star immediately, and which should be deferred until after the first implementation? -- What is the most useful lineage metadata shape for explain/debug tooling without making normal plan construction expensive? -- Are there Incan language or tooling limitations around model-derived schema facts that Prism depends on and that may require an upstream Incan RFC? +### Resolved - +- Prism conceptually distinguishes authored and optimized state, but the intended first implementation centers on a persistent authored graph with derived optimized views and explicit origin mappings. Equivalent implementations, including separate optimized graphs, remain acceptable if they preserve the same invariants. +- The first Prism slice commits only to safe logical rewrites: projection pruning, predicate pushdown, redundant-node elimination, normalization of equivalent logical shapes, and optional simple shared-subplan detection. Heavier work such as join reordering, cost-based optimization, and sink-aware splitting is explicitly deferred. +- The minimum lineage contract is stable authored node IDs plus optimized-to-authored origin mappings. Richer explain/debug structures may be added later, but they are not required for the RFC to be complete. +- RFC 007 does not require a new upstream Incan RFC before moving to `Planned`. Implementation may expose compiler or tooling gaps later, but those are implementation dependencies rather than specification blockers. +- Prism remains an internal InQL planning substrate for now; the first implementation does not expose public `Prism*` package APIs. +- `LazyFrame[T]` is the first real Prism-backed carrier. `DataFrame[T]`, `DataStream[T]`, and `query {}` integration remain follow-on work unless the `LazyFrame[T]` slice proves a hard dependency. +- `PrismCursor[T]` is the current backend-native handle beneath `LazyFrame[T]`. It is an internal convergence target for future `query {}` and pipe-forward lowering, not a public package API. +- The research prototype demonstrated the seam, but its clone-heavy storage and same-graph-only join restriction are not the intended production design. +- Real joins between lazy carriers must work even when the two sides were constructed independently; implementations may unify roots into one authored graph internally, but they must not require pre-shared lineage as a public contract. diff --git a/docs/rfcs/008_optimizer_boundary_stats_cbo_aqe.md b/docs/rfcs/008_optimizer_boundary_stats_cbo_aqe.md new file mode 100644 index 0000000..c113b87 --- /dev/null +++ b/docs/rfcs/008_optimizer_boundary_stats_cbo_aqe.md @@ -0,0 +1,271 @@ +# InQL RFC 008: Optimizer boundary, statistics, cost-based optimization, and adaptive execution + +- **Status:** Draft +- **Created:** 2026-04-07 +- **Author(s):** Danny Meijer +- **Related:** + - InQL RFC 004 (execution context — `Session` remains the execution and backend boundary) + - InQL RFC 007 (Prism planning engine — this RFC narrows optimizer-boundary ownership without replacing Prism adoption) +- **Issue:** [InQL #18](https://github.com/dannys-code-corner/InQL/issues/18) +- **RFC PR:** — +- **Written against:** Incan v0.2 +- **Shipped in:** — + +## Summary + +This RFC defines the optimizer boundary between **Prism** and **`Session`** as InQL grows beyond the first Prism adoption slice. Prism remains the internal engine for analyzed logical planning, semantic rewrites, property inference, logical alternative exploration, and cost-model hooks. `Session` remains the owner of backend capabilities, catalog statistics, physical planning, pushdown policy, runtime statistics, and adaptive re-planning during execution. This RFC does not replace RFC 007's role in establishing Prism as the internal planning substrate; it supersedes only the optimizer-boundary guidance where RFC 007 used broader wording or early-slice examples that would otherwise blur semantic optimization and backend execution policy. + +## Motivation + +RFC 007 was intentionally written to get Prism named, scoped, and implemented as a real planning substrate. That was the right move for the first Prism adoption slice. However, once InQL aims for stronger optimization, the remaining ambiguity becomes a liability: + +- If Prism owns all optimization in the abstract, it will tend to absorb backend policy and runtime behavior. +- If `Session` owns all optimization in practice, Prism becomes a passive container rather than a serious optimizer substrate. +- If statistics, cost-based optimization, and adaptive re-planning are not assigned cleanly, explain output, reproducibility, and backend substitution all become muddled. + +InQL needs the same kind of separation that high-performance query engines converge on in practice: + +- a semantic logical optimizer that can reason about equivalence and properties +- a backend boundary that can exploit concrete storage and runtime facts +- a runtime layer that can react when real cardinalities, partition sizes, or skew differ from pre-execution estimates + +The goal is not to copy Spark literally. The goal is to adopt the useful split in spirit: logical optimization is not the same thing as physical optimization, and neither is the same thing as adaptive re-optimization during execution. + +## Goals + +- Define which optimizer responsibilities belong to Prism versus `Session`. +- Establish **statistics ownership** clearly enough to support cost-based optimization without collapsing the Prism / `Session` boundary. +- Define the minimum optimizer artifacts Prism should expose as InQL moves past the initial identity-shaped optimized view. +- Reserve adaptive query re-planning as a `Session` concern rather than a Prism concern. +- Make precedence against RFC 007 explicit so future work does not rely on ambiguous wording. + +## Non-Goals + +- Replacing RFC 007 as the historical record of the first Prism adoption slice. +- Standardizing one exact memo structure, cost formula, or join enumeration algorithm. +- Requiring adaptive execution in v0.1. +- Defining backend-specific tuning knobs for DataFusion or any other engine. +- Defining new author-facing query syntax. + +## Guide-level explanation + +Authors still think in the same broad pipeline: + +```text +author intent + -> Prism raw plan + -> Prism analyzed / optimized logical plan + -> Session physical planning and execution + -> optional Session adaptive re-planning while executing +``` + +The important mental model is: + +- Prism decides what logical transformations are valid and what alternatives are semantically equivalent. +- Prism can prefer one logical alternative over another using inferred properties and cost-model hooks. +- `Session` decides what the selected backend can actually do well. +- `Session` can change physical strategy when runtime facts prove the original assumptions wrong. + +Conceptual example; exact explain API names may differ: + +```incan +from pub::inql import Session, LazyFrame, DataFrame +from pub::inql.functions import count +from models import Customer, Order, RegionalSummary + +session = Session.default() + +orders: LazyFrame[Order] = session.table("orders") +customers: LazyFrame[Customer] = session.table("customers") + +summary: LazyFrame[RegionalSummary] = query { + FROM orders + JOIN customers ON .customer_id == .id + WHERE .status == "completed" + GROUP BY .region + SELECT + region, + count() as order_count, +} + +# Prism-owned logical surfaces +summary.raw_plan() +summary.analyzed_plan() +summary.plan_after_inql_rules() + +# Session-owned execution and runtime behavior +result: DataFrame[RegionalSummary] = session.collect(summary) +session.session_plan(summary) +session.executed_plan(summary) +``` + +For explain and tooling, InQL should make the stages explicit instead of collapsing them into one vague “optimized plan” label. A future author or tool should be able to ask for at least: + +- `raw_plan()` — authored Prism DAG +- `analyzed_plan()` — resolved names, schemas, and derived logical properties +- `plan_after_inql_rules()` — Prism-owned logical rewrites +- `session_plan()` — backend-bound physical or session-owned optimized plan +- `executed_plan()` — what actually ran, including adaptive changes when available + +## Reference-level explanation (precise rules) + +### 1. Boundary ownership + +Prism **must** own: + +- logical plan analysis and semantic validation beyond raw authored structure +- schema and expression-driven rewrites that are deterministic given rules version, plan, and available inputs +- derivation of logical properties and constraints from author intent and schema facts +- logical alternative exploration for equivalent plans +- cost-model interfaces used to compare logical alternatives +- provenance and explain mappings from rewritten logical artifacts back to authored intent + +`Session` **must** own: + +- backend capability discovery and backend-specific planning policy +- catalog or source statistics acquisition +- pushdown policy into concrete scans or remote engines +- physical operator selection +- runtime statistics gathered during execution +- adaptive re-planning during execution + +Prism **must not** own: + +- backend-specific pushdown outcomes +- physical operator choice +- runtime adaptive plan changes +- direct catalog I/O for statistics discovery as a normative responsibility + +### 2. Statistics model + +This RFC distinguishes three statistics families: + +1. **Logical inferred facts** — bounds or properties Prism derives from schema, constraints, and expressions. +2. **Pre-execution source statistics** — row counts, NDV estimates, histograms, file sizes, partition metadata, or equivalent facts supplied through `Session` and its backend or catalog integrations. +3. **Runtime statistics** — observed row counts, partition sizes, skew signals, spill facts, and other execution-time measurements. + +Ownership rules: + +- Prism **may** consume all three families when available. +- Prism **must** be able to produce a valid logical result even when only logical inferred facts are available. +- `Session` **must** supply source and runtime statistics to the extent the selected backend can provide them. +- Runtime statistics **must not** be treated as Prism-authored facts; they belong to execution-time artifacts. + +### 3. Cost-based optimization + +Cost-based optimization in InQL is split across Prism and `Session`: + +- Prism **should** compare logical alternatives using a stable cost interface over logical properties, available statistics, and backend capability hints. +- `Session` **may** provide the statistics and capability inputs Prism needs to make better logical choices. +- The exact cost model is not standardized by this RFC, but the ownership boundary is. + +Inference from this boundary: if InQL later adds join reordering, memo-based exploration, or reuse/materialization decisions, those features belong primarily in Prism, but they rely on inputs that `Session` can provide. + +### 4. Adaptive execution + +Adaptive re-planning during execution is a `Session` concern. + +That means: + +- Prism may produce a preferred logical plan or ranked alternatives before execution. +- `Session` may revise physical strategy during execution based on runtime statistics. +- `Session` may record those adaptive decisions in `executed_plan()` or equivalent explain surfaces. +- Adaptive behavior **must not** mutate Prism-authored history. + +### 5. Named optimizer artifacts + +Implementations **should** expose distinct names for plan stages rather than one ambiguous “optimized plan” API. + +Minimum intended names: + +- `raw_plan()` +- `analyzed_plan()` +- `plan_after_inql_rules()` +- `session_plan()` +- `executed_plan()` + +Equivalent names are acceptable if they preserve the same separation. + +### 6. Precedence against older RFCs + +RFC 007 remains authoritative for: + +- Prism as InQL's internal planning substrate +- immutable authored state +- structural sharing +- lineage and optimized-to-authored provenance expectations + +This RFC supersedes RFC 007 only where RFC 007's optimizer examples or broad wording would otherwise imply that Prism owns backend pushdown policy, physical planning, statistics ownership, or adaptive re-planning. + +RFC 004 remains authoritative for: + +- the existence and core shape of `Session` +- execution and collection entry points +- backend abstraction and DataFusion as the default reference backend + +This RFC narrows RFC 004 by making the optimizer boundary more explicit; where optimizer ownership is discussed, this RFC governs. + +## Design details + +### Syntax + +This RFC introduces no new author-facing syntax. + +### Semantics + +As Prism evolves beyond the initial implementation slice, the intended logical stack is: + +- raw authored DAG +- analyzed logical plan with resolved references and derived properties +- Prism-owned logical rewrite stages +- optional logical alternative exploration and cost comparison +- `Session` handoff for backend planning and execution + +Prism's optimizer legality **must** continue to derive from InQL semantics, schema facts, and expression rules rather than backend quirks. + +`Session` **may** pass backend capability hints and statistics into Prism before execution, but those inputs **must not** collapse the ownership boundary defined above. + +### Interaction with other InQL surfaces + +- **`DataSet[T]` APIs:** carrier method chains continue to build Prism-managed authored state. Stronger optimization does not change carrier immutability rules. +- **`query {}`:** query-block lowering should target the same Prism analysis and rewrite pipeline as method chains. +- **Pipe-forward (`|>`):** if shipped, desugared forms must enter the same Prism optimizer boundary rather than defining a parallel optimizer path. +- **Substrait boundary:** Substrait remains the normative interchange contract. This RFC governs how Prism and `Session` arrive at plans around that boundary; it does not replace RFC 002. + +### Compatibility / migration + +This RFC is additive at the API and architecture level: + +- It does not require authors to change query syntax. +- It does not require RFC 007 to be rewritten. +- It may require documentation and future implementation work to stop referring to all optimization as one undifferentiated Prism responsibility. + +Existing prototype APIs that use vague names like `optimized_view` remain acceptable as transitional implementation details, but future public-facing documentation **should** migrate to more precise stage names. + +## Alternatives considered + +- **Keep RFC 007 as the only optimizer RFC** — rejected; RFC 007 already serves as the Prism adoption record for the first implementation slice, and retrofitting a more detailed optimizer boundary into it would mix historical adoption work with the follow-on architecture. +- **Move all optimization to `Session`** — rejected; that would reduce Prism to a plan container and make InQL-owned semantic optimization too backend-dependent. +- **Move AQE into Prism** — rejected; adaptive re-planning depends on runtime execution facts and should remain session-owned. +- **Treat statistics as purely backend-private** — rejected; Prism needs a clean way to consume statistics if it is going to perform serious cost-based logical optimization. + +## Drawbacks + +- Adds another foundational RFC and another precedence edge contributors must understand. +- Commits InQL to a sharper optimizer vocabulary earlier than a minimal prototype would require. +- Memo-based exploration, property inference, and stats plumbing will increase implementation complexity once work begins. + +## Layers affected + +- **InQL specification** — RFC 004 and RFC 007 references to optimization ownership **should** stay consistent with this boundary. +- **InQL library package** — future Prism internals **should** separate authored, analyzed, and rewritten artifacts more explicitly than the current prototype does. +- **Execution / interchange** — `Session` and backend integration layers **must** own physical planning, runtime stats, and adaptive re-planning policy. +- **Documentation** — explain surfaces and architecture notes **should** stop using “optimized plan” as an undifferentiated term. + +## Unresolved questions + +- Should the Prism/`Session` handoff carry source statistics as a concrete snapshot artifact, a callable capability surface, or both? +- Should `session_plan()` and `executed_plan()` be exposed on `Session`, on carriers, or through a dedicated explain object? +- When runtime statistics materially improve later planning, what information may be cached across executions without blurring authored facts, session facts, and execution facts? + + diff --git a/docs/rfcs/README.md b/docs/rfcs/README.md index b968f0a..d813cac 100644 --- a/docs/rfcs/README.md +++ b/docs/rfcs/README.md @@ -11,15 +11,16 @@ InQL uses its **own** RFC series (starting at 000), independent of the [Incan la | [002][rfc-002] | In Progress | Apache Substrait — `Rel`-level contract, mapping catalog, binding boundaries | | | [003][rfc-003] | Planned | `query {}` blocks — grammar, typing, Substrait lowering | | | [004][rfc-004] | Planned | Execution context — session, DataFusion, read/transform/write | | -| [005][rfc-005] | Blocked | Pipe-forward relational syntax (`|>`) — optional surface | | +| [005][rfc-005] | Blocked | Pipe-forward relational syntax (`\|>`) — optional surface | | | [006][rfc-006] | Blocked | Promote unnest/explode to core Substrait lowering — blocked on upstream Substrait standardization | | -| [007][rfc-007] | Draft | Prism logical planning and optimization engine | | +| [007][rfc-007] | In Progress | Prism logical planning and optimization engine | | +| [008][rfc-008] | Draft | Optimizer boundary, statistics, cost-based optimization, and adaptive execution | | -**Order:** [RFC 000][rfc-000] is the foundational language specification. [RFC 001][rfc-001] defines the dataset type hierarchy. [RFC 002][rfc-002] defines the Substrait interchange contract. [RFC 003][rfc-003] defines the `query {}` surface that lowers to Substrait per RFC 002 over carriers from RFC 001. [RFC 004][rfc-004] completes the end-to-end story: session, read, execute, write. [RFC 005][rfc-005] specifies optional pipe-forward syntax outside the RFC 000–004 milestone and currently blocked on Incan RFC 040. [RFC 006][rfc-006] tracks promotion of unnest/explode from gap to core Substrait lowering, blocked on upstream Substrait standardization. [RFC 007][rfc-007] defines Prism as InQL's internal logical planning and optimization engine, sitting beneath carriers and upstream of Substrait emission and session execution. +**Order:** [RFC 000][rfc-000] is the foundational language specification. [RFC 001][rfc-001] defines the dataset type hierarchy. [RFC 002][rfc-002] defines the Substrait interchange contract. [RFC 003][rfc-003] defines the `query {}` surface that lowers to Substrait per RFC 002 over carriers from RFC 001. [RFC 004][rfc-004] completes the end-to-end story: session, read, execute, write. [RFC 005][rfc-005] specifies optional pipe-forward syntax outside the RFC 000–004 milestone and currently blocked on Incan RFC 040. [RFC 006][rfc-006] tracks promotion of unnest/explode from gap to core Substrait lowering, blocked on upstream Substrait standardization. [RFC 007][rfc-007] defines Prism as InQL's internal logical planning and optimization engine, sitting beneath carriers and upstream of Substrait emission and session execution. [RFC 008][rfc-008] refines the optimizer boundary: Prism owns semantic logical optimization, while `Session` owns backend-facing statistics, physical planning, and adaptive execution behavior. -**v0.1 scope:** RFCs 000–004. When all five are resolved (Draft → Planned → Implemented), InQL v0.1 is complete: authors can read data, write typed queries, lower to Substrait, execute through DataFusion, and write results. +**v0.1 scope:** RFCs 000–004 plus RFC 007. When those foundational RFCs are resolved (Draft → Planned → Implemented), InQL v0.1 is complete: authors can read data, write typed queries, lower through Prism to Substrait, execute through DataFusion, and write results. New RFCs should follow [TEMPLATE.md] (aligned with Incan’s RFC structure, adapted for InQL). @@ -35,4 +36,5 @@ New RFCs should follow [TEMPLATE.md] (aligned with Incan’s RFC structure, adap [rfc-005]: 005_inql_pipe_forward.md [rfc-006]: 006_unnest_core_substrait.md [rfc-007]: 007_prism_planning_engine.md +[rfc-008]: 008_optimizer_boundary_stats_cbo_aqe.md [incan-rfcs]: https://github.com/dannys-code-corner/incan/tree/main/workspaces/docs-site/docs/RFCs diff --git a/examples/hello.incn b/examples/hello.incn index d3ca911..fc0ad18 100644 --- a/examples/hello.incn +++ b/examples/hello.incn @@ -1,4 +1,28 @@ -@main -fn main() { - print("Hello, World!") -} +"""Example: minimal real InQL flow (author logical plan, lower to Substrait).""" + +from dataset import LazyFrame +from models import Order +from substrait.plan import plan_contains_relation_kind, plan_encoded_len, relation_kind_name, root_rel + +def _bool_text(value: bool) -> str: + """Render boolean facts as stable lowercase text for CLI output.""" + if value: + return str("true") + return str("false") + + +def main() -> None: + """Build one lazy relational pipeline and print concrete Substrait boundary facts.""" + # `named_table` creates a deferred read root; no execution happens here. + orders: LazyFrame[Order] = LazyFrame.named_table(Order(id=1, customer_id=101, amount=1_200.0, status=str("open")), str("orders")) + # Compose logical operators on the deferred carrier (Prism-backed plan authoring). + high_value = orders.filter(false).order_by().limit(10) + # Boundary lowering: turn the lazy plan into a real Substrait `Plan`. + plan = high_value.to_substrait_plan() + print(str("InQL hello: LazyFrame -> Substrait")) + # Root relation kind reflects the final logical operator in the chain. + print(str("root relation: ") + relation_kind_name(root_rel(plan))) + # Nested relation checks prove lower stages are preserved in the emitted tree. + print(str("contains ReadRel: ") + _bool_text(plan_contains_relation_kind(plan, str("ReadRel")))) + # Encoded length > 0 confirms a proto-backed payload was emitted. + print(str("encoded bytes > 0: ") + _bool_text(plan_encoded_len(plan) > 0)) diff --git a/incan.lock b/incan.lock index 4c3bfdd..2b0f45d 100644 --- a/incan.lock +++ b/incan.lock @@ -3,8 +3,8 @@ [incan] format = 1 -incan-version = "0.2.0-dev.5" -generated = "2026-03-30T11:39:12.858306Z" +incan-version = "0.2.0-dev.7" +generated = "2026-04-08T17:55:01.875266Z" deps-fingerprint = "sha256:87bf2b26e77e2fb75979ce6cbe1128c88b63c617f3b1653eef19a2a3bbdfb4c6" cargo-features = [] cargo-no-default-features = false @@ -51,9 +51,9 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cc" -version = "1.2.58" +version = "1.2.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e928d4b69e3077709075a938a05ffbedfa53a84c8f766efbf8220bb1ff60e1" +checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283" dependencies = [ "find-msvc-tools", "shlex", @@ -104,9 +104,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" [[package]] name = "find-msvc-tools" @@ -179,11 +179,11 @@ checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" [[package]] name = "incan_core" -version = "0.2.0-dev.5" +version = "0.2.0-dev.7" [[package]] name = "incan_derive" -version = "0.2.0-dev.5" +version = "0.2.0-dev.7" dependencies = [ "proc-macro2", "quote", @@ -192,7 +192,7 @@ dependencies = [ [[package]] name = "incan_stdlib" -version = "0.2.0-dev.5" +version = "0.2.0-dev.7" dependencies = [ "incan_core", "incan_derive", @@ -200,9 +200,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.0" +version = "2.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" dependencies = [ "equivalent", "hashbrown 0.16.1", @@ -212,7 +212,7 @@ dependencies = [ [[package]] name = "inql" -version = "0.2.0-dev.5" +version = "0.2.0-dev.7" dependencies = [ "incan_derive", "incan_stdlib", @@ -244,9 +244,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" [[package]] name = "linux-raw-sys" @@ -474,9 +474,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" dependencies = [ "serde", "serde_core", diff --git a/src/dataset/mod.incn b/src/dataset/mod.incn index 8cf84c8..cb56400 100644 --- a/src/dataset/mod.incn +++ b/src/dataset/mod.incn @@ -65,9 +65,24 @@ def generic_any(data: DataSet[T]) -> None: ... ``` """ +# TODO: +# 1. add more detailed docstrings (including examples) for each method in the trait impls. +# 2. see about moving the LazyFrame methods to the higher level carriers (DataSet or BoundedDataSet) from rust::substrait::proto import Plan, Rel from substrait.plan import plan_from_root_relation from dataset.ops import filter_ds, join_ds, select_ds, group_by_ds, agg_ds, order_by_ds, limit_ds, explode_ds +from prism import ( + PrismCursor, + prism_cursor_apply_agg, + prism_cursor_apply_explode, + prism_cursor_apply_filter, + prism_cursor_apply_group_by, + prism_cursor_apply_join, + prism_cursor_apply_limit, + prism_cursor_apply_order_by, + prism_cursor_apply_select, + prism_cursor_named_table, +) # ---- DataSet trait ---- pub trait DataSet[T with Clone]: @@ -129,36 +144,49 @@ pub class DataFrame[T with Clone] with BoundedDataSet: pub class LazyFrame[T with Clone] with BoundedDataSet: - pub _row_schema_marker: T - pub _substrait_rel: Rel + # TODO: needs docstring and examples for the public API and overall purpose of this type. + pub _cursor: PrismCursor[T] + + @staticmethod + def named_table(row_schema_marker: T, table_name: str) -> LazyFrame[T]: + """Create a deferred lazy carrier rooted at one logical named-table read.""" + return LazyFrame(_cursor=prism_cursor_named_table(row_schema_marker, table_name)) def to_substrait_plan(self) -> Plan: - # TODO(#229): switch back to an empty root-name list once Incan preserves element type context for `[]`. - return plan_from_root_relation(self._substrait_rel, [str("id")]) + """Lower this lazy pipeline to a Substrait `Plan` at the boundary.""" + return self._cursor.to_substrait_plan() def filter(self, predicate: bool) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=filter_ds(self._substrait_rel, predicate)) + """Return one new lazy carrier with an appended filter stage.""" + return LazyFrame(_cursor=prism_cursor_apply_filter(self._cursor, predicate)) def join(self, other: Self, on: bool) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=join_ds(self._substrait_rel, other._substrait_rel, on)) + """Return one new lazy carrier with an appended join stage against another lazy carrier.""" + return LazyFrame(_cursor=prism_cursor_apply_join(self._cursor, other._cursor.clone(), on)) def select(self) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=select_ds(self._substrait_rel)) + """Return one new lazy carrier with an appended projection stage.""" + return LazyFrame(_cursor=prism_cursor_apply_select(self._cursor)) def group_by(self) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=group_by_ds(self._substrait_rel)) + """Return one new lazy carrier with an appended grouping stage.""" + return LazyFrame(_cursor=prism_cursor_apply_group_by(self._cursor)) def agg(self) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=agg_ds(self._substrait_rel)) + """Return one new lazy carrier with an appended aggregation stage.""" + return LazyFrame(_cursor=prism_cursor_apply_agg(self._cursor)) def order_by(self) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=order_by_ds(self._substrait_rel)) + """Return one new lazy carrier with an appended ordering stage.""" + return LazyFrame(_cursor=prism_cursor_apply_order_by(self._cursor)) def limit(self, n: int) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=limit_ds(self._substrait_rel, n)) + """Return one new lazy carrier with an appended row-limit stage.""" + return LazyFrame(_cursor=prism_cursor_apply_limit(self._cursor, n)) def explode(self) -> Self: - return LazyFrame(_row_schema_marker=self._row_schema_marker.clone(), _substrait_rel=explode_ds(self._substrait_rel)) + """Return one new lazy carrier with an appended explode/unnest stage.""" + return LazyFrame(_cursor=prism_cursor_apply_explode(self._cursor)) # ---- UnboundedDataSet trait and concrete types ---- diff --git a/src/dataset/ops.incn b/src/dataset/ops.incn index 0f5f700..3ca2ec1 100644 --- a/src/dataset/ops.incn +++ b/src/dataset/ops.incn @@ -1,35 +1,127 @@ -"""Canonical DataSet relation operation functions for RFC 002.""" +""" +Canonical dataset-operation seam over concrete relation builders. + +These helpers keep carrier-level operation names (`filter`, `join`, `group_by`, ...) distinct from the exact +Substrait builder functions used underneath. The current implementation is intentionally thin, but this module is the +place where dataset-layer operation intent can diverge from raw relation-construction details without pushing that +coupling into carrier types directly. +""" from rust::substrait::proto import Rel -from substrait.plan import aggregate_rel, explode_extension_uri, extension_single_rel, fetch_rel, filter_rel, join_rel, project_rel, sort_rel +from substrait.plan import ( + aggregate_rel, + explode_extension_uri, + extension_single_rel, + fetch_rel, + filter_rel, + join_rel, + project_rel, + sort_rel, +) pub def filter_ds(rel: Rel, predicate: bool) -> Rel: + """ + Apply dataset-level filter intent to one relation. + + Args: + rel: Input relation to filter. + predicate: Placeholder filter condition for the current package slice. + + Returns: + A relation shaped as a filter over the input relation. + """ return filter_rel(rel, predicate) pub def join_ds(left_rel: Rel, right_rel: Rel, on: bool) -> Rel: + """ + Apply dataset-level join intent to two relations. + + Args: + left_rel: Left input relation. + right_rel: Right input relation. + on: Placeholder join condition for the current package slice. + + Returns: + A relation shaped as a join over the two input relations. + """ return join_rel(left_rel, right_rel, on) pub def select_ds(rel: Rel) -> Rel: + """ + Apply dataset-level projection intent to one relation. + + Args: + rel: Input relation to project. + + Returns: + A relation shaped as a projection over the input relation. + """ return project_rel(rel) pub def group_by_ds(rel: Rel) -> Rel: + """ + Apply dataset-level grouping intent to one relation. + + Args: + rel: Input relation to group. + + Returns: + A relation shaped as the grouping form of aggregate over the input relation. + """ return aggregate_rel(rel, false) pub def agg_ds(rel: Rel) -> Rel: + """ + Apply dataset-level aggregation intent to one relation. + + Args: + rel: Input relation to aggregate. + + Returns: + A relation shaped as the aggregating form of aggregate over the input relation. + """ return aggregate_rel(rel, true) pub def order_by_ds(rel: Rel) -> Rel: + """ + Apply dataset-level ordering intent to one relation. + + Args: + rel: Input relation to sort. + + Returns: + A relation shaped as a sort over the input relation. + """ return sort_rel(rel) pub def limit_ds(rel: Rel, n: int) -> Rel: + """ + Apply dataset-level row-limit intent to one relation. + + Args: + rel: Input relation to limit. + n: Maximum row count to request. + + Returns: + A relation shaped as a fetch with zero offset over the input relation. + """ return fetch_rel(rel, 0, n) pub def explode_ds(rel: Rel) -> Rel: + """ + Apply dataset-level explode intent to one relation via the registered extension path. + + Args: + rel: Input relation to explode. + + Returns: + A relation shaped as the registered explode extension over the input relation. + """ return extension_single_rel(rel, explode_extension_uri()) diff --git a/src/prism/lower.incn b/src/prism/lower.incn new file mode 100644 index 0000000..715bbde --- /dev/null +++ b/src/prism/lower.incn @@ -0,0 +1,63 @@ +"""Lowering from Prism optimized views into Substrait relations and plans.""" + +from rust::substrait::proto import Plan, Rel +from substrait.plan import ( + aggregate_rel, + explode_extension_uri, + extension_single_rel, + fetch_rel, + filter_rel, + join_rel, + plan_from_root_relation, + project_rel, + read_named_table_rel, + sort_rel, +) +from prism.rewrite import derive_rewritten_view, rewritten_node_at +from prism.types import PrismNodeKind, PrismOptimizedView, PrismStoreId + +pub def lower_prism_tip(store_id: PrismStoreId, tip_id: int) -> Rel: + """ + Lower one authored Prism tip through the canonical rewrite view. + + This is the stable boundary from Prism's internal graph representation into concrete Substrait relations. + """ + return lower_optimized_view(derive_rewritten_view(store_id, tip_id)) + + +pub def lower_optimized_view(view: PrismOptimizedView) -> Rel: + """Lower one rewritten Prism view to its root Substrait relation.""" + return _lower_node(view, view.root_tip_id) + + +pub def prism_rel_to_plan(rel: Rel) -> Plan: + """ + Wrap one root relation in a Substrait plan. + + The placeholder root-name list remains here so cursor-level plan construction stays centralized. + """ + return plan_from_root_relation(rel, [str("id")]) + + +def _lower_node(view: PrismOptimizedView, node_id: int) -> Rel: + node = rewritten_node_at(view, node_id) + match node.kind: + PrismNodeKind.ReadNamedTable => + return read_named_table_rel(node.named_table) + PrismNodeKind.Filter => + return filter_rel(_lower_node(view, node.input_ids[0]), node.predicate) + PrismNodeKind.Join => + return join_rel(_lower_node(view, node.input_ids[0]), _lower_node(view, node.input_ids[1]), node.predicate) + PrismNodeKind.Project => + return project_rel(_lower_node(view, node.input_ids[0])) + PrismNodeKind.GroupBy => + return aggregate_rel(_lower_node(view, node.input_ids[0]), false) + PrismNodeKind.Aggregate => + return aggregate_rel(_lower_node(view, node.input_ids[0]), true) + PrismNodeKind.OrderBy => + return sort_rel(_lower_node(view, node.input_ids[0])) + PrismNodeKind.Limit => + return fetch_rel(_lower_node(view, node.input_ids[0]), 0, node.limit_count) + PrismNodeKind.Explode => + return extension_single_rel(_lower_node(view, node.input_ids[0]), explode_extension_uri()) + diff --git a/src/prism/mod.incn b/src/prism/mod.incn new file mode 100644 index 0000000..b52bcb2 --- /dev/null +++ b/src/prism/mod.incn @@ -0,0 +1,245 @@ +""" +Internal Prism planning façade. + +Prism is intentionally not re-exported from `lib.incn`. This façade keeps one stable internal import surface while the +implementation is split into focused submodules: + +- `prism.types`: shared node/view/explain types +- `prism.store`: append-only authored storage and cross-store adoption +- `prism.rewrite`: canonical rewrite derivation and explain artifacts +- `prism.lower`: lowering from rewritten views into Substrait +""" + +from rust::substrait::proto import Plan, Rel +from prism.lower import lower_prism_tip as lower_prism_tip_impl, prism_rel_to_plan +from prism.rewrite import ( + derive_optimized_view as derive_optimized_view_impl, + derive_rewritten_view as derive_rewritten_view_impl, + optimized_origin_for as optimized_origin_for_impl, + rewrite_explain, + rewritten_node_at, +) +from prism.store import ( + adopt_cursor_subgraph, + allocate_prism_store_id, + append_node, + node_at, + reachable_node_ids, + store_nodes, +) +from prism.types import ( + authored_node_kind_name, + PrismNode, + PrismNodeKind, + PrismOptimizedView, + PrismRewriteExplain, + PrismStoreAdoption, + PrismStoreId, +) + +pub class PrismCursor[T with Clone]: + """ + Immutable handle over one Prism-authored query lineage. + + A cursor carries the schema marker, owning store id, and current authored tip. Deriving a new cursor never copies + prior authored state; Prism appends one node to the shared store and returns a new handle pointing at the new tip. + """ + + pub row_schema_marker: T + pub store_id: PrismStoreId + pub tip_id: int + + def clone(self) -> Self: + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=self.tip_id) + + def filter(self, predicate: bool) -> Self: + """Append one filter node in the shared authored store and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.Filter, [self.tip_id], str(""), predicate, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def join(self, other: Self, predicate: bool) -> Self: + """Append one join against another cursor, adopting foreign authored state only when stores differ.""" + if self.store_id.0 == other.store_id.0: + next_tip_id = append_node(self.store_id, PrismNodeKind.Join, [self.tip_id, other.tip_id], str(""), predicate, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + adoption = adopt_cursor_subgraph(self.store_id, other.store_id, other.tip_id) + next_tip_id = append_node(self.store_id, PrismNodeKind.Join, [self.tip_id, adoption.adopted_tip_id], str(""), predicate, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def select(self) -> Self: + """Append one projection node and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.Project, [self.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def group_by(self) -> Self: + """Append one grouping node and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.GroupBy, [self.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def agg(self) -> Self: + """Append one aggregate node and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.Aggregate, [self.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def order_by(self) -> Self: + """Append one ordering node and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.OrderBy, [self.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def limit(self, n: int) -> Self: + """Append one row-limit node and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.Limit, [self.tip_id], str(""), false, n) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def explode(self) -> Self: + """Append one explode node and return the derived tip.""" + next_tip_id = append_node(self.store_id, PrismNodeKind.Explode, [self.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=self.row_schema_marker.clone(), store_id=self.store_id, tip_id=next_tip_id) + + def derive_optimized_view(self) -> PrismOptimizedView: + """Expose the identity-shaped authored view for internal inspection and tests.""" + return derive_optimized_view(self.store_id, self.tip_id) + + def lower_to_rel(self) -> Rel: + """Lower the current tip through canonical rewrites to one Substrait root relation.""" + return lower_prism_tip(self.store_id, self.tip_id) + + def to_substrait_plan(self) -> Plan: + """Lower the current tip all the way to one Substrait plan.""" + return prism_rel_to_plan(self.lower_to_rel()) + + +pub def prism_cursor_named_table[T with Clone](row_schema_marker: T, table_name: str) -> PrismCursor[T]: + """Create the minimal shared-store cursor rooted at one named-table read.""" + store_id = allocate_prism_store_id() + tip_id = append_node(store_id, PrismNodeKind.ReadNamedTable, [], table_name, false, 0) + return PrismCursor(row_schema_marker=row_schema_marker, store_id=store_id, tip_id=tip_id) + + +pub def prism_cursor_apply_filter[T with Clone](cursor: PrismCursor[T], predicate: bool) -> PrismCursor[T]: + """Apply dataset-level filter intent through Prism's shared authored store.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.Filter, [cursor.tip_id], str(""), predicate, 0) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_join[T with Clone](left: PrismCursor[T], right: PrismCursor[T], predicate: bool) -> PrismCursor[T]: + """Apply dataset-level join intent while preserving same-store cheapness and cross-store correctness.""" + if left.store_id.0 == right.store_id.0: + next_tip_id = append_node(left.store_id, PrismNodeKind.Join, [left.tip_id, right.tip_id], str(""), predicate, 0) + return PrismCursor(row_schema_marker=left.row_schema_marker.clone(), store_id=left.store_id, tip_id=next_tip_id) + adoption = adopt_cursor_subgraph(left.store_id, right.store_id, right.tip_id) + next_tip_id = append_node(left.store_id, PrismNodeKind.Join, [left.tip_id, adoption.adopted_tip_id], str(""), predicate, 0) + return PrismCursor(row_schema_marker=left.row_schema_marker.clone(), store_id=left.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_select[T with Clone](cursor: PrismCursor[T]) -> PrismCursor[T]: + """Apply dataset-level projection intent through Prism.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.Project, [cursor.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_group_by[T with Clone](cursor: PrismCursor[T]) -> PrismCursor[T]: + """Apply dataset-level grouping intent through Prism.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.GroupBy, [cursor.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_agg[T with Clone](cursor: PrismCursor[T]) -> PrismCursor[T]: + """Apply dataset-level aggregation intent through Prism.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.Aggregate, [cursor.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_order_by[T with Clone](cursor: PrismCursor[T]) -> PrismCursor[T]: + """Apply dataset-level ordering intent through Prism.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.OrderBy, [cursor.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_limit[T with Clone](cursor: PrismCursor[T], n: int) -> PrismCursor[T]: + """Apply dataset-level row-limit intent through Prism.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.Limit, [cursor.tip_id], str(""), false, n) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def prism_cursor_apply_explode[T with Clone](cursor: PrismCursor[T]) -> PrismCursor[T]: + """Apply dataset-level explode intent through Prism.""" + next_tip_id = append_node(cursor.store_id, PrismNodeKind.Explode, [cursor.tip_id], str(""), false, 0) + return PrismCursor(row_schema_marker=cursor.row_schema_marker.clone(), store_id=cursor.store_id, tip_id=next_tip_id) + + +pub def lower_prism_tip(store_id: PrismStoreId, tip_id: int) -> Rel: + """Lower one authored Prism tip through canonical rewrite view to the `Rel` boundary.""" + return lower_prism_tip_impl(store_id, tip_id) + + +pub def derive_optimized_view(store_id: PrismStoreId, tip_id: int) -> PrismOptimizedView: + """Derive the current identity-shaped view for one authored tip.""" + return derive_optimized_view_impl(store_id, tip_id) + + +pub def optimized_origin_for(view: PrismOptimizedView, optimized_node_id: int) -> int: + return optimized_origin_for_impl(view, optimized_node_id) + + +pub def derive_rewritten_view(store_id: PrismStoreId, tip_id: int) -> PrismOptimizedView: + """Derive the canonical rewritten lowering view for one authored tip.""" + return derive_rewritten_view_impl(store_id, tip_id) + + +pub def prism_cursor_rewrite_explain[T with Clone](cursor: PrismCursor[T]) -> PrismRewriteExplain: + """Return narrow explain metadata for the cursor's current canonical rewrite pass.""" + return rewrite_explain(cursor.store_id, cursor.tip_id) + + +pub def prism_cursor_authored_node_count[T with Clone](cursor: PrismCursor[T]) -> int: + """Count authored nodes reachable from the cursor tip.""" + return len(reachable_node_ids(cursor.store_id, cursor.tip_id)) + + +pub def prism_cursor_rewritten_node_count[T with Clone](cursor: PrismCursor[T]) -> int: + return prism_cursor_rewrite_explain(cursor).rewritten_node_count + + +pub def prism_cursor_store_node_count[T with Clone](cursor: PrismCursor[T]) -> int: + """Count all authored nodes currently stored in the cursor's store.""" + return len(store_nodes(cursor.store_id)) + + +pub def prism_cursor_store_id_value[T with Clone](cursor: PrismCursor[T]) -> int: + """Expose the raw store-id value for narrow invariant tests.""" + return cursor.store_id.0 + + +pub def prism_cursor_tip_kind_name[T with Clone](cursor: PrismCursor[T]) -> str: + """Return the authored node-kind name at the current cursor tip.""" + return authored_node_kind_name(node_at(cursor.store_id, cursor.tip_id)) + + +pub def prism_cursor_tip_origin_id[T with Clone](cursor: PrismCursor[T]) -> int: + """Return the authored origin id of the tip in the identity-shaped optimized view.""" + view = derive_optimized_view(cursor.store_id, cursor.tip_id) + return optimized_origin_for(view, view.root_tip_id) + + +pub def prism_cursor_rewritten_tip_kind_name[T with Clone](cursor: PrismCursor[T]) -> str: + """Return the rewritten node-kind name at the lowered tip after canonical rewrites.""" + view = derive_rewritten_view(cursor.store_id, cursor.tip_id) + return authored_node_kind_name(rewritten_node_at(view, view.root_tip_id)) + + +pub def prism_cursor_rewrite_applied_rule_count[T with Clone](cursor: PrismCursor[T]) -> int: + return len(prism_cursor_rewrite_explain(cursor).applied_rule_names) + + +pub def prism_cursor_rewrite_applied_rule_name_at[T with Clone](cursor: PrismCursor[T], index: int) -> str: + return prism_cursor_rewrite_explain(cursor).applied_rule_names[index] + + +pub def prism_cursor_rewritten_origin_count[T with Clone](cursor: PrismCursor[T]) -> int: + return len(prism_cursor_rewrite_explain(cursor).rewritten_to_authored_origin_ids) + + +pub def prism_cursors_share_store[T with Clone](left: PrismCursor[T], right: PrismCursor[T]) -> bool: + """Return whether two cursors share the same authored store backing.""" + return left.store_id.0 == right.store_id.0 diff --git a/src/prism/rewrite.incn b/src/prism/rewrite.incn new file mode 100644 index 0000000..c6e9849 --- /dev/null +++ b/src/prism/rewrite.incn @@ -0,0 +1,198 @@ +"""Prism optimized-view derivation, canonical rewrite rules, and explain artifacts.""" + +from prism.types import PrismNode, PrismNodeKind, PrismOptimizedView, PrismRewriteExplain, PrismStoreId +from prism.store import node_at, reachable_node_ids, remap_input_ids, store_nodes + +model PrismRewriteResult: + view: PrismOptimizedView + applied_rule_names: list[str] + + +pub def derive_optimized_view(store_id: PrismStoreId, tip_id: int) -> PrismOptimizedView: + """ + Derive the identity-shaped lowering view for one authored tip. + + This reflects authored structure exactly and serves as the input to canonical rewrites. + """ + authored_store_nodes = store_nodes(store_id) + mut origin_node_ids: list[int] = [] + for node in authored_store_nodes: + origin_node_ids.append(node.node_id) + return PrismOptimizedView(root_tip_id=tip_id, nodes=_clone_nodes(authored_store_nodes), origin_node_ids=origin_node_ids) + + +pub def optimized_origin_for(view: PrismOptimizedView, optimized_node_id: int) -> int: + """Return the authored origin id for one node in a rewritten view.""" + return view.origin_node_ids[optimized_node_id] + + +pub def derive_rewritten_view(store_id: PrismStoreId, tip_id: int) -> PrismOptimizedView: + """ + Derive the canonical lowering view for one authored tip. + + Rewrites are deliberately narrow and semantics-preserving. They simplify obvious redundant unary structure without + introducing cost-based behavior or store mutation. + """ + return _derive_rewrite_result(store_id, tip_id).view + + +pub def rewrite_explain(store_id: PrismStoreId, tip_id: int) -> PrismRewriteExplain: + """Return narrow explain metadata for the current canonical rewrite pass.""" + result = _derive_rewrite_result(store_id, tip_id) + return PrismRewriteExplain(authored_node_count=len(reachable_node_ids(store_id, tip_id)), rewritten_node_count=len(result.view.nodes), applied_rule_names=result.applied_rule_names, rewritten_to_authored_origin_ids=result.view.origin_node_ids) + + +pub def rewritten_node_at(view: PrismOptimizedView, node_id: int) -> PrismNode: + """Return one node from a rewritten view by node id.""" + return view.nodes[node_id] + + +def _derive_rewrite_result(store_id: PrismStoreId, tip_id: int) -> PrismRewriteResult: + authored_view = derive_optimized_view(store_id, tip_id) + authored_nodes = authored_view.nodes + authored_reachable_ids = reachable_node_ids(store_id, tip_id) + mut authored_to_rewritten_id: list[int] = [] + for _node in authored_nodes: + authored_to_rewritten_id.append(-1) + mut rewritten_nodes = _empty_prism_nodes() + mut rewritten_origin_ids = _empty_ints() + mut applied_rule_names = _empty_strings() + # Walk authored nodes in dependency order, remapping each node into the rewritten view. + for authored_node_id in authored_reachable_ids: + authored_node = authored_nodes[authored_node_id] + remapped_inputs = remap_input_ids(authored_node.input_ids, authored_to_rewritten_id) + # Rules that erase a node map the authored node directly onto an existing rewritten input. + if _can_eliminate_filter_true(authored_node): + authored_to_rewritten_id[authored_node_id] = remapped_inputs[0] + applied_rule_names.append(str("eliminate_filter_true")) + continue + # Rules that replace a node append one rewritten node with a simplified shape. + if _can_collapse_adjacent_limit(authored_node, remapped_inputs, rewritten_nodes): + collapsed_id = len(rewritten_nodes) + rewritten_nodes.append(_build_collapsed_limit_node(authored_node, remapped_inputs, rewritten_nodes, collapsed_id)) + rewritten_origin_ids.append(authored_node_id) + authored_to_rewritten_id[authored_node_id] = collapsed_id + applied_rule_names.append(str("collapse_adjacent_limit")) + continue + # Rules that collapse onto an equivalent rewritten parent reuse the parent node id. + if _can_collapse_adjacent_project(authored_node, remapped_inputs, rewritten_nodes): + authored_to_rewritten_id[authored_node_id] = remapped_inputs[0] + applied_rule_names.append(str("collapse_adjacent_project")) + continue + if _can_collapse_adjacent_order_by(authored_node, remapped_inputs, rewritten_nodes): + authored_to_rewritten_id[authored_node_id] = remapped_inputs[0] + applied_rule_names.append(str("collapse_adjacent_order_by")) + continue + rewritten_id = len(rewritten_nodes) + rewritten_nodes.append(_build_rewritten_node(authored_node, remapped_inputs, rewritten_id)) + rewritten_origin_ids.append(authored_node_id) + authored_to_rewritten_id[authored_node_id] = rewritten_id + compacted_view = _compact_optimized_view(PrismOptimizedView(root_tip_id=authored_to_rewritten_id[tip_id], nodes=rewritten_nodes, origin_node_ids=rewritten_origin_ids)) + return PrismRewriteResult(view=compacted_view, applied_rule_names=applied_rule_names) + + +def _can_eliminate_filter_true(node: PrismNode) -> bool: + return node.kind == PrismNodeKind.Filter and node.predicate + + +def _can_collapse_adjacent_limit(node: PrismNode, remapped_inputs: list[int], rewritten_nodes: list[PrismNode]) -> bool: + if node.kind != PrismNodeKind.Limit: + return false + return rewritten_nodes[remapped_inputs[0]].kind == PrismNodeKind.Limit + + +def _build_collapsed_limit_node(node: PrismNode, remapped_inputs: list[int], rewritten_nodes: list[PrismNode], rewritten_id: int) -> PrismNode: + limit_input = rewritten_nodes[remapped_inputs[0]] + return PrismNode(node_id=rewritten_id, kind=PrismNodeKind.Limit, input_ids=[limit_input.input_ids[0]], named_table=str(""), predicate=false, limit_count=_min_int(limit_input.limit_count, node.limit_count)) + + +def _can_collapse_adjacent_project(node: PrismNode, remapped_inputs: list[int], rewritten_nodes: list[PrismNode]) -> bool: + if node.kind != PrismNodeKind.Project: + return false + return rewritten_nodes[remapped_inputs[0]].kind == PrismNodeKind.Project + + +def _can_collapse_adjacent_order_by(node: PrismNode, remapped_inputs: list[int], rewritten_nodes: list[PrismNode]) -> bool: + if node.kind != PrismNodeKind.OrderBy: + return false + return rewritten_nodes[remapped_inputs[0]].kind == PrismNodeKind.OrderBy + + +def _build_rewritten_node(node: PrismNode, remapped_inputs: list[int], rewritten_id: int) -> PrismNode: + return PrismNode(node_id=rewritten_id, kind=node.kind, input_ids=remapped_inputs, named_table=node.named_table, predicate=node.predicate, limit_count=node.limit_count) + + +def _min_int(left: int, right: int) -> int: + if left < right: + return left + return right + + +def _empty_prism_nodes() -> list[PrismNode]: + mut nodes: list[PrismNode] = [] + return nodes + + +def _empty_ints() -> list[int]: + mut values: list[int] = [] + return values + + +def _empty_strings() -> list[str]: + mut values: list[str] = [] + return values + + +def _compact_optimized_view(view: PrismOptimizedView) -> PrismOptimizedView: + """ + Re-pack a rewritten view into a dense node-id space. + + Rewrites can leave gaps when authored nodes collapse onto earlier rewritten nodes. Compaction preserves semantics + while producing a clean lowering view and one-to-one origin list. + """ + reachable_ids = _reachable_view_node_ids(view) + mut old_to_new_ids: list[int] = [] + for _node in view.nodes: + old_to_new_ids.append(-1) + mut compacted_nodes = _empty_prism_nodes() + mut compacted_origins = _empty_ints() + for old_node_id in reachable_ids: + old_node = view.nodes[old_node_id] + mut remapped_inputs = _empty_ints() + for input_id in old_node.input_ids: + remapped_inputs.append(old_to_new_ids[input_id]) + new_node_id = len(compacted_nodes) + compacted_nodes.append(PrismNode(node_id=new_node_id, kind=old_node.kind, input_ids=remapped_inputs, named_table=old_node.named_table, predicate=old_node.predicate, limit_count=old_node.limit_count)) + compacted_origins.append(view.origin_node_ids[old_node_id]) + old_to_new_ids[old_node_id] = new_node_id + return PrismOptimizedView(root_tip_id=old_to_new_ids[view.root_tip_id], nodes=compacted_nodes, origin_node_ids=compacted_origins) + + +def _reachable_view_node_ids(view: PrismOptimizedView) -> list[int]: + """Return rewritten-view node ids reachable from the root in view order.""" + mut visited: list[bool] = [] + for _node in view.nodes: + visited.append(false) + mut pending: list[int] = [view.root_tip_id] + mut cursor = 0 + while cursor < len(pending): + node_id = pending[cursor] + cursor = cursor + 1 + if visited[node_id]: + continue + visited[node_id] = true + node = view.nodes[node_id] + for input_id in node.input_ids: + pending.append(input_id) + mut ordered = _empty_ints() + for node in view.nodes: + if visited[node.node_id]: + ordered.append(node.node_id) + return ordered + + +def _clone_nodes(nodes: list[PrismNode]) -> list[PrismNode]: + mut cloned: list[PrismNode] = [] + for node in nodes: + cloned.append(node) + return cloned diff --git a/src/prism/store.incn b/src/prism/store.incn new file mode 100644 index 0000000..641f7d9 --- /dev/null +++ b/src/prism/store.incn @@ -0,0 +1,132 @@ +"""Append-only Prism store allocation, storage, reachability, and cross-store adoption.""" + +from prism.types import PrismNode, PrismNodeKind, PrismStoreAdoption, PrismStoreId + +model PrismStoredNode: + store_id_raw: int + node: PrismNode + + +static allocated_prism_stores: list[bool] = [] +static prism_stored_nodes: list[PrismStoredNode] = [] + + +pub def allocate_prism_store_id() -> PrismStoreId: + """Allocate one monotonic module-owned Prism store id.""" + next_id = len(allocated_prism_stores) + allocated_prism_stores.append(true) + return PrismStoreId(next_id) + + +pub def append_node(store_id: PrismStoreId, kind: PrismNodeKind, input_ids: list[int], named_table: str, predicate: bool, limit_count: int) -> int: + """ + Append one immutable node to the target store. + + Node ids are store-local and monotonic. Appending never mutates earlier nodes. + """ + next_id = len(store_nodes(store_id)) + prism_stored_nodes.append(PrismStoredNode(store_id_raw=store_id.0, node=PrismNode(node_id=next_id, kind=kind, input_ids=input_ids, named_table=named_table, predicate=predicate, limit_count=limit_count))) + return next_id + + +pub def adopt_cursor_subgraph(target_store_id: PrismStoreId, source_store_id: PrismStoreId, tip_id: int) -> PrismStoreAdoption: + """ + Copy only reachable source nodes into the target store. + + Adoption dedups equivalent nodes within the current adoption pass so independently repeated RHS branches do not + duplicate authored state in the target store. + """ + mut id_map: list[int] = [] + mut adopted_node_ids: list[int] = [] + + # Seed the source->target id map with "unmapped" sentinels for every source-store node id. + for _node in store_nodes(source_store_id): + id_map.append(-1) + reachable_node_ids = reachable_node_ids(source_store_id, tip_id) + mut adopted_node_count = 0 + for source_node_id in reachable_node_ids: + source_node = node_at(source_store_id, source_node_id) + + # Inputs are remapped first so structural equality is evaluated in the target store's id space. + remapped_input_ids = remap_input_ids(source_node.input_ids, id_map) + + # Dedup is intentionally local to this adoption pass. We do not attempt global memoization across stores yet. + reused_adopted_id = _find_existing_adopted_node_id(target_store_id, source_node, remapped_input_ids, adopted_node_ids) + if reused_adopted_id >= 0: + id_map[source_node_id] = reused_adopted_id + else: + adopted_id = append_node(target_store_id, source_node.kind, remapped_input_ids, source_node.named_table, source_node.predicate, source_node.limit_count) + id_map[source_node_id] = adopted_id + adopted_node_ids.append(adopted_id) + adopted_node_count = adopted_node_count + 1 + # The source tip is guaranteed reachable, so its remapped target-store id is the adopted tip for the caller. + return PrismStoreAdoption(adopted_tip_id=id_map[tip_id], adopted_node_count=adopted_node_count) + + +pub def store_nodes(store_id: PrismStoreId) -> list[PrismNode]: + """Materialize the current authored nodes for one store in node-id order.""" + mut nodes: list[PrismNode] = [] + for stored in prism_stored_nodes: + if stored.store_id_raw == store_id.0: + nodes.append(stored.node) + return nodes + + +pub def reachable_node_ids(store_id: PrismStoreId, tip_id: int) -> list[int]: + """Return reachable authored node ids in store order for one tip.""" + store_nodes_for_id = store_nodes(store_id) + mut visited: list[bool] = [] + for _node in store_nodes_for_id: + visited.append(false) + mut pending: list[int] = [tip_id] + mut cursor = 0 + # Walk backward through inputs from the tip so later lowering/rewrite steps only see the live authored slice. + while cursor < len(pending): + node_id = pending[cursor] + cursor = cursor + 1 + if visited[node_id]: + continue + visited[node_id] = true + node = node_at(store_id, node_id) + for input_id in node.input_ids: + pending.append(input_id) + mut ordered: list[int] = [] + for node in store_nodes_for_id: + if visited[node.node_id]: + ordered.append(node.node_id) + return ordered + + +pub def node_at(store_id: PrismStoreId, node_id: int) -> PrismNode: + """Return one authored node by store-local id.""" + store_nodes_for_id = store_nodes(store_id) + return store_nodes_for_id[node_id] + + +pub def remap_input_ids(input_ids: list[int], id_map: list[int]) -> list[int]: + """Remap one input-id list through a node-id translation table.""" + mut remapped: list[int] = [] + for input_id in input_ids: + remapped.append(id_map[input_id]) + return remapped + + +def _find_existing_adopted_node_id(target_store_id: PrismStoreId, source_node: PrismNode, remapped_input_ids: list[int], adopted_node_ids: list[int]) -> int: + # Only scan nodes appended during this adoption pass; older target-store nodes are out of scope for this slice. + for adopted_node_id in adopted_node_ids: + candidate = node_at(target_store_id, adopted_node_id) + if _nodes_structurally_equal(candidate, source_node, remapped_input_ids): + return adopted_node_id + return -1 + + +def _nodes_structurally_equal(candidate: PrismNode, source_node: PrismNode, remapped_input_ids: list[int]) -> bool: + if candidate.kind != source_node.kind: + return false + if candidate.named_table != source_node.named_table: + return false + if candidate.predicate != source_node.predicate: + return false + if candidate.limit_count != source_node.limit_count: + return false + return candidate.input_ids == remapped_input_ids diff --git a/src/prism/types.incn b/src/prism/types.incn new file mode 100644 index 0000000..cbfe684 --- /dev/null +++ b/src/prism/types.incn @@ -0,0 +1,89 @@ +"""Shared Prism types that define the internal planning substrate contract.""" + +pub type PrismStoreId = newtype int + + +@derive(Clone) +pub enum PrismNodeKind: + """Logical node kinds Prism currently supports in authored and rewritten views.""" + + ReadNamedTable + Filter + Join + Project + GroupBy + Aggregate + OrderBy + Limit + Explode + + +@derive(Clone) +pub model PrismNode: + """ + One immutable Prism logical node. + + The same node shape is reused for append-only authored storage and rewritten lowering views so lineage can stay + simple in this slice. + """ + + pub node_id: int + pub kind: PrismNodeKind + pub input_ids: list[int] + pub named_table: str + pub predicate: bool + pub limit_count: int + + +@derive(Clone) +pub model PrismOptimizedView: + """ + Lowering-ready Prism view rooted at one tip. + + `origin_node_ids` preserves the mapping back to authored node ids so rewrites stay explainable in tests and + diagnostics. + """ + + pub root_tip_id: int + pub nodes: list[PrismNode] + pub origin_node_ids: list[int] + + +pub model PrismStoreAdoption: + """Summary of one cross-store adoption pass into a target Prism store.""" + + pub adopted_tip_id: int + pub adopted_node_count: int + + +pub model PrismRewriteExplain: + """Compact explain artifact for canonical rewrite results.""" + + pub authored_node_count: int + pub rewritten_node_count: int + pub applied_rule_names: list[str] + pub rewritten_to_authored_origin_ids: list[int] + + +pub def authored_node_kind_name(node: PrismNode) -> str: + """Render one stable human-readable node-kind name for tests and diagnostics.""" + match node.kind: + PrismNodeKind.ReadNamedTable => + return str("ReadNamedTable") + PrismNodeKind.Filter => + return str("Filter") + PrismNodeKind.Join => + return str("Join") + PrismNodeKind.Project => + return str("Project") + PrismNodeKind.GroupBy => + return str("GroupBy") + PrismNodeKind.Aggregate => + return str("Aggregate") + PrismNodeKind.OrderBy => + return str("OrderBy") + PrismNodeKind.Limit => + return str("Limit") + PrismNodeKind.Explode => + return str("Explode") + diff --git a/src/substrait/conformance.incn b/src/substrait/conformance.incn index d06503f..144a695 100644 --- a/src/substrait/conformance.incn +++ b/src/substrait/conformance.incn @@ -102,6 +102,14 @@ def _references(paths: list[str]) -> ConformanceReferences: return ConformanceReferences(paths) +def _capability_tags_contain(tags: ConformanceCapabilityTags, expected: str) -> bool: + """Check tag membership without relying on container lowering details.""" + for tag in tags.0: + if tag == expected: + return true + return false + + def _refs_read_root_binding() -> ConformanceReferences: return _references([str("docs/rfcs/002_apache_substrait_integration.md"), str("docs/language/reference/substrait/operator_catalog.md"), str("docs/language/reference/substrait/read_root_binding_contract.md")]) @@ -135,32 +143,33 @@ def _core_scenario(scenario_id: str, title: str, capability_tags: ConformanceCap pub def core_scenario(key: CoreScenarioKey) -> SubstraitConformanceScenario: """Lookup one core conformance scenario by enum key.""" match key: - CoreScenarioKey.ReadNamedTable => + CoreScenarioKey.ReadNamedTable => return _core_scenario(scenario_id=str("inql.substrait.core.read_named_table.001"), title=str("ReadRel named table without secret material"), capability_tags=_capability_tags([str("read"), str("named-table")]), required_rels=[ConformanceRel.Read], intent=str("Logical read from a registered table name without embedding execution secrets."), required_rel_shape=str("ReadRel(NamedTable) with model-compatible schema."), expected_constraints=str("No secrets or resolved execution-bound endpoint material in normative plan payload."), references=_refs_read_root_binding()) - CoreScenarioKey.ReadLocalFiles => + CoreScenarioKey.ReadLocalFiles => return _core_scenario(scenario_id=str("inql.substrait.core.read_local_files.001"), title=str("ReadRel local files with portable format fields"), capability_tags=_capability_tags([str("read"), str("local-files")]), required_rels=[ConformanceRel.Read], intent=str("Logical scan of file-backed sources with pinned-revision format descriptors."), required_rel_shape=str("ReadRel(LocalFiles) with supported file format fields."), expected_constraints=str("No credential or session-state material in normative plan payload."), references=_refs_read_root_binding()) - CoreScenarioKey.ReadVirtualTable => + CoreScenarioKey.ReadVirtualTable => return _core_scenario(scenario_id=str("inql.substrait.core.read_virtual_table.001"), title=str("ReadRel virtual table with schema-aligned rows"), capability_tags=_capability_tags([str("read"), str("virtual-table"), str("literal-rows")]), required_rels=[ConformanceRel.Read], intent=str("Inline row materialization through VirtualTable payload."), required_rel_shape=str("ReadRel(VirtualTable) with schema-consistent embedded rows."), expected_constraints=str("No external binding required to interpret inline rows."), references=_refs_read_root_binding()) - CoreScenarioKey.FilterRows => + CoreScenarioKey.FilterRows => return _core_scenario(scenario_id=str("inql.substrait.core.filter_rows.001"), title=str("FilterRel boolean row predicate"), capability_tags=_capability_tags([str("filter"), str("predicate")]), required_rels=[ConformanceRel.Filter], intent=str("Boolean predicate filtering over relation input."), required_rel_shape=str("FilterRel(predicate) over one child relation."), expected_constraints=str("Boundary coverage is relation-shape only; richer query-surface parity is deferred."), references=_refs_core_operator()) - CoreScenarioKey.ProjectComputedColumns => + CoreScenarioKey.ProjectComputedColumns => return _core_scenario(scenario_id=str("inql.substrait.core.project_computed_columns.001"), title=str("ProjectRel boundary scaffold"), capability_tags=_capability_tags([str("project"), str("shape-scaffold")]), required_rels=[ConformanceRel.Project], intent=str("Boundary-level ProjectRel envelope for package-authored plans."), required_rel_shape=str("ProjectRel(expressions) over one child relation."), expected_constraints=str("Current package code verifies shape only; computed-column and window semantics are deferred."), references=_refs_core_operator()) - CoreScenarioKey.JoinRelVariants => + CoreScenarioKey.JoinRelVariants => return _core_scenario(scenario_id=str("inql.substrait.core.join_rel_variants.001"), title=str("JoinRel variant emission boundary"), capability_tags=_capability_tags([str("join"), str("inner"), str("left"), str("semi"), str("anti"), str("single"), str("mark")]), required_rels=[ConformanceRel.Join], intent=str("Explicit JoinRel variant emission over two child inputs."), required_rel_shape=str("JoinRel(join_type, expression) with optional post_join_filter when needed."), expected_constraints=str("Variant selection is owned by the boundary helper; broader planning semantics are deferred."), references=_refs_core_operator()) - CoreScenarioKey.CrossRelCartesian => + CoreScenarioKey.CrossRelCartesian => return _core_scenario(scenario_id=str("inql.substrait.core.cross_rel_cartesian.001"), title=str("CrossRel cartesian product semantics"), capability_tags=_capability_tags([str("cross"), str("cartesian-product")]), required_rels=[ConformanceRel.Cross], intent=str("Cartesian multiplication of rows across two inputs."), required_rel_shape=str("CrossRel(left, right) without predicate fields."), expected_constraints=str("Row cardinality follows cartesian product semantics."), references=_refs_core_operator()) - CoreScenarioKey.AggregateGroupingSets => + CoreScenarioKey.AggregateGroupingSets => return _core_scenario(scenario_id=str("inql.substrait.core.aggregate_grouping_sets.001"), title=str("AggregateRel boundary scaffold"), capability_tags=_capability_tags([str("aggregate"), str("shape-scaffold")]), required_rels=[ConformanceRel.Aggregate], intent=str("Boundary-level AggregateRel envelope for package-authored plans."), required_rel_shape=str("AggregateRel(groupings, measures) over child relation."), expected_constraints=str("Current package code verifies scaffold shape only; grouping-set and distinct semantics are deferred."), references=_refs_core_operator()) - CoreScenarioKey.SortRelOrdering => + CoreScenarioKey.SortRelOrdering => return _core_scenario(scenario_id=str("inql.substrait.core.sort_rel_ordering.001"), title=str("SortRel deterministic ordering contract"), capability_tags=_capability_tags([str("sort"), str("order-by"), str("collation")]), required_rels=[ConformanceRel.Sort], intent=str("Deterministic collation semantics for ordered result sets."), required_rel_shape=str("SortRel(collation_fields) over child relation."), expected_constraints=str("Ordering behavior follows declared collation and null-order settings."), references=_refs_core_operator()) - CoreScenarioKey.FetchRelLimitOffset => + CoreScenarioKey.FetchRelLimitOffset => return _core_scenario(scenario_id=str("inql.substrait.core.fetch_rel_limit_offset.001"), title=str("FetchRel offset and count windowing"), capability_tags=_capability_tags([str("fetch"), str("limit"), str("offset")]), required_rels=[ConformanceRel.Fetch], intent=str("Top-N and offset-based windowing over ordered or unordered relation output."), required_rel_shape=str("FetchRel(offset,count) over child relation."), expected_constraints=str("Fetch changes row window only; it does not alter row values or schema."), references=_refs_core_operator()) - CoreScenarioKey.SetRelOperations => + CoreScenarioKey.SetRelOperations => return _core_scenario(scenario_id=str("inql.substrait.core.set_rel_operations.001"), title=str("SetRel operation emission boundary"), capability_tags=_capability_tags([str("set"), str("union"), str("intersect"), str("except")]), required_rels=[ConformanceRel.Set], intent=str("Explicit SetRel operation emission for schema-compatible inputs."), required_rel_shape=str("SetRel(operation, inputs) with compatible schemas."), expected_constraints=str("Operation selection is owned by the boundary helper; richer planning semantics are deferred."), references=_refs_core_operator()) - CoreScenarioKey.ReferenceRelSharedSubplan => + CoreScenarioKey.ReferenceRelSharedSubplan => return _core_scenario(scenario_id=str("inql.substrait.core.reference_rel_shared_subplan.001"), title=str("ReferenceRel ordinal emission boundary"), capability_tags=_capability_tags([str("reference"), str("shared-subplan"), str("plan-dag")]), required_rels=[ConformanceRel.Reference], intent=str("Boundary-level ReferenceRel ordinal emission."), required_rel_shape=str("ReferenceRel(subtree_ordinal) inside one plan envelope."), expected_constraints=str("Current package code verifies ordinal preservation only; shared-subplan planning is deferred."), references=_refs_core_operator()) + pub def core_scenarios() -> list[SubstraitConformanceScenario]: """Return all currently defined read/query core scenarios.""" return [core_scenario(CoreScenarioKey.ReadNamedTable), core_scenario(CoreScenarioKey.ReadLocalFiles), core_scenario(CoreScenarioKey.ReadVirtualTable), core_scenario(CoreScenarioKey.FilterRows), core_scenario(CoreScenarioKey.ProjectComputedColumns), core_scenario(CoreScenarioKey.JoinRelVariants), core_scenario(CoreScenarioKey.CrossRelCartesian), core_scenario(CoreScenarioKey.AggregateGroupingSets), core_scenario(CoreScenarioKey.SortRelOrdering), core_scenario(CoreScenarioKey.FetchRelLimitOffset), core_scenario(CoreScenarioKey.SetRelOperations), core_scenario(CoreScenarioKey.ReferenceRelSharedSubplan)] @@ -168,20 +177,32 @@ pub def core_scenarios() -> list[SubstraitConformanceScenario]: pub def core_scenario_plan(key: CoreScenarioKey) -> Plan: """Build a deterministic proto-backed plan for a core scenario key.""" - # TODO(#229): most of these root-name lists can become `[]` again once empty list literals lower with type context. match key: - CoreScenarioKey.ReadNamedTable => return plan_from_named_table(str("orders")) - CoreScenarioKey.ReadLocalFiles => return plan_from_local_files(str("file:///tmp/orders.parquet")) - CoreScenarioKey.ReadVirtualTable => return plan_from_virtual_table(str("inline_orders")) - CoreScenarioKey.FilterRows => return plan_from_root_relation(filter_rel(read_named_table_rel(str("orders")), true), [str("id")]) - CoreScenarioKey.ProjectComputedColumns => return plan_from_root_relation(project_rel(read_named_table_rel(str("orders"))), [str("id")]) - CoreScenarioKey.JoinRelVariants => return plan_from_root_relation(join_rel_of_kind(read_named_table_rel(str("orders")), read_named_table_rel(str("customers")), true, SubstraitJoinKind.Left), [str("id")]) - CoreScenarioKey.CrossRelCartesian => return plan_from_root_relation(cross_rel(read_named_table_rel(str("left_source")), read_named_table_rel(str("right_source"))), [str("id")]) - CoreScenarioKey.AggregateGroupingSets => return plan_from_root_relation(aggregate_rel(read_named_table_rel(str("orders")), true), [str("id")]) - CoreScenarioKey.SortRelOrdering => return plan_from_root_relation(sort_rel(read_named_table_rel(str("orders"))), [str("id")]) - CoreScenarioKey.FetchRelLimitOffset => return plan_from_root_relation(fetch_rel(read_named_table_rel(str("orders")), 10, 25), [str("id")]) - CoreScenarioKey.SetRelOperations => return plan_from_root_relation(set_rel_of_kind(read_named_table_rel(str("orders_current")), read_named_table_rel(str("orders_archive")), SubstraitSetOperation.UnionDistinct), [str("id")]) - CoreScenarioKey.ReferenceRelSharedSubplan => return plan_from_root_relation(reference_rel(7), [str("id")]) + CoreScenarioKey.ReadNamedTable => + return plan_from_named_table(str("orders")) + CoreScenarioKey.ReadLocalFiles => + return plan_from_local_files(str("file:///tmp/orders.parquet")) + CoreScenarioKey.ReadVirtualTable => + return plan_from_virtual_table(str("inline_orders")) + CoreScenarioKey.FilterRows => + return plan_from_root_relation(filter_rel(read_named_table_rel(str("orders")), true), [str("id")]) + CoreScenarioKey.ProjectComputedColumns => + return plan_from_root_relation(project_rel(read_named_table_rel(str("orders"))), [str("id")]) + CoreScenarioKey.JoinRelVariants => + return plan_from_root_relation(join_rel_of_kind(read_named_table_rel(str("orders")), read_named_table_rel(str("customers")), true, SubstraitJoinKind.Left), [str("id")]) + CoreScenarioKey.CrossRelCartesian => + return plan_from_root_relation(cross_rel(read_named_table_rel(str("left_source")), read_named_table_rel(str("right_source"))), [str("id")]) + CoreScenarioKey.AggregateGroupingSets => + return plan_from_root_relation(aggregate_rel(read_named_table_rel(str("orders")), true), [str("id")]) + CoreScenarioKey.SortRelOrdering => + return plan_from_root_relation(sort_rel(read_named_table_rel(str("orders"))), [str("id")]) + CoreScenarioKey.FetchRelLimitOffset => + return plan_from_root_relation(fetch_rel(read_named_table_rel(str("orders")), 10, 25), [str("id")]) + CoreScenarioKey.SetRelOperations => + return plan_from_root_relation(set_rel_of_kind(read_named_table_rel(str("orders_current")), read_named_table_rel(str("orders_archive")), SubstraitSetOperation.UnionDistinct), [str("id")]) + CoreScenarioKey.ReferenceRelSharedSubplan => + return plan_from_root_relation(reference_rel(7), [str("id")]) + def _kind_to_conformance_rel(kind: str) -> ConformanceRel: @@ -225,11 +246,11 @@ pub def scenario_matches_root_shape(scenario: SubstraitConformanceScenario, plan root = root_rel(plan) for expected in scenario.required_rels: if expected == ConformanceRel.Read: - if str("named-table") in scenario.capability_tags.0: + if _capability_tags_contain(scenario.capability_tags, str("named-table")): return relation_kind_name(root) == str("ReadRel") and read_kind_name(root) == str("NamedTable") - elif str("local-files") in scenario.capability_tags.0: + elif _capability_tags_contain(scenario.capability_tags, str("local-files")): return relation_kind_name(root) == str("ReadRel") and read_kind_name(root) == str("LocalFiles") - elif str("virtual-table") in scenario.capability_tags.0: + elif _capability_tags_contain(scenario.capability_tags, str("virtual-table")): return relation_kind_name(root) == str("ReadRel") and read_kind_name(root) == str("VirtualTable") return relation_kind_name(root) == str("ReadRel") elif relation_kind_name(root) != relation_kind_name_from_conformance(expected): @@ -249,28 +270,44 @@ pub def core_scenario_emission_matches(key: CoreScenarioKey) -> bool: return false root = root_rel(plan) match key: - CoreScenarioKey.SetRelOperations => + CoreScenarioKey.SetRelOperations => if set_operation_name(root) != str("UnionDistinct"): return false - CoreScenarioKey.ReferenceRelSharedSubplan => + CoreScenarioKey.ReferenceRelSharedSubplan => if reference_subtree_ordinal(root) != 7: return false - _ => pass + _ => + pass + return plan_encoded_len(plan) > 0 def relation_kind_name_from_conformance(rel: ConformanceRel) -> str: match rel: - ConformanceRel.Read => return str("ReadRel") - ConformanceRel.Filter => return str("FilterRel") - ConformanceRel.Project => return str("ProjectRel") - ConformanceRel.Join => return str("JoinRel") - ConformanceRel.Cross => return str("CrossRel") - ConformanceRel.Aggregate => return str("AggregateRel") - ConformanceRel.Sort => return str("SortRel") - ConformanceRel.Fetch => return str("FetchRel") - ConformanceRel.Set => return str("SetRel") - ConformanceRel.Reference => return str("ReferenceRel") - ConformanceRel.ExtensionSingle => return str("ExtensionSingleRel") - ConformanceRel.ExtensionLeaf => return str("ExtensionLeafRel") - _ => return str("UnknownRel") + ConformanceRel.Read => + return str("ReadRel") + ConformanceRel.Filter => + return str("FilterRel") + ConformanceRel.Project => + return str("ProjectRel") + ConformanceRel.Join => + return str("JoinRel") + ConformanceRel.Cross => + return str("CrossRel") + ConformanceRel.Aggregate => + return str("AggregateRel") + ConformanceRel.Sort => + return str("SortRel") + ConformanceRel.Fetch => + return str("FetchRel") + ConformanceRel.Set => + return str("SetRel") + ConformanceRel.Reference => + return str("ReferenceRel") + ConformanceRel.ExtensionSingle => + return str("ExtensionSingleRel") + ConformanceRel.ExtensionLeaf => + return str("ExtensionLeafRel") + _ => + return str("UnknownRel") + diff --git a/src/substrait/mod.incn b/src/substrait/mod.incn index 2fe8ea0..46de7b0 100644 --- a/src/substrait/mod.incn +++ b/src/substrait/mod.incn @@ -6,5 +6,6 @@ It uses the Substrait crate to be able to parse and serialize Substrait plans. Protobuf serialization and deserialization are provided via prost in the proto module. """ + from rust::prost import Message from rust::substrait::proto import Plan diff --git a/src/substrait/plan.incn b/src/substrait/plan.incn index bc2b7c6..bed8895 100644 --- a/src/substrait/plan.incn +++ b/src/substrait/plan.incn @@ -4,7 +4,7 @@ Proto-backed Substrait plan construction for InQL RFC 002. This module is the canonical emission boundary for Substrait plans. It uses the real `substrait::proto` types rather than a hand-rolled parallel plan model. -TODO: (rfc-002) revisit this module's public surface once the proto-backed API stabilizes. +TODO: (rfc-002) revisit this module's public surface once the proto-backed API stabilizes. A thin wrapper class or classmethod-based entrypoint may read more cleanly than the current free-function-heavy shape, as long as it remains a wrapper over real proto plans rather than a parallel semantic model. """ @@ -13,7 +13,28 @@ from rust::prost import Message from rust::prost_types import Any from rust::std::boxed import Box from rust::std::primitive import i32 as RustI32 -from rust::substrait::proto import AggregateRel, CrossRel, Expression, ExtensionSingleRel, FetchRel, FilterRel, JoinRel, NamedStruct, Plan, PlanRel, ProjectRel, ReadRel, ReferenceRel, Rel, RelCommon, RelRoot, SetRel, SortField, SortRel, Version +from rust::substrait::proto import ( + AggregateRel, + CrossRel, + Expression, + ExtensionSingleRel, + FetchRel, + FilterRel, + JoinRel, + NamedStruct, + Plan, + PlanRel, + ProjectRel, + ReadRel, + ReferenceRel, + Rel, + RelCommon, + RelRoot, + SetRel, + SortField, + SortRel, + Version, +) from rust::substrait::proto::aggregate_rel import Grouping, Measure from rust::substrait::proto::expression import Literal, RexType from rust::substrait::proto::expression::literal import LiteralType @@ -22,24 +43,14 @@ from rust::substrait::proto::extensions import SimpleExtensionUrn from rust::substrait::proto::fetch_rel import CountMode, OffsetMode from rust::substrait::proto::join_rel import JoinType from rust::substrait::proto::plan_rel import RelType as PlanRelType -from rust::substrait::proto::read_rel import LocalFiles -from rust::substrait::proto::read_rel import NamedTable as ReadNamedTable -from rust::substrait::proto::read_rel import ReadType, VirtualTable +from rust::substrait::proto::read_rel import LocalFiles, NamedTable as ReadNamedTable, ReadType, VirtualTable from rust::substrait::proto::read_rel::local_files import FileOrFiles -from rust::substrait::proto::read_rel::local_files::file_or_files import FileFormat -from rust::substrait::proto::read_rel::local_files::file_or_files import ParquetReadOptions -from rust::substrait::proto::read_rel::local_files::file_or_files import PathType +from rust::substrait::proto::read_rel::local_files::file_or_files import FileFormat, ParquetReadOptions, PathType from rust::substrait::proto::rel import RelType from rust::substrait::proto::rel_common import Direct, EmitKind from rust::substrait::proto::set_rel import SetOp from rust::substrait::proto::sort_field import SortDirection, SortKind - -from substrait.schema import ( - RowColumnSpec, - RowShapeSpec, - SubstraitPrimitiveKind, - row_shape_to_named_struct, -) +from substrait.schema import RowColumnSpec, RowShapeSpec, SubstraitPrimitiveKind, row_shape_to_named_struct @derive(Clone) pub type RelationKindName = newtype str @@ -66,10 +77,10 @@ pub enum SubstraitSetOperation: UnionDistinct UnionAll + # --- Internal helpers ------------------------------------------------------------------------------------------------ # These keep the public RFC 002 surface compact by centralizing common proto defaults, relation wrapping, and temporary # compatibility shims. - def _logical_shape(name: str) -> RowShapeSpec: return RowShapeSpec(model_name=name, columns=[RowColumnSpec(name=str("id"), kind=SubstraitPrimitiveKind.I64, nullable=false)]) @@ -172,24 +183,40 @@ def _set_operation_from_legacy_name(operation: str) -> SubstraitSetOperation: def _join_type_from_kind(kind: SubstraitJoinKind) -> RustI32: match kind: - SubstraitJoinKind.Inner => return JoinType.Inner.into() - SubstraitJoinKind.Left => return JoinType.Left.into() - SubstraitJoinKind.Right => return JoinType.Right.into() - SubstraitJoinKind.Outer => return JoinType.Outer.into() - SubstraitJoinKind.Semi => return JoinType.LeftSemi.into() - SubstraitJoinKind.Anti => return JoinType.LeftAnti.into() - SubstraitJoinKind.Single => return JoinType.LeftSingle.into() - SubstraitJoinKind.Mark => return JoinType.LeftMark.into() + SubstraitJoinKind.Inner => + return JoinType.Inner.into() + SubstraitJoinKind.Left => + return JoinType.Left.into() + SubstraitJoinKind.Right => + return JoinType.Right.into() + SubstraitJoinKind.Outer => + return JoinType.Outer.into() + SubstraitJoinKind.Semi => + return JoinType.LeftSemi.into() + SubstraitJoinKind.Anti => + return JoinType.LeftAnti.into() + SubstraitJoinKind.Single => + return JoinType.LeftSingle.into() + SubstraitJoinKind.Mark => + return JoinType.LeftMark.into() + def _set_op_from_kind(operation: SubstraitSetOperation) -> RustI32: match operation: - SubstraitSetOperation.MinusPrimary => return SetOp.MinusPrimary.into() - SubstraitSetOperation.IntersectPrimary => return SetOp.IntersectionPrimary.into() - SubstraitSetOperation.IntersectMultiset => return SetOp.IntersectionMultiset.into() - SubstraitSetOperation.MinusMultiset => return SetOp.MinusMultiset.into() - SubstraitSetOperation.UnionDistinct => return SetOp.UnionDistinct.into() - SubstraitSetOperation.UnionAll => return SetOp.UnionAll.into() + SubstraitSetOperation.MinusPrimary => + return SetOp.MinusPrimary.into() + SubstraitSetOperation.IntersectPrimary => + return SetOp.IntersectionPrimary.into() + SubstraitSetOperation.IntersectMultiset => + return SetOp.IntersectionMultiset.into() + SubstraitSetOperation.MinusMultiset => + return SetOp.MinusMultiset.into() + SubstraitSetOperation.UnionDistinct => + return SetOp.UnionDistinct.into() + SubstraitSetOperation.UnionAll => + return SetOp.UnionAll.into() + def _plan_root(input: Rel, names: list[str]) -> PlanRel: @@ -337,49 +364,77 @@ pub def root_rel(plan: Plan) -> Rel: return read_named_table_rel(str("__malformed_plan__")) relation = plan.relations[0] match relation.rel_type: - Some(PlanRelType.Root(root)) => + Some(PlanRelType.Root(root)) => match root.input: - Some(rel) => return rel - None => return read_named_table_rel(str("__malformed_plan__")) - Some(PlanRelType.Rel(rel)) => return rel - _ => return read_named_table_rel(str("__malformed_plan__")) + Some(rel) => + return rel + None => + return read_named_table_rel(str("__malformed_plan__")) + + Some(PlanRelType.Rel(rel)) => + return rel + _ => + return read_named_table_rel(str("__malformed_plan__")) + pub def relation_kind_name(rel: Rel) -> str: """Return the public Substrait relation-kind label for one `Rel` node.""" match rel.rel_type: - Some(RelType.Read(_)) => return str("ReadRel") - Some(RelType.Filter(_)) => return str("FilterRel") - Some(RelType.Project(_)) => return str("ProjectRel") - Some(RelType.Join(_)) => return str("JoinRel") - Some(RelType.Cross(_)) => return str("CrossRel") - Some(RelType.Aggregate(_)) => return str("AggregateRel") - Some(RelType.Sort(_)) => return str("SortRel") - Some(RelType.Fetch(_)) => return str("FetchRel") - Some(RelType.Set(_)) => return str("SetRel") - Some(RelType.Reference(_)) => return str("ReferenceRel") - Some(RelType.ExtensionSingle(_)) => return str("ExtensionSingleRel") - Some(RelType.ExtensionLeaf(_)) => return str("ExtensionLeafRel") - Some(RelType.ExtensionMulti(_)) => return str("ExtensionMultiRel") - _ => return str("UnknownRel") + Some(RelType.Read(_)) => + return str("ReadRel") + Some(RelType.Filter(_)) => + return str("FilterRel") + Some(RelType.Project(_)) => + return str("ProjectRel") + Some(RelType.Join(_)) => + return str("JoinRel") + Some(RelType.Cross(_)) => + return str("CrossRel") + Some(RelType.Aggregate(_)) => + return str("AggregateRel") + Some(RelType.Sort(_)) => + return str("SortRel") + Some(RelType.Fetch(_)) => + return str("FetchRel") + Some(RelType.Set(_)) => + return str("SetRel") + Some(RelType.Reference(_)) => + return str("ReferenceRel") + Some(RelType.ExtensionSingle(_)) => + return str("ExtensionSingleRel") + Some(RelType.ExtensionLeaf(_)) => + return str("ExtensionLeafRel") + Some(RelType.ExtensionMulti(_)) => + return str("ExtensionMultiRel") + _ => + return str("UnknownRel") + pub def read_kind_name(rel: Rel) -> str: """Return the read-root kind label for one relation.""" match rel.rel_type: - Some(RelType.Read(read)) => + Some(RelType.Read(read)) => match read.read_type: - Some(ReadType.NamedTable(_)) => return str("NamedTable") - Some(ReadType.LocalFiles(_)) => return str("LocalFiles") - Some(ReadType.VirtualTable(_)) => return str("VirtualTable") - _ => return str("ReadRel") - _ => return str("Unknown") + Some(ReadType.NamedTable(_)) => + return str("NamedTable") + Some(ReadType.LocalFiles(_)) => + return str("LocalFiles") + Some(ReadType.VirtualTable(_)) => + return str("VirtualTable") + _ => + return str("ReadRel") + + _ => + return str("Unknown") + pub def set_operation_name(rel: Rel) -> str: """Return the public Substrait set-operation label for one `SetRel` node.""" match rel.rel_type: - Some(RelType.Set(set_rel)) => + Some(RelType.Set(set_rel)) => if set_rel.op == _set_op_from_kind(SubstraitSetOperation.MinusPrimary): return str("MinusPrimary") elif set_rel.op == _set_op_from_kind(SubstraitSetOperation.IntersectPrimary): @@ -393,14 +448,19 @@ pub def set_operation_name(rel: Rel) -> str: elif set_rel.op == _set_op_from_kind(SubstraitSetOperation.UnionAll): return str("UnionAll") return str("UnknownSetOp") - _ => return str("NotSet") + _ => + return str("NotSet") + pub def reference_subtree_ordinal(rel: Rel) -> RustI32: """Return the subtree ordinal for one `ReferenceRel`, or `-1` when the input is not a reference.""" match rel.rel_type: - Some(RelType.Reference(reference)) => return reference.subtree_ordinal - _ => return -1 + Some(RelType.Reference(reference)) => + return reference.subtree_ordinal + _ => + return -1 + pub def plan_encoded_len(plan: Plan) -> int: @@ -410,50 +470,79 @@ pub def plan_encoded_len(plan: Plan) -> int: def _relation_children(rel: Rel) -> list[Rel]: match rel.rel_type: - Some(RelType.Filter(filter)) => + Some(RelType.Filter(filter)) => match filter.input: - Some(child) => return [child.as_ref().clone()] - None => return [] - Some(RelType.Project(project)) => + Some(child) => + return [child.as_ref().clone()] + None => + return [] + + Some(RelType.Project(project)) => match project.input: - Some(child) => return [child.as_ref().clone()] - None => return [] - Some(RelType.Join(join)) => + Some(child) => + return [child.as_ref().clone()] + None => + return [] + + Some(RelType.Join(join)) => mut children: list[Rel] = [] match join.left: Some(left) => children.append(left.as_ref().clone()) - None => pass + None => + pass + match join.right: Some(right) => children.append(right.as_ref().clone()) - None => pass + None => + pass + return children - Some(RelType.Cross(cross)) => + Some(RelType.Cross(cross)) => mut children: list[Rel] = [] match cross.left: Some(left) => children.append(left.as_ref().clone()) - None => pass + None => + pass + match cross.right: Some(right) => children.append(right.as_ref().clone()) - None => pass + None => + pass + return children - Some(RelType.Aggregate(aggregate)) => + Some(RelType.Aggregate(aggregate)) => match aggregate.input: - Some(child) => return [child.as_ref().clone()] - None => return [] - Some(RelType.Sort(sort)) => + Some(child) => + return [child.as_ref().clone()] + None => + return [] + + Some(RelType.Sort(sort)) => match sort.input: - Some(child) => return [child.as_ref().clone()] - None => return [] - Some(RelType.Fetch(fetch)) => + Some(child) => + return [child.as_ref().clone()] + None => + return [] + + Some(RelType.Fetch(fetch)) => match fetch.input: - Some(child) => return [child.as_ref().clone()] - None => return [] - Some(RelType.Set(set_rel)) => return set_rel.inputs - Some(RelType.ExtensionSingle(extension)) => + Some(child) => + return [child.as_ref().clone()] + None => + return [] + + Some(RelType.Set(set_rel)) => + return set_rel.inputs + Some(RelType.ExtensionSingle(extension)) => match extension.input: - Some(child) => return [child.as_ref().clone()] - None => return [] - _ => return [] + Some(child) => + return [child.as_ref().clone()] + None => + return [] + + _ => + return [] + pub def plan_contains_relation_kind(plan: Plan, expected_kind: str) -> bool: @@ -478,13 +567,17 @@ def _extension_urns_for_rel(rel: Rel) -> list[SimpleExtensionUrn]: def _collect_extension_urns(rel: Rel) -> list[SimpleExtensionUrn]: mut urns: list[SimpleExtensionUrn] = [] match rel.rel_type.clone(): - Some(RelType.ExtensionSingle(extension)) => + Some(RelType.ExtensionSingle(extension)) => match extension.detail: - Some(detail) => + Some(detail) => if detail.type_url != str(""): urns.append(SimpleExtensionUrn(extension_urn_anchor=1, urn=detail.type_url)) - None => pass - _ => pass + None => + pass + + _ => + pass + for child in _relation_children(rel): for urn in _collect_extension_urns(child): urns.append(SimpleExtensionUrn(extension_urn_anchor=1, urn=urn.urn)) diff --git a/tests/test_dataset.incn b/tests/test_dataset.incn index 06f71fe..adef041 100644 --- a/tests/test_dataset.incn +++ b/tests/test_dataset.incn @@ -5,9 +5,18 @@ from metadata import inql_version from dataset import DataSet, BoundedDataSet, UnboundedDataSet, DataFrame, LazyFrame, DataStream from dataset.ops import filter_ds, join_ds from functions import count_rows, total -from substrait.plan import explode_extension_uri, plan_encoded_len, plan_from_named_table, plan_from_root_relation, plan_has_extension_urn, read_named_table_rel, relation_kind_name, root_rel +from substrait.plan import ( + explode_extension_uri, + plan_contains_relation_kind, + plan_encoded_len, + plan_from_named_table, + plan_from_root_relation, + plan_has_extension_urn, + read_named_table_rel, + relation_kind_name, + root_rel, +) -# ---- Helper functions and tooling ---- @derive(Clone) model Order: id: int @@ -66,6 +75,7 @@ def _touch[T](x: T) -> None: """Consume a value so assignment chains are not unused-variable warnings.""" pass + def _compile_hierarchy_assignability(order_frame: DataFrame[Order], order_lazy: LazyFrame[Order], event_stream: DataStream[Event]) -> None: """Compile-time shape checks for concrete -> trait -> supertrait assignments.""" sink0 = _upcast_bounded_to_data_set(_upcast_data_frame_to_bounded(order_frame)) @@ -76,7 +86,6 @@ def _compile_hierarchy_assignability(order_frame: DataFrame[Order], order_lazy: _touch(sink2) -# ---- Test cases ---- def test_smoke__dataset_types_are_published() -> None: """RFC 001: carrier types and aggregate helpers are importable.""" assert_eq(total(42), 0, "total is a stub returning 0 until RFC 004") @@ -98,7 +107,7 @@ def test_type_contracts__signature_tiers_compile() -> None: def test_type_contracts__concrete_carriers_compile() -> None: """RFC 001: concrete carriers flow through generic helper signatures.""" df: DataFrame[Order] = DataFrame(_row_schema_marker=Order(id=3), _substrait_rel=read_named_table_rel(str("orders"))) - lf: LazyFrame[Order] = LazyFrame(_row_schema_marker=Order(id=3003), _substrait_rel=read_named_table_rel(str("orders"))) + lf: LazyFrame[Order] = LazyFrame.named_table(Order(id=3003), str("orders")) ev = Event(id=4) st: DataStream[Event] = DataStream(_row_schema_marker=ev, _substrait_rel=read_named_table_rel(str("events"))) _touch(_accept_data_frame_concrete(df)) @@ -109,7 +118,7 @@ def test_type_contracts__concrete_carriers_compile() -> None: def test_hierarchy__concrete_and_supertrait_assignability() -> None: """RFC 001: concrete -> bounded/unbounded trait -> DataSet chains compile.""" df: DataFrame[Order] = DataFrame(_row_schema_marker=Order(id=5), _substrait_rel=read_named_table_rel(str("orders"))) - lf: LazyFrame[Order] = LazyFrame(_row_schema_marker=Order(id=5005), _substrait_rel=read_named_table_rel(str("orders"))) + lf: LazyFrame[Order] = LazyFrame.named_table(Order(id=5005), str("orders")) ev = Event(id=6) st: DataStream[Event] = DataStream(_row_schema_marker=ev, _substrait_rel=read_named_table_rel(str("events"))) _compile_hierarchy_assignability(df, lf, st) @@ -118,7 +127,7 @@ def test_hierarchy__concrete_and_supertrait_assignability() -> None: def test_type_contracts__concrete_and_trait_types_match_generic_arguments() -> None: """Generic parameter T matches for DataFrame/LazyFrame/DataStream carriers.""" df: DataFrame[Order] = DataFrame(_row_schema_marker=Order(id=7), _substrait_rel=read_named_table_rel(str("orders"))) - lf: LazyFrame[Order] = LazyFrame(_row_schema_marker=Order(id=7007), _substrait_rel=read_named_table_rel(str("orders"))) + lf: LazyFrame[Order] = LazyFrame.named_table(Order(id=7007), str("orders")) bounded_df: BoundedDataSet[Order] = df bounded_lf: BoundedDataSet[Order] = lf _touch(bounded_df) @@ -147,7 +156,7 @@ def test_dataset_ops__method_wrapper_matches_canonical_function() -> None: def test_dataset_ops__all_carriers_emit_real_plans() -> None: df = DataFrame(_row_schema_marker=Order(id=10), _substrait_rel=read_named_table_rel(str("orders"))) - lf = LazyFrame(_row_schema_marker=Order(id=11), _substrait_rel=read_named_table_rel(str("orders"))) + lf = LazyFrame.named_table(Order(id=11), str("orders")) st = DataStream(_row_schema_marker=Event(id=12), _substrait_rel=read_named_table_rel(str("events"))) assert_eq(plan_encoded_len(df.to_substrait_plan()) > 0, true, "DataFrame should emit a real plan") assert_eq(plan_encoded_len(lf.to_substrait_plan()) > 0, true, "LazyFrame should emit a real plan") @@ -160,3 +169,62 @@ def test_dataset_ops__api_lowered_boundary_facts_stay_stable() -> None: exploded_plan = left.explode().to_substrait_plan() assert_eq(relation_kind_name(root_rel(joined_plan)), str("JoinRel"), "canonical join function should still lower to a JoinRel root") assert_eq(plan_has_extension_urn(exploded_plan, explode_extension_uri()), true, "explode method should emit the registered extension URI") + + +def test_lazy_frame__immutable_branching_and_origin_mapping_hold() -> None: + base = LazyFrame.named_table(Order(id=14), str("orders")) + filtered: LazyFrame[Order] = base.filter(false) + assert_eq(plan_encoded_len(base.to_substrait_plan()) > 0, true, "base lazy carrier should still emit a real proto-backed plan") + assert_eq(relation_kind_name(root_rel(filtered.to_substrait_plan())), str("FilterRel"), "Prism-backed LazyFrame should still lower to FilterRel") + assert_eq(plan_contains_relation_kind(filtered.to_substrait_plan(), str("ReadRel")), true, "derived filter plans should still preserve the read root") + + +def test_lazy_frame__independent_roots_can_join_and_lower() -> None: + left: LazyFrame[Order] = LazyFrame.named_table(Order(id=15), str("orders")).filter(false) + right: LazyFrame[Order] = LazyFrame.named_table(Order(id=16), str("orders_archive")).filter(false) + joined: LazyFrame[Order] = left.join(right, true) + plan = joined.to_substrait_plan() + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "joined lazy carriers should lower to a JoinRel root") + assert_eq(plan_contains_relation_kind(plan, str("FilterRel")), true, "joined lazy carriers should preserve nested filters") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "joined lazy carriers should still reach the original read roots") + + +def test_lazy_frame__native_prism_ops_preserve_current_boundary_shapes() -> None: + projected = LazyFrame.named_table(Order(id=17), str("orders")).select() + grouped = LazyFrame.named_table(Order(id=18), str("orders")).group_by() + aggregated = grouped.agg() + ordered = LazyFrame.named_table(Order(id=19), str("orders")).order_by() + limited = LazyFrame.named_table(Order(id=20), str("orders")).limit(10) + exploded = LazyFrame.named_table(Order(id=21), str("orders")).explode() + assert_eq(relation_kind_name(root_rel(projected.to_substrait_plan())), str("ProjectRel"), "select should lower through the project boundary shape") + assert_eq(relation_kind_name(root_rel(grouped.to_substrait_plan())), str("AggregateRel"), "group_by should lower to the aggregate scaffold boundary shape") + assert_eq(relation_kind_name(root_rel(aggregated.to_substrait_plan())), str("AggregateRel"), "agg should lower to AggregateRel") + assert_eq(relation_kind_name(root_rel(ordered.to_substrait_plan())), str("SortRel"), "order_by should lower to SortRel") + assert_eq(relation_kind_name(root_rel(limited.to_substrait_plan())), str("FetchRel"), "limit should lower to FetchRel") + assert_eq(plan_has_extension_urn(exploded.to_substrait_plan(), explode_extension_uri()), true, "explode should keep emitting the registered extension boundary") + + +def test_lazy_frame__deeper_independent_roots_still_lower_with_stable_shapes() -> None: + left: LazyFrame[Order] = LazyFrame.named_table(Order(id=22), str("orders")).filter(false) + right_base: LazyFrame[Order] = LazyFrame.named_table(Order(id=23), str("orders_archive")) + right_joined: LazyFrame[Order] = right_base.filter(false).order_by().join(right_base.filter(false).order_by(), true) + joined: LazyFrame[Order] = left.join(right_joined, true) + plan = joined.to_substrait_plan() + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "deeper independent-root join compositions should still lower to JoinRel") + assert_eq(plan_contains_relation_kind(plan, str("SortRel")), true, "deeper branch compositions should preserve sort nodes") + assert_eq(plan_contains_relation_kind(plan, str("FilterRel")), true, "deeper branch compositions should preserve filter nodes") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "deeper branch compositions should still preserve read roots") + + +def test_lazy_frame__canonical_rewrite_removes_filter_true_and_preserves_boundary_validity() -> None: + simplified = LazyFrame.named_table(Order(id=24), str("orders")).filter(true) + plan = simplified.to_substrait_plan() + assert_eq(plan_encoded_len(plan) > 0, true, "canonicalized plans should still encode to a non-empty Substrait payload") + assert_eq(relation_kind_name(root_rel(plan)), str("ReadRel"), "filter(true) should canonicalize away before RFC 002 lowering") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "canonicalized plans should still preserve the read root") + + +def test_lazy_frame__canonical_rewrite_keeps_extension_boundary_for_explode() -> None: + exploded = LazyFrame.named_table(Order(id=25), str("orders")).filter(true).explode() + plan = exploded.to_substrait_plan() + assert_eq(plan_has_extension_urn(plan, explode_extension_uri()), true, "canonical rewrites must keep explode extension URI emission intact") diff --git a/tests/test_prism.incn b/tests/test_prism.incn new file mode 100644 index 0000000..1dcbb08 --- /dev/null +++ b/tests/test_prism.incn @@ -0,0 +1,178 @@ +"""Internal Prism engine tests against the shared-store cursor substrate.""" + +from std.testing import assert_eq +from prism import ( + PrismCursor, + prism_cursor_authored_node_count, + prism_cursor_named_table, + prism_cursor_rewrite_applied_rule_count, + prism_cursor_rewrite_applied_rule_name_at, + prism_cursor_rewritten_node_count, + prism_cursor_rewritten_origin_count, + prism_cursor_rewritten_tip_kind_name, + prism_cursor_store_id_value, + prism_cursor_store_node_count, + prism_cursor_tip_kind_name, + prism_cursor_tip_origin_id, + prism_cursors_share_store, +) +from substrait.plan import plan_contains_relation_kind, plan_encoded_len, relation_kind_name, root_rel + +@derive(Clone) +model Order: + id: int + + +def test_prism__branching_keeps_base_reachable_history_small() -> None: + base: PrismCursor[Order] = prism_cursor_named_table(Order(id=1), str("orders")) + filtered: PrismCursor[Order] = base.filter(false) + plan = filtered.to_substrait_plan() + assert_eq(prism_cursors_share_store(base, filtered), true, "unary branching should share one append-only Prism store") + assert_eq(prism_cursor_authored_node_count(base), 1, "base cursor should still only reach the original read node") + assert_eq(prism_cursor_store_node_count(base), 2, "shared store should contain the appended filter node exactly once") + assert_eq(prism_cursor_tip_kind_name(base), str("ReadNamedTable"), "base cursor tip should remain the read root") + assert_eq(prism_cursor_authored_node_count(filtered), 2, "derived cursor should reach the read node plus the new filter") + assert_eq(prism_cursor_tip_kind_name(filtered), str("Filter"), "filtered cursor tip should become the filter node") + assert_eq(prism_cursor_tip_origin_id(filtered), 1, "identity optimized view should preserve the appended filter as the tip origin") + assert_eq(plan_encoded_len(plan) > 0, true, "lazy carrier should still emit a real proto-backed plan") + assert_eq(relation_kind_name(root_rel(plan)), str("FilterRel"), "Prism lowering should preserve filter root shape") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "Prism lowering should still reach the read root") + + +def test_prism__independent_roots_get_distinct_store_ids() -> None: + left: PrismCursor[Order] = prism_cursor_named_table(Order(id=100), str("orders")) + right: PrismCursor[Order] = prism_cursor_named_table(Order(id=101), str("orders_archive")) + left_store_id = prism_cursor_store_id_value(left) + right_store_id = prism_cursor_store_id_value(right) + assert_eq(prism_cursors_share_store(left, right), false, "independent read roots should not share one store id") + assert_eq(left_store_id >= 0, true, "store ids should be non-negative") + assert_eq(right_store_id >= 0, true, "store ids should be non-negative") + assert_eq(left_store_id == right_store_id, false, "allocator should assign distinct ids to distinct stores") + + +def test_prism__same_store_join_reuses_shared_history() -> None: + base: PrismCursor[Order] = prism_cursor_named_table(Order(id=2), str("orders")) + left: PrismCursor[Order] = base.filter(false) + right: PrismCursor[Order] = base.select() + joined: PrismCursor[Order] = left.join(right.clone(), true) + plan = joined.to_substrait_plan() + assert_eq(prism_cursors_share_store(left, right), true, "branches from the same base should still share one store before the join") + assert_eq(prism_cursor_authored_node_count(base), 1, "joining later should not mutate the base cursor's reachable history") + assert_eq(prism_cursor_store_node_count(joined), 4, "same-store join should append one join node without duplicating the shared read root") + assert_eq(prism_cursor_authored_node_count(joined), 4, "joined cursor should reach the shared read plus both unary nodes and the join") + assert_eq(prism_cursor_tip_kind_name(joined), str("Join"), "same-store join tip should become a join node") + assert_eq(prism_cursor_tip_origin_id(joined), 3, "identity optimized view should preserve the appended join node as the tip origin") + assert_eq(plan_encoded_len(plan) > 0, true, "branch-and-join carrier should still emit a real proto-backed plan") + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "binary authored nodes should lower back to JoinRel") + assert_eq(plan_contains_relation_kind(plan, str("FilterRel")), true, "join lowering should preserve the left filter input") + assert_eq(plan_contains_relation_kind(plan, str("ProjectRel")), true, "join lowering should preserve the right project input") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "join lowering should still reach the original read root") + + +def test_prism__same_store_join_with_longer_branches_is_still_one_append() -> None: + base: PrismCursor[Order] = prism_cursor_named_table(Order(id=22), str("orders")) + left: PrismCursor[Order] = base.filter(false).order_by() + right: PrismCursor[Order] = base.select().limit(5) + joined: PrismCursor[Order] = left.join(right.clone(), true) + plan = joined.to_substrait_plan() + assert_eq(prism_cursors_share_store(left, right), true, "branches from one base should continue sharing a store as branch depth grows") + assert_eq(prism_cursor_authored_node_count(base), 1, "deeper branch joins should still keep the base handle immutable") + assert_eq(prism_cursor_store_node_count(joined), 6, "same-store join should still append one join node after deeper branch history") + assert_eq(prism_cursor_authored_node_count(joined), 6, "joined cursor should still reach exactly read + branch nodes + join") + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "deeper same-store branches should still lower through JoinRel") + assert_eq(plan_contains_relation_kind(plan, str("SortRel")), true, "left branch sort should be preserved") + assert_eq(plan_contains_relation_kind(plan, str("FetchRel")), true, "right branch limit should be preserved") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "joined deeper branches should still reach the original read root") + + +def test_prism__cross_store_join_adopts_reachable_subgraph() -> None: + left: PrismCursor[Order] = prism_cursor_named_table(Order(id=3), str("orders")).filter(false) + right: PrismCursor[Order] = prism_cursor_named_table(Order(id=4), str("orders_archive")).filter(false) + joined: PrismCursor[Order] = left.join(right.clone(), true) + plan = joined.to_substrait_plan() + assert_eq(prism_cursors_share_store(left, right), false, "independent roots should begin on distinct stores") + assert_eq(prism_cursor_store_node_count(joined), 5, "cross-store join should adopt two reachable rhs nodes then append one join node") + assert_eq(prism_cursor_authored_node_count(joined), 5, "joined cursor should reach both adopted branches plus the join") + assert_eq(prism_cursor_tip_kind_name(joined), str("Join"), "cross-store join tip should become a join node") + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "cross-store joins should still lower to JoinRel") + assert_eq(plan_contains_relation_kind(plan, str("FilterRel")), true, "cross-store join lowering should preserve nested filters") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "cross-store join lowering should still reach both read roots") + + +def test_prism__cross_store_join_dedups_equivalent_reachable_rhs_nodes() -> None: + left: PrismCursor[Order] = prism_cursor_named_table(Order(id=30), str("orders")).filter(false) + right_base: PrismCursor[Order] = prism_cursor_named_table(Order(id=31), str("orders_archive")) + right_left: PrismCursor[Order] = right_base.filter(false) + right_right: PrismCursor[Order] = right_base.filter(false) + right_joined: PrismCursor[Order] = right_left.join(right_right, true) + joined: PrismCursor[Order] = left.join(right_joined.clone(), true) + plan = joined.to_substrait_plan() + assert_eq(prism_cursors_share_store(left, right_joined), false, "left and rhs join trees should still begin on distinct stores") + assert_eq(prism_cursor_store_node_count(joined), 6, "cross-store adoption should dedup equivalent rhs filter branches instead of cloning both copies") + assert_eq(prism_cursor_authored_node_count(joined), 6, "joined cursor should reach the left branch plus one deduped rhs branch-join subgraph") + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "cross-store join with rhs dedup should still lower to JoinRel") + assert_eq(plan_contains_relation_kind(plan, str("FilterRel")), true, "deduped cross-store join lowering should still preserve nested filters") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "deduped cross-store join lowering should still preserve read roots") + + +def test_prism__cross_store_join_dedups_equivalent_rhs_multistep_branches() -> None: + left: PrismCursor[Order] = prism_cursor_named_table(Order(id=40), str("orders")).filter(false) + right_base: PrismCursor[Order] = prism_cursor_named_table(Order(id=41), str("orders_archive")) + right_left: PrismCursor[Order] = right_base.filter(false).order_by() + right_right: PrismCursor[Order] = right_base.filter(false).order_by() + right_joined: PrismCursor[Order] = right_left.join(right_right.clone(), true) + joined: PrismCursor[Order] = left.join(right_joined.clone(), true) + plan = joined.to_substrait_plan() + assert_eq(prism_cursors_share_store(left, right_joined), false, "final join should still cross stores for independently rooted branches") + assert_eq(prism_cursor_store_node_count(joined), 7, "cross-store adoption should dedup structurally equivalent multi-step rhs branches") + assert_eq(prism_cursor_authored_node_count(joined), 7, "joined cursor should still reach exactly one adopted copy of each rhs branch shape") + assert_eq(relation_kind_name(root_rel(plan)), str("JoinRel"), "deduped cross-store multistep branches should still lower to JoinRel") + assert_eq(plan_contains_relation_kind(plan, str("SortRel")), true, "rhs multistep branch sort should remain present after adoption") + assert_eq(plan_contains_relation_kind(plan, str("FilterRel")), true, "rhs multistep branch filter should remain present after adoption") + assert_eq(plan_contains_relation_kind(plan, str("ReadRel")), true, "deduped multistep adoption should still preserve read roots") + + +def test_prism__cursor_native_nodes_cover_current_method_surface() -> None: + projected: PrismCursor[Order] = prism_cursor_named_table(Order(id=5), str("orders")).select() + grouped: PrismCursor[Order] = prism_cursor_named_table(Order(id=6), str("orders")).group_by() + aggregated: PrismCursor[Order] = grouped.agg() + ordered: PrismCursor[Order] = prism_cursor_named_table(Order(id=7), str("orders")).order_by() + limited: PrismCursor[Order] = prism_cursor_named_table(Order(id=8), str("orders")).limit(10) + exploded: PrismCursor[Order] = prism_cursor_named_table(Order(id=9), str("orders")).explode() + assert_eq(prism_cursor_tip_kind_name(projected), str("Project"), "select should append a native project node") + assert_eq(prism_cursor_tip_kind_name(grouped), str("GroupBy"), "group_by should append a native group node") + assert_eq(prism_cursor_tip_kind_name(aggregated), str("Aggregate"), "agg should append a native aggregate node") + assert_eq(prism_cursor_tip_kind_name(ordered), str("OrderBy"), "order_by should append a native sort node") + assert_eq(prism_cursor_tip_kind_name(limited), str("Limit"), "limit should append a native limit node") + assert_eq(prism_cursor_tip_kind_name(exploded), str("Explode"), "explode should append a native explode node") + + +def test_prism__rewrite_eliminates_filter_true_by_default() -> None: + cursor: PrismCursor[Order] = prism_cursor_named_table(Order(id=50), str("orders")).filter(true) + plan = cursor.to_substrait_plan() + assert_eq(prism_cursor_authored_node_count(cursor), 2, "authored history should still keep the explicit filter node") + assert_eq(prism_cursor_rewritten_node_count(cursor), 1, "rewrite should eliminate filter(true) from lowered shape") + assert_eq(prism_cursor_rewritten_tip_kind_name(cursor), str("ReadNamedTable"), "rewritten tip should become the read root after eliminating filter(true)") + assert_eq(prism_cursor_rewrite_applied_rule_count(cursor), 1, "one rewrite rule should be recorded for filter(true)") + assert_eq(prism_cursor_rewrite_applied_rule_name_at(cursor, 0), str("eliminate_filter_true"), "rewrite trace should include the filter elimination rule") + assert_eq(relation_kind_name(root_rel(plan)), str("ReadRel"), "filter(true) should lower to ReadRel root after canonical rewrite") + + +def test_prism__rewrite_collapses_adjacent_limits_projects_and_order_by() -> None: + limited: PrismCursor[Order] = prism_cursor_named_table(Order(id=51), str("orders")).limit(10).limit(3) + projected: PrismCursor[Order] = prism_cursor_named_table(Order(id=52), str("orders")).select().select() + ordered: PrismCursor[Order] = prism_cursor_named_table(Order(id=53), str("orders")).order_by().order_by() + assert_eq(prism_cursor_authored_node_count(limited), 3, "authored history should keep both limit nodes") + assert_eq(prism_cursor_rewritten_node_count(limited), 2, "rewritten view should collapse adjacent limits to one node") + assert_eq(prism_cursor_rewrite_applied_rule_name_at(limited, 0), str("collapse_adjacent_limit"), "limit rewrite trace should record adjacent limit collapse") + assert_eq(prism_cursor_authored_node_count(projected), 3, "authored history should keep both project nodes") + assert_eq(prism_cursor_rewritten_node_count(projected), 2, "rewritten view should collapse adjacent projects") + assert_eq(prism_cursor_rewrite_applied_rule_name_at(projected, 0), str("collapse_adjacent_project"), "project rewrite trace should record adjacent project collapse") + assert_eq(prism_cursor_authored_node_count(ordered), 3, "authored history should keep both order_by nodes") + assert_eq(prism_cursor_rewritten_node_count(ordered), 2, "rewritten view should collapse adjacent order_by nodes") + assert_eq(prism_cursor_rewrite_applied_rule_name_at(ordered, 0), str("collapse_adjacent_order_by"), "order_by rewrite trace should record adjacent order_by collapse") + + +def test_prism__rewrite_explain_origin_mapping_matches_rewritten_nodes() -> None: + cursor: PrismCursor[Order] = prism_cursor_named_table(Order(id=54), str("orders")).filter(true).limit(5).limit(4) + assert_eq(prism_cursor_rewritten_node_count(cursor), prism_cursor_rewritten_origin_count(cursor), "rewritten origin mapping cardinality should match rewritten node cardinality")