Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 35 additions & 74 deletions airflow-core/src/airflow/cli/commands/backfill_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,101 +18,62 @@
from __future__ import annotations

import json
import logging
import signal

from tabulate import tabulate

from airflow import settings
from airflow.api_fastapi.core_api.services.public.common import resolve_run_on_latest_version
from airflow.api_fastapi.core_api.datamodels.backfills import BackfillPostBody
from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import AirflowConfigException
from airflow.models.backfill import ReprocessBehavior, _create_backfill, _do_dry_run
from airflow.cli.utils import deprecated_for_airflowctl
from airflow.models.backfill import ReprocessBehavior
from airflow.utils import cli as cli_utils
from airflow.utils.cli import sigint_handler
from airflow.utils.platform import getuser
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import create_session

log = logging.getLogger(__name__)


@cli_utils.action_cli
@deprecated_for_airflowctl("airflowctl backfill create")
@providers_configuration_loaded
def create_backfill(args) -> None:
"""Create backfill job or dry run for a Dag or list of Dags using regex."""
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
@provide_api_client
def create_backfill(args, api_client: Client = NEW_API_CLIENT) -> None:
"""Create a backfill job, or dry run one, for a Dag through the API server."""
console = AirflowConsole()

if args.reprocess_behavior is not None:
reprocess_behavior = ReprocessBehavior(args.reprocess_behavior)
else:
reprocess_behavior = None

with create_session() as session:
resolved_run_on_latest = resolve_run_on_latest_version(
args.run_on_latest_version,
args.dag_id,
session,
fallback=True,
)

if args.dry_run:
console.print("Performing dry run of backfill.")
console.print("Printing params:")
params = dict(
dag_id=args.dag_id,
from_date=args.from_date,
to_date=args.to_date,
max_active_runs=args.max_active_runs,
reverse=args.run_backwards,
dag_run_conf=args.dag_run_conf,
reprocess_behavior=reprocess_behavior,
run_on_latest_version=resolved_run_on_latest,
)
for k, v in params.items():
console.print(f" - {k} = {v}")
with create_session() as session:
infos = _do_dry_run(
dag_id=args.dag_id,
from_date=args.from_date,
to_date=args.to_date,
reverse=args.run_backwards,
reprocess_behavior=args.reprocess_behavior,
session=session,
)
console.print("Runs to be attempted:")
rows = [
dict(logical_date=d.logical_date, partition_key=d.partition_key, partition_date=d.partition_date)
for d in infos
]
output = tabulate(rows, tablefmt="grid", headers="keys")
console.print(output)
return

try:
user = getuser()
except AirflowConfigException as e:
log.warning("Failed to get user name from os: %s, not setting the triggering user", e)
user = None

# Parse dag_run_conf if provided
dag_run_conf = None
if args.dag_run_conf:
try:
dag_run_conf = json.loads(args.dag_run_conf)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in --dag-run-conf: {e}")

_create_backfill(
# ``run_on_latest_version`` and the triggering user are resolved server-side; the
# core_api request model is wire-compatible with the airflowctl client's generated model.
body_kwargs: dict = dict(
dag_id=args.dag_id,
from_date=args.from_date,
to_date=args.to_date,
max_active_runs=args.max_active_runs,
reverse=args.run_backwards,
run_backwards=args.run_backwards,
dag_run_conf=dag_run_conf,
triggering_user_name=user,
reprocess_behavior=reprocess_behavior,
run_on_latest_version=resolved_run_on_latest,
run_on_latest_version=args.run_on_latest_version,
)
if args.reprocess_behavior is not None:
body_kwargs["reprocess_behavior"] = ReprocessBehavior(args.reprocess_behavior)
if args.max_active_runs is not None:
body_kwargs["max_active_runs"] = args.max_active_runs
backfill_body = BackfillPostBody(**body_kwargs)

if args.dry_run:
console.print("Performing dry run of backfill.")
dry_run = api_client.backfills.create_dry_run(backfill=backfill_body) # type: ignore[arg-type]
rows = [
dict(
logical_date=run.logical_date,
partition_key=run.partition_key,
partition_date=run.partition_date,
)
for run in dry_run.backfills
]
console.print("Runs to be attempted:")
console.print(tabulate(rows, tablefmt="grid", headers="keys"))
return

api_client.backfills.create(backfill=backfill_body) # type: ignore[arg-type]
Loading
Loading