From 78bde667fe496eaaa538019d7c8b26fc41ef685f Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sat, 22 Feb 2025 18:52:12 -0800 Subject: [PATCH 1/5] Attach availability to materialization --- ...f60_add_materialization_to_availability.py | 43 +++++++++++++++++++ .../datajunction_server/api/data.py | 3 +- .../database/availabilitystate.py | 19 +++++++- .../database/materialization.py | 8 ++++ .../datajunction_server/internal/nodes.py | 1 + .../datajunction_server/models/node.py | 2 +- 6 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py diff --git a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py new file mode 100644 index 000000000..5f347c402 --- /dev/null +++ b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py @@ -0,0 +1,43 @@ +"""Add materialization to availability + +Revision ID: d99d274bff60 +Revises: c3d5f327296c +Create Date: 2025-02-23 02:17:27.717193+00:00 + +""" +# pylint: disable=no-member, invalid-name, missing-function-docstring, unused-import, no-name-in-module + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "d99d274bff60" +down_revision = "c3d5f327296c" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "materialization_id", + sa.BigInteger().with_variant(sa.Integer(), "sqlite"), + nullable=True, + ), + ) + batch_op.create_foreign_key( + "fk_availability_materialization_id_materialization", + "materialization", + ["materialization_id"], + ["id"], + ) + + +def downgrade(): + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.drop_constraint( + "fk_availability_materialization_id_materialization", + type_="foreignkey", + ) + batch_op.drop_column("materialization_id") diff --git a/datajunction-server/datajunction_server/api/data.py b/datajunction-server/datajunction_server/api/data.py index a769d9c46..0e883521f 100644 --- a/datajunction-server/datajunction_server/api/data.py +++ b/datajunction-server/datajunction_server/api/data.py @@ -49,7 +49,7 @@ router = SecureAPIRouter(tags=["data"]) -@router.post("/data/{node_name}/availability/", name="Add Availability State to Node") +@router.post("/data/{node_name}/availability", name="Add Availability State to Node") async def add_availability_state( node_name: str, data: AvailabilityStateBase, @@ -139,6 +139,7 @@ async def add_availability_state( categorical_partitions=data.categorical_partitions, temporal_partitions=data.temporal_partitions, links=data.links, + materialization_id=data.materialization_id, ) if node_revision.availability and not node_revision.availability.partitions: node_revision.availability.partitions = [] diff --git a/datajunction-server/datajunction_server/database/availabilitystate.py b/datajunction-server/datajunction_server/database/availabilitystate.py index 5da82659c..b4667ee01 100644 --- a/datajunction-server/datajunction_server/database/availabilitystate.py +++ b/datajunction-server/datajunction_server/database/availabilitystate.py @@ -2,16 +2,19 @@ from datetime import datetime, timezone from functools import partial -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional import sqlalchemy as sa from sqlalchemy import JSON, DateTime, ForeignKey -from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.orm import Mapped, mapped_column, relationship from datajunction_server.database.base import Base from datajunction_server.models.node import BuildCriteria, PartitionAvailability from datajunction_server.typing import UTCDatetime +if TYPE_CHECKING: + from datajunction_server.database.materialization import Materialization + class AvailabilityState(Base): """ @@ -32,6 +35,18 @@ class AvailabilityState(Base): url: Mapped[Optional[str]] links: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, default=dict) + # The materialization that this availability is associated with, if any + materialization_id: Mapped[Optional[int]] = mapped_column( + ForeignKey( + "materialization.id", + name="fk_availability_materialization_id_materialization", + ), + ) + materialization: Mapped[Optional["Materialization"]] = relationship( + back_populates="availability", + primaryjoin="Materialization.id==AvailabilityState.materialization_id", + ) + # An ordered list of categorical partitions like ["country", "group_id"] # or ["region_id", "age_group"] categorical_partitions: Mapped[Optional[List[str]]] = mapped_column( diff --git a/datajunction-server/datajunction_server/database/materialization.py b/datajunction-server/datajunction_server/database/materialization.py index 930792843..87268200c 100644 --- a/datajunction-server/datajunction_server/database/materialization.py +++ b/datajunction-server/datajunction_server/database/materialization.py @@ -19,6 +19,7 @@ from datajunction_server.database.backfill import Backfill from datajunction_server.database.base import Base from datajunction_server.database.column import Column +from datajunction_server.database.availabilitystate import AvailabilityState from datajunction_server.models.materialization import ( DruidMeasuresCubeConfig, GenericMaterializationConfig, @@ -97,6 +98,13 @@ class Materialization(Base): lazy="selectin", ) + availability: Mapped[List[AvailabilityState]] = relationship( + back_populates="materialization", + primaryjoin="Materialization.id==AvailabilityState.materialization_id", + cascade="all, delete", + lazy="selectin", + ) + @classmethod async def get_by_names( cls, diff --git a/datajunction-server/datajunction_server/internal/nodes.py b/datajunction-server/datajunction_server/internal/nodes.py index 9cf252834..ccd43c87f 100644 --- a/datajunction-server/datajunction_server/internal/nodes.py +++ b/datajunction-server/datajunction_server/internal/nodes.py @@ -1570,6 +1570,7 @@ async def upsert_complex_dimension_link( node = await Node.get_by_name( session, node_name, + raise_if_not_exists=True, ) if node.type not in (NodeType.SOURCE, NodeType.DIMENSION, NodeType.TRANSFORM): # type: ignore raise DJInvalidInputException( diff --git a/datajunction-server/datajunction_server/models/node.py b/datajunction-server/datajunction_server/models/node.py index f03bc779b..547641976 100644 --- a/datajunction-server/datajunction_server/models/node.py +++ b/datajunction-server/datajunction_server/models/node.py @@ -265,7 +265,7 @@ class AvailabilityStateBase(TemporalPartitionRange): schema_: Optional[str] = Field(default=None) table: str valid_through_ts: int - url: Optional[str] + materialization_id: Optional[int] = Field(default=None) links: Optional[Dict[str, Any]] = Field(default={}) # An ordered list of categorical partitions like ["country", "group_id"] From b6207d0f84c523372ed68fda1b2e6b061aab7f04 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 23 Feb 2025 00:00:55 -0800 Subject: [PATCH 2/5] Expose ids when retrieving materializations or calling the query service. Add custom metadata to availability --- ...f60_add_materialization_to_availability.py | 43 ------------- ...cd664b4_associate_materialization_with_.py | 36 +++++++++++ .../datajunction_server/api/data.py | 3 +- .../database/availabilitystate.py | 13 +++- .../jobs/cube_materialization.py | 1 + .../models/cube_materialization.py | 1 + .../models/materialization.py | 1 + .../datajunction_server/models/node.py | 2 +- datajunction-server/tests/api/data_test.py | 64 ++++++++++--------- .../tests/api/materializations_test.py | 28 +++++--- datajunction-server/tests/models/node_test.py | 12 ++-- .../tests/service_clients_test.py | 1 + 12 files changed, 112 insertions(+), 93 deletions(-) delete mode 100644 datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py create mode 100644 datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py diff --git a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py deleted file mode 100644 index 5f347c402..000000000 --- a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0217-d99d274bff60_add_materialization_to_availability.py +++ /dev/null @@ -1,43 +0,0 @@ -"""Add materialization to availability - -Revision ID: d99d274bff60 -Revises: c3d5f327296c -Create Date: 2025-02-23 02:17:27.717193+00:00 - -""" -# pylint: disable=no-member, invalid-name, missing-function-docstring, unused-import, no-name-in-module - -import sqlalchemy as sa -from alembic import op - -# revision identifiers, used by Alembic. -revision = "d99d274bff60" -down_revision = "c3d5f327296c" -branch_labels = None -depends_on = None - - -def upgrade(): - with op.batch_alter_table("availabilitystate", schema=None) as batch_op: - batch_op.add_column( - sa.Column( - "materialization_id", - sa.BigInteger().with_variant(sa.Integer(), "sqlite"), - nullable=True, - ), - ) - batch_op.create_foreign_key( - "fk_availability_materialization_id_materialization", - "materialization", - ["materialization_id"], - ["id"], - ) - - -def downgrade(): - with op.batch_alter_table("availabilitystate", schema=None) as batch_op: - batch_op.drop_constraint( - "fk_availability_materialization_id_materialization", - type_="foreignkey", - ) - batch_op.drop_column("materialization_id") diff --git a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py new file mode 100644 index 000000000..a4aaedee1 --- /dev/null +++ b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py @@ -0,0 +1,36 @@ +"""Associate materialization with availability and change metadata + +Revision ID: 0c3d8cd664b4 +Revises: c3d5f327296c +Create Date: 2025-02-23 07:58:08.850294+00:00 + +""" +# pylint: disable=no-member, invalid-name, missing-function-docstring, unused-import, no-name-in-module + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '0c3d8cd664b4' +down_revision = 'c3d5f327296c' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('availabilitystate', schema=None) as batch_op: + batch_op.add_column(sa.Column('custom_metadata', sa.JSON(), nullable=True)) + batch_op.add_column(sa.Column('materialization_id', sa.BigInteger(), nullable=True)) + batch_op.create_foreign_key('fk_availability_materialization_id_materialization', 'materialization', ['materialization_id'], ['id']) + batch_op.drop_column('url') + batch_op.drop_column('links') + + +def downgrade(): + with op.batch_alter_table('availabilitystate', schema=None) as batch_op: + batch_op.add_column(sa.Column('links', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True)) + batch_op.add_column(sa.Column('url', sa.VARCHAR(), autoincrement=False, nullable=True)) + batch_op.drop_constraint('fk_availability_materialization_id_materialization', type_='foreignkey') + batch_op.drop_column('materialization_id') + batch_op.drop_column('custom_metadata') diff --git a/datajunction-server/datajunction_server/api/data.py b/datajunction-server/datajunction_server/api/data.py index 0e883521f..39ebe4d9e 100644 --- a/datajunction-server/datajunction_server/api/data.py +++ b/datajunction-server/datajunction_server/api/data.py @@ -129,7 +129,7 @@ async def add_availability_state( schema_=data.schema_, table=data.table, valid_through_ts=data.valid_through_ts, - url=data.url, + custom_metadata=data.custom_metadata, min_temporal_partition=data.min_temporal_partition, max_temporal_partition=data.max_temporal_partition, partitions=[ @@ -138,7 +138,6 @@ async def add_availability_state( ], categorical_partitions=data.categorical_partitions, temporal_partitions=data.temporal_partitions, - links=data.links, materialization_id=data.materialization_id, ) if node_revision.availability and not node_revision.availability.partitions: diff --git a/datajunction-server/datajunction_server/database/availabilitystate.py b/datajunction-server/datajunction_server/database/availabilitystate.py index b4667ee01..d3ece3972 100644 --- a/datajunction-server/datajunction_server/database/availabilitystate.py +++ b/datajunction-server/datajunction_server/database/availabilitystate.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone from functools import partial -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional import sqlalchemy as sa from sqlalchemy import JSON, DateTime, ForeignKey @@ -28,12 +28,19 @@ class AvailabilityState(Base): primary_key=True, ) + # Identifying where the dataset lives catalog: Mapped[str] schema_: Mapped[Optional[str]] = mapped_column(nullable=True) table: Mapped[str] + + # Indicates data freshness valid_through_ts: Mapped[int] = mapped_column(sa.BigInteger()) - url: Mapped[Optional[str]] - links: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, default=dict) + + # Arbitrary JSON metadata. This can encompass any URLs associated with the materialized dataset + custom_metadata: Mapped[Optional[Dict]] = mapped_column( + JSON, + default={}, + ) # The materialization that this availability is associated with, if any materialization_id: Mapped[Optional[int]] = mapped_column( diff --git a/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py b/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py index 58affe56a..a2fcac6f7 100644 --- a/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py +++ b/datajunction-server/datajunction_server/materialization/jobs/cube_materialization.py @@ -149,6 +149,7 @@ def schedule( ) return query_service_client.materialize_cube( materialization_input=DruidCubeMaterializationInput( + id=materialization.id, name=materialization.name, cube=cube_config.cube, dimensions=cube_config.dimensions, diff --git a/datajunction-server/datajunction_server/models/cube_materialization.py b/datajunction-server/datajunction_server/models/cube_materialization.py index d945b03a7..db34ca546 100644 --- a/datajunction-server/datajunction_server/models/cube_materialization.py +++ b/datajunction-server/datajunction_server/models/cube_materialization.py @@ -398,6 +398,7 @@ class DruidCubeMaterializationInput(BaseModel): Materialization info as passed to the query service. """ + id: int name: str # Frozen cube info at the time of materialization diff --git a/datajunction-server/datajunction_server/models/materialization.py b/datajunction-server/datajunction_server/models/materialization.py index e50ba07e9..c7debbe0f 100644 --- a/datajunction-server/datajunction_server/models/materialization.py +++ b/datajunction-server/datajunction_server/models/materialization.py @@ -119,6 +119,7 @@ class MaterializationConfigOutput(BaseModel): Output for materialization config. """ + id: int name: Optional[str] config: Dict schedule: str diff --git a/datajunction-server/datajunction_server/models/node.py b/datajunction-server/datajunction_server/models/node.py index 547641976..90ac9db3a 100644 --- a/datajunction-server/datajunction_server/models/node.py +++ b/datajunction-server/datajunction_server/models/node.py @@ -266,7 +266,7 @@ class AvailabilityStateBase(TemporalPartitionRange): table: str valid_through_ts: int materialization_id: Optional[int] = Field(default=None) - links: Optional[Dict[str, Any]] = Field(default={}) + custom_metadata: Optional[Dict[str, Any]] = Field(default={}) # An ordered list of categorical partitions like ["country", "group_id"] # or ["region_id", "age_group"] diff --git a/datajunction-server/tests/api/data_test.py b/datajunction-server/tests/api/data_test.py index f83f80ef9..74bd6b0ae 100644 --- a/datajunction-server/tests/api/data_test.py +++ b/datajunction-server/tests/api/data_test.py @@ -665,7 +665,9 @@ async def test_setting_availability_state( "valid_through_ts": 20230125, "max_temporal_partition": ["2023", "01", "25"], "min_temporal_partition": ["2022", "01", "01"], - "url": "http://some.catalog.com/default.accounting.pmts", + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, }, ) data = response.json() @@ -700,8 +702,10 @@ async def test_setting_availability_state( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": "http://some.catalog.com/default.accounting.pmts", - "links": {}, + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, + "materialization_id": None, }, "pre": {}, "user": "dj", @@ -725,8 +729,10 @@ async def test_setting_availability_state( "schema_": "accounting", "categorical_partitions": [], "temporal_partitions": [], - "url": "http://some.catalog.com/default.accounting.pmts", - "links": {}, + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, + "materialization_id": None, } @pytest.mark.asyncio @@ -839,8 +845,8 @@ async def test_setting_availability_state_multiple_times( "table": "new_payments_table", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "pre": { "catalog": "default", @@ -852,8 +858,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "user": "dj", }, @@ -875,8 +881,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "pre": { "catalog": "default", @@ -888,8 +894,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "user": "dj", }, @@ -911,8 +917,8 @@ async def test_setting_availability_state_multiple_times( "table": "pmts", "temporal_partitions": [], "valid_through_ts": 20230125, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, }, "pre": {}, "user": "dj", @@ -936,8 +942,8 @@ async def test_setting_availability_state_multiple_times( "schema_": "new_accounting", "categorical_partitions": [], "temporal_partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1076,8 +1082,8 @@ async def test_merging_in_a_higher_max_partition( "partitions": [], "categorical_partitions": [], "temporal_partitions": ["payment_id"], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.fixture @@ -1148,8 +1154,8 @@ async def test_set_temporal_only_availability( "schema_": "dimensions", "table": "local_hard_hats", "valid_through_ts": 20230101, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1195,8 +1201,8 @@ async def test_set_node_level_availability_wider_time_range( "schema_": "dimensions", "table": "local_hard_hats", "valid_through_ts": 20230101, - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1549,8 +1555,8 @@ async def test_merging_in_a_lower_min_partition( "max_temporal_partition": ["2023", "01", "01"], "schema_": "accounting", "partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1622,8 +1628,8 @@ async def test_moving_back_valid_through_ts( "partitions": [], "categorical_partitions": [], "temporal_partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio @@ -1668,8 +1674,8 @@ async def test_setting_availablity_state_on_a_source_node( "partitions": [], "categorical_partitions": [], "temporal_partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @pytest.mark.asyncio diff --git a/datajunction-server/tests/api/materializations_test.py b/datajunction-server/tests/api/materializations_test.py index 466991e6c..eb8226b4f 100644 --- a/datajunction-server/tests/api/materializations_test.py +++ b/datajunction-server/tests/api/materializations_test.py @@ -1071,7 +1071,9 @@ async def test_spark_sql_full( parse(load_expected_file("spark_sql.full.query.sql")), ) del data["materializations"][0]["config"]["query"] - assert data["materializations"] == load_expected_file("spark_sql.full.config.json") + expected_config = load_expected_file("spark_sql.full.config.json") + expected_config[0]["id"] = mock.ANY + assert data["materializations"] == expected_config # Set both temporal and categorical partitions on node response = await module__client_with_roads.post( @@ -1125,6 +1127,7 @@ async def test_spark_sql_full( materialization_with_partitions = data["materializations"][1] del materialization_with_partitions["config"]["query"] expected_config = load_expected_file("spark_sql.full.partition.config.json") + expected_config["id"] = mock.ANY assert materialization_with_partitions == expected_config # Check listing materializations of the node @@ -1133,14 +1136,17 @@ async def test_spark_sql_full( ) materializations = response.json() materializations[0]["config"]["query"] = mock.ANY - assert materializations[0] == load_expected_file( - "spark_sql.full.materializations.json", - ) + expected_config = load_expected_file("spark_sql.full.materializations.json") + expected_config["id"] = mock.ANY + assert materializations[0] == expected_config + materializations = response.json() materializations[1]["config"]["query"] = mock.ANY - assert materializations[1] == load_expected_file( + expected_config = load_expected_file( "spark_sql.full.partition.materializations.json", ) + expected_config["id"] = mock.ANY + assert materializations[1] == expected_config # Kick off backfill for this materialization response = await module__client_with_roads.post( @@ -1249,9 +1255,9 @@ async def test_spark_sql_incremental( data = response.json() assert data["version"] == "v1.0" del data["materializations"][0]["config"]["query"] - assert data["materializations"] == load_expected_file( - "spark_sql.incremental.config.json", - ) + expected_config = load_expected_file("spark_sql.incremental.config.json") + expected_config[0]["id"] = mock.ANY + assert data["materializations"] == expected_config # Kick off backfill for this materialization response = await module__client_with_roads.post( @@ -1465,6 +1471,7 @@ async def test_spark_with_availablity( ) assert response.status_code in (200, 201) assert str(parse(response.json()[0]["config"]["query"])) == str(parse(query_one)) + materialization_id = response.json()[0]["id"] # create a materialization on the 2nd node (w/o availability) response = await module__client_with_roads.post( @@ -1511,7 +1518,10 @@ async def test_spark_with_availablity( "valid_through_ts": 20230125, "max_temporal_partition": ["2023", "01", "25"], "min_temporal_partition": ["2022", "01", "01"], - "url": "http://some.catalog.com/default.accounting.pmts", + "custom_metadata": { + "url": "http://some.catalog.com/default.accounting.pmts", + }, + "materialization_id": materialization_id, }, ) assert response.status_code in (200, 201) diff --git a/datajunction-server/tests/models/node_test.py b/datajunction-server/tests/models/node_test.py index 1ab36079d..f7537c281 100644 --- a/datajunction-server/tests/models/node_test.py +++ b/datajunction-server/tests/models/node_test.py @@ -167,8 +167,8 @@ def test_merging_availability_simple_no_partitions() -> None: "categorical_partitions": [], "temporal_partitions": [], "partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @@ -208,8 +208,8 @@ def test_merging_availability_complex_no_partitions() -> None: "categorical_partitions": [], "temporal_partitions": [], "partitions": [], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } @@ -289,8 +289,8 @@ def test_merging_availability_complex_with_partitions() -> None: "max_temporal_partition": ["20230926"], }, ], - "url": None, - "links": {}, + "custom_metadata": {}, + "materialization_id": None, } diff --git a/datajunction-server/tests/service_clients_test.py b/datajunction-server/tests/service_clients_test.py index 5fb79b6f6..cc89204b1 100644 --- a/datajunction-server/tests/service_clients_test.py +++ b/datajunction-server/tests/service_clients_test.py @@ -779,6 +779,7 @@ def test_materialize_cube(self, mocker: MockerFixture) -> None: query_service_client = QueryServiceClient(uri=self.endpoint) materialization_input = DruidCubeMaterializationInput( + id=1, name="default", strategy=MaterializationStrategy.INCREMENTAL_TIME, schedule="@daily", From 3fd677e080b605c0c17a6a051ec2e89213c9e24c Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 23 Feb 2025 00:09:08 -0800 Subject: [PATCH 3/5] Add migration for existing links and url --- ...cd664b4_associate_materialization_with_.py | 113 +++++++++++++++--- 1 file changed, 96 insertions(+), 17 deletions(-) diff --git a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py index a4aaedee1..85d133dd7 100644 --- a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py +++ b/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py @@ -1,36 +1,115 @@ -"""Associate materialization with availability and change metadata +""" +Associate materialization with availability and change metadata Revision ID: 0c3d8cd664b4 Revises: c3d5f327296c Create Date: 2025-02-23 07:58:08.850294+00:00 - """ -# pylint: disable=no-member, invalid-name, missing-function-docstring, unused-import, no-name-in-module +import json import sqlalchemy as sa +from sqlalchemy.sql import table, column from alembic import op from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = '0c3d8cd664b4' -down_revision = 'c3d5f327296c' +revision = "0c3d8cd664b4" +down_revision = "c3d5f327296c" branch_labels = None depends_on = None def upgrade(): - with op.batch_alter_table('availabilitystate', schema=None) as batch_op: - batch_op.add_column(sa.Column('custom_metadata', sa.JSON(), nullable=True)) - batch_op.add_column(sa.Column('materialization_id', sa.BigInteger(), nullable=True)) - batch_op.create_foreign_key('fk_availability_materialization_id_materialization', 'materialization', ['materialization_id'], ['id']) - batch_op.drop_column('url') - batch_op.drop_column('links') + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.add_column(sa.Column("custom_metadata", sa.JSON(), nullable=True)) + batch_op.add_column( + sa.Column("materialization_id", sa.BigInteger(), nullable=True), + ) + batch_op.create_foreign_key( + "fk_availability_materialization_id_materialization", + "materialization", + ["materialization_id"], + ["id"], + ) + + availabilitystate = table( + "availabilitystate", + column("id", sa.BigInteger()), + column("url", sa.String()), + column("links", postgresql.JSON), + column("custom_metadata", sa.JSON), + ) + + # Move data from url and links to custom_metadata + connection = op.get_bind() + results = connection.execute( + sa.select( + availabilitystate.c.id, + availabilitystate.c.url, + availabilitystate.c.links, + ), + ).fetchall() + + for row in results: + metadata = {} + if row.url: + metadata["url"] = row.url + if row.links: + metadata["links"] = row.links + + if metadata: + connection.execute( + sa.update(availabilitystate) + .where(availabilitystate.c.id == row.id) + .values(custom_metadata=json.dumps(metadata)), + ) + + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.drop_column("url") + batch_op.drop_column("links") def downgrade(): - with op.batch_alter_table('availabilitystate', schema=None) as batch_op: - batch_op.add_column(sa.Column('links', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=True)) - batch_op.add_column(sa.Column('url', sa.VARCHAR(), autoincrement=False, nullable=True)) - batch_op.drop_constraint('fk_availability_materialization_id_materialization', type_='foreignkey') - batch_op.drop_column('materialization_id') - batch_op.drop_column('custom_metadata') + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "links", + postgresql.JSON(astext_type=sa.Text()), + autoincrement=False, + nullable=True, + ), + ) + batch_op.add_column( + sa.Column("url", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + batch_op.drop_constraint( + "fk_availability_materialization_id_materialization", + type_="foreignkey", + ) + batch_op.drop_column("materialization_id") + batch_op.drop_column("custom_metadata") + + # Restore `url` and `links` from `custom_metadata` + availabilitystate = table( + "availabilitystate", + column("id", sa.BigInteger()), + column("custom_metadata", sa.JSON), + column("url", sa.String()), + column("links", postgresql.JSON), + ) + + conn = op.get_bind() + results = conn.execute( + sa.select(availabilitystate.c.id, availabilitystate.c.custom_metadata), + ).fetchall() + + for row in results: + metadata = row.custom_metadata or {} + conn.execute( + sa.update(availabilitystate) + .where(availabilitystate.c.id == row.id) + .values( + url=metadata.get("url"), + links=metadata.get("links"), + ), + ) From 5647d9a1a52ea6c73e39b6813e0a09d91ff35143 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 23 Feb 2025 09:54:12 -0800 Subject: [PATCH 4/5] Fix --- .../datajunction_server/models/cube_materialization.py | 2 +- datajunction-server/tests/api/nodes_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datajunction-server/datajunction_server/models/cube_materialization.py b/datajunction-server/datajunction_server/models/cube_materialization.py index db34ca546..3d6d892cf 100644 --- a/datajunction-server/datajunction_server/models/cube_materialization.py +++ b/datajunction-server/datajunction_server/models/cube_materialization.py @@ -398,7 +398,7 @@ class DruidCubeMaterializationInput(BaseModel): Materialization info as passed to the query service. """ - id: int + id: int | None = None name: str # Frozen cube info at the time of materialization diff --git a/datajunction-server/tests/api/nodes_test.py b/datajunction-server/tests/api/nodes_test.py index db5619a7e..14c3ab7ab 100644 --- a/datajunction-server/tests/api/nodes_test.py +++ b/datajunction-server/tests/api/nodes_test.py @@ -3412,6 +3412,7 @@ async def test_update_node_query_with_materializations( "job": "SparkSqlMaterializationJob", "name": "spark_sql__full", "schedule": "0 * * * *", + "id": mock.ANY, }, ] From bf683fa88aab45f568aa6bc134c440d7326c82fa Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sun, 23 Feb 2025 22:07:59 -0800 Subject: [PATCH 5/5] Use jsonb instead of json and fix migration script --- ...ef80efd70c_associate_availability_and_.py} | 33 ++++++++++++------- .../database/availabilitystate.py | 5 +-- 2 files changed, 24 insertions(+), 14 deletions(-) rename datajunction-server/datajunction_server/alembic/versions/{2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py => 2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py} (79%) diff --git a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py b/datajunction-server/datajunction_server/alembic/versions/2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py similarity index 79% rename from datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py rename to datajunction-server/datajunction_server/alembic/versions/2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py index 85d133dd7..484f02a4c 100644 --- a/datajunction-server/datajunction_server/alembic/versions/2025_02_23_0758-0c3d8cd664b4_associate_materialization_with_.py +++ b/datajunction-server/datajunction_server/alembic/versions/2025_02_24_0549-b8ef80efd70c_associate_availability_and_.py @@ -1,11 +1,12 @@ -""" -Associate materialization with availability and change metadata +"""Associate availability and materialization -Revision ID: 0c3d8cd664b4 +Revision ID: b8ef80efd70c Revises: c3d5f327296c -Create Date: 2025-02-23 07:58:08.850294+00:00 +Create Date: 2025-02-24 05:49:06.588675+00:00 + """ +# pylint: disable=no-member, invalid-name, missing-function-docstring, unused-import, no-name-in-module import json import sqlalchemy as sa from sqlalchemy.sql import table, column @@ -13,7 +14,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = "0c3d8cd664b4" +revision = "b8ef80efd70c" down_revision = "c3d5f327296c" branch_labels = None depends_on = None @@ -21,7 +22,13 @@ def upgrade(): with op.batch_alter_table("availabilitystate", schema=None) as batch_op: - batch_op.add_column(sa.Column("custom_metadata", sa.JSON(), nullable=True)) + batch_op.add_column( + sa.Column( + "custom_metadata", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + ), + ) batch_op.add_column( sa.Column("materialization_id", sa.BigInteger(), nullable=True), ) @@ -37,7 +44,7 @@ def upgrade(): column("id", sa.BigInteger()), column("url", sa.String()), column("links", postgresql.JSON), - column("custom_metadata", sa.JSON), + column("custom_metadata", postgresql.JSONB), ) # Move data from url and links to custom_metadata @@ -61,7 +68,7 @@ def upgrade(): connection.execute( sa.update(availabilitystate) .where(availabilitystate.c.id == row.id) - .values(custom_metadata=json.dumps(metadata)), + .values(custom_metadata=metadata), ) with op.batch_alter_table("availabilitystate", schema=None) as batch_op: @@ -86,14 +93,12 @@ def downgrade(): "fk_availability_materialization_id_materialization", type_="foreignkey", ) - batch_op.drop_column("materialization_id") - batch_op.drop_column("custom_metadata") # Restore `url` and `links` from `custom_metadata` availabilitystate = table( "availabilitystate", column("id", sa.BigInteger()), - column("custom_metadata", sa.JSON), + column("custom_metadata", postgresql.JSONB), column("url", sa.String()), column("links", postgresql.JSON), ) @@ -104,7 +109,7 @@ def downgrade(): ).fetchall() for row in results: - metadata = row.custom_metadata or {} + metadata = json.loads(row.custom_metadata) if row.custom_metadata else {} conn.execute( sa.update(availabilitystate) .where(availabilitystate.c.id == row.id) @@ -113,3 +118,7 @@ def downgrade(): links=metadata.get("links"), ), ) + + with op.batch_alter_table("availabilitystate", schema=None) as batch_op: + batch_op.drop_column("materialization_id") + batch_op.drop_column("custom_metadata") diff --git a/datajunction-server/datajunction_server/database/availabilitystate.py b/datajunction-server/datajunction_server/database/availabilitystate.py index d3ece3972..c38e5a044 100644 --- a/datajunction-server/datajunction_server/database/availabilitystate.py +++ b/datajunction-server/datajunction_server/database/availabilitystate.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy import JSON, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -38,8 +39,8 @@ class AvailabilityState(Base): # Arbitrary JSON metadata. This can encompass any URLs associated with the materialized dataset custom_metadata: Mapped[Optional[Dict]] = mapped_column( - JSON, - default={}, + JSONB, + default=dict, ) # The materialization that this availability is associated with, if any