Skip to content

Commit d123712

Browse files
committed
Feat: Add a janitor configuration that allows to warn instead of failing if it fails to delete an expired environment schema / view (#4150)
1 parent b3cb99d commit d123712

File tree

6 files changed

+83
-8
lines changed

6 files changed

+83
-8
lines changed

docs/reference/configuration.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,16 @@ Formatting settings for the `sqlmesh format` command and UI.
103103
| `append_newline` | Whether to append a newline to the end of the file (Default: False) | boolean | N |
104104
| `no_rewrite_casts` | Preserve the existing casts, without rewriting them to use the :: syntax. (Default: False) | boolean | N |
105105

106+
107+
## Janitor
108+
109+
Configuration for the `sqlmesh janitor` command.
110+
111+
| Option | Description | Type | Required |
112+
|--------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
113+
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
114+
115+
106116
## UI
107117

108118
SQLMesh UI settings.

sqlmesh/core/config/janitor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from __future__ import annotations
2+
3+
4+
from sqlmesh.core.config.base import BaseConfig
5+
6+
7+
class JanitorConfig(BaseConfig):
8+
"""The configuration for the janitor.
9+
10+
Args:
11+
warn_on_delete_failure: Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views.
12+
"""
13+
14+
warn_on_delete_failure: bool = False

sqlmesh/core/config/root.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from sqlmesh.core.config.feature_flag import FeatureFlag
2626
from sqlmesh.core.config.format import FormatConfig
2727
from sqlmesh.core.config.gateway import GatewayConfig
28+
from sqlmesh.core.config.janitor import JanitorConfig
2829
from sqlmesh.core.config.migration import MigrationConfig
2930
from sqlmesh.core.config.model import ModelDefaultsConfig
3031
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
@@ -126,6 +127,7 @@ class Config(BaseConfig):
126127
before_all: t.Optional[t.List[str]] = None
127128
after_all: t.Optional[t.List[str]] = None
128129
linter: LinterConfig = LinterConfig()
130+
janitor: JanitorConfig = JanitorConfig()
129131

130132
_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
131133
"gateways": UpdateStrategy.NESTED_UPDATE,

sqlmesh/core/context.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2307,7 +2307,12 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
23072307

23082308
def _cleanup_environments(self) -> None:
23092309
expired_environments = self.state_sync.delete_expired_environments()
2310-
cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
2310+
cleanup_expired_views(
2311+
self.engine_adapter,
2312+
expired_environments,
2313+
warn_on_delete_failure=self.config.janitor.warn_on_delete_failure,
2314+
console=self.console,
2315+
)
23112316

23122317
def _try_connection(self, connection_name: str, validator: t.Callable[[], None]) -> None:
23132318
connection_name = connection_name.capitalize()

sqlmesh/core/state_sync/common.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121

2222

2323
def cleanup_expired_views(
24-
adapter: EngineAdapter, environments: t.List[Environment], console: t.Optional[Console] = None
24+
adapter: EngineAdapter,
25+
environments: t.List[Environment],
26+
warn_on_delete_failure: bool = False,
27+
console: t.Optional[Console] = None,
2528
) -> None:
2629
expired_schema_environments = [
2730
environment for environment in environments if environment.suffix_target.is_schema
@@ -52,9 +55,11 @@ def cleanup_expired_views(
5255
if console:
5356
console.update_cleanup_progress(schema.sql(dialect=adapter.dialect))
5457
except Exception as e:
55-
raise SQLMeshError(
56-
f"Failed to drop the expired environment schema '{schema}': {e}"
57-
) from e
58+
message = f"Failed to drop the expired environment schema '{schema}': {e}"
59+
if warn_on_delete_failure:
60+
logger.warning(message)
61+
else:
62+
raise SQLMeshError(message) from e
5863
for expired_view in {
5964
snapshot.qualified_view_name.for_environment(
6065
environment.naming_info, dialect=adapter.dialect
@@ -68,9 +73,11 @@ def cleanup_expired_views(
6873
if console:
6974
console.update_cleanup_progress(expired_view)
7075
except Exception as e:
71-
raise SQLMeshError(
72-
f"Failed to drop the expired environment view '{expired_view}': {e}"
73-
) from e
76+
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
77+
if warn_on_delete_failure:
78+
logger.warning(message)
79+
else:
80+
raise SQLMeshError(message) from e
7481

7582

7683
def transactional() -> t.Callable[[t.Callable], t.Callable]:

tests/core/state_sync/test_state_sync.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2651,6 +2651,43 @@ def test_cleanup_expired_views(
26512651
]
26522652

26532653

2654+
@pytest.mark.parametrize(
2655+
"suffix_target", [EnvironmentSuffixTarget.SCHEMA, EnvironmentSuffixTarget.TABLE]
2656+
)
2657+
def test_cleanup_expired_environment_schema_warn_on_delete_failure(
2658+
mocker: MockerFixture, make_snapshot: t.Callable, suffix_target: EnvironmentSuffixTarget
2659+
):
2660+
adapter = mocker.MagicMock()
2661+
adapter.dialect = None
2662+
adapter.drop_schema.side_effect = Exception("Failed to drop the schema")
2663+
adapter.drop_view.side_effect = Exception("Failed to drop the view")
2664+
2665+
snapshot = make_snapshot(
2666+
SqlModel(name="test_catalog.test_schema.test_model", query=parse_one("select 1, ds"))
2667+
)
2668+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
2669+
schema_environment = Environment(
2670+
name="test_environment",
2671+
suffix_target=suffix_target,
2672+
snapshots=[snapshot.table_info],
2673+
start_at="2022-01-01",
2674+
end_at="2022-01-01",
2675+
plan_id="test_plan_id",
2676+
previous_plan_id="test_plan_id",
2677+
catalog_name_override="catalog_override",
2678+
)
2679+
2680+
with pytest.raises(SQLMeshError, match="Failed to drop the expired environment .*"):
2681+
cleanup_expired_views(adapter, [schema_environment], warn_on_delete_failure=False)
2682+
2683+
cleanup_expired_views(adapter, [schema_environment], warn_on_delete_failure=True)
2684+
2685+
if suffix_target == EnvironmentSuffixTarget.SCHEMA:
2686+
assert adapter.drop_schema.called
2687+
else:
2688+
assert adapter.drop_view.called
2689+
2690+
26542691
def test_max_interval_end_per_model(
26552692
state_sync: EngineAdapterStateSync, make_snapshot: t.Callable
26562693
) -> None:

0 commit comments

Comments
 (0)