Skip to content

feat(dag): Skip dataset-triggered dags without SerializedDagModel#63546

Open
leossantos wants to merge 11 commits intoapache:v2-11-stablefrom
leossantos:leo-santos/addtional-scheduler-fix
Open

feat(dag): Skip dataset-triggered dags without SerializedDagModel#63546
leossantos wants to merge 11 commits intoapache:v2-11-stablefrom
leossantos:leo-santos/addtional-scheduler-fix

Conversation

@leossantos
Copy link
Copy Markdown

@leossantos leossantos commented Mar 13, 2026

Summary

DagModel.dags_needing_dagruns could treat dataset-scheduled DAGs as ready for a new run when they had DatasetDagRunQueue rows but no SerializedDagModel row in the same evaluation window. The dataset timetable condition was never evaluated for those DAGs, yet they could still flow into dataset_triggered_dag_info, allowing premature dataset-triggered DagRuns.

This PR removes such dag_ids from the in-memory by_dag / dag_statuses maps until serialization exists. DatasetDagRunQueue ORM rows are not deleted here, so the scheduler can re-evaluate on a later heartbeat. DEBUG logging records skipped DAGs (missing serialization) and, when applicable, satisfied dataset conditions. Docstring updated accordingly.

Tests: two TestDagModel cases cover missing SerializedDagModel (single and multiple DAGs), DEBUG log expectations, and assert dataset_dag_run_queue row counts unchanged after dags_needing_dagruns.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    Cursor

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Mar 13, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@leossantos leossantos changed the title feat(scheduler): add INFO logging for dataset-triggered DagRun creation feat(dag): Add control to remove non serialized dags from by_dag Mar 24, 2026
@kaxil kaxil added this to the Airflow 2.11.3 milestone Mar 24, 2026
@kaxil
Copy link
Copy Markdown
Member

kaxil commented Mar 24, 2026

A couple of meta-questions:

  1. This PR targets v2-11-stable. The dag.py guard against missing SerializedDagModel is a real fix for a race condition. Does this same issue exist on main? If so, the fix should land there too (or first).

  2. The diagnostic logging ([DEBUG DATASETS] prefix, N+1 lazy loads, per-loop queries) reads like temporary instrumentation added while investigating a specific incident. Is the intent to merge this into the stable branch permanently, or was this meant to be a local debugging patch? If it is meant to be permanent, the N+1 issues and log levels need to be addressed first — this would regress scheduler performance under load.

@leossantos
Copy link
Copy Markdown
Author

@kaxil Thanks for the review.

Re: main — yes, the same structural issue exists on main (asset-based code path). We plan a separate PR against main with a manual port; this PR targets 2.11.x only. Happy to follow whichever merge order you prefer.

Re: logging — the [DEBUG DATASETS] / per-DAG re-query instrumentation was for our deployment debugging, not intended to merge as-is. The updated PR will keep only the SerializedDagModel guard + tests (+ newsfragment); any remaining log line in the fix will be DEBUG only. Inline comments (joinedload if needed, DDRQ preservation test, minor cleanups) will be addressed in the next push.

Log which DAGs are selected as dataset-triggered (with ADRQ timestamp
ranges) and log successful DagRun creation with dag_id, exec_date,
prev_exec, event count, and event URIs. This provides visibility into
the scheduler's dataset trigger decisions for debugging premature
trigger incidents.

Made-with: Cursor
…sions

Log the full context of dataset-triggered scheduling to debug premature
trigger incidents:

- P0: Log condition, DDRQ URIs, and count when dataset_condition is
  satisfied (INFO in dags_needing_dagruns)
- P1: Warn on DDRQ/event mismatch when queued URIs have no matching
  DatasetEvent in the timestamp range (WARNING in
  _create_dag_runs_dataset_triggered)
- P2: Include data_interval start/end in the DagRun creation log
- P3: Log consumed event timestamps and source DAG/run_id (DEBUG)

Made-with: Cursor
…l is missing

DAGs with DDRQ entries but no corresponding SerializedDagModel were
bypassing dataset condition evaluation in dags_needing_dagruns() and
entering dataset_triggered_dag_info unchecked. This caused premature
triggers with partial events when the DAG processor was mid-parse cycle.

Now explicitly detects the mismatch and excludes those DAGs from the
current scheduler loop. DDRQ entries are preserved so the DAG is
re-evaluated on the next heartbeat (~5s) once serialization completes.

Made-with: Cursor
…DAGs

This change ensures that the `missing_from_serialized` variable is deleted after its entries have been processed, preventing potential memory leaks and maintaining cleaner state management within the DAG model.
Drop [DEBUG DATASETS] instrumentation from SchedulerJobRunner and DagModel dataset-readiness loop; inline timetable dataset_condition where it is only used once.
Log the DDRQ-without-serialization case at debug and remove the
[DEBUG DATASETS] prefix; drop redundant del of missing_from_serialized.
Tests capture DEBUG, match the new message, and assert
dataset_dag_run_queue rows remain after dags_needing_dagruns.
@leossantos leossantos force-pushed the leo-santos/addtional-scheduler-fix branch from 5682993 to 872c56c Compare March 26, 2026 20:35
@leossantos leossantos marked this pull request as ready for review March 27, 2026 18:03
@leossantos leossantos requested review from XD-DENG and ashb as code owners March 27, 2026 18:03
@leossantos
Copy link
Copy Markdown
Author

I am creating the port forward for Airflow 3

@leossantos
Copy link
Copy Markdown
Author

The port forwarding PR for main is #64322

@leossantos leossantos changed the title feat(dag): Add control to remove non serialized dags from by_dag feat(dag): Skip dataset-triggered dags without SerializedDagModel Mar 27, 2026
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Mar 27, 2026
Split DagModel and DatasetDagRunQueue inserts and flush after DagModel so foreign-key order matches production DB constraints in TestDagModel.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants