From f8d7eaed51c3cc0962088550c0cb72a65770fabb Mon Sep 17 00:00:00 2001 From: Yuseok Jo Date: Sun, 14 Jun 2026 23:12:22 +0900 Subject: [PATCH] [AIP-94] Route airflow backfill create through the API server --- .../airflow/cli/commands/backfill_command.py | 109 +++------ .../cli/commands/test_backfill_command.py | 231 ++++-------------- .../cli/commands/test_command_deprecations.py | 7 +- airflow-ctl/src/airflowctl/api/operations.py | 7 +- .../tests/airflow_ctl/api/test_operations.py | 10 +- 5 files changed, 108 insertions(+), 256 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py b/airflow-core/src/airflow/cli/commands/backfill_command.py index 5ce8cc9224291..0e341887fdeb9 100644 --- a/airflow-core/src/airflow/cli/commands/backfill_command.py +++ b/airflow-core/src/airflow/cli/commands/backfill_command.py @@ -18,86 +18,26 @@ from __future__ import annotations import json -import logging -import signal from tabulate import tabulate -from airflow import settings -from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version +from airflow.api_fastapi.core_api.datamodels.backfills import BackfillPostBody +from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client from airflow.cli.simple_table import AirflowConsole -from airflow.exceptions import AirflowConfigException -from airflow.models.backfill import ReprocessBehavior, _create_backfill, _do_dry_run +from airflow.cli.utils import deprecated_for_airflowctl +from airflow.models.backfill import ReprocessBehavior from airflow.utils import cli as cli_utils -from airflow.utils.cli import sigint_handler -from airflow.utils.platform import getuser from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.utils.session import create_session - -log = logging.getLogger(__name__) @cli_utils.action_cli +@deprecated_for_airflowctl("airflowctl backfill create") @providers_configuration_loaded -def create_backfill(args) -> None: - """Create backfill job or dry run for a Dag or list of Dags using regex.""" - logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT) - signal.signal(signal.SIGTERM, sigint_handler) +@provide_api_client +def create_backfill(args, api_client: Client = NEW_API_CLIENT) -> None: + """Create a backfill job, or dry run one, for a Dag through the API server.""" console = AirflowConsole() - if args.reprocess_behavior is not None: - reprocess_behavior = ReprocessBehavior(args.reprocess_behavior) - else: - reprocess_behavior = None - - with create_session() as session: - resolved_run_on_latest = resolve_run_on_latest_version( - args.run_on_latest_version, - args.dag_id, - session, - fallback=True, - ) - - if args.dry_run: - console.print("Performing dry run of backfill.") - console.print("Printing params:") - params = dict( - dag_id=args.dag_id, - from_date=args.from_date, - to_date=args.to_date, - max_active_runs=args.max_active_runs, - reverse=args.run_backwards, - dag_run_conf=args.dag_run_conf, - reprocess_behavior=reprocess_behavior, - run_on_latest_version=resolved_run_on_latest, - ) - for k, v in params.items(): - console.print(f" - {k} = {v}") - with create_session() as session: - infos = _do_dry_run( - dag_id=args.dag_id, - from_date=args.from_date, - to_date=args.to_date, - reverse=args.run_backwards, - reprocess_behavior=args.reprocess_behavior, - session=session, - ) - console.print("Runs to be attempted:") - rows = [ - dict(logical_date=d.logical_date, partition_key=d.partition_key, partition_date=d.partition_date) - for d in infos - ] - output = tabulate(rows, tablefmt="grid", headers="keys") - console.print(output) - return - - try: - user = getuser() - except AirflowConfigException as e: - log.warning("Failed to get user name from os: %s, not setting the triggering user", e) - user = None - - # Parse dag_run_conf if provided dag_run_conf = None if args.dag_run_conf: try: @@ -105,14 +45,35 @@ def create_backfill(args) -> None: except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in --dag-run-conf: {e}") - _create_backfill( + # ``run_on_latest_version`` and the triggering user are resolved server-side; the + # core_api request model is wire-compatible with the airflowctl client's generated model. + body_kwargs: dict = dict( dag_id=args.dag_id, from_date=args.from_date, to_date=args.to_date, - max_active_runs=args.max_active_runs, - reverse=args.run_backwards, + run_backwards=args.run_backwards, dag_run_conf=dag_run_conf, - triggering_user_name=user, - reprocess_behavior=reprocess_behavior, - run_on_latest_version=resolved_run_on_latest, + run_on_latest_version=args.run_on_latest_version, ) + if args.reprocess_behavior is not None: + body_kwargs["reprocess_behavior"] = ReprocessBehavior(args.reprocess_behavior) + if args.max_active_runs is not None: + body_kwargs["max_active_runs"] = args.max_active_runs + backfill_body = BackfillPostBody(**body_kwargs) + + if args.dry_run: + console.print("Performing dry run of backfill.") + dry_run = api_client.backfills.create_dry_run(backfill=backfill_body) # type: ignore[arg-type] + rows = [ + dict( + logical_date=run.logical_date, + partition_key=run.partition_key, + partition_date=run.partition_date, + ) + for run in dry_run.backfills + ] + console.print("Runs to be attempted:") + console.print(tabulate(rows, tablefmt="grid", headers="keys")) + return + + api_client.backfills.create(backfill=backfill_body) # type: ignore[arg-type] diff --git a/airflow-core/tests/unit/cli/commands/test_backfill_command.py b/airflow-core/tests/unit/cli/commands/test_backfill_command.py index ab6c937834609..106ba61d46422 100644 --- a/airflow-core/tests/unit/cli/commands/test_backfill_command.py +++ b/airflow-core/tests/unit/cli/commands/test_backfill_command.py @@ -18,11 +18,9 @@ from __future__ import annotations import argparse -import os from datetime import datetime -from unittest import mock +from types import SimpleNamespace -import pendulum import pytest import airflow.cli.commands.backfill_command @@ -30,18 +28,7 @@ from airflow.cli import cli_parser from airflow.models.backfill import ReprocessBehavior -from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.db import clear_db_backfills, clear_db_dags, clear_db_runs, parse_and_sync_to_db - DEFAULT_DATE = timezone.make_aware(datetime(2015, 1, 1), timezone=timezone.utc) -if pendulum.__version__.startswith("3"): - DEFAULT_DATE_REPR = DEFAULT_DATE.isoformat(sep=" ") -else: - DEFAULT_DATE_REPR = DEFAULT_DATE.isoformat() - -# TODO: Check if tests needs side effects - locally there's missing DAG - -pytestmark = pytest.mark.db_test class TestCliBackfill: @@ -49,33 +36,10 @@ class TestCliBackfill: @classmethod def setup_class(cls): - with conf_vars({("core", "load_examples"): "True"}): - parse_and_sync_to_db(os.devnull) cls.parser = cli_parser.get_parser() - @classmethod - def teardown_class(cls) -> None: - clear_db_runs() - clear_db_dags() - clear_db_backfills() - - def setup_method(self): - clear_db_runs() # clean-up all dag run before start each test - clear_db_dags() - clear_db_backfills() - - @mock.patch("airflow.cli.commands.backfill_command._create_backfill") - @pytest.mark.parametrize( - ("repro", "expected_repro"), - [ - (None, None), - ("none", ReprocessBehavior.NONE), - ("completed", ReprocessBehavior.COMPLETED), - ("failed", ReprocessBehavior.FAILED), - ], - ) - def test_backfill(self, mock_create, repro, expected_repro): - args = [ + def _base_args(self) -> list[str]: + return [ "backfill", "create", "--dag-id", @@ -85,160 +49,73 @@ def test_backfill(self, mock_create, repro, expected_repro): "--to-date", DEFAULT_DATE.isoformat(), ] + + @pytest.mark.parametrize( + ("repro", "expected_repro"), + [ + (None, ReprocessBehavior.NONE), + ("none", ReprocessBehavior.NONE), + ("completed", ReprocessBehavior.COMPLETED), + ("failed", ReprocessBehavior.FAILED), + ], + ) + def test_backfill(self, mock_cli_api_client, repro, expected_repro): + args = self._base_args() if repro is not None: - args.extend( - [ - "--reprocess-behavior", - repro, - ] - ) - # When --run-on-latest-version is not passed, the resolver kicks in. - # With no DAG config and no global config set, the backfill fallback=True applies, - # preserving the historical default. + args.extend(["--reprocess-behavior", repro]) airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args)) - mock_create.assert_called_once_with( - dag_id="example_bash_operator", - from_date=DEFAULT_DATE, - to_date=DEFAULT_DATE, - max_active_runs=None, - reverse=False, - dag_run_conf=None, - reprocess_behavior=expected_repro, - triggering_user_name="root", - run_on_latest_version=True, - ) - - @mock.patch("airflow.cli.commands.backfill_command._create_backfill") - def test_backfill_with_run_on_latest_version(self, mock_create): - args = [ - "backfill", - "create", - "--dag-id", - "example_bash_operator", - "--from-date", - DEFAULT_DATE.isoformat(), - "--to-date", - DEFAULT_DATE.isoformat(), - "--run-on-latest-version", - ] + mock_cli_api_client.backfills.create.assert_called_once() + mock_cli_api_client.backfills.create_dry_run.assert_not_called() + body = mock_cli_api_client.backfills.create.call_args.kwargs["backfill"] + assert body.dag_id == "example_bash_operator" + assert body.from_date == DEFAULT_DATE + assert body.to_date == DEFAULT_DATE + assert body.run_backwards is False + assert body.dag_run_conf is None + assert body.reprocess_behavior == expected_repro + # Not passed on the command line; the API server resolves the fallback. + assert body.run_on_latest_version is None + + def test_backfill_with_run_on_latest_version(self, mock_cli_api_client): + args = [*self._base_args(), "--run-on-latest-version"] airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args)) - mock_create.assert_called_once_with( - dag_id="example_bash_operator", - from_date=DEFAULT_DATE, - to_date=DEFAULT_DATE, - max_active_runs=None, - reverse=False, - dag_run_conf=None, - reprocess_behavior=None, - run_on_latest_version=True, - triggering_user_name="root", - ) + body = mock_cli_api_client.backfills.create.call_args.kwargs["backfill"] + assert body.run_on_latest_version is True - @mock.patch("airflow.cli.commands.backfill_command._do_dry_run") - @pytest.mark.parametrize( - "reverse", - [False, True], - ) - def test_backfill_dry_run(self, mock_dry_run, reverse): - args = [ - "backfill", - "create", - "--dag-id", - "example_bash_operator", - "--from-date", - DEFAULT_DATE.isoformat(), - "--to-date", - DEFAULT_DATE.isoformat(), - "--dry-run", - "--reprocess-behavior", - "none", - ] + @pytest.mark.parametrize("reverse", [False, True]) + def test_backfill_dry_run(self, mock_cli_api_client, reverse): + mock_cli_api_client.backfills.create_dry_run.return_value = SimpleNamespace( + backfills=[SimpleNamespace(logical_date=DEFAULT_DATE, partition_key=None, partition_date=None)] + ) + args = [*self._base_args(), "--dry-run", "--reprocess-behavior", "none"] if reverse: args.append("--run-backwards") airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args)) - mock_dry_run.assert_called_once_with( - dag_id="example_bash_operator", - from_date=DEFAULT_DATE.replace(tzinfo=timezone.utc), - to_date=DEFAULT_DATE.replace(tzinfo=timezone.utc), - reverse=reverse, - reprocess_behavior="none", - session=mock.ANY, - ) + mock_cli_api_client.backfills.create_dry_run.assert_called_once() + mock_cli_api_client.backfills.create.assert_not_called() + body = mock_cli_api_client.backfills.create_dry_run.call_args.kwargs["backfill"] + assert body.run_backwards is reverse + assert body.reprocess_behavior == ReprocessBehavior.NONE - @mock.patch("airflow.cli.commands.backfill_command._create_backfill") - def test_backfill_with_dag_run_conf(self, mock_create): - """Test that dag_run_conf is properly parsed from JSON string.""" - args = [ - "backfill", - "create", - "--dag-id", - "example_bash_operator", - "--from-date", - DEFAULT_DATE.isoformat(), - "--to-date", - DEFAULT_DATE.isoformat(), - "--dag-run-conf", - '{"example_key": "example_value"}', - ] + def test_backfill_with_dag_run_conf(self, mock_cli_api_client): + args = [*self._base_args(), "--dag-run-conf", '{"example_key": "example_value"}'] airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args)) - mock_create.assert_called_once_with( - dag_id="example_bash_operator", - from_date=DEFAULT_DATE, - to_date=DEFAULT_DATE, - max_active_runs=None, - reverse=False, - dag_run_conf={"example_key": "example_value"}, - reprocess_behavior=None, - triggering_user_name="root", - run_on_latest_version=True, - ) + body = mock_cli_api_client.backfills.create.call_args.kwargs["backfill"] + assert body.dag_run_conf == {"example_key": "example_value"} - def test_backfill_with_invalid_dag_run_conf(self): - """Test that invalid JSON in dag_run_conf raises ValueError.""" - args = [ - "backfill", - "create", - "--dag-id", - "example_bash_operator", - "--from-date", - DEFAULT_DATE.isoformat(), - "--to-date", - DEFAULT_DATE.isoformat(), - "--dag-run-conf", - '{"invalid": json}', # Invalid JSON - ] + def test_backfill_with_invalid_dag_run_conf(self, mock_cli_api_client): + args = [*self._base_args(), "--dag-run-conf", '{"invalid": json}'] with pytest.raises(ValueError, match="Invalid JSON in --dag-run-conf"): airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args)) + mock_cli_api_client.backfills.create.assert_not_called() - @mock.patch("airflow.cli.commands.backfill_command._create_backfill") - def test_backfill_with_empty_dag_run_conf(self, mock_create): - """Test that empty dag_run_conf is properly parsed.""" - args = [ - "backfill", - "create", - "--dag-id", - "example_bash_operator", - "--from-date", - DEFAULT_DATE.isoformat(), - "--to-date", - DEFAULT_DATE.isoformat(), - "--dag-run-conf", - "{}", - ] + def test_backfill_with_empty_dag_run_conf(self, mock_cli_api_client): + args = [*self._base_args(), "--dag-run-conf", "{}"] airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args)) - mock_create.assert_called_once_with( - dag_id="example_bash_operator", - from_date=DEFAULT_DATE, - to_date=DEFAULT_DATE, - max_active_runs=None, - reverse=False, - dag_run_conf={}, - reprocess_behavior=None, - triggering_user_name="root", - run_on_latest_version=True, - ) + body = mock_cli_api_client.backfills.create.call_args.kwargs["backfill"] + assert body.dag_run_conf == {} diff --git a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py index b4eb6840c9069..2cbc23ca032ad 100644 --- a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py +++ b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py @@ -30,7 +30,7 @@ import pytest -from airflow.cli.commands import asset_command, dag_command, pool_command +from airflow.cli.commands import asset_command, backfill_command, dag_command, pool_command from airflow.exceptions import RemovedInAirflow4Warning # (command callable, argv to parse, expected airflowctl replacement named in the warning) @@ -52,6 +52,11 @@ ["assets", "materialize", "--name=foo"], "airflowctl assets materialize", ), + ( + backfill_command.create_backfill, + ["backfill", "create", "--dag-id", "foo", "--from-date", "2024-01-01", "--to-date", "2024-01-02"], + "airflowctl backfill create", + ), ] diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index e250b66e127dd..91ce014d57166 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -56,6 +56,7 @@ DAGVersionCollectionResponse, DagVersionResponse, DAGWarningCollectionResponse, + DryRunBackfillCollectionResponse, ImportErrorCollectionResponse, ImportErrorResponse, JobCollectionResponse, @@ -360,13 +361,15 @@ def create(self, backfill: BackfillPostBody) -> BackfillResponse | ServerRespons except ServerResponseError as e: raise e - def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse | ServerResponseError: + def create_dry_run( + self, backfill: BackfillPostBody + ) -> DryRunBackfillCollectionResponse | ServerResponseError: """Create a dry run backfill.""" try: self.response = self.client.post( "backfills/dry_run", json=backfill.model_dump(mode="json", exclude_none=True) ) - return BackfillResponse.model_validate_json(self.response.content) + return DryRunBackfillCollectionResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 52faecee73ea0..de2a01ab01830 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -75,6 +75,8 @@ DAGWarningCollectionResponse, DAGWarningResponse, DagWarningType, + DryRunBackfillCollectionResponse, + DryRunBackfillResponse, ImportErrorCollectionResponse, ImportErrorResponse, JobCollectionResponse, @@ -461,6 +463,10 @@ class TestBackfillOperations: backfills=[backfill_response], total_entries=1, ) + dry_run_collection_response = DryRunBackfillCollectionResponse( + backfills=[DryRunBackfillResponse(logical_date=datetime.datetime(2025, 1, 1, 0, 0, 0))], + total_entries=1, + ) def test_create(self): expected_body = self.backfill_body.model_dump(mode="json", exclude_none=True) @@ -482,11 +488,11 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v2/backfills/dry_run" assert request.headers.get("content-type", "").startswith("application/json") assert json.loads(request.content.decode()) == expected_body - return httpx.Response(200, json=json.loads(self.backfill_response.model_dump_json())) + return httpx.Response(200, json=json.loads(self.dry_run_collection_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) response = client.backfills.create_dry_run(backfill=self.backfill_body) - assert response == self.backfill_response + assert response == self.dry_run_collection_response def test_get(self): def handle_request(request: httpx.Request) -> httpx.Response: