Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,7 @@ def plan_builder(
explain: t.Optional[bool] = None,
ignore_cron: t.Optional[bool] = None,
min_intervals: t.Optional[int] = None,
always_include_local_changes: t.Optional[bool] = None,
) -> PlanBuilder:
"""Creates a plan builder.

Expand Down Expand Up @@ -1467,6 +1468,8 @@ def plan_builder(
diff_rendered: Whether the diff should compare raw vs rendered models
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
on every model when checking for missing intervals
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.

Returns:
The plan builder.
Expand Down Expand Up @@ -1583,13 +1586,20 @@ def plan_builder(
"Selector did not return any models. Please check your model selection and try again."
)

if always_include_local_changes is None:
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
force_no_diff = restate_models is not None or (
backfill_models is not None and not backfill_models
)
else:
force_no_diff = not always_include_local_changes

snapshots = self._snapshots(models_override)
context_diff = self._context_diff(
environment or c.PROD,
snapshots=snapshots,
create_from=create_from,
force_no_diff=restate_models is not None
or (backfill_models is not None and not backfill_models),
force_no_diff=force_no_diff,
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
diff_rendered=diff_rendered,
always_recreate_environment=self.config.plan.always_recreate_environment,
Expand Down Expand Up @@ -1644,13 +1654,22 @@ def plan_builder(
elif forward_only is None:
forward_only = self.config.plan.forward_only

# When handling prod restatements, only clear intervals from other model versions if we are using full virtual environments
# If we are not, then there is no point, because none of the data in dev environments can be promoted by definition
restate_all_snapshots = (
expanded_restate_models is not None
and not is_dev
and self.config.virtual_environment_mode.is_full
)

return self.PLAN_BUILDER_TYPE(
context_diff=context_diff,
start=start,
end=end,
execution_time=execution_time,
apply=self.apply,
restate_models=expanded_restate_models,
restate_all_snapshots=restate_all_snapshots,
backfill_models=backfill_models,
no_gaps=no_gaps,
skip_backfill=skip_backfill,
Expand Down
16 changes: 6 additions & 10 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class PlanBuilder:
restate_models: A list of models for which the data should be restated for the time range
specified in this plan. Note: models defined outside SQLMesh (external) won't be a part
of the restatement.
restate_all_snapshots: If restatements are present, this flag indicates whether or not the intervals
being restated should be cleared from state for other versions of this model (typically, versions that are present in other environments).
If set to None, the default behaviour is to not clear anything unless the target environment is prod.
backfill_models: A list of fully qualified model names for which the data should be backfilled as part of this plan.
no_gaps: Whether to ensure that new snapshots for nodes that are already a
part of the target environment have no data gaps when compared against previous
Expand Down Expand Up @@ -103,6 +106,7 @@ def __init__(
execution_time: t.Optional[TimeLike] = None,
apply: t.Optional[t.Callable[[Plan], None]] = None,
restate_models: t.Optional[t.Iterable[str]] = None,
restate_all_snapshots: bool = False,
backfill_models: t.Optional[t.Iterable[str]] = None,
no_gaps: bool = False,
skip_backfill: bool = False,
Expand Down Expand Up @@ -154,6 +158,7 @@ def __init__(
self._auto_categorization_enabled = auto_categorization_enabled
self._include_unmodified = include_unmodified
self._restate_models = set(restate_models) if restate_models is not None else None
self._restate_all_snapshots = restate_all_snapshots
self._effective_from = effective_from

# note: this deliberately doesnt default to now() here.
Expand Down Expand Up @@ -277,7 +282,6 @@ def build(self) -> Plan:
if self._latest_plan:
return self._latest_plan

self._ensure_no_new_snapshots_with_restatements()
self._ensure_new_env_with_changes()
self._ensure_valid_date_range()
self._ensure_no_broken_references()
Expand Down Expand Up @@ -340,6 +344,7 @@ def build(self) -> Plan:
deployability_index=deployability_index,
selected_models_to_restate=self._restate_models,
restatements=restatements,
restate_all_snapshots=self._restate_all_snapshots,
start_override_per_model=self._start_override_per_model,
end_override_per_model=end_override_per_model,
selected_models_to_backfill=self._backfill_models,
Expand Down Expand Up @@ -859,15 +864,6 @@ def _ensure_no_broken_references(self) -> None:
f"""Removed {broken_references_msg} are referenced in '{snapshot.name}'. Please remove broken references before proceeding."""
)

def _ensure_no_new_snapshots_with_restatements(self) -> None:
if self._restate_models is not None and (
self._context_diff.new_snapshots or self._context_diff.modified_snapshots
):
raise PlanError(
"Model changes and restatements can't be a part of the same plan. "
"Revert or apply changes before proceeding with restatements."
)

def _ensure_new_env_with_changes(self) -> None:
if (
self._is_dev
Expand Down
4 changes: 4 additions & 0 deletions sqlmesh/core/plan/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class Plan(PydanticModel, frozen=True):
Note that dev previews are also considered restatements, so :selected_models_to_restate can be empty
while :restatements is still populated with dev previews
"""
restate_all_snapshots: bool
"""Whether or not to clear intervals from state for other versions of the models listed in :restatements"""

