|
31 | 31 | OnAdditiveChange, |
32 | 32 | on_destructive_change_validator, |
33 | 33 | on_additive_change_validator, |
| 34 | + TimeColumn, |
34 | 35 | ) |
35 | 36 | from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy |
36 | 37 | from sqlmesh.dbt.common import SqlStr, sql_str_validator |
@@ -85,7 +86,7 @@ class ModelConfig(BaseModelConfig): |
85 | 86 |
|
86 | 87 | # sqlmesh fields |
87 | 88 | sql: SqlStr = SqlStr("") |
88 | | - time_column: t.Optional[str] = None |
| 89 | + time_column: t.Optional[TimeColumn] = None |
89 | 90 | cron: t.Optional[str] = None |
90 | 91 | interval_unit: t.Optional[str] = None |
91 | 92 | batch_concurrency: t.Optional[int] = None |
@@ -152,6 +153,7 @@ class ModelConfig(BaseModelConfig): |
152 | 153 | _sql_validator = sql_str_validator |
153 | 154 | _on_destructive_change_validator = on_destructive_change_validator |
154 | 155 | _on_additive_change_validator = on_additive_change_validator |
| 156 | + _time_column_validator = TimeColumn.validator() |
155 | 157 |
|
156 | 158 | @field_validator( |
157 | 159 | "unique_key", |
@@ -243,17 +245,6 @@ def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]: |
243 | 245 | def table_schema(self) -> str: |
244 | 246 | return self.target_schema or super().table_schema |
245 | 247 |
|
246 | | - def _get_overlapping_field_value( |
247 | | - self, context: DbtContext, dbt_field_name: str, sqlmesh_field_name: str |
248 | | - ) -> t.Optional[t.Any]: |
249 | | - dbt_field = self._get_field_value(dbt_field_name) |
250 | | - sqlmesh_field = getattr(self, sqlmesh_field_name, None) |
251 | | - if dbt_field is not None and sqlmesh_field is not None: |
252 | | - get_console().log_warning( |
253 | | - f"Both '{dbt_field_name}' and '{sqlmesh_field_name}' are set for model '{self.canonical_name(context)}'. '{sqlmesh_field_name}' will be used." |
254 | | - ) |
255 | | - return sqlmesh_field if sqlmesh_field is not None else dbt_field |
256 | | - |
257 | 248 | def model_kind(self, context: DbtContext) -> ModelKind: |
258 | 249 | """ |
259 | 250 | Get the sqlmesh ModelKind |
@@ -342,16 +333,18 @@ def model_kind(self, context: DbtContext) -> ModelKind: |
342 | 333 | f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}." |
343 | 334 | ) |
344 | 335 |
|
345 | | - if self.time_column and strategy not in {"incremental_by_time_range", "microbatch"}: |
| 336 | + if self.time_column and strategy != "incremental_by_time_range": |
346 | 337 | get_console().log_warning( |
347 | 338 | f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. " |
348 | 339 | f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'." |
349 | 340 | ) |
350 | 341 |
|
351 | 342 | if strategy == "microbatch": |
352 | | - time_column = self._get_overlapping_field_value( |
353 | | - context, "event_time", "time_column" |
354 | | - ) |
| 343 | + if self.time_column: |
| 344 | + raise ConfigError( |
| 345 | + f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead." |
| 346 | + ) |
| 347 | + time_column = self._get_field_value("event_time") |
355 | 348 | if not time_column: |
356 | 349 | raise ConfigError( |
357 | 350 | f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy." |
|
0 commit comments