Skip to content

Commit 80d786a

Browse files
authored
Fix: Set the plan DAG start date at request time (#1244)
1 parent 814a44b commit 80d786a

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

sqlmesh/schedulers/airflow/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class PlanDagSpec(PydanticModel):
7878
is_dev: bool
7979
forward_only: t.Optional[bool]
8080
environment_expiration_ts: t.Optional[int]
81+
dag_start_ts: t.Optional[int]
8182

8283

8384
class EnvironmentsResponse(PydanticModel):

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from sqlmesh.schedulers.airflow.operators.notification import (
2626
BaseNotificationOperatorProvider,
2727
)
28-
from sqlmesh.utils.date import TimeLike, to_datetime
28+
from sqlmesh.utils.date import TimeLike, to_datetime, yesterday_timestamp
2929
from sqlmesh.utils.errors import SQLMeshError
3030

3131
logger = logging.getLogger(__name__)
@@ -136,7 +136,9 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
136136
with DAG(
137137
dag_id=dag_id,
138138
schedule_interval="@once",
139-
start_date=pendulum.now(tz="UTC"),
139+
start_date=pendulum.instance(
140+
to_datetime(plan_dag_spec.dag_start_ts or yesterday_timestamp())
141+
),
140142
max_active_tasks=plan_dag_spec.backfill_concurrent_tasks,
141143
catchup=False,
142144
is_paused_upon_creation=False,

sqlmesh/schedulers/airflow/plan.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from sqlmesh.core.snapshot import SnapshotTableInfo
88
from sqlmesh.core.state_sync import StateSync
99
from sqlmesh.schedulers.airflow import common
10-
from sqlmesh.utils.date import now
10+
from sqlmesh.utils.date import now, now_timestamp
1111
from sqlmesh.utils.errors import SQLMeshError
1212

1313

@@ -97,6 +97,7 @@ def create_plan_dag_spec(
9797
is_dev=request.is_dev,
9898
forward_only=request.forward_only,
9999
environment_expiration_ts=request.environment.expiration_ts,
100+
dag_start_ts=now_timestamp(),
100101
)
101102

102103

tests/schedulers/airflow/test_plan.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,11 @@ def test_create_plan_dag_spec(
125125
state_sync_mock.get_environment.return_value = old_environment
126126
state_sync_mock.get_snapshot_intervals.return_value = []
127127

128-
plan_spec = create_plan_dag_spec(plan_request, state_sync_mock)
128+
with mock.patch(
129+
"sqlmesh.schedulers.airflow.plan.now_timestamp",
130+
side_effect=lambda: to_timestamp("2023-01-01"),
131+
):
132+
plan_spec = create_plan_dag_spec(plan_request, state_sync_mock)
129133
assert plan_spec == common.PlanDagSpec(
130134
request_id="test_request_id",
131135
environment_naming_info=EnvironmentNamingInfo(
@@ -152,6 +156,7 @@ def test_create_plan_dag_spec(
152156
users=[],
153157
is_dev=False,
154158
forward_only=True,
159+
dag_start_ts=to_timestamp("2023-01-01"),
155160
)
156161

157162
state_sync_mock.get_snapshots.assert_called_once()
@@ -240,9 +245,11 @@ def test_restatement(
240245
dev_intervals=[],
241246
)
242247
]
248+
now_value = "2022-01-09T23:59:59+00:00"
243249
with mock.patch(
244-
"sqlmesh.schedulers.airflow.plan.now",
245-
side_effect=lambda: to_datetime("2022-01-09T23:59:59+00:00"),
250+
"sqlmesh.schedulers.airflow.plan.now", side_effect=lambda: to_datetime(now_value)
251+
), mock.patch(
252+
"sqlmesh.schedulers.airflow.plan.now_timestamp", side_effect=lambda: to_timestamp(now_value)
246253
):
247254
plan_spec = create_plan_dag_spec(plan_request, state_sync_mock)
248255
assert plan_spec == common.PlanDagSpec(
@@ -271,6 +278,7 @@ def test_restatement(
271278
users=[],
272279
is_dev=False,
273280
forward_only=True,
281+
dag_start_ts=to_timestamp(now_value),
274282
)
275283

276284
state_sync_mock.get_snapshots.assert_called_once()

0 commit comments

Comments
 (0)