Skip to content

Commit b477bcd

Browse files
committed
feat: add dedicated incremental_by_time_range strategy
1 parent cef3859 commit b477bcd

File tree

7 files changed

+200
-31
lines changed

7 files changed

+200
-31
lines changed

docs/integrations/dbt.md

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ This section describes how to adapt dbt's incremental models to run on sqlmesh a
219219
SQLMesh supports two approaches to implement [idempotent](../concepts/glossary.md#idempotency) incremental loads:
220220

221221
* Using merge (with the sqlmesh [`INCREMENTAL_BY_UNIQUE_KEY` model kind](../concepts/models/model_kinds.md#incremental_by_unique_key))
222-
* Using insert-overwrite/delete+insert (with the sqlmesh [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range))
222+
* Using [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range)
223223

224224
#### Incremental by unique key
225225

@@ -233,28 +233,22 @@ To enable incremental_by_unique_key incrementality, the model configuration shou
233233

234234
#### Incremental by time range
235235

236-
To enable incremental_by_time_range incrementality, the model configuration should contain:
236+
To enable incremental_by_time_range incrementality, the model configuration must contain:
237237

238-
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
239238
* The `materialized` key with value `'incremental'`
240-
* Either:
241-
* The `incremental_strategy` key with value `'insert_overwrite'` or
242-
* The `incremental_strategy` key with value `'delete+insert'`
243-
* Note: in this context, these two strategies are synonyms. Regardless of which one is specified SQLMesh will use the [`best incremental strategy`](../concepts/models/model_kinds.md#materialization-strategy) for the target engine.
239+
* The `incremental_strategy` key with the value `incremental_by_time_range`
240+
* The `time_column` key with the model's time column field name as the value (see [`time column`](../concepts/models/model_kinds.md#time-column) for details)
244241

245242
### Incremental logic
246243

247-
SQLMesh requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should supersede the existing `{% if is_incremental() %}` block and contain the `WHERE` clause selecting the time interval.
244+
Unlike dbt incremental strategies, SQLMesh does not require the use of `is_incremental` jinja blocks to implement incremental logic.
245+
Instead, SQLMesh provides predefined time macro variables that can be used in the model's SQL to filter data based on the time column.
248246

249247
For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja block gated by `{% if sqlmesh_incremental is defined %}` as follows:
250248

251249
```bash
252-
> {% if sqlmesh_incremental is defined %}
253250
> WHERE
254251
> ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
255-
> {% elif is_incremental() %}
256-
> ; < your existing is_incremental block >
257-
> {% endif %}
258252
```
259253

260254
`{{ start_ds }}` and `{{ end_ds }}` are the jinja equivalents of SQLMesh's `@start_ds` and `@end_ds` predefined time macro variables. See all [predefined time variables](../concepts/macros/macro_variables.md) available in jinja.
@@ -263,13 +257,11 @@ For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja blo
263257

264258
SQLMesh provides configuration parameters that enable control over how incremental computations occur. These parameters are set in the model's `config` block.
265259

266-
The [`batch_size` parameter](../concepts/models/overview.md#batch_size) determines the maximum number of time intervals to run in a single job.
267-
268-
The [`lookback` parameter](../concepts/models/overview.md#lookback) is used to capture late arriving data. It sets the number of units of late arriving data the model should expect and must be a positive integer.
260+
See [Incremental Model Properties](../concepts/models/overview.md#incremental-model-properties) for the full list of incremental model configuration parameters.
269261

270262
**Note:** By default, all incremental dbt models are configured to be [forward-only](../concepts/plans.md#forward-only-plans). However, you can change this behavior by setting the `forward_only: false` setting either in the configuration of an individual model or globally for all models in the `dbt_project.yaml` file. The [forward-only](../concepts/plans.md#forward-only-plans) mode aligns more closely with the typical operation of dbt and therefore better meets user's expectations.
271263

272-
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default for incremental dbt models unless the time column is specified, or the `allow_partials` parameter is explicitly set to `false` in the model configuration.
264+
Similarly, the [allow_partials](../concepts/models/overview.md#allow_partials) parameter is set to `true` by default unless the `allow_partials` parameter is explicitly set to `false` in the model configuration.
273265

274266
#### on_schema_change
275267

examples/sushi_dbt/models/customer_revenue_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_as_customer_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_revenue_by_day.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

examples/sushi_dbt/models/waiter_revenue_by_day_v1.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{{
22
config(
33
materialized='incremental',
4-
incremental_strategy='delete+insert',
4+
incremental_strategy='incremental_by_time_range',
55
cluster_by=['ds'],
66
time_column='ds',
77
)

sqlmesh/dbt/model.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@
2525
ManagedKind,
2626
create_sql_model,
2727
)
28-
from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange, OnAdditiveChange
28+
from sqlmesh.core.model.kind import (
29+
SCDType2ByTimeKind,
30+
OnDestructiveChange,
31+
OnAdditiveChange,
32+
on_destructive_change_validator,
33+
on_additive_change_validator,
34+
)
2935
from sqlmesh.dbt.basemodel import BaseModelConfig, Materialization, SnapshotStrategy
3036
from sqlmesh.dbt.common import SqlStr, sql_str_validator
3137
from sqlmesh.utils.errors import ConfigError
@@ -41,7 +47,9 @@
4147
logger = logging.getLogger(__name__)
4248

4349

44-
INCREMENTAL_BY_TIME_STRATEGIES = set(["delete+insert", "insert_overwrite", "microbatch"])
50+
INCREMENTAL_BY_TIME_RANGE_STRATEGIES = set(
51+
["delete+insert", "insert_overwrite", "microbatch", "incremental_by_time_range"]
52+
)
4553
INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES = set(["merge"])
4654

4755

@@ -87,6 +95,9 @@ class ModelConfig(BaseModelConfig):
8795
physical_version: t.Optional[str] = None
8896
auto_restatement_cron: t.Optional[str] = None
8997
auto_restatement_intervals: t.Optional[int] = None
98+
partition_by_time_column: t.Optional[bool] = None
99+
on_destructive_change: t.Optional[OnDestructiveChange] = None
100+
on_additive_change: t.Optional[OnAdditiveChange] = None
90101

91102
# DBT configuration fields
92103
cluster_by: t.Optional[t.List[str]] = None
@@ -139,6 +150,8 @@ class ModelConfig(BaseModelConfig):
139150
incremental_predicates: t.Optional[t.List[str]] = None
140151

141152
_sql_validator = sql_str_validator
153+
_on_destructive_change_validator = on_destructive_change_validator
154+
_on_additive_change_validator = on_additive_change_validator
142155

143156
@field_validator(
144157
"unique_key",
@@ -275,8 +288,12 @@ def model_kind(self, context: DbtContext) -> ModelKind:
275288
"Valid values are 'ignore', 'fail', 'append_new_columns', 'sync_all_columns'."
276289
)
277290

278-
incremental_kind_kwargs["on_destructive_change"] = on_destructive_change
279-
incremental_kind_kwargs["on_additive_change"] = on_additive_change
291+
incremental_kind_kwargs["on_destructive_change"] = (
292+
self._get_field_value("on_destructive_change") or on_destructive_change
293+
)
294+
incremental_kind_kwargs["on_additive_change"] = (
295+
self._get_field_value("on_additive_change") or on_additive_change
296+
)
280297
auto_restatement_cron_value = self._get_field_value("auto_restatement_cron")
281298
if auto_restatement_cron_value is not None:
282299
incremental_kind_kwargs["auto_restatement_cron"] = auto_restatement_cron_value
@@ -292,7 +309,8 @@ def model_kind(self, context: DbtContext) -> ModelKind:
292309
incremental_kind_kwargs["forward_only"] = forward_only_value
293310

294311
is_incremental_by_time_range = self.time_column or (
295-
self.incremental_strategy and self.incremental_strategy == "microbatch"
312+
self.incremental_strategy
313+
and self.incremental_strategy in {"microbatch", "incremental_by_time_range"}
296314
)
297315
# Get shared incremental by kwargs
298316
for field in ("batch_size", "batch_concurrency", "lookback"):
@@ -313,16 +331,21 @@ def model_kind(self, context: DbtContext) -> ModelKind:
313331
)
314332
incremental_by_kind_kwargs["disable_restatement"] = disable_restatement
315333

316-
# Incremental by time range which includes microbatch
317334
if is_incremental_by_time_range:
318335
strategy = self.incremental_strategy or target.default_incremental_strategy(
319336
IncrementalByTimeRangeKind
320337
)
321338

322-
if strategy not in INCREMENTAL_BY_TIME_STRATEGIES:
339+
if strategy not in INCREMENTAL_BY_TIME_RANGE_STRATEGIES:
323340
get_console().log_warning(
324341
f"SQLMesh incremental by time strategy is not compatible with '{strategy}' incremental strategy in model '{self.canonical_name(context)}'. "
325-
f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES)}."
342+
f"Supported strategies include {collection_to_str(INCREMENTAL_BY_TIME_RANGE_STRATEGIES)}."
343+
)
344+
345+
if self.time_column and strategy not in {"incremental_by_time_range", "microbatch"}:
346+
get_console().log_warning(
347+
f"Using `time_column` on a model with incremental_strategy '{strategy}' has been deprecated. "
348+
f"Please use `incremental_by_time_range` instead in model '{self.canonical_name(context)}'."
326349
)
327350

328351
if strategy == "microbatch":
@@ -342,11 +365,22 @@ def model_kind(self, context: DbtContext) -> ModelKind:
342365
)
343366
time_column = self.time_column
344367

368+
incremental_by_time_range_kwargs = {
369+
"time_column": time_column,
370+
}
371+
if self.auto_restatement_intervals:
372+
incremental_by_time_range_kwargs["auto_restatement_intervals"] = (
373+
self.auto_restatement_intervals
374+
)
375+
if self.partition_by_time_column is not None:
376+
incremental_by_time_range_kwargs["partition_by_time_column"] = (
377+
self.partition_by_time_column
378+
)
379+
345380
return IncrementalByTimeRangeKind(
346-
time_column=time_column,
347-
auto_restatement_intervals=self.auto_restatement_intervals,
348381
**incremental_kind_kwargs,
349382
**incremental_by_kind_kwargs,
383+
**incremental_by_time_range_kwargs,
350384
)
351385

352386
if self.unique_key:
@@ -384,7 +418,7 @@ def model_kind(self, context: DbtContext) -> ModelKind:
384418
IncrementalUnmanagedKind
385419
)
386420
return IncrementalUnmanagedKind(
387-
insert_overwrite=strategy in INCREMENTAL_BY_TIME_STRATEGIES,
421+
insert_overwrite=strategy in INCREMENTAL_BY_TIME_RANGE_STRATEGIES,
388422
disable_restatement=incremental_by_kind_kwargs["disable_restatement"],
389423
**incremental_kind_kwargs,
390424
)

tests/dbt/test_model.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from sqlglot import exp
88
from sqlmesh import Context
99
from sqlmesh.core.model import TimeColumn, IncrementalByTimeRangeKind
10+
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange
1011
from sqlmesh.dbt.common import Dependencies
1112
from sqlmesh.dbt.context import DbtContext
1213
from sqlmesh.dbt.model import ModelConfig
@@ -301,3 +302,145 @@ def test_load_microbatch_required_only(
301302
)
302303
assert model.kind.batch_size == 1
303304
assert model.depends_on_self is False
305+
306+
307+
@pytest.mark.slow
308+
def test_load_incremental_time_range_strategy_required_only(
309+
tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project
310+
) -> None:
311+
project_dir, model_dir = create_empty_project()
312+
# add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
313+
incremental_time_range_contents = """
314+
{{
315+
config(
316+
materialized='incremental',
317+
incremental_strategy='incremental_by_time_range',
318+
time_column='ds',
319+
)
320+
}}
321+
322+
SELECT 1 as cola, '2021-01-01' as ds
323+
"""
324+
incremental_time_range_model_file = model_dir / "incremental_time_range.sql"
325+
with open(incremental_time_range_model_file, "w", encoding="utf-8") as f:
326+
f.write(incremental_time_range_contents)
327+
328+
snapshot_fqn = '"local"."main"."incremental_time_range"'
329+
context = Context(paths=project_dir)
330+
model = context.snapshots[snapshot_fqn].model
331+
# Validate model-level attributes
332+
assert model.start == "2025-01-01"
333+
assert model.interval_unit.is_day
334+
# Validate model kind attributes
335+
assert isinstance(model.kind, IncrementalByTimeRangeKind)
336+
assert model.kind.lookback == 1
337+
assert model.kind.time_column == TimeColumn(
338+
column=exp.to_column("ds", quoted=True), format="%Y-%m-%d"
339+
)
340+
assert model.kind.batch_size is None
341+
assert model.depends_on_self is False
342+
assert model.kind.auto_restatement_intervals is None
343+
assert model.kind.partition_by_time_column is True
344+
345+
346+
@pytest.mark.slow
347+
def test_load_incremental_time_range_strategy_all_defined(
348+
tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project
349+
) -> None:
350+
project_dir, model_dir = create_empty_project()
351+
# add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
352+
incremental_time_range_contents = """
353+
{{
354+
config(
355+
materialized='incremental',
356+
incremental_strategy='incremental_by_time_range',
357+
time_column='ds',
358+
auto_restatement_intervals=3,
359+
partition_by_time_column=false,
360+
lookback=5,
361+
batch_size=3,
362+
batch_concurrency=2,
363+
forward_only=true,
364+
disable_restatement=true,
365+
on_destructive_change='allow',
366+
on_additive_change='error',
367+
auto_restatement_cron='@hourly',
368+
on_schema_change='ignore'
369+
)
370+
}}
371+
372+
SELECT 1 as cola, '2021-01-01' as ds
373+
"""
374+
incremental_time_range_model_file = model_dir / "incremental_time_range.sql"
375+
with open(incremental_time_range_model_file, "w", encoding="utf-8") as f:
376+
f.write(incremental_time_range_contents)
377+
378+
snapshot_fqn = '"local"."main"."incremental_time_range"'
379+
context = Context(paths=project_dir)
380+
model = context.snapshots[snapshot_fqn].model
381+
# Validate model-level attributes
382+
assert model.start == "2025-01-01"
383+
assert model.interval_unit.is_day
384+
# Validate model kind attributes
385+
assert isinstance(model.kind, IncrementalByTimeRangeKind)
386+
# `on_schema_change` is ignored since the user explicitly overrode the values
387+
assert model.kind.on_destructive_change == OnDestructiveChange.ALLOW
388+
assert model.kind.on_additive_change == OnAdditiveChange.ERROR
389+
assert model.kind.forward_only is True
390+
assert model.kind.disable_restatement is True
391+
assert model.kind.auto_restatement_cron == "@hourly"
392+
assert model.kind.auto_restatement_intervals == 3
393+
assert model.kind.partition_by_time_column is False
394+
assert model.kind.lookback == 5
395+
assert model.kind.time_column == TimeColumn(
396+
column=exp.to_column("ds", quoted=True), format="%Y-%m-%d"
397+
)
398+
assert model.kind.batch_size == 3
399+
assert model.kind.batch_concurrency == 2
400+
assert model.depends_on_self is False
401+
402+
403+
@pytest.mark.slow
404+
def test_load_deprecated_incremental_time_column(
405+
tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project
406+
) -> None:
407+
project_dir, model_dir = create_empty_project()
408+
# add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
409+
incremental_time_range_contents = """
410+
{{
411+
config(
412+
materialized='incremental',
413+
incremental_strategy='delete+insert',
414+
time_column='ds'
415+
)
416+
}}
417+
418+
SELECT 1 as cola, '2021-01-01' as ds
419+
"""
420+
incremental_time_range_model_file = model_dir / "incremental_time_range.sql"
421+
with open(incremental_time_range_model_file, "w", encoding="utf-8") as f:
422+
f.write(incremental_time_range_contents)
423+
424+
snapshot_fqn = '"local"."main"."incremental_time_range"'
425+
context = Context(paths=project_dir)
426+
model = context.snapshots[snapshot_fqn].model
427+
# Validate model-level attributes
428+
assert model.start == "2025-01-01"
429+
assert model.interval_unit.is_day
430+
# Validate model-level attributes
431+
assert model.start == "2025-01-01"
432+
assert model.interval_unit.is_day
433+
# Validate model kind attributes
434+
assert isinstance(model.kind, IncrementalByTimeRangeKind)
435+
assert model.kind.lookback == 1
436+
assert model.kind.time_column == TimeColumn(
437+
column=exp.to_column("ds", quoted=True), format="%Y-%m-%d"
438+
)
439+
assert model.kind.batch_size is None
440+
assert model.depends_on_self is False
441+
assert model.kind.auto_restatement_intervals is None
442+
assert model.kind.partition_by_time_column is True
443+
assert (
444+
"Using `time_column` on a model with incremental_strategy 'delete+insert' has been deprecated. Please use `incremental_by_time_range` instead in model 'main.incremental_time_range'."
445+
in caplog.text
446+
)

0 commit comments

Comments
 (0)