Skip to content

Commit d26c779

Browse files
authored
Feat(sqlmesh_dbt): Implement --full-refresh (#5370)
1 parent dbc7de6 commit d26c779

File tree

13 files changed

+333
-68
lines changed

13 files changed

+333
-68
lines changed

sqlmesh/core/context.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,7 @@ def plan_builder(
14291429
explain: t.Optional[bool] = None,
14301430
ignore_cron: t.Optional[bool] = None,
14311431
min_intervals: t.Optional[int] = None,
1432+
always_include_local_changes: t.Optional[bool] = None,
14321433
) -> PlanBuilder:
14331434
"""Creates a plan builder.
14341435
@@ -1467,6 +1468,8 @@ def plan_builder(
14671468
diff_rendered: Whether the diff should compare raw vs rendered models
14681469
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
14691470
on every model when checking for missing intervals
1471+
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
1472+
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
14701473
14711474
Returns:
14721475
The plan builder.
@@ -1583,13 +1586,20 @@ def plan_builder(
15831586
"Selector did not return any models. Please check your model selection and try again."
15841587
)
15851588

1589+
if always_include_local_changes is None:
1590+
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
1591+
force_no_diff = restate_models is not None or (
1592+
backfill_models is not None and not backfill_models
1593+
)
1594+
else:
1595+
force_no_diff = not always_include_local_changes
1596+
15861597
snapshots = self._snapshots(models_override)
15871598
context_diff = self._context_diff(
15881599
environment or c.PROD,
15891600
snapshots=snapshots,
15901601
create_from=create_from,
1591-
force_no_diff=restate_models is not None
1592-
or (backfill_models is not None and not backfill_models),
1602+
force_no_diff=force_no_diff,
15931603
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
15941604
diff_rendered=diff_rendered,
15951605
always_recreate_environment=self.config.plan.always_recreate_environment,
@@ -1644,13 +1654,22 @@ def plan_builder(
16441654
elif forward_only is None:
16451655
forward_only = self.config.plan.forward_only
16461656

1657+
# When handling prod restatements, only clear intervals from other model versions if we are using full virtual environments
1658+
# If we are not, then there is no point, because none of the data in dev environments can be promoted by definition
1659+
restate_all_snapshots = (
1660+
expanded_restate_models is not None
1661+
and not is_dev
1662+
and self.config.virtual_environment_mode.is_full
1663+
)
1664+
16471665
return self.PLAN_BUILDER_TYPE(
16481666
context_diff=context_diff,
16491667
start=start,
16501668
end=end,
16511669
execution_time=execution_time,
16521670
apply=self.apply,
16531671
restate_models=expanded_restate_models,
1672+
restate_all_snapshots=restate_all_snapshots,
16541673
backfill_models=backfill_models,
16551674
no_gaps=no_gaps,
16561675
skip_backfill=skip_backfill,

sqlmesh/core/plan/builder.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ class PlanBuilder:
6565
restate_models: A list of models for which the data should be restated for the time range
6666
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
6767
of the restatement.
68+
restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals
69+
being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments).
70+
If set to None, the default behaviour is to not clear anything unless the target environment is prod.
6871
backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
6972
no_gaps: Whether to ensure that new snapshots for nodes that are already a
7073
part of the target environment have no data gaps when compared against previous
@@ -103,6 +106,7 @@ def __init__(
103106
execution_time: t.Optional[TimeLike] = None,
104107
apply: t.Optional[t.Callable[[Plan], None]] = None,
105108
restate_models: t.Optional[t.Iterable[str]] = None,
109+
restate_all_snapshots: bool = False,
106110
backfill_models: t.Optional[t.Iterable[str]] = None,
107111
no_gaps: bool = False,
108112
skip_backfill: bool = False,
@@ -154,6 +158,7 @@ def __init__(
154158
self._auto_categorization_enabled = auto_categorization_enabled
155159
self._include_unmodified = include_unmodified
156160
self._restate_models = set(restate_models) if restate_models is not None else None
161+
self._restate_all_snapshots = restate_all_snapshots
157162
self._effective_from = effective_from
158163

159164
# note: this deliberately doesnt default to now() here.
@@ -277,7 +282,6 @@ def build(self) -> Plan:
277282
if self._latest_plan:
278283
return self._latest_plan
279284

280-
self._ensure_no_new_snapshots_with_restatements()
281285
self._ensure_new_env_with_changes()
282286
self._ensure_valid_date_range()
283287
self._ensure_no_broken_references()
@@ -340,6 +344,7 @@ def build(self) -> Plan:
340344
deployability_index=deployability_index,
341345
selected_models_to_restate=self._restate_models,
342346
restatements=restatements,
347+
restate_all_snapshots=self._restate_all_snapshots,
343348
start_override_per_model=self._start_override_per_model,
344349
end_override_per_model=end_override_per_model,
345350
selected_models_to_backfill=self._backfill_models,
@@ -859,15 +864,6 @@ def _ensure_no_broken_references(self) -> None:
859864
f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding."""
860865
)
861866

