Skip to content

Commit 88da4d9

Browse files
authored
Refactor!: move project from snapshot to model/node (#1345)
1 parent a866dff commit 88da4d9

File tree

10 files changed

+69
-14
lines changed

10 files changed

+69
-14
lines changed

sqlmesh/core/context.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,7 @@ def _snapshots(
11591159
projects = {config.project for config in self.configs.values()}
11601160

11611161
for name, snapshot in remote_snapshots.items():
1162-
if name not in models and snapshot.project not in projects:
1162+
if name not in models and snapshot.model.project not in projects:
11631163
models[name] = snapshot.model
11641164

11651165
for audit in snapshot.audits:
@@ -1172,19 +1172,16 @@ def _snapshots(
11721172
if model.name not in self._models and model.name in remote_snapshots:
11731173
snapshot = remote_snapshots[model.name]
11741174
ttl = snapshot.ttl
1175-
project = snapshot.project
11761175
else:
11771176
config = self.config_for_model(model)
11781177
ttl = config.snapshot_ttl
1179-
project = config.project
11801178

11811179
snapshot = Snapshot.from_model(
11821180
model,
11831181
nodes=models,
11841182
audits=audits,
11851183
cache=fingerprint_cache,
11861184
ttl=ttl,
1187-
project=project,
11881185
)
11891186
snapshots[model.name] = snapshot
11901187

sqlmesh/core/loader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def _load_external_models(self) -> UniqueKeyDict[str, Model]:
174174
**row,
175175
dialect=config.model_defaults.dialect,
176176
path=path,
177+
project=config.project,
177178
)
178179
models[model.name] = model
179180
return models
@@ -275,6 +276,7 @@ def _load() -> Model:
275276
dialect=config.model_defaults.dialect,
276277
time_column_format=config.time_column_format,
277278
physical_schema_override=config.physical_schema_override,
279+
project=config.project,
278280
)
279281

280282
model = cache.get_or_load_model(path, _load)
@@ -310,6 +312,7 @@ def _load_python_models(self) -> UniqueKeyDict[str, Model]:
310312
dialect=config.model_defaults.dialect,
311313
time_column_format=config.time_column_format,
312314
physical_schema_override=config.physical_schema_override,
315+
project=config.project,
313316
)
314317
models[model.name] = model
315318

sqlmesh/core/model/decorator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def model(
5858
dialect: t.Optional[str] = None,
5959
time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT,
6060
physical_schema_override: t.Optional[t.Dict[str, str]] = None,
61+
project: str = "",
6162
) -> Model:
6263
"""Get the model registered by this function."""
6364
env: t.Dict[str, t.Any] = {}
@@ -71,6 +72,7 @@ def model(
7172
time_column_format=time_column_format,
7273
python_env=serialize_env(env, path=module_path),
7374
physical_schema_override=physical_schema_override,
75+
project=project,
7476
**self.kwargs,
7577
)
7678

sqlmesh/core/model/definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,7 @@ def metadata_hash(self, audits: t.Dict[str, Audit]) -> str:
722722
str(self.forward_only),
723723
str(self.disable_restatement),
724724
str(self.interval_unit_) if self.interval_unit_ is not None else None,
725+
self.project,
725726
]
726727

727728
for audit_name, audit_args in sorted(self.audits, key=lambda a: a[0]):

