Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/guides/linter.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ Here are all of SQLMesh's built-in linting rules:
| `invalidselectstarexpansion` | Correctness | The query's top-level selection may be `SELECT *`, but only if SQLMesh can expand the `SELECT *` into individual columns |
| `noselectstar` | Stylistic | The query's top-level selection may not be `SELECT *`, even if SQLMesh can expand the `SELECT *` into individual columns |
| `nomissingaudits` | Governance | SQLMesh did not find any `audits` in the model's configuration to test data quality. |
| `nomissingexternalmodels` | Governance | All external models must be registered in the external_models.yaml file |
| `cronvalidator` | Governance | Upstream model has a cron expression with longer intervals than downstream model. Example: step_1(`@weekly`) -> step_2(`@daily`) -> step_3(`*/5 * * * *`). step_2 and step_3 are anchored to step_1's cron and will run on the same schedule as step_1. The fix is to align the schedules where a downstream model's cron is the same or has a longer cron interval than an upstream model's. |

### User-defined rules

Expand Down
1 change: 1 addition & 0 deletions examples/sushi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),
)
Expand Down
30 changes: 30 additions & 0 deletions sqlmesh/core/linter/rules/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,34 @@ def create_fix(self, model_name: str) -> t.Optional[Fix]:
)


class CronValidator(Rule):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the name be more specific? I feel like there can be multiple approaches to validating cron

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this? CronAlignmentValidator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or CronIntervalAlignment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last one is good, thanks

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

"""Upstream model has a cron expression with longer intervals than downstream model."""

def check_model(self, model: Model) -> t.Optional[RuleViolation]:
placeholder_start_date = "2020-01-01 10:00:00"

this_model_cron_next = model.cron_next(placeholder_start_date)

for upstream_model_name in model.depends_on:
upstream_model = self.context.get_model(upstream_model_name)

# Skip if upstream model doesn't exist
if upstream_model is None:
continue

# Skip model kinds since they don't run on cron schedules
skip_kinds = ["EXTERNAL", "EMBEDDED", "SEED"]
Copy link
Copy Markdown
Collaborator

@izeigerman izeigerman Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using enum values instead of hardcoded strings (ModelKindName)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

if upstream_model.kind.name in skip_kinds:
continue

upstream_model_cron_next = upstream_model.cron_next(placeholder_start_date)

if upstream_model_cron_next > this_model_cron_next:
Copy link
Copy Markdown
Collaborator

@izeigerman izeigerman Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this more and I don't think this is the right way to handle this. Imagine graph A (upstream) <- B (downstream) with crons

  • A: 5 * * * * (run at 5th minute of every hour)
  • B: 0 * * * * (run every hour)

Both models are hourly, so technically the upstream's cron interval is not "longer" as the violation suggests. However, B will run before A which might not be the desired behavior.

But if it was this instead:

  • A: 15 10 * * * (run at 10:15AM of every day)
  • B: 0 * * * * (run every hour)

Your check will return false given the current placeholder start date, because cron_next for A will be 10:15 but for B it will be 11:00. In this case upstream is daily, and its interval is indeed longer than Bs (hourly). You probably want your check to fail but it won't.

So if the length of the interval is what you're trying to capture here then you want to do the following instead:

upstream_cron_interval_unit = IntervalUnit.from_cron(upstream_model.cron)
this_cron_interval_unit = IntervalUnit.from_cron(model.cron)

if upstream_cron_interval_unit.seconds > this_cron_interval_unit.seconds:
    # violation

and move away from the placehoder date.

Copy link
Copy Markdown
Contributor Author

@sungchun12 sungchun12 Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually my first approach, but it won't work in isolation because the IntervalUnit is limited: https://github.com/TobikoData/sqlmesh/blob/5861dbaac291113b504380456cc1b38a897611cc/sqlmesh/core/node.py#L39-L45

For example, @weekly and @daily will be interpreted as being equal in interval unit seconds because they are both categorized as DAY.

upstream_cron_interval_unit = IntervalUnit.from_cron(upstream_model.cron)
this_cron_interval_unit = IntervalUnit.from_cron(model.cron)

if upstream_cron_interval_unit.seconds > this_cron_interval_unit.seconds:
    # violation