start_override_per_model: t.Optional[t.Dict[str, datetime]]
end_override_per_model: t.Optional[t.Dict[str, datetime]]
Expand Down Expand Up @@ -268,6 +270,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
skip_backfill=self.skip_backfill,
empty_backfill=self.empty_backfill,
restatements={s.name: i for s, i in self.restatements.items()},
restate_all_snapshots=self.restate_all_snapshots,
is_dev=self.is_dev,
allow_destructive_models=self.allow_destructive_models,
allow_additive_models=self.allow_additive_models,
Expand Down Expand Up @@ -312,6 +315,7 @@ class EvaluatablePlan(PydanticModel):
skip_backfill: bool
empty_backfill: bool
restatements: t.Dict[str, Interval]
restate_all_snapshots: bool
is_dev: bool
allow_destructive_models: t.Set[str]
allow_additive_models: t.Set[str]
Expand Down
18 changes: 12 additions & 6 deletions sqlmesh/core/plan/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
SnapshotTableInfo,
SnapshotId,
)
from sqlmesh.utils.errors import PlanError


@dataclass
Expand Down Expand Up @@ -452,13 +453,18 @@ def _get_after_all_stage(
def _get_restatement_stage(
self, plan: EvaluatablePlan, snapshots_by_name: t.Dict[str, Snapshot]
) -> t.Optional[RestatementStage]:
if not plan.restatements or plan.is_dev:
# The RestatementStage to clear intervals from state across all environments is not needed for plans against dev, only prod
return None
if plan.restate_all_snapshots:
if plan.is_dev:
raise PlanError(
"Clearing intervals from state across dev model versions is only valid for prod plans"
)

return RestatementStage(
all_snapshots=snapshots_by_name,
)
if plan.restatements:
return RestatementStage(
all_snapshots=snapshots_by_name,
)

return None

def _get_physical_layer_update_stage(
self,
Expand Down
4 changes: 3 additions & 1 deletion sqlmesh_dbt/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def dbt(
@click.option(
"-f",
"--full-refresh",
help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.",
is_flag=True,
default=False,
help="If specified, sqlmesh will drop incremental models and fully-recalculate the incremental table from the model definition.",
)
@click.option(
"--env",
Expand Down
79 changes: 63 additions & 16 deletions sqlmesh_dbt/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sqlmesh.dbt.project import Project
from sqlmesh_dbt.console import DbtCliConsole
from sqlmesh.core.model import Model
from sqlmesh.core.plan import Plan
from sqlmesh.core.plan import Plan, PlanBuilder

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,8 +42,39 @@ def run(
full_refresh: bool = False,
empty: bool = False,
) -> Plan:
return self.context.plan(
**self._plan_options(
plan_builder = self._plan_builder(
environment=environment,
select=select,
exclude=exclude,
full_refresh=full_refresh,
empty=empty,
)

plan = plan_builder.build()

self.console.plan(
plan_builder,
default_catalog=self.context.default_catalog,
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
auto_apply=True,
# dont output a diff of model changes
no_diff=True,
# don't throw up any prompts like "set the effective date" - use defaults
no_prompts=True,
)

return plan

def _plan_builder(
self,
environment: t.Optional[str] = None,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
full_refresh: bool = False,
empty: bool = False,
) -> PlanBuilder:
return self.context.plan_builder(
**self._plan_builder_options(
environment=environment,
select=select,
exclude=exclude,
Expand Down Expand Up @@ -71,13 +102,15 @@ def _selected_models(

return selected_models

def _plan_options(
def _plan_builder_options(
self,
environment: t.Optional[str] = None,
# upstream dbt options
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
empty: bool = False,
full_refresh: bool = False,
# sqlmesh extra options
environment: t.Optional[str] = None,
) -> t.Dict[str, t.Any]:
import sqlmesh.core.constants as c

Expand Down Expand Up @@ -130,24 +163,38 @@ def _plan_options(
# `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated.
# This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan.
# However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared.
# So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables
# Currently, SQLMesh has no way to say "restate with empty data", because --restate-model coupled with --skip-backfill ends up being a no-op
options["skip_backfill"] = True
full_refresh = True

self.console.log_warning(
"dbt's `--empty` drops the tables for all selected models and replaces them with empty ones.\n"
"This can easily result in accidental data loss, so SQLMesh limits this to only new or modified models and leaves the tables for existing unmodified models alone.\n\n"
"If you were creating empty tables to preview model changes, please consider using `--environment` to preview these changes in an isolated Virtual Data Environment instead.\n\n"
"Otherwise, if you really do want dbt's `--empty` behaviour of clearing every selected table, please file an issue on GitHub so we can better understand the use-case.\n"
)

if full_refresh:
# --full-refresh is implemented in terms of "add every model as a restatement"
# however, `--empty` sets skip_backfill=True, which causes the BackfillStage of the plan to be skipped.
# the re-processing of data intervals happens in the BackfillStage, so if it gets skipped, restatements become a no-op
raise ValueError("`--full-refresh` alongside `--empty` is not currently supported.")

if full_refresh:
# TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod
# if the plan targets dev
pass
options.update(
dict(
# Add every selected model as a restatement to force them to get repopulated from scratch
restate_models=list(self.context.models)
if not select_models
else select_models,
# by default in SQLMesh, restatements only operate on what has been committed to state.
# in order to emulate dbt, we need to use the local filesystem instead, so we override this default
always_include_local_changes=True,
)
)

return dict(
environment=environment,
select_models=select_models,
# dont output a diff of model changes
no_diff=True,
# don't throw up any prompts like "set the effective date" - use defaults
no_prompts=True,
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
auto_apply=True,
**options,
)

Expand Down
31 changes: 1 addition & 30 deletions tests/core/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ def test_missing_intervals_lookback(make_snapshot, mocker: MockerFixture):
indirectly_modified={},
deployability_index=DeployabilityIndex.all_deployable(),
restatements={},
restate_all_snapshots=False,
end_bounded=False,
ensure_finalized_snapshots=False,
start_override_per_model=None,
Expand Down Expand Up @@ -1074,36 +1075,6 @@ def test_restate_missing_model(make_snapshot, mocker: MockerFixture):
PlanBuilder(context_diff, restate_models=["missing"]).build()


def test_new_snapshots_with_restatements(make_snapshot, mocker: MockerFixture):
snapshot_a = make_snapshot(SqlModel(name="a", query=parse_one("select 1, ds")))

context_diff = ContextDiff(
environment="test_environment",
is_new_environment=True,
is_unfinalized_environment=False,
normalize_environment_name=True,
create_from="prod",
create_from_env_exists=True,
added=set(),
removed_snapshots={},
modified_snapshots={},
snapshots={snapshot_a.snapshot_id: snapshot_a},
new_snapshots={snapshot_a.snapshot_id: snapshot_a},
previous_plan_id=None,
previously_promoted_snapshot_ids=set(),
previous_finalized_snapshots=None,
previous_gateway_managed_virtual_layer=False,
gateway_managed_virtual_layer=False,
environment_statements=[],
)

with pytest.raises(
PlanError,
match=r"Model changes and restatements can't be a part of the same plan.*",
):
PlanBuilder(context_diff, restate_models=["a"]).build()


def test_end_validation(make_snapshot, mocker: MockerFixture):
snapshot_a = make_snapshot(
SqlModel(
Expand Down
Loading