Skip to content

Commit 837d730

Browse files
committed
Feat(sqlmesh_dbt): Implement --full-refresh
1 parent 8dd5e38 commit 837d730

File tree

13 files changed

+372
-29
lines changed

13 files changed

+372
-29
lines changed

sqlmesh/core/context.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,7 @@ def plan_builder(
14291429
explain: t.Optional[bool] = None,
14301430
ignore_cron: t.Optional[bool] = None,
14311431
min_intervals: t.Optional[int] = None,
1432+
always_include_local_changes: t.Optional[bool] = None,
14321433
) -> PlanBuilder:
14331434
"""Creates a plan builder.
14341435
@@ -1467,6 +1468,8 @@ def plan_builder(
14671468
diff_rendered: Whether the diff should compare raw vs rendered models
14681469
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
14691470
on every model when checking for missing intervals
1471+
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
1472+
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
14701473
14711474
Returns:
14721475
The plan builder.
@@ -1583,13 +1586,20 @@ def plan_builder(
15831586
"Selector did not return any models. Please check your model selection and try again."
15841587
)
15851588

1589+
if always_include_local_changes is None:
1590+
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
1591+
force_no_diff = restate_models is not None or (
1592+
backfill_models is not None and not backfill_models
1593+
)
1594+
else:
1595+
force_no_diff = not always_include_local_changes
1596+
15861597
snapshots = self._snapshots(models_override)
15871598
context_diff = self._context_diff(
15881599
environment or c.PROD,
15891600
snapshots=snapshots,
15901601
create_from=create_from,
1591-
force_no_diff=restate_models is not None
1592-
or (backfill_models is not None and not backfill_models),
1602+
force_no_diff=force_no_diff,
15931603
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
15941604
diff_rendered=diff_rendered,
15951605
always_recreate_environment=self.config.plan.always_recreate_environment,
@@ -1644,13 +1654,22 @@ def plan_builder(
16441654
elif forward_only is None:
16451655
forward_only = self.config.plan.forward_only
16461656

1657+
# When handling prod restatements, only clear intervals from other model versions if we are using full virtual environments
1658+
# If we are not, then there is no point, because none of the data in dev environments can be promoted by definition
1659+
clear_restated_intervals_across_model_versions = (
1660+
expanded_restate_models is not None
1661+
and not is_dev
1662+
and self.config.virtual_environment_mode.is_full
1663+
)
1664+
16471665
return self.PLAN_BUILDER_TYPE(
16481666
context_diff=context_diff,
16491667
start=start,
16501668
end=end,
16511669
execution_time=execution_time,
16521670
apply=self.apply,
16531671
restate_models=expanded_restate_models,
1672+
clear_restated_intervals_across_model_versions=clear_restated_intervals_across_model_versions,
16541673
backfill_models=backfill_models,
16551674
no_gaps=no_gaps,
16561675
skip_backfill=skip_backfill,
@@ -1684,6 +1703,7 @@ def plan_builder(
16841703
},
16851704
explain=explain or False,
16861705
ignore_cron=ignore_cron or False,
1706+
always_include_local_changes=always_include_local_changes,
16871707
)
16881708

16891709
def apply(

sqlmesh/core/plan/builder.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ class PlanBuilder:
6565
restate_models: A list of models for which the data should be restated for the time range
6666
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
6767
of the restatement.
68+
clear_restated_intervals_across_model_versions: If restatements are present, this flag indicates whether or not the intervals
69+
being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments).
70+
If set to None, the default behaviour is to not clear anything unless the target environment is prod.
71+
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
72+
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
6873
backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
6974
no_gaps: Whether to ensure that new snapshots for nodes that are already a
7075
part of the target environment have no data gaps when compared against previous
@@ -103,6 +108,8 @@ def __init__(
103108
execution_time: t.Optional[TimeLike] = None,
104109
apply: t.Optional[t.Callable[[Plan], None]] = None,
105110
restate_models: t.Optional[t.Iterable[str]] = None,
111+
clear_restated_intervals_across_model_versions: bool = False,
112+
always_include_local_changes: t.Optional[bool] = None,
106113
backfill_models: t.Optional[t.Iterable[str]] = None,
107114
no_gaps: bool = False,
108115
skip_backfill: bool = False,
@@ -154,6 +161,9 @@ def __init__(
154161
self._auto_categorization_enabled = auto_categorization_enabled
155162
self._include_unmodified = include_unmodified
156163
self._restate_models = set(restate_models) if restate_models is not None else None
164+
self._clear_restated_intervals_across_model_versions = (
165+
clear_restated_intervals_across_model_versions
166+
)
157167
self._effective_from = effective_from
158168

159169
# note: this deliberately doesnt default to now() here.
@@ -172,6 +182,7 @@ def __init__(
172182
self._user_provided_flags = user_provided_flags
173183
self._selected_models = selected_models
174184
self._explain = explain
185+
self._always_include_local_changes = always_include_local_changes
175186

176187
self._start = start
177188
if not self._start and (
@@ -340,6 +351,7 @@ def build(self) -> Plan:
340351
deployability_index=deployability_index,
341352
selected_models_to_restate=self._restate_models,
342353
restatements=restatements,
354+
clear_restated_intervals_across_model_versions=self._clear_restated_intervals_across_model_versions,
343355
start_override_per_model=self._start_override_per_model,
344356
end_override_per_model=end_override_per_model,
345357
selected_models_to_backfill=self._backfill_models,
@@ -860,6 +872,12 @@ def _ensure_no_broken_references(self) -> None:
860872
)
861873

862874
def _ensure_no_new_snapshots_with_restatements(self) -> None:
875+
if self._always_include_local_changes:
876+
# the sqlmesh_dbt cli sets "always include local changes" to deliberately allow changes and restatements
877+
# to be deployed in the same plan. If this is set, "force_no_diff" is also turned off on the ContextDiff
878+
# so that the user is shown the local changes that will be applied and must accept them in order to run the plan
879+
return
880+
863881
if self._restate_models is not None and (
864882
self._context_diff.new_snapshots or self._context_diff.modified_snapshots
865883
):

sqlmesh/core/plan/definition.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ class Plan(PydanticModel, frozen=True):
6666
6767
Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
6868
while :restatements is still populated with dev previews
69-
"""
69+
"""
70+
clear_restated_intervals_across_model_versions: bool
71+
"""Whether or not to clear intervals from state for other versions of the models listed in :restatements"""
7072

7173
start_override_per_model: t.Optional[t.Dict[str, datetime]]
7274
end_override_per_model: t.Optional[t.Dict[str, datetime]]
@@ -268,6 +270,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
268270
skip_backfill=self.skip_backfill,
269271
empty_backfill=self.empty_backfill,
270272
restatements={s.name: i for s, i in self.restatements.items()},
273+
clear_restated_intervals_across_model_versions=self.clear_restated_intervals_across_model_versions,
271274
is_dev=self.is_dev,
272275
allow_destructive_models=self.allow_destructive_models,
273276
allow_additive_models=self.allow_additive_models,
@@ -312,6 +315,7 @@ class EvaluatablePlan(PydanticModel):
312315
skip_backfill: bool
313316
empty_backfill: bool
314317
restatements: t.Dict[str, Interval]
318+
clear_restated_intervals_across_model_versions: bool
315319
is_dev: bool
316320
allow_destructive_models: t.Set[str]
317321
allow_additive_models: t.Set[str]

sqlmesh/core/plan/stages.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -452,13 +452,12 @@ def _get_after_all_stage(
452452
def _get_restatement_stage(
453453
self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]
454454
) -> t.Optional[RestatementStage]:
455-
if not plan.restatements or plan.is_dev:
456-
# The RestatementStage to clear intervals from state across all environments is not needed for plans against dev, only prod
457-
return None
455+
if plan.restatements and plan.clear_restated_intervals_across_model_versions:
456+
return RestatementStage(
457+
all_snapshots=snapshots_by_name,
458+
)
458459

459-
return RestatementStage(
460-
all_snapshots=snapshots_by_name,
461-
)
460+
return None
462461

463462
def _get_physical_layer_update_stage(
464463
self,

sqlmesh_dbt/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ def dbt(
9090
@click.option(
9191
"-f",
9292
"--full-refresh",
93-
help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.",
93+
is_flag=True,
94+
default=False,
95+
help="If specified, sqlmesh will drop incremental models and fully-recalculate the incremental table from the model definition.",
9496
)
9597
@click.option(
9698
"--env",

sqlmesh_dbt/operations.py

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from sqlmesh.dbt.project import Project
1212
from sqlmesh_dbt.console import DbtCliConsole
1313
from sqlmesh.core.model import Model
14-
from sqlmesh.core.plan import Plan
14+
from sqlmesh.core.plan import Plan, PlanBuilder
1515

1616
logger = logging.getLogger(__name__)
1717

@@ -42,8 +42,39 @@ def run(
4242
full_refresh: bool = False,
4343
empty: bool = False,
4444
) -> Plan:
45-
return self.context.plan(
46-
**self._plan_options(
45+
plan_builder = self._plan_builder(
46+
environment=environment,
47+
select=select,
48+
exclude=exclude,
49+
full_refresh=full_refresh,
50+
empty=empty,
51+
)
52+
53+
plan = plan_builder.build()
54+
55+
self.console.plan(
56+
plan_builder,
57+
default_catalog=self.context.default_catalog,
58+
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
59+
auto_apply=True,
60+
# dont output a diff of model changes
61+
no_diff=True,
62+
# don't throw up any prompts like "set the effective date" - use defaults
63+
no_prompts=True,
64+
)
65+
66+
return plan
67+
68+
def _plan_builder(
69+
self,
70+
environment: t.Optional[str] = None,
71+
select: t.Optional[t.List[str]] = None,
72+
exclude: t.Optional[t.List[str]] = None,
73+
full_refresh: bool = False,
74+
empty: bool = False,
75+
) -> PlanBuilder:
76+
return self.context.plan_builder(
77+
**self._plan_builder_options(
4778
environment=environment,
4879
select=select,
4980
exclude=exclude,
@@ -71,13 +102,15 @@ def _selected_models(
71102

72103
return selected_models
73104

74-
def _plan_options(
105+
def _plan_builder_options(
75106
self,
76-
environment: t.Optional[str] = None,
107+
# upstream dbt options
77108
select: t.Optional[t.List[str]] = None,
78109
exclude: t.Optional[t.List[str]] = None,
79110
empty: bool = False,
80111
full_refresh: bool = False,
112+
# sqlmesh extra options
113+
environment: t.Optional[str] = None,
81114
) -> t.Dict[str, t.Any]:
82115
import sqlmesh.core.constants as c
83116

@@ -130,24 +163,38 @@ def _plan_options(
130163
# `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated.
131164
# This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan.
132165
# However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared.
133-
# So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables
166+
# Currently, SQLMesh has no way to say "restate with empty data", because --restate-model coupled with --skip-backfill ends up being a no-op
134167
options["skip_backfill"] = True
135-
full_refresh = True
168+
169+
self.console.log_warning(
170+
"dbt's `--empty` drops the tables for all selected models and replaces them with empty ones.\n"
171+
"This can easily result in accidental data loss, so SQLMesh limits this to only new or modified models and leaves the tables for existing unmodified models alone.\n\n"
172+
"If you were creating empty tables to preview model changes, please consider using `--environment` to preview these changes in an isolated Virtual Data Environment instead.\n\n"
173+
"Otherwise, if you really do want dbt's `--empty` behaviour of clearing every selected table, please file an issue on GitHub so we can better understand the use-case.\n"
174+
)
175+
176+
if full_refresh:
177+
# --full-refresh is implemented in terms of "add every model as a restatement"
178+
# however, `--empty` sets skip_backfill=True, which causes the BackfillStage of the plan to be skipped.
179+
# the re-processing of data intervals happens in the BackfillStage, so if it gets skipped, restatements become a no-op
180+
raise ValueError("`--full-refresh` alongside `--empty` is not currently supported.")
136181

137182
if full_refresh:
138-
# TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod
139-
# if the plan targets dev
140-
pass
183+
options.update(
184+
dict(
185+
# Add every selected model as a restatement to force them to get repopulated from scratch
186+
restate_models=list(self.context.models)
187+
if not select_models
188+
else select_models,
189+
# by default in SQLMesh, restatements only operate on what has been committed to state.
190+
# in order to emulate dbt, we need to use the local filesystem instead, so we override this default
191+
always_include_local_changes=True,
192+
)
193+
)
141194

142195
return dict(
143196
environment=environment,
144197
select_models=select_models,
145-
# dont output a diff of model changes
146-
no_diff=True,
147-
# don't throw up any prompts like "set the effective date" - use defaults
148-
no_prompts=True,
149-
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
150-
auto_apply=True,
151198
**options,
152199
)
153200

tests/core/test_plan.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,7 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture):
826826
indirectly_modified={},
827827
deployability_index=DeployabilityIndex.all_deployable(),
828828
restatements={},
829+
clear_restated_intervals_across_model_versions=False,
829830
end_bounded=False,
830831
ensure_finalized_snapshots=False,
831832
start_override_per_model=None,
@@ -1104,6 +1105,38 @@ def test_new_snapshots_with_restatements(make_snapshot, mocker: MockerFixture):
11041105
PlanBuilder(context_diff, restate_models=["a"]).build()
11051106

11061107

1108+
def test_new_snapshots_with_restatements_allowed_if_flag_set(
1109+
make_snapshot: t.Callable[..., Snapshot],
1110+
):
1111+
snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds")))
1112+
1113+
context_diff = ContextDiff(
1114+
environment="test_environment",
1115+
is_new_environment=True,
1116+
is_unfinalized_environment=False,
1117+
normalize_environment_name=True,
1118+
create_from="prod",
1119+
create_from_env_exists=True,
1120+
added=set(),
1121+
removed_snapshots={},
1122+
modified_snapshots={},
1123+
snapshots={snapshot_a.snapshot_id: snapshot_a},
1124+
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
1125+
previous_plan_id=None,
1126+
previously_promoted_snapshot_ids=set(),
1127+
previous_finalized_snapshots=None,
1128+
previous_gateway_managed_virtual_layer=False,
1129+
gateway_managed_virtual_layer=False,
1130+
environment_statements=[],
1131+
)
1132+
1133+
plan = PlanBuilder(
1134+
context_diff, restate_models=[snapshot_a.name], always_include_local_changes=True
1135+
).build()
1136+
assert plan.restatements
1137+
assert plan.new_snapshots
1138+
1139+
11071140
def test_end_validation(make_snapshot, mocker: MockerFixture):
11081141
snapshot_a = make_snapshot(
11091142
SqlModel(

0 commit comments

Comments
 (0)