diff --git a/mkdocs.yaml b/mkdocs.yaml index 1109b4a2..7e432b0d 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -24,6 +24,7 @@ nav: - Storage: - Type System: explanation/type-system.md - Custom Codecs: explanation/custom-codecs.md + - Spark Adapters: explanation/spark-adapters.md - Operations: - PostgreSQL CDC and Replica Identity: explanation/postgresql-cdc-replication.md - Tutorials: @@ -115,6 +116,7 @@ nav: - Types: reference/specs/type-system.md - Codec API: reference/specs/codec-api.md - NPY Codec: reference/specs/npy-codec.md + - SparkAdapter Protocol: reference/specs/spark-adapter.md - Storage Adapter API: reference/specs/storage-adapter-api.md - Data Operations: - Data Manipulation: reference/specs/data-manipulation.md diff --git a/src/explanation/spark-adapters.md b/src/explanation/spark-adapters.md new file mode 100644 index 00000000..2bc76c1c --- /dev/null +++ b/src/explanation/spark-adapters.md @@ -0,0 +1,89 @@ +# Spark Adapters and Silver-Layer Publishing + +This page explains how DataJoint codecs interact with downstream typed-query systems — specifically, how ``-style opaque columns differ from codecs that adapt to Spark, and why pipeline authors who want their data accessible to Spark SQL, BI tools, or Delta Sharing should choose typed codecs over generic blobs. + +For the normative protocol contract, see [SparkAdapter Protocol Specification](../reference/specs/spark-adapter.md). For the base `Codec` interface, see [Codec API](../reference/specs/codec-api.md). + +!!! version-added "New in DataJoint 2.3" + The `SparkAdapter` Protocol ships in 2.3. SparkAdapter codec implementations are out-of-tree plugins that opt in by implementing the `to_spark()` method. + +## The Bronze/Silver layer model + +DataJoint pipelines are operational stores. When the same data is published to a downstream analytic system, two layers commonly appear: + +| Layer | What it is | How blobs surface | +|---|---|---| +| **Bronze** | A mirror of the operational tables, captured via change-data-capture (CDC). Schema follows the source; rows match the source 1:1. | Blob columns land as opaque `BINARY` — the bytes are preserved but the analytic system can't see inside them. | +| **Silver** | Curated, consumer-facing tables. Schemas optimized for analytic queries. Used by Spark SQL, BI dashboards, Delta Sharing recipients, ML feature stores. | Blob columns must be **rendered** to typed shapes — `ARRAY`, `STRUCT<...>`, `MAP` — so consumers can query them natively. No opaque `BINARY`. | + +The bronze layer needs nothing from DataJoint beyond what CDC already provides — Lakehouse Sync (or Debezium, or any other CDC tool) mirrors the operational tables, blobs included. The silver layer needs more: the framework has to expose **structure** for the columns whose values aren't already primitives. + +## Why `` is bronze-only + +`` and `` are **general-purpose**: they accept any picklable / mYm-serializable Python value. A numpy array, a list of dicts, a custom object — all become an opaque byte sequence. That generality is the right call for most pipeline work; it lets `make()` write whatever the computation produces without each codec having to anticipate the shape. + +But generality is the problem at the silver-layer boundary: + +- A query engine can't know whether a 4 KB blob is a 1D float array, a 2D image, a serialized dict, or a Pickle stream. +- A schema inferencer can't generate a Spark `StructType` for "could be anything." +- A SQL filter (`WHERE blob_col[0] > 0.5`) can't be evaluated. + +So `` rows make it to bronze (the bytes are there) but not to silver (the analytic system can't unpack them). This is the right tradeoff: `` keeps pipelines flexible; the silver-layer cost is paid by users who want analytic access, not by every user. + +## Typed codecs that adapt to Spark + +The path to silver eligibility is to **choose a typed codec for the column** in the schema definition. A typed codec knows its value shape and can map decoded values to a Spark-native return — primitives, lists, dicts of the same. + +```python +@schema +class Recording(dj.Imported): + definition = """ + -> Session + --- + signal : # 1D float array — adapts to Spark ARRAY + metadata : # structured record — adapts to Spark STRUCT<...> + raw_dump : # arbitrary Python value — bronze-only + """ +``` + +The codec implementations (``, ``, etc.) live in plugin packages, not in `datajoint-python` itself. Each plugin is independently versioned and can target a specific domain — neuroscience array shapes, imaging types, time-series layouts, etc. + +## Why a Protocol, not a Codec method + +The natural-seeming design — "add a `to_spark()` method to the `Codec` base class" — was the first proposal ([#1457](https://github.com/datajoint/datajoint-python/issues/1457)). It was rejected for three reasons. + +**Generic codecs can't render meaningfully.** Forcing `` to implement `to_spark` would mean enumerating the type taxonomy of mYm-tagged binary content inside the framework — exactly the structural knowledge that `` exists to avoid. The clean answer is: `` does not implement the protocol. Pipeline authors who want silver migrate to a typed codec. + +**Codec authors shouldn't have to acknowledge a feature they don't support.** An abstract method requires `NotImplementedError` stubs everywhere; that's noise. The Protocol pattern lets typed codecs add the method only when they're ready, leaving non-typed codecs untouched. + +**Plugin codecs already in the wild stay valid.** Existing third-party codecs (`dj-zarr-codecs`, `dj-photon-codecs`, lab-specific packages) work unchanged through 2.3 and beyond. They opt into silver eligibility by adding a `to_spark()` method in a future release of their choosing. + +The decisive factor is the **opt-in shape** of the contract. Protocol checking via `isinstance(codec, SparkAdapter)` is structural — codec authors implement when they're ready, consumers detect support per-column at runtime. No coordination between codec releases and framework releases. + +## What the protocol does not specify + +This is a small contract, deliberately. It says **how** a codec exposes Spark-native rendering; it does not say: + +- **What Spark-adapter codecs exist.** Specific codec types ship downstream. The framework only provides the protocol. +- **How the consumer publishes to silver.** Delta table writes, branch-namespace management, Lakehouse Sync hooks — all consumer concerns. The framework provides the eligibility check; the pipeline is built on top. +- **How to round-trip back.** Spark consumers query rendered columns directly; reverse-direction decode is not a framework concern. +- **What to do with columns that don't adapt.** Consumers decide: skip, raise, fall back to bronze. The framework doesn't impose a policy. + +This is the typical small-Protocol pattern in Python. `__iter__`, `__len__`, `__hash__` — each is one method that opts a type into a category of consumers. `SparkAdapter` follows the same shape. + +## Choosing codecs for a new pipeline + +Two questions, in order: + +1. **Do you need analytic access?** Will downstream consumers (Spark SQL, BI tools, Delta Sharing) query this column? If yes, choose a typed codec that implements `SparkAdapter`. If no, `` is fine and simpler. +2. **Is there a published codec for your value shape?** Check the [available codec plugins list](../how-to/use-plugin-codecs.md). If yes, use it. If no, implement one — the [Create Custom Codecs](../how-to/create-custom-codec.md) how-to covers the framework side; add a `to_spark()` method to make it silver-eligible. + +For pipelines that are predominantly internal (no analytic publish), `` everywhere is the right default. For pipelines that publish to a shared lake, choose typed codecs from the start — retroactive migration is painful. + +## Related + +- Spec: [SparkAdapter Protocol](../reference/specs/spark-adapter.md) — the normative protocol. +- Spec: [Codec API](../reference/specs/codec-api.md) — the base `Codec` interface. +- How-to: [Use Plugin Codecs](../how-to/use-plugin-codecs.md) — installing and registering codec plugins. +- How-to: [Create Custom Codecs](../how-to/create-custom-codec.md) — writing your own codec. +- Strategic background: Databricks integration's Linked Delta Tables — covered in `datajoint-databricks/DECISIONS.md` (internal repo; summary on request). diff --git a/src/reference/specs/spark-adapter.md b/src/reference/specs/spark-adapter.md new file mode 100644 index 00000000..ef9f65fe --- /dev/null +++ b/src/reference/specs/spark-adapter.md @@ -0,0 +1,192 @@ +# SparkAdapter Protocol Specification + +This document specifies the `SparkAdapter` Protocol — an opt-in contract that codecs can implement to declare that their decoded values can be expressed as Spark-native types (primitives, lists, dicts, and nested combinations). + +The Protocol is intentionally small: a single method, no inheritance, no abstract-method surface area on the existing `Codec` base class. Consumers detect support via `isinstance(codec, SparkAdapter)`. + +!!! version-added "New in DataJoint 2.3" + Introduced for the Databricks Linked Delta Tables ("silver layer") integration. Independent of any consumer: any codec author can implement `to_spark()` and any downstream tool can check for it. + +For the `Codec` base class itself, see [Codec API Specification](codec-api.md). For background on why the protocol is needed, see [Spark Adapters and Silver-Layer Publishing](../../explanation/spark-adapters.md). + +## Why this exists + +DataJoint's `` codec stores arbitrary Python values — numpy arrays, lists, dicts, custom objects — as serialized binary. That generality is the right call for most use cases, but it makes the column **opaque** to consumers that need typed access at query time: SQL engines, dataframe libraries, BI tools, federated query systems. Such consumers can only treat blob columns as `BINARY` and rely on the application to decode. + +The downstream concrete case is **Databricks Linked Delta Tables** (the "silver layer"): every column must render to Spark-native types so the data is queryable with Spark SQL, exposed through Delta Sharing, and usable by Genie / BI without round-tripping through DataJoint. A `BINARY` blob doesn't qualify. + +`SparkAdapter` is the minimum framework-side contract to make typed rendering possible **without** forcing every codec to support it. Generic codecs (``, ``) remain unsupported by design. Typed codecs (``, ``, future shapes) opt in by implementing the method. + +## The protocol + +```python +from typing import Any, Protocol, runtime_checkable + +@runtime_checkable +class SparkAdapter(Protocol): + """ + A codec that adapts its decoded values to Spark-native types. + + Opt-in. Codecs implementing this method declare that their decoded + values can be expressed as primitives, lists, or dicts of the same — + i.e., shapes that map cleanly to Spark's StructType / ArrayType / MapType. + """ + + def to_spark(self, decoded: Any, *, key: dict | None = None) -> Any: ... +``` + +Defined in `datajoint.spark` and re-exported at the top level as `dj.SparkAdapter`. + +### Signature + +| Parameter | Type | Description | +|---|---|---| +| `decoded` | `Any` | The Python value produced by the codec's `decode()`. The protocol does not constrain this beyond requiring that the codec can map it to a Spark-native shape. | +| `key` | `dict \| None` | Optional context dict — same shape as `Codec.encode`'s `key` parameter. Codecs may use it to resolve per-row context (rare; most codecs ignore it). | + +### Return value + +A value composed entirely of: + +- **Primitives**: `bool`, `int`, `float`, `str`, `bytes`, `None`, `datetime.date`, `datetime.datetime`. +- **Lists**: `list[T]` where `T` is any allowed shape. Maps to Spark `ArrayType`. +- **Dicts with string keys**: `dict[str, T]` where `T` is any allowed shape. Maps to Spark `StructType` (uniform key types per row) or `MapType` (uniform value types per row) — the consumer decides based on schema inference. + +Numpy scalars (`np.int32`, `np.float64`, etc.) are accepted but consumers may coerce to native Python types. Numpy arrays must be converted to lists before return — Spark has no representation for `np.ndarray`. + +No tuples, no sets, no custom objects, no callables. The output must be JSON-shaped (plus the binary/date/datetime primitives listed above). + +### Why a Protocol, not an abstract method on `Codec` + +The earlier framing of this work (#1457, superseded) proposed adding `to_spark()` as an abstract method on `dj.Codec`. The current factoring uses a separate Protocol for four reasons: + +1. **Smaller OSS surface.** Adding an abstract method requires `NotImplementedError` stubs on every built-in codec — not just `` and `` (which can't render) but also every plugin codec retroactively. The Protocol approach adds ~10 lines total. +2. **Cleaner opt-in semantics.** Codec authors implement the method when they want silver-layer eligibility; they don't have to acknowledge it otherwise. Codecs that don't adapt to Spark are invisible to the contract. +3. **No churn for existing plugins.** Third-party codecs (e.g. `dj-zarr-codecs`, `dj-photon-codecs`) work unchanged. They opt in by adding the method on a future release of their choosing. +4. **Composable with structural typing.** Consumers use `isinstance(codec, SparkAdapter)` (enabled by `@runtime_checkable`) — no inheritance chain or registration step required. + +The Protocol pattern matches Python's `Iterable`, `Sized`, `Sequence`, and the `dataclasses.Protocol`-style design — DataJoint follows the language convention rather than inventing a new mechanism. + +## Eligibility detection + +Consumers determine whether a column adapts to Spark by checking the codec: + +```python +import datajoint as dj +from datajoint.spark import SparkAdapter + +attr = table.heading["column_name"] +if attr.codec is not None and isinstance(attr.codec, SparkAdapter): + # The codec opts in to Spark rendering + rendered = attr.codec.to_spark(decoded_value) +else: + # No Spark adapter — consumer falls back to bronze-only handling + ... +``` + +Because `@runtime_checkable` only verifies that the method **exists** (not its signature), the check is a structural test, not a behavioral guarantee. Codec authors must produce a Spark-native return value as defined above — the framework cannot enforce this statically. + +## What's not in this specification + +- **Specific Spark-adapter codecs.** Codecs like ``, ``, ``, ``, ``, ``, `` are intentionally **out of scope** of `datajoint-python`. They ship as plugins (registered via the existing codec auto-registration mechanism) so each can evolve independently of the framework. The Protocol is what they implement against. +- **`` and `` rendering.** These codecs hold arbitrary Python values; their content can't be assumed to have a Spark-native shape. They do **not** implement `SparkAdapter`. Pipeline authors who want silver eligibility migrate columns to a typed codec. +- **Reverse direction (Spark → DataJoint).** Delta consumers query rendered columns via Spark SQL; round-tripping back through DataJoint's codec is not a target of this work. There is no `from_spark` method. +- **Best-effort `BINARY` fallback.** Codecs either implement `SparkAdapter` (and produce a Spark-native value) or they don't (and remain bronze-only / non-eligible). No automatic blob → bytes-passthrough rendering. +- **Schema inference.** Consumers infer Spark schemas from sample rendered values; the protocol does not transmit type metadata. (A `Codec.spark_schema()` companion method may be added in a future release; not in scope here.) +- **Streaming / chunked rendering.** `to_spark()` is invoked per decoded value (per row, per column). Chunked / vectorized rendering is a downstream concern. + +## Example: implementing a SparkAdapter codec + +A plugin codec for 1D float arrays. Shipped in a separate package (e.g. `dj-array-codecs`), registered via the codec entry-point mechanism. + +```python +import datajoint as dj +import numpy as np + +class FloatArrayCodec(dj.Codec): + """1D array of float64. Adapts to Spark ARRAY.""" + + name = "float_array" + + def get_dtype(self, is_store: bool) -> str: + return "longblob" if not is_store else "" + + def encode(self, value, *, key=None, store_name=None) -> bytes: + return np.asarray(value, dtype=np.float64).tobytes() + + def decode(self, stored: bytes, *, key=None) -> np.ndarray: + return np.frombuffer(stored, dtype=np.float64) + + def to_spark(self, decoded: np.ndarray, *, key=None) -> list[float]: + return decoded.tolist() +``` + +`isinstance(FloatArrayCodec(), dj.SparkAdapter)` returns `True` because the method is present. No subclassing required. + +A 2D image codec returning a nested list (Spark `ARRAY>`): + +```python +class Image2DCodec(dj.Codec): + name = "image_2d" + + def encode(self, value, *, key=None, store_name=None) -> bytes: ... + def decode(self, stored, *, key=None) -> np.ndarray: ... + + def to_spark(self, decoded: np.ndarray, *, key=None) -> list[list[float]]: + return decoded.tolist() # 2D ndarray → nested list +``` + +A structured codec rendering to Spark `STRUCT`: + +```python +class PointWithLabelCodec(dj.Codec): + name = "labeled_point" + + def encode(self, value, *, key=None, store_name=None) -> bytes: ... + def decode(self, stored, *, key=None) -> dict: ... + + def to_spark(self, decoded: dict, *, key=None) -> dict[str, Any]: + return { + "x": float(decoded["x"]), + "y": float(decoded["y"]), + "label": str(decoded["label"]), + } +``` + +## Consumer pattern + +A simplified silver-layer publish loop (the actual `datajoint-databricks` consumer is more elaborate): + +```python +def publish_row_to_silver(table, key, target_table): + """Publish one row of `table` (restricted by `key`) to a Spark-renderable target.""" + from datajoint.spark import SparkAdapter + + row = (table & key).fetch1() + rendered = {} + for attr_name, value in row.items(): + attr = table.heading[attr_name] + if attr.codec is not None and isinstance(attr.codec, SparkAdapter): + rendered[attr_name] = attr.codec.to_spark(value, key=key) + elif attr.codec is None: + # Primitive column (no codec) — pass through + rendered[attr_name] = value + else: + # Codec doesn't adapt to Spark — skip this column for silver, or raise + raise ValueError( + f"Column {attr_name!r} uses codec {attr.codec.name!r} which " + f"does not implement SparkAdapter; this row is not eligible " + f"for silver-layer publish." + ) + target_table.write(rendered) +``` + +The framework provides the protocol and the eligibility check. The publish pipeline lives downstream. + +## References + +- Source: `src/datajoint/spark.py` (new file in 2.3) — Protocol declaration; re-exported as `dj.SparkAdapter`. +- Issue: [datajoint-python #1458](https://github.com/datajoint/datajoint-python/issues/1458). +- Superseded: [datajoint-python #1457](https://github.com/datajoint/datajoint-python/issues/1457) — earlier framing (abstract method on `Codec`). +- [Codec API Specification](codec-api.md) — the base `Codec` interface that SparkAdapter codecs extend (by composition, not inheritance). +- [Spark Adapters and Silver-Layer Publishing](../../explanation/spark-adapters.md) — explainer page covering the Bronze/Silver layer model and the rationale for typed codecs.