Skip to content

Commit dd65f83

Browse files
authored
Fix: Allow deserialization of models using custom materializations (#3886)
1 parent b20bd02 commit dd65f83

12 files changed

Lines changed: 332 additions & 172 deletions

File tree

docs/guides/custom_materializations.md

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ In many cases, the above usage of a custom materialization will suffice.
167167

168168
However, you may still want tighter integration with SQLMesh's internals:
169169

170-
- You may want more control over what is considered a metadata change vs a data change
171170
- You may want to validate custom properties are correct before any database connections are made
172171
- You may want to leverage existing functionality of SQLMesh that relies on specific properties being present
173172

@@ -176,43 +175,52 @@ During project load, SQLMesh will instantiate your *subclass* instead of `Custom
176175

177176
This allows you to run custom validators at load time rather than having to perform extra validation when `insert()` is invoked on your `CustomMaterialization`.
178177

179-
This approach also allows you set "top-level" properties directly in the `kind (...)` block rather than nesting them under `materialization_properties`.
178+
You can also define standard Python `@property` methods to "hoist" properties declared inside `materialization_properties` to the top level on your `Kind` object. This can make using them from within your custom materialization easier.
180179

181180
To extend `CustomKind`, first you define a subclass like so:
182181

183182
```python linenums="1" hl_lines="7"
184-
from sqlmesh import CustomKind
183+
from typing_extensions import Self
185184
from pydantic import field_validator, ValidationInfo
185+
from sqlmesh import CustomKind
186186
from sqlmesh.utils.pydantic import list_of_fields_validator
187+
from sqlmesh.utils.errors import ConfigError
187188

188189
class MyCustomKind(CustomKind):
189190

190-
primary_key: t.List[exp.Expression]
191+
_primary_key: t.List[exp.Expression]
191192

192-
@field_validator("primary_key", mode="before")
193-
@classmethod
194-
def _validate_primary_key(cls, value: t.Any, info: ValidationInfo) -> t.Any:
195-
return list_of_fields_validator(value, info.data)
193+
@model_validator(mode="after")
194+
def _validate_model(self) -> Self:
195+
self._primary_key = list_of_fields_validator(
196+
self.materialization_properties.get("primary_key"),
197+
{ "dialect": self.dialect }
198+
)
199+
if not self.primary_key:
200+
raise ConfigError("primary_key must be specified")
201+
return self
196202

197-
```
203+
@property
204+
def primary_key(self) -> t.List[exp.Expression]:
205+
return self._primary_key
198206

199-
In this example, we define a field called `primary_key` that takes a list of fields. Notice that the field validation is just a simple Pydantic `@field_validator` with the [exact same usage](https://github.com/TobikoData/sqlmesh/blob/ade5f7245950822f3cfe5a68a0c243f91ceca600/sqlmesh/core/model/kind.py#L470) as the standard SQLMesh model kinds.
207+
```
200208

201209
To use it within a model, we can do something like:
202210

203-
```sql linenums="1" hl_lines="5"
211+
```sql linenums="1" hl_lines="4"
204212
MODEL (
205213
name my_db.my_model,
206214
kind CUSTOM (
207215
materialization 'my_custom_full',
208-
primary_key (col1, col2)
216+
materialization_properties (
217+
primary_key = (col1, col2)
218+
)
209219
)
210220
);
211221
```
212222

213-
Notice that the `primary_key` field we declared is top-level within the `kind` block instead of being nested under `materialization_properties`.
214-
215-
To indicate to SQLMesh that it should use this subclass, specify it as a generic type parameter on your custom materialization class like so:
223+
To indicate to SQLMesh that it should use the `MyCustomKind` subclass instead of `CustomKind`, specify it as a generic type parameter on your custom materialization class like so:
216224

217225
```python linenums="1" hl_lines="1 16"
218226
class CustomFullMaterialization(CustomMaterialization[MyCustomKind]):
@@ -238,21 +246,9 @@ When SQLMesh loads your custom materialization, it will inspect the Python type
238246

239247
In this example, this means that:
240248

241-
- Validation for `primary_key` happens at load time instead of evaluation time.
249+
- Validation for `primary_key` happens at load time instead of evaluation time. So if there is an issue, you can abort early rather than halfway through applying a plan.
242250
- When your custom materialization is called to load data into tables, `model.kind` will resolve to your custom kind object so you can access the extra properties you defined without first needing to validate them / coerce them to a usable type.
243251

244-
### Data vs Metadata changes
245-
246-
Subclasses of `CustomKind` that add extra properties can also decide if they are data properties (changes may trigger the creation of new snapshots) or metadata properties (changes just update metadata about the model).
247-
248-
They can also decide if they are relevant for text diffing when SQLMesh detects changes to a model.
249-
250-
You can opt in to SQLMesh's change tracking by overriding the following methods:
251-
252-
- If changing the property should change the data fingerprint, add it to [data_hash_values()](https://github.com/TobikoData/sqlmesh/blob/ade5f7245950822f3cfe5a68a0c243f91ceca600/sqlmesh/core/model/kind.py#L858)
253-
- If changing the property should change the metadata fingerprint, add it to [metadata_hash_values()](https://github.com/TobikoData/sqlmesh/blob/ade5f7245950822f3cfe5a68a0c243f91ceca600/sqlmesh/core/model/kind.py#L867)
254-
- If the property should show up in context diffs, add it to [to_expression()](https://github.com/TobikoData/sqlmesh/blob/ade5f7245950822f3cfe5a68a0c243f91ceca600/sqlmesh/core/model/kind.py#L880)
255-
256252

257253
## Sharing custom materializations
258254

examples/custom_materializations/custom_materializations/custom_kind.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@
44

55
from sqlmesh import CustomMaterialization, CustomKind, Model
66
from sqlmesh.utils.pydantic import validate_string
7-
from pydantic import field_validator
87

98
if t.TYPE_CHECKING:
109
from sqlmesh import QueryOrDF
1110

1211

1312
class ExtendedCustomKind(CustomKind):
14-
custom_property: t.Optional[str] = None
15-
16-
@field_validator("custom_property", mode="before")
17-
@classmethod
18-
def _validate_custom_property(cls, v: t.Any) -> str:
19-
return validate_string(v)
13+
@property
14+
def custom_property(self) -> str:
15+
return validate_string(self.materialization_properties.get("custom_property"))
2016

2117

2218
class CustomFullWithCustomKindMaterialization(CustomMaterialization[ExtendedCustomKind]):

examples/sushi/models/latest_order.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ MODEL (
22
name sushi.latest_order,
33
kind CUSTOM (
44
materialization 'custom_full_with_custom_kind',
5-
custom_property 'sushi!!!'
5+
materialization_properties (
6+
custom_property = 'sushi!!!'
7+
)
68
),
79
cron '@daily'
810
);

pytest.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ markers =
3939
trino_delta: test for Trino (Delta connector)
4040
addopts = -n 0 --dist=loadgroup
4141

42+
asyncio_default_fixture_loop_scope = session
43+
4244
# Set this to True to enable logging during tests
4345
log_cli = False
4446
log_cli_format = %(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s

sqlmesh/core/model/definition.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,14 @@
3232
single_value_or_tuple,
3333
)
3434
from sqlmesh.core.model.meta import ModelMeta, FunctionCall
35-
from sqlmesh.core.model.kind import ModelKindName, SeedKind, ModelKind, FullKind, create_model_kind
35+
from sqlmesh.core.model.kind import (
36+
ModelKindName,
37+
SeedKind,
38+
ModelKind,
39+
FullKind,
40+
create_model_kind,
41+
CustomKind,
42+
)
3643
from sqlmesh.core.model.seed import CsvSeedReader, Seed, create_seed
3744
from sqlmesh.core.renderer import ExpressionRenderer, QueryRenderer
3845
from sqlmesh.core.signal import SignalRegistry
@@ -979,6 +986,12 @@ def validate_definition(self) -> None:
979986
self._path,
980987
)
981988

989+
if isinstance(self.kind, CustomKind):
990+
from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type_or_raise
991+
992+
# Will raise if the custom materialization points to an invalid class
993+
get_custom_materialization_type_or_raise(self.kind.materialization)
994+
982995
def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
983996
"""Determines whether this model is a breaking change in relation to the `previous` model.
984997

sqlmesh/core/model/kind.py

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import typing as t
44
from enum import Enum
5+
from typing_extensions import Self
56

67
from pydantic import Field
78
from sqlglot import exp
@@ -240,43 +241,7 @@ class TimeColumn(PydanticModel):
240241
@classmethod
241242
def validator(cls) -> classmethod:
242243
def _time_column_validator(v: t.Any, info: ValidationInfo) -> TimeColumn:
243-
dialect = get_dialect(info.data)
244-
245-
if isinstance(v, exp.Tuple):
246-
column_expr = v.expressions[0]
247-
column = (
248-
exp.column(column_expr)
249-
if isinstance(column_expr, exp.Identifier)
250-
else column_expr
251-
)
252-
format = v.expressions[1].name if len(v.expressions) > 1 else None
253-
elif isinstance(v, exp.Expression):
254-
column = exp.column(v) if isinstance(v, exp.Identifier) else v
255-
format = None
256-
elif isinstance(v, str):
257-
column = d.parse_one(v, dialect=dialect)
258-
column.meta.pop("sql")
259-
format = None
260-
elif isinstance(v, dict):
261-
column_raw = v["column"]
262-
column = (
263-
d.parse_one(column_raw, dialect=dialect)
264-
if isinstance(column_raw, str)
265-
else column_raw
266-
)
267-
format = v.get("format")
268-
elif isinstance(v, TimeColumn):
269-
column = v.column
270-
format = v.format
271-
else:
272-
raise ConfigError(f"Invalid time_column: '{v}'.")
273-
274-
column = quote_identifiers(
275-
normalize_identifiers(column, dialect=dialect), dialect=dialect
276-
)
277-
column.meta["dialect"] = dialect
278-
279-
return TimeColumn(column=column, format=format)
244+
return TimeColumn.create(v, get_dialect(info.data))
280245

281246
return field_validator("time_column", mode="before")(_time_column_validator)
282247

@@ -314,6 +279,40 @@ def to_expression(self, dialect: str) -> exp.Expression:
314279
def to_property(self, dialect: str = "") -> exp.Property:
315280
return exp.Property(this="time_column", value=self.to_expression(dialect))
316281

282+
@classmethod
283+
def create(cls, v: t.Any, dialect: str) -> Self:
284+
if isinstance(v, exp.Tuple):
285+
column_expr = v.expressions[0]
286+
column = (
287+
exp.column(column_expr) if isinstance(column_expr, exp.Identifier) else column_expr
288+
)
289+
format = v.expressions[1].name if len(v.expressions) > 1 else None
290+
elif isinstance(v, exp.Expression):
291+
column = exp.column(v) if isinstance(v, exp.Identifier) else v
292+
format = None
293+
elif isinstance(v, str):
294+
column = d.parse_one(v, dialect=dialect)
295+
column.meta.pop("sql")
296+
format = None
297+
elif isinstance(v, dict):
298+
column_raw = v["column"]
299+
column = (
300+
d.parse_one(column_raw, dialect=dialect)
301+
if isinstance(column_raw, str)
302+
else column_raw
303+
)
304+
format = v.get("format")
305+
elif isinstance(v, TimeColumn):
306+
column = v.column
307+
format = v.format
308+
else:
309+
raise ConfigError(f"Invalid time_column: '{v}'.")
310+
311+
column = quote_identifiers(normalize_identifiers(column, dialect=dialect), dialect=dialect)
312+
column.meta["dialect"] = dialect
313+
314+
return cls(column=column, format=format)
315+
317316

318317
def _kind_dialect_validator(cls: t.Type, v: t.Optional[str]) -> str:
319318
if v is None:
@@ -836,17 +835,16 @@ class CustomKind(_ModelKind):
836835
auto_restatement_cron: t.Optional[SQLGlotCron] = None
837836
auto_restatement_intervals: t.Optional[SQLGlotPositiveInt] = None
838837

838+
# so that CustomKind subclasses know the dialect when validating / normalizing / interpreting values in `materialization_properties`
839+
dialect: str = Field(exclude=True)
840+
839841
_properties_validator = properties_validator
840842

841843
@field_validator("materialization", mode="before")
842844
@classmethod
843845
def _validate_materialization(cls, v: t.Any) -> str:
844-
from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type
845-
846-
materialization = validate_string(v)
847-
# The below call fails if a materialization with the given name doesn't exist.
848-
get_custom_materialization_type(materialization)
849-
return materialization
846+
# note: create_model_kind() validates the custom materialization class
847+
return validate_string(v)
850848

851849
@property
852850
def materialization_properties(self) -> CustomMaterializationProperties:
@@ -985,11 +983,15 @@ def create_model_kind(v: t.Any, dialect: str, defaults: t.Dict[str, t.Any]) -> M
985983
"The 'materialization' property is required for models of the CUSTOM kind"
986984
)
987985

988-
actual_kind_type, _ = get_custom_materialization_type(
989-
validate_string(props.get("materialization"))
990-
)
991-
992-
return actual_kind_type(**props)
986+
# The below call will print a warning if a materialization with the given name doesn't exist
987+
# we dont want to throw an error here because we still want Models with a CustomKind to be able
988+
# to be serialized / deserialized in contexts where the custom materialization class may not be available,
989+
# such as in HTTP request handlers
990+
if custom_materialization := get_custom_materialization_type(
991+
validate_string(props.get("materialization")), raise_errors=False
992+
):
993+
actual_kind_type, _ = custom_materialization
994+
return actual_kind_type(**props)
993995

994996
return kind_type(**props)
995997

0 commit comments

Comments
 (0)