New Approach: Use an OR condition where if upstream model's next cron is greater OR its interval unit in seconds is greater, return a violation.

Example in action for your first scenario: You'll notice the placeholder date implementation works as expected. This is NOT a rule violation because model B runs AFTER model A.

A: 5 * * * * (run at 5th minute of every hour)
B: 0 * * * * (run every hour)

(Pdb) upstream_cron_interval_unit.seconds
3600
(Pdb) this_cron_interval_unit.seconds
3600
(Pdb) upstream_model_cron_next
datetime.datetime(2020, 1, 1, 10, 5, tzinfo=datetime.timezone.utc)
(Pdb) this_model_cron_next
datetime.datetime(2020, 1, 1, 11, 0, tzinfo=datetime.timezone.utc)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: it's important WHEN models are linted because when it's to deployed to prod determines the next run. For nuanced cron schedules, a model applied at a specific time can emit violation vs. not.

return self.violation(
f"Upstream model {upstream_model_name} has longer cron interval ({upstream_model.cron}) "
f"than this model ({model.cron})"
)
return None


BUILTIN_RULES = RuleSet(subclasses(__name__, Rule, (Rule,)))
59 changes: 59 additions & 0 deletions tests/core/linter/test_builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from sqlmesh import Context
from sqlmesh.core.linter.rule import Position, Range
from sqlmesh.core.model import load_sql_based_model
from sqlmesh.core import dialect as d


def test_no_missing_external_models(tmp_path, copy_to_temp_path) -> None:
Expand Down Expand Up @@ -31,6 +33,7 @@ def test_no_missing_external_models(tmp_path, copy_to_temp_path) -> None:
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down Expand Up @@ -84,6 +87,7 @@ def test_no_missing_external_models_with_existing_file_ending_in_newline(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down Expand Up @@ -141,6 +145,7 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down Expand Up @@ -172,3 +177,57 @@ def test_no_missing_external_models_with_existing_file_not_ending_in_newline(
)
fix_path = sushi_path / "external_models.yaml"
assert edit.path == fix_path


def test_cron_validator(tmp_path, copy_to_temp_path) -> None:
sushi_paths = copy_to_temp_path("examples/sushi")
sushi_path = sushi_paths[0]

# Override the config.py to turn on lint
with open(sushi_path / "config.py", "r") as f:
read_file = f.read()

before = """ linter=LinterConfig(
enabled=False,
rules=[
"ambiguousorinvalidcolumn",
"invalidselectstarexpansion",
"noselectstar",
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["cronvalidator"]),"""
read_file = read_file.replace(before, after)
assert after in read_file
with open(sushi_path / "config.py", "w") as f:
f.writelines(read_file)

# Load the context with the temporary sushi path
context = Context(paths=[sushi_path])

context.load()

# Create model with shorter cron interval that depends on model with longer interval
upstream_model = load_sql_based_model(
d.parse("MODEL (name memory.sushi.step_1, cron '@weekly'); SELECT * FROM (SELECT 1)")
)

downstream_model = load_sql_based_model(
d.parse(
"MODEL (name memory.sushi.step_2, cron '@daily', depends_on ['memory.sushi.step_1']); SELECT * FROM (SELECT 1)"
)
)

context.upsert_model(upstream_model)
context.upsert_model(downstream_model)

lints = context.lint_models(raise_on_error=False)
assert len(lints) == 1
lint = lints[0]
assert (
lint.violation_msg
== 'Upstream model "memory"."sushi"."step_1" has longer cron interval (@weekly) than this model (@daily)'
)
1 change: 1 addition & 0 deletions tests/lsp/test_code_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def test_code_actions_create_file(copy_to_temp_path: t.Callable) -> None:
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down
1 change: 1 addition & 0 deletions tests/lsp/test_reference_external_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def test_unregistered_external_model_with_schema(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],
),"""
after = """linter=LinterConfig(enabled=True, rules=["nomissingexternalmodels"]),"""
Expand Down
1 change: 1 addition & 0 deletions vscode/extension/tests/quickfix.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ test.fixme(
"nomissingaudits",
"nomissingowner",
"nomissingexternalmodels",
"cronvalidator",
],`,
targetRules,
)
Expand Down