diff --git a/datajunction-server/datajunction_server/alembic/versions/2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py b/datajunction-server/datajunction_server/alembic/versions/2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py new file mode 100644 index 000000000..484f02a4c --- /dev/null +++ b/datajunction-server/datajunction_server/alembic/versions/2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py @@ -0,0 +1,124 @@ +"""Associate availability and materialization + +Revision ID: b8ef80efd70c +Revises: c3d5f327296c +Create Date: 2025-02-24 05:49:06.588675+00:00 + +""" + +# pylint: disable=no-member, invalid-name, missing-function-docstring, unused-import, no-name-in-module +import json +import sqlalchemy as sa +from sqlalchemy.sql import table, column +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "b8ef80efd70c" +down_revision = "c3d5f327296c" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "custom_metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + ), + ) + batch_op.add_column( + sa.Column("materialization_id", sa.BigInteger(), nullable=True), + ) + batch_op.create_foreign_key( + "fk_availability_materialization_id_materialization", + "materialization", + ["materialization_id"], + ["id"], + ) + + availabilitystate = table( + "availabilitystate", + column("id", sa.BigInteger()), + column("url", sa.String()), + column("links", postgresql.JSON), + column("custom_metadata", postgresql.JSONB), + ) + + # Move data from url and links to custom_metadata + connection = op.get_bind() + results = connection.execute( + sa.select( + availabilitystate.c.id, + availabilitystate.c.url, + availabilitystate.c.links, + ), + ).fetchall() + + for row in results: + metadata = {} + if row.url: + metadata["url"] = row.url + if row.links: + metadata["links"] = row.links + + if metadata: + connection.execute( + sa.update(availabilitystate) + .where(availabilitystate.c.id == row.id) + .values(custom_metadata=metadata), + ) + + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.drop_column("url") + batch_op.drop_column("links") + + +def downgrade(): + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "links", + postgresql.JSON(astext_type=sa.Text()), + autoincrement=False, + nullable=True, + ), + ) + batch_op.add_column( + sa.Column("url", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + batch_op.drop_constraint( + "fk_availability_materialization_id_materialization", + type_="foreignkey", + ) + + # Restore `url` and `links` from `custom_metadata` + availabilitystate = table( + "availabilitystate", + column("id", sa.BigInteger()), + column("custom_metadata", postgresql.JSONB), + column("url", sa.String()), + column("links", postgresql.JSON), + ) + + conn = op.get_bind() + results = conn.execute( + sa.select(availabilitystate.c.id, availabilitystate.c.custom_metadata), + ).fetchall() + + for row in results: + metadata = json.loads(row.custom_metadata) if row.custom_metadata else {} + conn.execute( + sa.update(availabilitystate) + .where(availabilitystate.c.id == row.id) + .values( + url=metadata.get("url"), + links=metadata.get("links"), + ), + ) + + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.drop_column("materialization_id") + batch_op.drop_column("custom_metadata") diff --git a/datajunction-server/datajunction_server/api/data.py b/datajunction-server/datajunction_server/api/data.py index a769d9c46..39ebe4d9e 100644 --- a/datajunction-server/datajunction_server/api/data.py +++ b/datajunction-server/datajunction_server/api/data.py @@ -49,7 +49,7 @@ router = SecureAPIRouter(tags=["data"]) -@router.post("/data/{node_name}/availability/", name="Add Availability State to Node") +@router.post("/data/{node_name}/availability", name="Add Availability State to Node") async def add_availability_state( node_name: str, data: AvailabilityStateBase, @@ -129,7 +129,7 @@ async def add_availability_state( schema_=data.schema_, table=data.table, valid_through_ts=data.valid_through_ts, - url=data.url, + custom_metadata=data.custom_metadata, min_temporal_partition=data.min_temporal_partition, max_temporal_partition=data.max_temporal_partition, partitions=[ @@ -138,7 +138,7 @@ async def add_availability_state( ], categorical_partitions=data.categorical_partitions, temporal_partitions=data.temporal_partitions, - links=data.links, + materialization_id=data.materialization_id, ) if node_revision.availability and not node_revision.availability.partitions: node_revision.availability.partitions = [] diff --git a/datajunction-server/datajunction_server/database/availabilitystate.py b/datajunction-server/datajunction_server/database/availabilitystate.py index 5da82659c..c38e5a044 100644 --- a/datajunction-server/datajunction_server/database/availabilitystate.py +++ b/datajunction-server/datajunction_server/database/availabilitystate.py @@ -2,16 +2,20 @@ from datetime import datetime, timezone from functools import partial -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy import JSON, DateTime, ForeignKey -from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.orm import Mapped, mapped_column, relationship from datajunction_server.database.base import Base from datajunction_server.models.node import BuildCriteria, PartitionAvailability from datajunction_server.typing import UTCDatetime +if TYPE_CHECKING: + from datajunction_server.database.materialization import Materialization + class AvailabilityState(Base): """ @@ -25,12 +29,31 @@ class AvailabilityState(Base): primary_key=True, ) + # Identifying where the dataset lives catalog: Mapped[str] schema_: Mapped[Optional[str]] = mapped_column(nullable=True) table: Mapped[str] + + # Indicates data freshness valid_through_ts: Mapped[int] = mapped_column(sa.BigInteger()) - url: Mapped[Optional[str]] - links: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, default=dict) + + # Arbitrary JSON metadata. This can encompass any URLs associated with the materialized dataset + custom_metadata: Mapped[Optional[Dict]] = mapped_column( + JSONB, + default=dict, + ) + + # The materialization that this availability is associated with, if any + materialization_id: Mapped[Optional[int]] = mapped_column( + ForeignKey( + "materialization.id", + name="fk_availability_materialization_id_materialization", + ), + ) + materialization: Mapped[Optional["Materialization"]] = relationship( + back_populates="availability", + primaryjoin="Materialization.id==AvailabilityState.materialization_id", + ) # An ordered list of categorical partitions like ["country", "group_id"] # or ["region_id", "age_group"] diff --git a/datajunction-server/datajunction_server/database/materialization.py b/datajunction-server/datajunction_server/database/materialization.py index 930792843..87268200c 100644 --- a/datajunction-server/datajunction_server/database/materialization.py +++ b/datajunction-server/datajunction_server/database/materialization.py @@ -19,6 +19,7 @@ from datajunction_server.database.backfill import Backfill from datajunction_server.database.base import Base from datajunction_server.database.column import Column +from datajunction_server.database.availabilitystate import AvailabilityState from datajunction_server.models.materialization import ( DruidMeasuresCubeConfig, GenericMaterializationConfig, @@ -97,6 +98,13 @@ class Materialization(Base): lazy="selectin", ) + availability: Mapped[List[AvailabilityState]] = relationship( + back_populates="materialization", + primaryjoin="Materialization.id==AvailabilityState.materialization_id", + cascade="all, delete", + lazy="selectin", + ) + @classmethod async def get_by_names( cls, diff --git a/datajunction-server/datajunction_server/internal/nodes.py b/datajunction-server/datajunction_server/internal/nodes.py index 9cf252834..ccd43c87f 100644 --- a/datajunction-server/datajunction_server/internal/nodes.py +++ b/datajunction-server/datajunction_server/internal/nodes.py @@ -1570,6 +1570,7 @@ async def upsert_complex_dimension_link( node = await Node.get_by_name( session, node_name, + raise_if_not_exists=True, ) if node.type not in (NodeType.SOURCE, NodeType.DIMENSION, NodeType.TRANSFORM): # type: ignore raise DJInvalidInputException( diff --git a/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py b/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py index 58affe56a..a2fcac6f7 100644 --- a/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py +++ b/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py @@ -149,6 +149,7 @@ def schedule( ) return query_service_client.materialize_cube( materialization_input=DruidCubeMaterializationInput( + id=materialization.id, name=materialization.name, cube=cube_config.cube, dimensions=cube_config.dimensions, diff --git a/datajunction-server/datajunction_server/models/cube_materialization.py b/datajunction-server/datajunction_server/models/cube_materialization.py index d945b03a7..3d6d892cf 100644 --- a/datajunction-server/datajunction_server/models/cube_materialization.py +++ b/datajunction-server/datajunction_server/models/cube_materialization.py @@ -398,6 +398,7 @@ class DruidCubeMaterializationInput(BaseModel): Materialization info as passed to the query service. """ + id: int | None = None name: str # Frozen cube info at the time of materialization diff --git a/datajunction-server/datajunction_server/models/materialization.py b/datajunction-server/datajunction_server/models/materialization.py index e50ba07e9..c7debbe0f 100644 --- a/datajunction-server/datajunction_server/models/materialization.py +++ b/datajunction-server/datajunction_server/models/materialization.py @@ -119,6 +119,7 @@ class MaterializationConfigOutput(BaseModel): Output for materialization config. """ + id: int name: Optional[str] config: Dict schedule: str diff --git a/datajunction-server/datajunction_server/models/node.py b/datajunction-server/datajunction_server/models/node.py index f03bc779b..90ac9db3a 100644 --- a/datajunction-server/datajunction_server/models/node.py +++ b/datajunction-server/datajunction_server/models/node.py @@ -265,8 +265,8 @@ class AvailabilityStateBase(TemporalPartitionRange): schema_: Optional[str] = Field(default=None) table: str valid_through_ts: int - url: Optional[str] - links: Optional[Dict[str, Any]] = Field(default={}) + materialization_id: Optional[int] = Field(default=None) + custom_metadata: Optional[Dict[str, Any]] = Field(default={}) # An ordered list of categorical partitions like ["country", "group_id"] # or ["region_id", "age_group"] diff --git a/datajunction-server/tests/api/data_test.py b/datajunction-server/tests/api/data_test.py index f83f80ef9..74bd6b0ae 100644 --- a/datajunction-server/tests/api/data_test.py +++ b/datajunction-server/tests/api/data_test.py @@ -665,7 +665,9 @@ async def test_setting_availability_state( "valid_through_ts": 20230125, "max_temporal_partition": ["2023", "01", "25"], "min_temporal_partition": ["2022", "01", "01"], - "url": "http://some.catalog.com/default.accounting.pmts", + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, }, ) data = response.json() @@ -700,8 +702,10 @@ async def test_setting_availability_state( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": "http://some.catalog.com/default.accounting.pmts", - "links": {}, + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, + "materialization_id": None, }, "pre": {}, "user": "dj", @@ -725,8 +729,10 @@ async def test_setting_availability_state( "schema_": "accounting", "categorical_partitions": [], "temporal_partitions": [], - "url": "http://some.catalog.com/default.accounting.pmts", - "links": {}, + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, + "materialization_id": None, } @pytest.mark.asyncio @@ -839,8 +845,8 @@ async def test_setting_availability_state_multiple_times( "table": "new_payments_table", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "pre": { "catalog": "default", @@ -852,8 +858,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "user": "dj", }, @@ -875,8 +881,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "pre": { "catalog": "default", @@ -888,8 +894,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "user": "dj", }, @@ -911,8 +917,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "pre": {}, "user": "dj", @@ -936,8 +942,8 @@ async def test_setting_availability_state_multiple_times( "schema_": "new_accounting", "categorical_partitions": [], "temporal_partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1076,8 +1082,8 @@ async def test_merging_in_a_higher_max_partition( "partitions": [], "categorical_partitions": [], "temporal_partitions": ["payment_id"], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.fixture @@ -1148,8 +1154,8 @@ async def test_set_temporal_only_availability( "schema_": "dimensions", "table": "local_hard_hats", "valid_through_ts": 20230101, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1195,8 +1201,8 @@ async def test_set_node_level_availability_wider_time_range( "schema_": "dimensions", "table": "local_hard_hats", "valid_through_ts": 20230101, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1549,8 +1555,8 @@ async def test_merging_in_a_lower_min_partition( "max_temporal_partition": ["2023", "01", "01"], "schema_": "accounting", "partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1622,8 +1628,8 @@ async def test_moving_back_valid_through_ts( "partitions": [], "categorical_partitions": [], "temporal_partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1668,8 +1674,8 @@ async def test_setting_availablity_state_on_a_source_node( "partitions": [], "categorical_partitions": [], "temporal_partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio diff --git a/datajunction-server/tests/api/materializations_test.py b/datajunction-server/tests/api/materializations_test.py index 466991e6c..eb8226b4f 100644 --- a/datajunction-server/tests/api/materializations_test.py +++ b/datajunction-server/tests/api/materializations_test.py @@ -1071,7 +1071,9 @@ async def test_spark_sql_full( parse(load_expected_file("spark_sql.full.query.sql")), ) del data["materializations"][0]["config"]["query"] - assert data["materializations"] == load_expected_file("spark_sql.full.config.json") + expected_config = load_expected_file("spark_sql.full.config.json") + expected_config[0]["id"] = mock.ANY + assert data["materializations"] == expected_config # Set both temporal and categorical partitions on node response = await module__client_with_roads.post( @@ -1125,6 +1127,7 @@ async def test_spark_sql_full( materialization_with_partitions = data["materializations"][1] del materialization_with_partitions["config"]["query"] expected_config = load_expected_file("spark_sql.full.partition.config.json") + expected_config["id"] = mock.ANY assert materialization_with_partitions == expected_config # Check listing materializations of the node @@ -1133,14 +1136,17 @@ async def test_spark_sql_full( ) materializations = response.json() materializations[0]["config"]["query"] = mock.ANY - assert materializations[0] == load_expected_file( - "spark_sql.full.materializations.json", - ) + expected_config = load_expected_file("spark_sql.full.materializations.json") + expected_config["id"] = mock.ANY + assert materializations[0] == expected_config + materializations = response.json() materializations[1]["config"]["query"] = mock.ANY - assert materializations[1] == load_expected_file( + expected_config = load_expected_file( "spark_sql.full.partition.materializations.json", ) + expected_config["id"] = mock.ANY + assert materializations[1] == expected_config # Kick off backfill for this materialization response = await module__client_with_roads.post( @@ -1249,9 +1255,9 @@ async def test_spark_sql_incremental( data = response.json() assert data["version"] == "v1.0" del data["materializations"][0]["config"]["query"] - assert data["materializations"] == load_expected_file( - "spark_sql.incremental.config.json", - ) + expected_config = load_expected_file("spark_sql.incremental.config.json") + expected_config[0]["id"] = mock.ANY + assert data["materializations"] == expected_config # Kick off backfill for this materialization response = await module__client_with_roads.post( @@ -1465,6 +1471,7 @@ async def test_spark_with_availablity( ) assert response.status_code in (200, 201) assert str(parse(response.json()[0]["config"]["query"])) == str(parse(query_one)) + materialization_id = response.json()[0]["id"] # create a materialization on the 2nd node (w/o availability) response = await module__client_with_roads.post( @@ -1511,7 +1518,10 @@ async def test_spark_with_availablity( "valid_through_ts": 20230125, "max_temporal_partition": ["2023", "01", "25"], "min_temporal_partition": ["2022", "01", "01"], - "url": "http://some.catalog.com/default.accounting.pmts", + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, + "materialization_id": materialization_id, }, ) assert response.status_code in (200, 201) diff --git a/datajunction-server/tests/api/nodes_test.py b/datajunction-server/tests/api/nodes_test.py index db5619a7e..14c3ab7ab 100644 --- a/datajunction-server/tests/api/nodes_test.py +++ b/datajunction-server/tests/api/nodes_test.py @@ -3412,6 +3412,7 @@ async def test_update_node_query_with_materializations( "job": "SparkSqlMaterializationJob", "name": "spark_sql__full", "schedule": "0 * * * *", + "id": mock.ANY, }, ] diff --git a/datajunction-server/tests/models/node_test.py b/datajunction-server/tests/models/node_test.py index 1ab36079d..f7537c281 100644 --- a/datajunction-server/tests/models/node_test.py +++ b/datajunction-server/tests/models/node_test.py @@ -167,8 +167,8 @@ def test_merging_availability_simple_no_partitions() -> None: "categorical_partitions": [], "temporal_partitions": [], "partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @@ -208,8 +208,8 @@ def test_merging_availability_complex_no_partitions() -> None: "categorical_partitions": [], "temporal_partitions": [], "partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @@ -289,8 +289,8 @@ def test_merging_availability_complex_with_partitions() -> None: "max_temporal_partition": ["20230926"], }, ], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } diff --git a/datajunction-server/tests/service_clients_test.py b/datajunction-server/tests/service_clients_test.py index 5fb79b6f6..cc89204b1 100644 --- a/datajunction-server/tests/service_clients_test.py +++ b/datajunction-server/tests/service_clients_test.py @@ -779,6 +779,7 @@ def test_materialize_cube(self, mocker: MockerFixture) -> None: query_service_client = QueryServiceClient(uri=self.endpoint) materialization_input = DruidCubeMaterializationInput( + id=1, name="default", strategy=MaterializationStrategy.INCREMENTAL_TIME, schedule="@daily",