Skip to content

Commit 61a0b11

Browse files
authored
feat: add dedicated incremental_by_time_range strategy (#5306)
1 parent cef3859 commit 61a0b11

File tree

8 files changed

+214
-48
lines changed

8 files changed

+214
-48
lines changed

docs/integrations/dbt.md

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ This section describes how to adapt dbt's incremental models to run on sqlmesh a
219219
SQLMesh supports two approaches to implement [idempotent](../concepts/glossary.md#idempotency) incremental loads:
220220

221221
* Using merge (with the sqlmesh [`INCREMENTAL_BY_UNIQUE_KEY` model kind](../concepts/models/model_kinds.md#incremental_by_unique_key))
222-
* Using insert-overwrite/delete+insert (with the sqlmesh [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range))
222+
* Using [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range)
223223

224224
#### Incremental by unique key
225225

@@ -233,28 +233,22 @@ To enable incremental_by_unique_key incrementality, the model configuration shou
233233

234234
#### Incremental by time range
235235

236-
To enable incremental_by_time_range incrementality, the model configuration should contain:
236+
To enable incremental_by_time_range incrementality, the model configuration must contain:
237237

238-
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
239238
* The `materialized` key with value `'incremental'`
240-
* Either:
241-
* The `incremental_strategy` key with value `'insert_overwrite'` or
242-
* The `incremental_strategy` key with value `'delete+insert'`
243-
* Note: in this context, these two strategies are synonyms. Regardless of which one is specified SQLMesh will use the [`best incremental strategy`](../concepts/models/model_kinds.md#materialization-strategy) for the target engine.
239+
* The `incremental_strategy` key with the value `incremental_by_time_range`
240+
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
244241

245242
### Incremental logic
246243

247-
SQLMesh requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should supersede the existing `{% if is_incremental() %}` block and contain the `WHERE` clause selecting the time interval.
244+
Unlike dbt incremental strategies, SQLMesh does not require the use of `is_incremental` jinja blocks to implement incremental logic.
245+
Instead, SQLMesh provides predefined time macro variables that can be used in the model's SQL to filter data based on the time column.
248246

249247
For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja block gated by `{% if sqlmesh_incremental is defined %}` as follows:
250248

251249
```bash
252-
> {% if sqlmesh_incremental is defined %}
253250
> WHERE
254251
> ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
255-
> {% elif is_incremental() %}
256-
> ; < your existing is_incremental block >
257-
> {% endif %}
258252
```
259253

260254
`{{ start_ds }}` and `{{ end_ds }}` are the jinja equivalents of SQLMesh's `@start_ds` and `@end_ds` predefined time macro variables. See all [predefined time variables](../concepts/macros/macro_variables.md) available in jinja.
@@ -263,13 +257,11 @@ For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja blo
263257

264258
SQLMesh provides configuration parameters that enable control over how incremental computations occur. These parameters are set in the model's `config` block.
265259

266-
The [`batch_size` parameter](../concepts/models/overview.md#batch_size) determines the maximum number of time intervals to run in a single job.
267-
268-
The [`lookback` parameter](../concepts/models/overview.md#lookback) is used to capture late arriving data. It sets the number of units of late arriving data the model should expect and must be a positive integer.
260+
See [Incremental Model Properties](../concepts/models/overview.md#incremental-model-properties) for the full list of incremental model configuration parameters.
269261

270262
**Note:** By default, all incremental dbt models are configured to be [forward-only](../concepts/plans.md#forward-only-plans). However, you can change this behavior by setting the `forward_only: false` setting either in the configuration of an individual model or globally for all models in the `dbt_project.yaml` file. The [forward-only](../concepts/plans.md#forward-only-plans) mode aligns more closely with the typical operation of dbt and therefore better meets user's expectations.
271263

272-
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default for incremental dbt models unless the time column is specified, or the `allow_partials` parameter is explicitly set to `false` in the model configuration.
264+
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default unless the `allow_partials` parameter is explicitly set to `false` in the model configuration.
273265

274266
#### on_schema_change
275267

examples/sushi_dbt/models/customer_revenue_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_as_customer_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_revenue_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

sqlmesh/dbt/model.py

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,14 @@
2525
ManagedKind,
2626
create_sql_model,
2727
)
28-
from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange, OnAdditiveChange
28+
from sqlmesh.core.model.kind import (
29+
SCDType2ByTimeKind,
30+
OnDestructiveChange,
31+
OnAdditiveChange,
32+
on_destructive_change_validator,
33+
on_additive_change_validator,
34+
TimeColumn,
35+
)
2936
from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy
3037
from sqlmesh.dbt.common import SqlStr, sql_str_validator
3138
from sqlmesh.utils.errors import ConfigError
@@ -41,7 +48,9 @@
4148
logger = logging.getLogger(__name__)
4249

4350

44-
INCREMENTAL_BY_TIME_STRATEGIES = set(["delete+insert", "insert_overwrite", "microbatch"])
51+
INCREMENTAL_BY_TIME_RANGE_STRATEGIES = set(
52+
["delete+insert", "insert_overwrite", "microbatch", "incremental_by_time_range"]
53+
)
4554
INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = set(["merge"])
4655

4756

@@ -77,7 +86,7 @@ class ModelConfig(BaseModelConfig):
7786

7887
# sqlmesh fields
7988
sql: SqlStr = SqlStr("")
80-
time_column: t.Optional[str] = None
89+
time_column: t.Optional[TimeColumn] = None
8190
cron: t.Optional[str] = None
8291
interval_unit: t.Optional[str] = None
8392
batch_concurrency: t.Optional[int] = None
@@ -87,6 +96,9 @@ class ModelConfig(BaseModelConfig):
8796
physical_version: t.Optional[str] = None
8897
auto_restatement_cron: t.Optional[str] = None
8998
auto_restatement_intervals: t.Optional[int] = None
99+
partition_by_time_column: t.Optional[bool] = None
100+
on_destructive_change: t.Optional[OnDestructiveChange] = None
101+
on_additive_change: t.Optional[OnAdditiveChange] = None
90102

91103
# DBT configuration fields
92104
cluster_by: t.Optional[t.List[str]] = None
@@ -139,6 +151,9 @@ class ModelConfig(BaseModelConfig):
139151
incremental_predicates: t.Optional[t.List[str]] = None
140152

141153
_sql_validator = sql_str_validator
154+
_on_destructive_change_validator = on_destructive_change_validator
155+
_on_additive_change_validator = on_additive_change_validator
156+
_time_column_validator = TimeColumn.validator()
142157

143158
@field_validator(
144159
"unique_key",
@@ -230,17 +245,6 @@ def snapshot_strategy(self) -> t.Optional[SnapshotStrategy]:
230245
def table_schema(self) -> str:
231246
return self.target_schema or super().table_schema
232247

233-
def _get_overlapping_field_value(
234-
self, context: DbtContext, dbt_field_name: str, sqlmesh_field_name: str
235-
) -> t.Optional[t.Any]:
236-
dbt_field = self._get_field_value(dbt_field_name)
237-
sqlmesh_field = getattr(self, sqlmesh_field_name, None)
238-
if dbt_field is not None and sqlmesh_field is not None:
239-
get_console().log_warning(
240-
f"Both '{dbt_field_name}' and '{sqlmesh_field_name}' are set for model '{self.canonical_name(context)}'. '{sqlmesh_field_name}' will be used."
241-
)
242-
return sqlmesh_field if sqlmesh_field is not None else dbt_field
243-
244248
def model_kind(self, context: DbtContext) -> ModelKind:
245249
"""
246250
Get the sqlmesh ModelKind
@@ -275,8 +279,12 @@ def model_kind(self, context: DbtContext) -> ModelKind:
275279
"Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'."
276280
)
277281

278-
incremental_kind_kwargs["on_destructive_change"] = on_destructive_change
279-
incremental_kind_kwargs["on_additive_change"] = on_additive_change
282+
incremental_kind_kwargs["on_destructive_change"] = (
283+
self._get_field_value("on_destructive_change") or on_destructive_change
284+
)
285+
incremental_kind_kwargs["on_additive_change"] = (
286+
self._get_field_value("on_additive_change") or on_additive_change
287+
)
280288
auto_restatement_cron_value = self._get_field_value("auto_restatement_cron")
281289
if auto_restatement_cron_value is not None:
282290
incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value
@@ -292,7 +300,8 @@ def model_kind(self, context: DbtContext) -> ModelKind:
292300
incremental_kind_kwargs["forward_only"] = forward_only_value
293301

294302
is_incremental_by_time_range = self.time_column or (
295-
self.incremental_strategy and self.incremental_strategy == "microbatch"
303+
self.incremental_strategy
304+
and self.incremental_strategy in {"microbatch", "incremental_by_time_range"}
296305
)
297306
# Get shared incremental by kwargs
298307
for field in ("batch_size", "batch_concurrency", "lookback"):
@@ -313,22 +322,29 @@ def model_kind(self, context: DbtContext) -> ModelKind:
313322
)
314323
incremental_by_kind_kwargs["disable_restatement"] = disable_restatement
315324

316-
# Incremental by time range which includes microbatch
317325
if is_incremental_by_time_range:
318326
strategy = self.incremental_strategy or target.default_incremental_strategy(
319327
IncrementalByTimeRangeKind
320328
)
321329

322-
if strategy not in INCREMENTAL_BY_TIME_STRATEGIES:
330+
if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES:
323331
get_console().log_warning(
324332
f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
325-
f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES)}."
333+
f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
326334
)
327335

328-
if strategy == "microbatch":
329-
time_column = self._get_overlapping_field_value(
330-
context, "event_time", "time_column"
336+
if self.time_column and strategy != "incremental_by_time_range":
337+
get_console().log_warning(
338+
f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
339+
f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
331340
)
341+
342+
if strategy == "microbatch":
343+
if self.time_column:
344+
raise ConfigError(
345+
f"{self.canonical_name(context)}: 'time_column' cannot be used with 'microbatch' incremental strategy. Use 'event_time' instead."
346+
)
347+
time_column = self._get_field_value("event_time")
332348
if not time_column:
333349
raise ConfigError(
334350
f"{self.canonical_name(context)}: 'event_time' is required for microbatch incremental strategy."
@@ -342,11 +358,22 @@ def model_kind(self, context: DbtContext) -> ModelKind:
342358
)
343359
time_column = self.time_column
344360

361+
incremental_by_time_range_kwargs = {
362+
"time_column": time_column,
363+
}
364+
if self.auto_restatement_intervals:
365+
incremental_by_time_range_kwargs["auto_restatement_intervals"] = (
366+
self.auto_restatement_intervals
367+
)
368+
if self.partition_by_time_column is not None:
369+
incremental_by_time_range_kwargs["partition_by_time_column"] = (
370+
self.partition_by_time_column
371+
)
372+
345373
return IncrementalByTimeRangeKind(
346-
time_column=time_column,
347-
auto_restatement_intervals=self.auto_restatement_intervals,
348374
**incremental_kind_kwargs,
349375
**incremental_by_kind_kwargs,
376+
**incremental_by_time_range_kwargs,
350377
)
351378

352379
if self.unique_key:
@@ -384,7 +411,7 @@ def model_kind(self, context: DbtContext) -> ModelKind:
384411
IncrementalUnmanagedKind
385412
)
386413
return IncrementalUnmanagedKind(
387-
insert_overwrite=strategy in INCREMENTAL_BY_TIME_STRATEGIES,
414+
insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES,
388415
disable_restatement=incremental_by_kind_kwargs["disable_restatement"],
389416
**incremental_kind_kwargs,
390417
)

tests/dbt/test_manifest.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pytest
66

77
from sqlmesh.core.config import ModelDefaultsConfig
8+
from sqlmesh.core.model import TimeColumn
89
from sqlmesh.dbt.basemodel import Dependencies
910
from sqlmesh.dbt.common import ModelAttrs
1011
from sqlmesh.dbt.context import DbtContext
@@ -83,7 +84,7 @@ def test_manifest_helper(caplog):
8384
assert waiter_as_customer_by_day_config.materialized == "incremental"
8485
assert waiter_as_customer_by_day_config.incremental_strategy == "delete+insert"
8586
assert waiter_as_customer_by_day_config.cluster_by == ["ds"]
86-
assert waiter_as_customer_by_day_config.time_column == "ds"
87+
assert waiter_as_customer_by_day_config.time_column == TimeColumn.create("ds", "duckdb")
8788

8889
if DBT_VERSION >= (1, 5, 0):
8990
waiter_revenue_by_day_config = models["waiter_revenue_by_day_v2"]
@@ -105,7 +106,7 @@ def test_manifest_helper(caplog):
105106
assert waiter_revenue_by_day_config.materialized == "incremental"
106107
assert waiter_revenue_by_day_config.incremental_strategy == "delete+insert"
107108
assert waiter_revenue_by_day_config.cluster_by == ["ds"]
108-
assert waiter_revenue_by_day_config.time_column == "ds"
109+
assert waiter_revenue_by_day_config.time_column == TimeColumn.create("ds", "duckdb")
109110
assert waiter_revenue_by_day_config.dialect_ == "bigquery"
110111

111112
assert helper.models("customers")["customers"].dependencies == Dependencies(

0 commit comments

Comments
 (0)