Skip to content

Commit d28cbb2

Browse files
committed
Fix: Check if the target table exists when determining the value of the is_incremental flag
1 parent 0b819d2 commit d28cbb2

File tree

10 files changed

+89
-25
lines changed

10 files changed

+89
-25
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,8 @@ def _evaluate_snapshot(
773773
allow_destructive_snapshots=allow_destructive_snapshots,
774774
allow_additive_snapshots=allow_additive_snapshots,
775775
)
776-
common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING
776+
runtime_stage = RuntimeStage.EVALUATING
777+
target_table_exists = True
777778
elif model.annotated or model.is_seed or model.kind.is_scd_type_2:
778779
self._execute_create(
779780
snapshot=snapshot,
@@ -785,7 +786,14 @@ def _evaluate_snapshot(
785786
dry_run=False,
786787
run_pre_post_statements=False,
787788
)
788-
common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING
789+
runtime_stage = RuntimeStage.EVALUATING
790+
target_table_exists = True
791+
792+
evaluate_render_kwargs = {
793+
**common_render_kwargs,
794+
"runtime_stage": runtime_stage,
795+
"snapshot_table_exists": target_table_exists,
796+
}
789797

790798
wap_id: t.Optional[str] = None
791799
if snapshot.is_materialized and (
@@ -801,7 +809,7 @@ def _evaluate_snapshot(
801809
execution_time=execution_time,
802810
snapshot=snapshot,
803811
snapshots=snapshots,
804-
render_kwargs=common_render_kwargs,
812+
render_kwargs=evaluate_render_kwargs,
805813
create_render_kwargs=create_render_kwargs,
806814
rendered_physical_properties=rendered_physical_properties,
807815
deployability_index=deployability_index,

sqlmesh/dbt/builtin.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,12 @@ def create_builtin_globals(
407407
else snapshot.dev_intervals
408408
)
409409
is_incremental = bool(intervals)
410+
411+
snapshot_table_exists = jinja_globals.get("snapshot_table_exists")
412+
if is_incremental and snapshot_table_exists is not None:
413+
# If we know the information about table existence, we can use it to correctly
414+
# set the flag
415+
is_incremental &= snapshot_table_exists
410416
else:
411417
is_incremental = False
412418
builtin_globals["is_incremental"] = lambda: is_incremental

sqlmesh/dbt/seed.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def to_sqlmesh(
8686
dialect=self.dialect(context),
8787
audit_definitions=audit_definitions,
8888
virtual_environment_mode=virtual_environment_mode,
89+
start=self.start or context.sqlmesh_config.model_defaults.start,
8990
**kwargs,
9091
)
9192

tests/core/test_integration.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,6 +2037,35 @@ def test_dbt_select_star_is_directly_modified(sushi_test_dbt_context: Context):
20372037
assert plan.snapshots[snapshot_b_id].change_category == SnapshotChangeCategory.NON_BREAKING
20382038

20392039

2040+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2041+
def test_dbt_is_incremental_table_is_missing(sushi_test_dbt_context: Context):
2042+
context = sushi_test_dbt_context
2043+
2044+
model = context.get_model("sushi.waiter_revenue_by_day_v2")
2045+
model = model.copy(update={"kind": IncrementalUnmanagedKind(), "start": "2023-01-01"})
2046+
context.upsert_model(model)
2047+
2048+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
2049+
2050+
snapshot = context.get_snapshot("sushi.waiter_revenue_by_day_v2")
2051+
assert snapshot
2052+
2053+
# Manually drop the table
2054+
context.engine_adapter.drop_table(snapshot.table_name())
2055+
2056+
context.snapshot_evaluator.evaluate(
2057+
snapshot,
2058+
start="2023-01-01",
2059+
end="2023-01-08",
2060+
execution_time="2023-01-08 15:00:00",
2061+
snapshots={s.name: s for s in context.snapshots.values()},
2062+
deployability_index=DeployabilityIndex.all_deployable(),
2063+
)
2064+
2065+
# Make sure the table was recreated
2066+
assert context.engine_adapter.table_exists(snapshot.table_name())
2067+
2068+
20402069
def test_model_attr(sushi_test_dbt_context: Context, assert_exp_eq):
20412070
context = sushi_test_dbt_context
20422071
model = context.get_model("sushi.top_waiters")

tests/dbt/test_config.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,6 @@ def test_variables(assert_exp_eq, sushi_test_project):
354354

355355
# Finally, check that variable scoping & overwriting (some_var) works as expected
356356
expected_sushi_variables = {
357-
"start": "Jan 1 2022",
358357
"yet_another_var": 1,
359358
"top_waiters:limit": 10,
360359
"top_waiters:revenue": "revenue",
@@ -379,7 +378,6 @@ def test_variables(assert_exp_eq, sushi_test_project):
379378
"yet_another_var": 5,
380379
"customers:bla": False,
381380
"customers:customer_id": "customer_id",
382-
"start": "Jan 1 2022",
383381
}
384382

385383
assert sushi_test_project.packages["sushi"].variables == expected_sushi_variables
@@ -1006,8 +1004,11 @@ def test_db_type_to_quote_policy():
10061004
def test_variable_override():
10071005
project_root = "tests/fixtures/dbt/sushi_test"
10081006
project = Project.load(
1009-
DbtContext(project_root=Path(project_root)),
1010-
variables={"yet_another_var": 2, "start": "2021-01-01"},
1007+
DbtContext(
1008+
project_root=Path(project_root),
1009+
sqlmesh_config=Config(model_defaults=ModelDefaultsConfig(start="2021-01-01")),
1010+
),
1011+
variables={"yet_another_var": 2},
10111012
)
10121013
assert project.packages["sushi"].variables["yet_another_var"] == 2
10131014

tests/dbt/test_manifest.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66

7+
from sqlmesh.core.config import ModelDefaultsConfig
78
from sqlmesh.dbt.basemodel import Dependencies
89
from sqlmesh.dbt.context import DbtContext
910
from sqlmesh.dbt.manifest import ManifestHelper
@@ -24,7 +25,7 @@ def test_manifest_helper(caplog):
2425
project_path,
2526
"sushi",
2627
profile.target,
27-
variable_overrides={"start": "2020-01-01"},
28+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
2829
)
2930

3031
models = helper.models()
@@ -135,7 +136,7 @@ def test_tests_referencing_disabled_models():
135136
project_path,
136137
"sushi",
137138
profile.target,
138-
variable_overrides={"start": "2020-01-01"},
139+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
139140
)
140141

141142
assert "disabled_model" not in helper.models()
@@ -151,7 +152,7 @@ def test_call_cache():
151152
project_path,
152153
"sushi",
153154
profile.target,
154-
variable_overrides={"start": "2020-01-01"},
155+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
155156
)
156157