862-
def _ensure_no_new_snapshots_with_restatements(self) -> None:
863-
if self._restate_models is not None and (
864-
self._context_diff.new_snapshots or self._context_diff.modified_snapshots
865-
):
866-
raise PlanError(
867-
"Model changes and restatements can't be a part of the same plan. "
868-
"Revert or apply changes before proceeding with restatements."
869-
)
870-
871867
def _ensure_new_env_with_changes(self) -> None:
872868
if (
873869
self._is_dev

sqlmesh/core/plan/definition.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class Plan(PydanticModel, frozen=True):
6767
Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
6868
while :restatements is still populated with dev previews
6969
"""
70+
restate_all_snapshots: bool
71+
"""Whether or not to clear intervals from state for other versions of the models listed in :restatements"""
7072

7173
start_override_per_model: t.Optional[t.Dict[str, datetime]]
7274
end_override_per_model: t.Optional[t.Dict[str, datetime]]
@@ -268,6 +270,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
268270
skip_backfill=self.skip_backfill,
269271
empty_backfill=self.empty_backfill,
270272
restatements={s.name: i for s, i in self.restatements.items()},
273+
restate_all_snapshots=self.restate_all_snapshots,
271274
is_dev=self.is_dev,
272275
allow_destructive_models=self.allow_destructive_models,
273276
allow_additive_models=self.allow_additive_models,
@@ -312,6 +315,7 @@ class EvaluatablePlan(PydanticModel):
312315
skip_backfill: bool
313316
empty_backfill: bool
314317
restatements: t.Dict[str, Interval]
318+
restate_all_snapshots: bool
315319
is_dev: bool
316320
allow_destructive_models: t.Set[str]
317321
allow_additive_models: t.Set[str]

sqlmesh/core/plan/stages.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
SnapshotId,
1515
snapshots_to_dag,
1616
)
17+
from sqlmesh.utils.errors import PlanError
1718

1819

1920
@dataclass
@@ -461,13 +462,18 @@ def _get_after_all_stage(
461462
def _get_restatement_stage(
462463
self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]
463464
) -> t.Optional[RestatementStage]:
464-
if not plan.restatements or plan.is_dev:
465-
# The RestatementStage to clear intervals from state across all environments is not needed for plans against dev, only prod
466-
return None
465+
if plan.restate_all_snapshots:
466+
if plan.is_dev:
467+
raise PlanError(
468+
"Clearing intervals from state across dev model versions is only valid for prod plans"
469+
)
467470

468-
return RestatementStage(
469-
all_snapshots=snapshots_by_name,
470-
)
471+
if plan.restatements:
472+
return RestatementStage(
473+
all_snapshots=snapshots_by_name,
474+
)
475+
476+
return None
471477

472478
def _get_physical_layer_update_stage(
473479
self,

sqlmesh_dbt/cli.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ def dbt(
9090
@click.option(
9191
"-f",
9292
"--full-refresh",
93-
help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.",
93+
is_flag=True,
94+
default=False,
95+
help="If specified, sqlmesh will drop incremental models and fully-recalculate the incremental table from the model definition.",
9496
)
9597
@click.option(
9698
"--env",

sqlmesh_dbt/operations.py

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from sqlmesh.dbt.project import Project
1212
from sqlmesh_dbt.console import DbtCliConsole
1313
from sqlmesh.core.model import Model
14-
from sqlmesh.core.plan import Plan
14+
from sqlmesh.core.plan import Plan, PlanBuilder
1515

1616
logger = logging.getLogger(__name__)
1717

@@ -42,8 +42,39 @@ def run(
4242
full_refresh: bool = False,
4343
empty: bool = False,
4444
) -> Plan:
45-
return self.context.plan(
46-
**self._plan_options(
45+
plan_builder = self._plan_builder(
46+
environment=environment,
47+
select=select,
48+
exclude=exclude,
49+
full_refresh=full_refresh,
50+
empty=empty,
51+
)
52+
53+
plan = plan_builder.build()
54+
55+
self.console.plan(
56+
plan_builder,
57+
default_catalog=self.context.default_catalog,
58+
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
59+
auto_apply=True,
60+
# dont output a diff of model changes
61+
no_diff=True,
62+
# don't throw up any prompts like "set the effective date" - use defaults
63+
no_prompts=True,
64+
)
65+
66+
return plan
67+
68+
def _plan_builder(
69+
self,
70+
environment: t.Optional[str] = None,
71+
select: t.Optional[t.List[str]] = None,
72+
exclude: t.Optional[t.List[str]] = None,
73+
full_refresh: bool = False,
74+
empty: bool = False,
75+
) -> PlanBuilder:
76+
return self.context.plan_builder(
77+
**self._plan_builder_options(
4778
environment=environment,
4879
select=select,
4980
exclude=exclude,
@@ -71,13 +102,15 @@ def _selected_models(
71102

72103
return selected_models
73104

74-
def _plan_options(
105+
def _plan_builder_options(
75106
self,
76-
environment: t.Optional[str] = None,
107+
# upstream dbt options
77108
select: t.Optional[t.List[str]] = None,
78109
exclude: t.Optional[t.List[str]] = None,
79110
empty: bool = False,
80111
full_refresh: bool = False,
112+
# sqlmesh extra options
113+
environment: t.Optional[str] = None,
81114
) -> t.Dict[str, t.Any]:
82115
import sqlmesh.core.constants as c
83116

@@ -130,24 +163,38 @@ def _plan_options(
130163
# `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated.
131164
# This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan.
132165
# However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared.
133-
# So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables
166+
# Currently, SQLMesh has no way to say "restate with empty data", because --restate-model coupled with --skip-backfill ends up being a no-op
134167
options["skip_backfill"] = True
135-
full_refresh = True
168+
169+
self.console.log_warning(
170+
"dbt's `--empty` drops the tables for all selected models and replaces them with empty ones.\n"
171+
"This can easily result in accidental data loss, so SQLMesh limits this to only new or modified models and leaves the tables for existing unmodified models alone.\n\n"
172+
"If you were creating empty tables to preview model changes, please consider using `--environment` to preview these changes in an isolated Virtual Data Environment instead.\n\n"
173+
"Otherwise, if you really do want dbt's `--empty` behaviour of clearing every selected table, please file an issue on GitHub so we can better understand the use-case.\n"
174+
)
175+
176+
if full_refresh:
177+
# --full-refresh is implemented in terms of "add every model as a restatement"
178+
# however, `--empty` sets skip_backfill=True, which causes the BackfillStage of the plan to be skipped.
179+
# the re-processing of data intervals happens in the BackfillStage, so if it gets skipped, restatements become a no-op
180+
raise ValueError("`--full-refresh` alongside `--empty` is not currently supported.")
136181

137182
if full_refresh:
138-
# TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod
139-
# if the plan targets dev
140-
pass
183+
options.update(
184+
dict(
185+
# Add every selected model as a restatement to force them to get repopulated from scratch
186+
restate_models=list(self.context.models)
187+
if not select_models
188+
else select_models,
189+
# by default in SQLMesh, restatements only operate on what has been committed to state.
190+
# in order to emulate dbt, we need to use the local filesystem instead, so we override this default
191+
always_include_local_changes=True,
192+
)
193+
)
141194

142195
return dict(
143196
environment=environment,
144197
select_models=select_models,
145-
# dont output a diff of model changes
146-
no_diff=True,
147-
# don't throw up any prompts like "set the effective date" - use defaults
148-
no_prompts=True,
149-
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
150-
auto_apply=True,
151198
**options,
152199
)
153200

tests/core/test_plan.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,7 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture):
826826
indirectly_modified={},
827827
deployability_index=DeployabilityIndex.all_deployable(),
828828
restatements={},
829+
restate_all_snapshots=False,
829830
end_bounded=False,
830831
ensure_finalized_snapshots=False,
831832
start_override_per_model=None,
@@ -1074,36 +1075,6 @@ def test_restate_missing_model(make_snapshot, mocker: MockerFixture):
10741075
PlanBuilder(context_diff, restate_models=["missing"]).build()
10751076

10761077

1077-
def test_new_snapshots_with_restatements(make_snapshot, mocker: MockerFixture):
1078-
snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds")))
1079-
1080-
context_diff = ContextDiff(
1081-
environment="test_environment",
1082-
is_new_environment=True,
1083-
is_unfinalized_environment=False,
1084-
normalize_environment_name=True,
1085-
create_from="prod",
1086-
create_from_env_exists=True,
1087-
added=set(),
1088-
removed_snapshots={},
1089-
modified_snapshots={},
1090-
snapshots={snapshot_a.snapshot_id: snapshot_a},
1091-
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
1092-
previous_plan_id=None,
1093-
previously_promoted_snapshot_ids=set(),
1094-
previous_finalized_snapshots=None,
1095-
previous_gateway_managed_virtual_layer=False,
1096-
gateway_managed_virtual_layer=False,
1097-
environment_statements=[],
1098-
)
1099-
1100-
with pytest.raises(
1101-
PlanError,
1102-
match=r"Model changes and restatements can't be a part of the same plan.*",
1103-
):
1104-
PlanBuilder(context_diff, restate_models=["a"]).build()
1105-
1106-
11071078
def test_end_validation(make_snapshot, mocker: MockerFixture):
11081079
snapshot_a = make_snapshot(
11091080
SqlModel(

0 commit comments

Comments
 (0)