-
Notifications
You must be signed in to change notification settings - Fork 17.2k
Add AssetAndTimeSchedule timetable
#58543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c080852
20f85d9
2c28da4
330380c
806c764
209d80e
281f5e5
1013164
3718a5c
71c1527
b53c549
ba13f02
392483e
26367b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -113,6 +113,7 @@ | |
| from airflow.serialization.definitions.assets import SerializedAssetUniqueKey | ||
| from airflow.serialization.definitions.notset import NOTSET | ||
| from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES | ||
| from airflow.timetables.assets import AssetAndTimeSchedule | ||
| from airflow.timetables.base import compute_rollup_fingerprint | ||
| from airflow.timetables.simple import AssetTriggeredTimetable, PartitionedAssetTimetable | ||
| from airflow.utils.event_scheduler import EventScheduler | ||
|
|
@@ -2542,6 +2543,7 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): | |
| cached_get_dag: Callable[[DagRun], SerializedDAG | None] = lru_cache()( | ||
| partial(self.scheduler_dag_bag.get_dag_for_run, session=session) | ||
| ) | ||
| asset_evaluator = AssetEvaluator(session) | ||
|
|
||
| for dag_run in dag_runs: | ||
| dag_id = dag_run.dag_id | ||
|
|
@@ -2582,6 +2584,55 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): | |
| dag_run.run_id, | ||
| ) | ||
| continue | ||
| # For AssetAndTimeSchedule, defer starting until all required assets are queued. | ||
| # Only gate scheduled runs; manual and backfill runs should start immediately. | ||
| if isinstance(dag.timetable, AssetAndTimeSchedule) and dag_run.run_type == DagRunType.SCHEDULED: | ||
|
Comment on lines
+2587
to
+2589
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I understand the demand, but not being an expert in timetables and scheduler... looking at the code I am not fully convinced that the proposed implementation strategy is "good". It assumes the Dag is triggered to queued state at point of schedule and then is sitting "queued" waiting for events. I think this is the wrong state model. In my view the trigger should only happen at point of schedule if events are available. In all other cases if should not generate a Dag run in my view and this also should not mark the Dag as failed just because of missing events. Marking a queued Dag run to failed is the wrong signal, feels like something is broken. Also besides this the code you are using is not updating the tasks to be skipped but they stay in the state like before ("none"), usually if a Dag run is set to failed, tasks are set to skipped.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks Jens, this is a really, really helpful insight. I'm checking the fundamental mechanism of ADRQ and I found there might be a clean way to design for this case. Please give me a couple days to study and think, and I'll try to propose a better design later. |
||
| # Reuse dagrun_timeout to fail runs that wait in QUEUED for assets for too long. | ||
| if ( | ||
| dag.dagrun_timeout | ||
| and dag_run.queued_at | ||
| and dag_run.queued_at < timezone.utcnow() - dag.dagrun_timeout | ||
| ): | ||
|
nailo2c marked this conversation as resolved.
|
||
| dag_run.set_state(DagRunState.FAILED) | ||
| session.flush() | ||
| self.log.info( | ||
| "Run %s of %s has timed-out while waiting for assets", | ||
| dag_run.run_id, | ||
| dag_run.dag_id, | ||
| ) | ||
| if dag_run.dag_model is not None: | ||
| self._set_exceeds_max_active_runs(dag_model=dag_run.dag_model, session=session) | ||
|
nailo2c marked this conversation as resolved.
|
||
| dag_run.notify_dagrun_state_changed(msg="timed_out") | ||
| continue | ||
|
|
||
| queued_adrqs = session.scalars( | ||
| with_row_locks( | ||
| select(AssetDagRunQueue) | ||
| .where(AssetDagRunQueue.target_dag_id == dag_id) | ||
| .options(joinedload(AssetDagRunQueue.asset)), | ||
| of=AssetDagRunQueue, | ||
| session=session, | ||
| skip_locked=True, | ||
| ) | ||
| ).all() | ||
| statuses = { | ||
| SerializedAssetUniqueKey.from_asset(record.asset): True for record in queued_adrqs | ||
| } | ||
|
|
||
| if not asset_evaluator.run(dag.timetable.asset_condition, statuses=statuses): | ||
| self.log.debug("Deferring DagRun until assets ready; dag_id=%s run_id=%s", dag_id, run_id) | ||
| # Do not increment active run counts; we didn't start it. | ||
| continue | ||
|
|
||
| if queued_adrqs: | ||
| # Consume only the rows selected for this DagRun to avoid races with new asset events. | ||
| adrq_pks = [(record.asset_id, record.target_dag_id) for record in queued_adrqs] | ||
| session.execute( | ||
| delete(AssetDagRunQueue).where( | ||
| tuple_(AssetDagRunQueue.asset_id, AssetDagRunQueue.target_dag_id).in_(adrq_pks) | ||
| ) | ||
| ) | ||
|
|
||
| active_runs_of_dags[(dag_run.dag_id, backfill_id)] += 1 | ||
| _update_state(dag, dag_run) | ||
| dag_run.notify_dagrun_state_changed(msg="started") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.