157158
unused = "0000"
@@ -172,7 +173,7 @@ def test_variable_override():
172173
project_path,
173174
"sushi",
174175
profile.target,
175-
variable_overrides={"start": "2020-01-01"},
176+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
176177
)
177178
assert helper.models()["top_waiters"].limit_value == 10
178179

@@ -181,7 +182,8 @@ def test_variable_override():
181182
project_path,
182183
"sushi",
183184
profile.target,
184-
variable_overrides={"top_waiters:limit": 1, "start": "2020-01-01"},
185+
variable_overrides={"top_waiters:limit": 1},
186+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
185187
)
186188
assert helper.models()["top_waiters"].limit_value == 1
187189

@@ -196,7 +198,7 @@ def test_source_meta_external_location():
196198
project_path,
197199
"sushi",
198200
profile.target,
199-
variable_overrides={"start": "2020-01-01"},
201+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
200202
)
201203

202204
sources = helper.sources()
@@ -229,7 +231,7 @@ def test_top_level_dbt_adapter_macros():
229231
project_path,
230232
"sushi",
231233
profile.target,
232-
variable_overrides={"start": "2020-01-01"},
234+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
233235
)
234236

235237
# Adapter macros must be marked as top-level

tests/dbt/test_transformation.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,29 @@ def test_is_incremental(sushi_test_project: Project, assert_exp_eq, mocker):
13461346
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")',
13471347
)
13481348

1349+
# If the snapshot_table_exists flag was set to False, intervals should be ignored
1350+
assert_exp_eq(
1351+
model_config.to_sqlmesh(context)
1352+
.render_query_or_raise(snapshot=snapshot, snapshot_table_exists=False)
1353+
.sql(),
1354+
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"',
1355+
)
1356+
1357+
# If the snapshot_table_exists flag was set to True, intervals should be taken into account
1358+
assert_exp_eq(
1359+
model_config.to_sqlmesh(context)
1360+
.render_query_or_raise(snapshot=snapshot, snapshot_table_exists=True)
1361+
.sql(),
1362+
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")',
1363+
)
1364+
snapshot.intervals = []
1365+
assert_exp_eq(
1366+
model_config.to_sqlmesh(context)
1367+
.render_query_or_raise(snaspshot=snapshot, snapshot_table_exists=True)
1368+
.sql(),
1369+
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"',
1370+
)
1371+
13491372

13501373
@pytest.mark.xdist_group("dbt_manifest")
13511374
def test_is_incremental_non_incremental_model(sushi_test_project: Project, assert_exp_eq, mocker):

tests/fixtures/dbt/sushi_test/config.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,17 @@
33
from sqlmesh.core.config import ModelDefaultsConfig
44
from sqlmesh.dbt.loader import sqlmesh_config
55

6-
variables = {"start": "Jan 1 2022"}
7-
86

97
config = sqlmesh_config(
10-
Path(__file__).parent, variables=variables, model_defaults=ModelDefaultsConfig(dialect="duckdb")
8+
Path(__file__).parent, model_defaults=ModelDefaultsConfig(dialect="duckdb", start="Jan 1 2022")
119
)
1210

1311

1412
test_config = config
1513

1614
test_config_with_normalization_strategy = sqlmesh_config(
1715
Path(__file__).parent,
18-
variables=variables,
19-
model_defaults=ModelDefaultsConfig(dialect="duckdb,normalization_strategy=LOWERCASE"),
16+
model_defaults=ModelDefaultsConfig(
17+
dialect="duckdb,normalization_strategy=LOWERCASE", start="Jan 1 2022"
18+
),
2019
)

tests/fixtures/dbt/sushi_test/dbt_project.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ clean-targets: # directories to be removed by `dbt clean`
2020
# Full documentation: https://docs.getdbt.com/docs/configuring-models
2121

2222
models:
23-
+start: "{{ var('start') }}"
2423
sushi:
2524
+materialized: table
2625
+pre-hook:

tests/fixtures/dbt/sushi_test/models/waiter_revenue_by_day.sql

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,7 @@ LEFT JOIN {{ source('streaming', 'items') }} AS i
3030
ON oi.item_id = i.id AND oi.ds = i.ds
3131
{% if is_incremental() %}
3232
WHERE
33-
o.ds > (select max(ds) from {{ this }})
34-
{% endif %}
35-
{% if sqlmesh_incremental is defined %}
36-
WHERE
37-
o.ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
33+
o.ds > (select CAST(max(ds) AS DATE) from {{ this }})
3834
{% endif %}
3935
GROUP BY
4036
o.waiter_id,

0 commit comments

Comments
 (0)