sqlmesh/core/node.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ class Node(PydanticModel):
141141
142142
Args:
143143
name: The name of the node.
144+
description: The name of the project this node belongs to, used in multi-repo deployments.
144145
description: The optional node description.
145146
owner: The owner of the node.
146147
start: The earliest date that the node will be executed for. If this is None,
@@ -154,6 +155,7 @@ class Node(PydanticModel):
154155
"""
155156

156157
name: str
158+
project: str = ""
157159
description: t.Optional[str] = None
158160
owner: t.Optional[str] = None
159161
start: t.Optional[TimeLike] = None

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
361361
audits: The list of audits used by the model.
362362
intervals: List of [start, end) intervals showing which time ranges a snapshot has data for.
363363
dev_intervals: List of [start, end) intervals showing development intervals (forward-only).
364-
project: The name of the project this snapshot is associated with.
365364
created_ts: Epoch millis timestamp when a snapshot was first created.
366365
updated_ts: Epoch millis timestamp when a snapshot was last updated.
367366
ttl: The time-to-live of a snapshot determines when it should be deleted after it's no longer referenced
@@ -385,7 +384,6 @@ class Snapshot(PydanticModel, SnapshotInfoMixin):
385384
audits: t.Tuple[Audit, ...]
386385
intervals: Intervals = []
387386
dev_intervals: Intervals = []
388-
project: str = ""
389387
created_ts: int
390388
updated_ts: int
391389
ttl: str
@@ -473,7 +471,6 @@ def from_model(
473471
*,
474472
nodes: t.Dict[str, SnapshotNode],
475473
ttl: str = c.DEFAULT_SNAPSHOT_TTL,
476-
project: str = "",
477474
version: t.Optional[str] = None,
478475
audits: t.Optional[t.Dict[str, Audit]] = None,
479476
cache: t.Optional[t.Dict[str, SnapshotFingerprint]] = None,
@@ -519,7 +516,6 @@ def from_model(
519516
audits=tuple(model.referenced_audits(audits)),
520517
intervals=[],
521518
dev_intervals=[],
522-
project=project,
523519
created_ts=created_ts,
524520
updated_ts=created_ts,
525521
ttl=ttl,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Move project attr from snapshot to model."""
2+
import json
3+
4+
import pandas as pd
5+
from sqlglot import exp
6+
7+
from sqlmesh.utils.migration import index_text_type
8+
9+
10+
def migrate(state_sync): # type: ignore
11+
engine_adapter = state_sync.engine_adapter
12+
schema = state_sync.schema
13+
snapshots_table = "_snapshots"
14+
if schema:
15+
snapshots_table = f"{schema}.{snapshots_table}"
16+
17+
new_snapshots = []
18+
19+
for name, identifier, version, snapshot, kind_name in engine_adapter.fetchall(
20+
exp.select("name", "identifier", "version", "snapshot", "kind_name").from_(snapshots_table),
21+
quote_identifiers=True,
22+
):
23+
parsed_snapshot = json.loads(snapshot)
24+
25+
parsed_snapshot["node"]["project"] = parsed_snapshot.pop("project", "")
26+
27+
new_snapshots.append(
28+
{
29+
"name": name,
30+
"identifier": identifier,
31+
"version": version,
32+
"snapshot": json.dumps(parsed_snapshot),
33+
"kind_name": kind_name,
34+
}
35+
)
36+
37+
engine_adapter.delete_from(snapshots_table, "TRUE")
38+
39+
text_type = index_text_type(engine_adapter.dialect)
40+
41+
if new_snapshots:
42+
engine_adapter.insert_append(
43+
snapshots_table,
44+
pd.DataFrame(new_snapshots),
45+
columns_to_types={
46+
"name": exp.DataType.build(text_type),
47+
"identifier": exp.DataType.build(text_type),
48+
"version": exp.DataType.build(text_type),
49+
"snapshot": exp.DataType.build("text"),
50+
"kind_name": exp.DataType.build("text"),
51+
},
52+
contains_json=True,
53+
)

tests/core/test_integration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ def test_multi(mocker):
645645
paths=["examples/multi/repo_1"], engine_adapter=context.engine_adapter, gateway="memory"
646646
)
647647
model = context.models["bronze.a"]
648+
assert model.project == "repo_1"
648649
context.upsert_model(model.copy(update={"query": model.query.select("'c' AS c")}))
649650
plan = context.plan()
650651
assert set(snapshot.name for snapshot in plan.directly_modified) == {"bronze.a", "bronze.b"}

tests/core/test_snapshot.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def test_json(snapshot: Snapshot):
100100
"dialect": "spark",
101101
"name": "name",
102102
"partitioned_by": [],
103+
"project": "",
103104
"owner": "owner",
104105
"query": "SELECT @EACH([1, 2], x -> x), ds FROM parent.tbl",
105106
"jinja_macros": {
@@ -118,7 +119,6 @@ def test_json(snapshot: Snapshot):
118119
"name": "name",
119120
"parents": [{"name": "parent.tbl", "identifier": snapshot.parents[0].identifier}],
120121
"previous_versions": [],
121-
"project": "",
122122
"indirect_versions": {},
123123
"updated_ts": 1663891973000,
124124
"version": snapshot.fingerprint.to_version(),
@@ -393,7 +393,7 @@ def test_fingerprint(model: Model, parent_model: Model):
393393

394394
original_fingerprint = SnapshotFingerprint(
395395
data_hash="1116890341",
396-
metadata_hash="1237394431",
396+
metadata_hash="1312958471",
397397
)
398398

399399
assert fingerprint == original_fingerprint
@@ -440,7 +440,7 @@ def test_fingerprint_seed_model():
440440

441441
expected_fingerprint = SnapshotFingerprint(
442442
data_hash="1421766360",
443-
metadata_hash="3585221762",
443+
metadata_hash="1617581697",
444444
)
445445

446446
model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))
@@ -480,7 +480,7 @@ def test_fingerprint_jinja_macros(model: Model):
480480
)
481481
original_fingerprint = SnapshotFingerprint(
482482
data_hash="4053778362",
483-
metadata_hash="1237394431",
483+
metadata_hash="1312958471",
484484
)
485485

486486
fingerprint = fingerprint_from_node(model, nodes={})

tests/schedulers/airflow/test_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
8888
"partitioned_by": ["a"],
8989
"query": "SELECT a, ds FROM tbl",
9090
"references": [],
91+
"project": "",
9192
"storage_format": "parquet",
9293
"jinja_macros": {
9394
"global_objs": {},
@@ -104,7 +105,6 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
104105
"name": "test_model",
105106
"parents": [],
106107
"previous_versions": [],
107-
"project": "",
108108
"updated_ts": 1665014400000,
109109
"version": snapshot.version,
110110
"change_category": snapshot.change_category,
@@ -131,7 +131,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
131131
"promoted_snapshot_ids": [
132132
{
133133
"name": "test_model",
134-
"identifier": "3192766394",
134+
"identifier": "3474928511",
135135
}
136136
],
137137
"suffix_target": "schema",

0 commit comments

Comments
 (0)