From 0864d30a11001afc82218e5fb38bf6a3d1cbcea6 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 15 Jun 2026 09:40:04 +0900 Subject: [PATCH] refactor: [AIP-94] airflowctl jobs: add check command Signed-off-by: PoAn Yang --- .../howto/docker-compose/docker-compose.yaml | 21 +++ .../src/airflow/cli/commands/jobs_command.py | 31 ++--- .../cli/commands/test_command_deprecations.py | 3 +- .../unit/cli/commands/test_jobs_command.py | 123 +++++++----------- airflow-ctl/docs/images/command_hashes.txt | 2 +- airflow-ctl/docs/images/output_jobs.svg | 66 +++++----- airflow-ctl/src/airflowctl/ctl/cli_config.py | 49 +++++++ .../airflowctl/ctl/commands/jobs_command.py | 47 +++++++ .../ctl/commands/test_jobs_command.py | 123 ++++++++++++++++++ chart/templates/_helpers.yaml | 22 +++- .../airflow_core/test_dag_processor.py | 8 +- .../helm_tests/airflow_core/test_scheduler.py | 15 +-- .../helm_tests/airflow_core/test_triggerer.py | 8 +- 13 files changed, 367 insertions(+), 151 deletions(-) create mode 100644 airflow-ctl/src/airflowctl/ctl/commands/jobs_command.py create mode 100644 airflow-ctl/tests/airflow_ctl/ctl/commands/test_jobs_command.py diff --git a/airflow-core/docs/howto/docker-compose/docker-compose.yaml b/airflow-core/docs/howto/docker-compose/docker-compose.yaml index 6a8891f3f58cc..c4682d2f21cec 100644 --- a/airflow-core/docs/howto/docker-compose/docker-compose.yaml +++ b/airflow-core/docs/howto/docker-compose/docker-compose.yaml @@ -139,6 +139,11 @@ services: airflow-scheduler: <<: *airflow-common command: scheduler + # ``airflow jobs check`` (run by the healthcheck) talks to the API server; point the CLI + # at it here rather than globally so the api-server keeps its host-facing base_url. + environment: + <<: *airflow-common-env + AIRFLOW__API__BASE_URL: http://airflow-apiserver:8080 healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] interval: 30s @@ -148,12 +153,19 @@ services: restart: always depends_on: <<: *airflow-common-depends-on + airflow-apiserver: + condition: service_healthy airflow-init: condition: service_completed_successfully airflow-dag-processor: <<: *airflow-common command: dag-processor + # ``airflow jobs check`` (run by the healthcheck) talks to the API server; point the CLI + # at it here rather than globally so the api-server keeps its host-facing base_url. + environment: + <<: *airflow-common-env + AIRFLOW__API__BASE_URL: http://airflow-apiserver:8080 healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"'] interval: 30s @@ -163,6 +175,8 @@ services: restart: always depends_on: <<: *airflow-common-depends-on + airflow-apiserver: + condition: service_healthy airflow-init: condition: service_completed_successfully @@ -192,6 +206,11 @@ services: airflow-triggerer: <<: *airflow-common command: triggerer + # ``airflow jobs check`` (run by the healthcheck) talks to the API server; point the CLI + # at it here rather than globally so the api-server keeps its host-facing base_url. + environment: + <<: *airflow-common-env + AIRFLOW__API__BASE_URL: http://airflow-apiserver:8080 healthcheck: test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] interval: 30s @@ -201,6 +220,8 @@ services: restart: always depends_on: <<: *airflow-common-depends-on + airflow-apiserver: + condition: service_healthy airflow-init: condition: service_completed_successfully diff --git a/airflow-core/src/airflow/cli/commands/jobs_command.py b/airflow-core/src/airflow/cli/commands/jobs_command.py index 194d8720db2f1..72fe350201b2d 100644 --- a/airflow-core/src/airflow/cli/commands/jobs_command.py +++ b/airflow-core/src/airflow/cli/commands/jobs_command.py @@ -16,39 +16,28 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING - -from sqlalchemy import select - -from airflow.jobs.job import Job, JobState +from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client +from airflow.cli.utils import deprecated_for_airflowctl +from airflow.utils.cli import suppress_logs_and_warning from airflow.utils.net import get_hostname from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.utils.session import NEW_SESSION, provide_session - -if TYPE_CHECKING: - from sqlalchemy.orm import Session +@deprecated_for_airflowctl("airflowctl jobs check") +@suppress_logs_and_warning @providers_configuration_loaded -@provide_session -def check(args, *, session: Session = NEW_SESSION) -> None: +@provide_api_client +def check(args, api_client: Client = NEW_API_CLIENT) -> None: """Check if job(s) are still alive.""" if args.allow_multiple and args.limit <= 1: raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.") if args.hostname and args.local: raise SystemExit("You can't use --hostname and --local at the same time") - query = select(Job).where(Job.state == JobState.RUNNING).order_by(Job.latest_heartbeat.desc()) - if args.job_type: - query = query.where(Job.job_type == args.job_type) - if args.hostname: - query = query.where(Job.hostname == args.hostname) - if args.local: - query = query.where(Job.hostname == get_hostname()) + hostname = get_hostname() if args.local else args.hostname + alive_jobs = api_client.jobs.list(job_type=args.job_type, hostname=hostname, is_alive=True).jobs if args.limit > 0: - query = query.limit(args.limit) - - alive_jobs: list[Job] = [job for job in session.scalars(query) if job.is_alive()] + alive_jobs = alive_jobs[: args.limit] count_alive_jobs = len(alive_jobs) if count_alive_jobs == 0: diff --git a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py index b4eb6840c9069..59dee6d8a5223 100644 --- a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py +++ b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py @@ -30,7 +30,7 @@ import pytest -from airflow.cli.commands import asset_command, dag_command, pool_command +from airflow.cli.commands import asset_command, dag_command, jobs_command, pool_command from airflow.exceptions import RemovedInAirflow4Warning # (command callable, argv to parse, expected airflowctl replacement named in the warning) @@ -52,6 +52,7 @@ ["assets", "materialize", "--name=foo"], "airflowctl assets materialize", ), + (jobs_command.check, ["jobs", "check"], "airflowctl jobs check"), ] diff --git a/airflow-core/tests/unit/cli/commands/test_jobs_command.py b/airflow-core/tests/unit/cli/commands/test_jobs_command.py index 9d17bde437cf3..6673092c331e1 100644 --- a/airflow-core/tests/unit/cli/commands/test_jobs_command.py +++ b/airflow-core/tests/unit/cli/commands/test_jobs_command.py @@ -16,55 +16,36 @@ # under the License. from __future__ import annotations +from types import SimpleNamespace +from unittest import mock + import pytest from airflow.cli import cli_parser from airflow.cli.commands import jobs_command -from airflow.jobs.job import Job, JobState -from airflow.jobs.scheduler_job_runner import SchedulerJobRunner -from airflow.utils.session import create_session -from airflow.utils.state import State -from tests_common.test_utils.db import clear_db_jobs + +def _jobs(count: int) -> SimpleNamespace: + """Stand in for the airflowctl ``JobCollectionResponse`` of ``count`` already-alive jobs.""" + return SimpleNamespace(jobs=[SimpleNamespace(id=i) for i in range(count)]) -@pytest.mark.db_test -class TestCliConfigList: +class TestCliJobsCheck: @classmethod def setup_class(cls): cls.parser = cli_parser.get_parser() - def setup_method(self) -> None: - clear_db_jobs() - self.scheduler_job = None - self.job_runner = None - - def teardown_method(self) -> None: - clear_db_jobs() - - def test_should_report_success_for_one_working_scheduler(self, stdout_capture): - with create_session() as session: - self.scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=self.scheduler_job) - self.scheduler_job.state = State.RUNNING - session.add(self.scheduler_job) - session.commit() - self.scheduler_job.heartbeat(heartbeat_callback=self.job_runner.heartbeat_callback) - + def test_reports_one_alive_job(self, mock_cli_api_client, stdout_capture): + mock_cli_api_client.jobs.list.return_value = _jobs(1) with stdout_capture as temp_stdout: jobs_command.check(self.parser.parse_args(["jobs", "check", "--job-type", "SchedulerJob"])) assert "Found one alive job." in temp_stdout.getvalue() + mock_cli_api_client.jobs.list.assert_called_once_with( + job_type="SchedulerJob", hostname=None, is_alive=True + ) - def test_should_report_success_for_one_working_scheduler_with_hostname(self, stdout_capture): - with create_session() as session: - self.scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=self.scheduler_job) - self.scheduler_job.state = State.RUNNING - self.scheduler_job.hostname = "HOSTNAME" - session.add(self.scheduler_job) - session.commit() - self.scheduler_job.heartbeat(heartbeat_callback=self.job_runner.heartbeat_callback) - + def test_forwards_hostname(self, mock_cli_api_client, stdout_capture): + mock_cli_api_client.jobs.list.return_value = _jobs(1) with stdout_capture as temp_stdout: jobs_command.check( self.parser.parse_args( @@ -72,20 +53,12 @@ def test_should_report_success_for_one_working_scheduler_with_hostname(self, std ) ) assert "Found one alive job." in temp_stdout.getvalue() + mock_cli_api_client.jobs.list.assert_called_once_with( + job_type="SchedulerJob", hostname="HOSTNAME", is_alive=True + ) - def test_should_report_success_for_ha_schedulers(self, stdout_capture): - scheduler_jobs = [] - job_runners = [] - with create_session() as session: - for _ in range(3): - scheduler_job = Job() - job_runner = SchedulerJobRunner(job=scheduler_job) - scheduler_job.state = State.RUNNING - session.add(scheduler_job) - scheduler_jobs.append(scheduler_job) - job_runners.append(job_runner) - session.commit() - scheduler_job.heartbeat(heartbeat_callback=job_runner.heartbeat_callback) + def test_reports_multiple_with_allow_multiple(self, mock_cli_api_client, stdout_capture): + mock_cli_api_client.jobs.list.return_value = _jobs(3) with stdout_capture as temp_stdout: jobs_command.check( self.parser.parse_args( @@ -94,38 +67,13 @@ def test_should_report_success_for_ha_schedulers(self, stdout_capture): ) assert "Found 3 alive jobs." in temp_stdout.getvalue() - def test_should_ignore_not_running_jobs(self): - scheduler_jobs = [] - job_runners = [] - with create_session() as session: - for _ in range(3): - scheduler_job = Job() - job_runner = SchedulerJobRunner(job=scheduler_job) - scheduler_job.state = JobState.FAILED - session.add(scheduler_job) - scheduler_jobs.append(scheduler_job) - job_runners.append(job_runner) - session.commit() - # No alive jobs found. + def test_no_alive_jobs(self, mock_cli_api_client): + mock_cli_api_client.jobs.list.return_value = _jobs(0) with pytest.raises(SystemExit, match=r"No alive jobs found."): jobs_command.check(self.parser.parse_args(["jobs", "check"])) - def test_should_raise_exception_for_multiple_scheduler_on_one_host(self): - scheduler_jobs = [] - job_runners = [] - with create_session() as session: - for _ in range(3): - scheduler_job = Job() - job_runner = SchedulerJobRunner(job=scheduler_job) - job_runner.job = scheduler_job - scheduler_job.state = State.RUNNING - scheduler_job.hostname = "HOSTNAME" - session.add(scheduler_job) - scheduler_jobs.append(scheduler_job) - job_runners.append(job_runner) - session.commit() - scheduler_job.heartbeat(heartbeat_callback=job_runner.heartbeat_callback) - + def test_multiple_without_allow_multiple_fails(self, mock_cli_api_client): + mock_cli_api_client.jobs.list.return_value = _jobs(3) with pytest.raises(SystemExit, match=r"Found 3 alive jobs. Expected only one."): jobs_command.check( self.parser.parse_args( @@ -140,9 +88,30 @@ def test_should_raise_exception_for_multiple_scheduler_on_one_host(self): ) ) - def test_should_raise_exception_for_allow_multiple_and_limit_1(self): + def test_allow_multiple_requires_limit_above_one(self, mock_cli_api_client): with pytest.raises( SystemExit, match=r"To use option --allow-multiple, you must set the limit to a value greater than 1.", ): jobs_command.check(self.parser.parse_args(["jobs", "check", "--allow-multiple"])) + mock_cli_api_client.jobs.list.assert_not_called() + + def test_hostname_and_local_are_mutually_exclusive(self, mock_cli_api_client): + with pytest.raises(SystemExit, match=r"You can't use --hostname and --local at the same time"): + jobs_command.check(self.parser.parse_args(["jobs", "check", "--hostname", "h", "--local"])) + mock_cli_api_client.jobs.list.assert_not_called() + + def test_local_resolves_hostname(self, mock_cli_api_client, stdout_capture): + mock_cli_api_client.jobs.list.return_value = _jobs(1) + with mock.patch("airflow.cli.commands.jobs_command.get_hostname", return_value="local-host"): + jobs_command.check(self.parser.parse_args(["jobs", "check", "--local"])) + mock_cli_api_client.jobs.list.assert_called_once_with( + job_type=None, hostname="local-host", is_alive=True + ) + + def test_limit_is_applied_client_side(self, mock_cli_api_client, stdout_capture): + # The API returns all alive jobs (heartbeat desc); --limit trims to the most recent N. + mock_cli_api_client.jobs.list.return_value = _jobs(5) + with stdout_capture as stdout: + jobs_command.check(self.parser.parse_args(["jobs", "check", "--limit", "1"])) + assert "Found one alive job." in stdout.getvalue() diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 53c93e7546d1e..7bfff72dad531 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -6,7 +6,7 @@ config:a3d936cb15fe3b547bf6c82cf93d923f connections:942f9f88cb908c28bf5c19159fc5065b dags:6b38e6bcd491bc1941e7814b77e63bde dagrun:c32e0011aa9a845456c778786717208e -jobs:a5b644c5da8889443bb40ee10b599270 +jobs:d4af478f28dae48ee18d43b998edc345 pools:19efe105b9515ab1926ebcaf0e028d71 providers:34502fe09dc0b8b0a13e7e46efdffda6 variables:f8fc76d3d398b2780f4e97f7cd816646 diff --git a/airflow-ctl/docs/images/output_jobs.svg b/airflow-ctl/docs/images/output_jobs.svg index 13b31d2caedc4..9ff6f8753c162 100644 --- a/airflow-ctl/docs/images/output_jobs.svg +++ b/airflow-ctl/docs/images/output_jobs.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + + + + - + - + - - Usage:airflowctl jobs [-hCOMMAND... - -Perform Jobs operations - -Positional Arguments: -COMMAND -listList scheduler, triggerer, and other Airflow jobs - -Options: --h--helpshow this help message and exit + + Usage:airflowctl jobs [-hCOMMAND... + +Perform Jobs operations + +Positional Arguments: +COMMAND +checkCheck if job(s) are still alive. +listList scheduler, triggerer, and other Airflow jobs + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 11ff4542e01ef..664aee0ae77ca 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -276,6 +276,35 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: choices=("overwrite", "fail", "skip"), ) +# Jobs command args +ARG_JOB_TYPE_FILTER = Arg( + flags=("--job-type",), + choices=("SchedulerJob", "TriggererJob", "DagProcessorJob"), + help="The type of job(s) that will be checked.", +) +ARG_JOB_HOSTNAME_FILTER = Arg( + flags=("--hostname",), + type=str, + default=None, + help="The hostname of job(s) that will be checked.", +) +ARG_JOB_LOCAL_FILTER = Arg( + flags=("--local",), + action="store_true", + help="If passed, this command will only show jobs from the local host.", +) +ARG_JOB_LIMIT = Arg( + flags=("--limit",), + type=positive_int(allow_zero=True), + default=1, + help="The number of recent jobs that will be checked. To disable limit, set 0.", +) +ARG_JOB_ALLOW_MULTIPLE = Arg( + flags=("--allow-multiple",), + action="store_true", + help="If passed, this command will be successful even if multiple matching alive jobs are found.", +) + # Config arguments ARG_CONFIG_SECTION = Arg( flags=("--section",), @@ -1015,6 +1044,21 @@ def merge_commands( ), ) +JOB_COMMANDS = ( + ActionCommand( + name="check", + help="Check if job(s) are still alive.", + func=lazy_load_command("airflowctl.ctl.commands.jobs_command.check"), + args=( + ARG_JOB_TYPE_FILTER, + ARG_JOB_HOSTNAME_FILTER, + ARG_JOB_LOCAL_FILTER, + ARG_JOB_LIMIT, + ARG_JOB_ALLOW_MULTIPLE, + ), + ), +) + core_commands: list[CLICommand] = [ GroupCommand( name="auth", @@ -1037,6 +1081,11 @@ def merge_commands( help="Manage Airflow Dags", subcommands=DAG_COMMANDS, ), + GroupCommand( + name="jobs", + help="Manage Airflow jobs", + subcommands=JOB_COMMANDS, + ), GroupCommand( name="pools", help="Manage Airflow pools", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/jobs_command.py b/airflow-ctl/src/airflowctl/ctl/commands/jobs_command.py new file mode 100644 index 0000000000000..d15aead0677f5 --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/commands/jobs_command.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import socket + +import rich + +from airflowctl.api.client import NEW_API_CLIENT, ClientKind, provide_api_client + + +@provide_api_client(kind=ClientKind.CLI) +def check(args, api_client=NEW_API_CLIENT) -> None: + """Check if job(s) are still alive.""" + if args.allow_multiple and args.limit <= 1: + raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.") + if args.hostname and args.local: + raise SystemExit("You can't use --hostname and --local at the same time") + + hostname = socket.getfqdn() if args.local else args.hostname + alive_jobs = api_client.jobs.list(job_type=args.job_type, hostname=hostname, is_alive=True).jobs + if args.limit > 0: + alive_jobs = alive_jobs[: args.limit] + + count_alive_jobs = len(alive_jobs) + if count_alive_jobs == 0: + raise SystemExit("No alive jobs found.") + if count_alive_jobs > 1 and not args.allow_multiple: + raise SystemExit(f"Found {count_alive_jobs} alive jobs. Expected only one.") + if count_alive_jobs == 1: + rich.print("Found one alive job.") + else: + rich.print(f"Found {count_alive_jobs} alive jobs.") diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_jobs_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_jobs_command.py new file mode 100644 index 0000000000000..93539c50ef8c2 --- /dev/null +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_jobs_command.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from types import SimpleNamespace +from unittest import mock + +import pytest + +from airflowctl.ctl import cli_parser +from airflowctl.ctl.commands import jobs_command + + +def _jobs(count: int) -> SimpleNamespace: + """Stand in for the ``JobCollectionResponse`` of ``count`` already-alive jobs.""" + return SimpleNamespace(jobs=[SimpleNamespace(id=i) for i in range(count)]) + + +class TestCliJobsCheck: + parser = cli_parser.get_parser() + + def test_reports_one_alive_job(self, capsys): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(1) + + jobs_command.check( + self.parser.parse_args(["jobs", "check", "--job-type", "SchedulerJob"]), + api_client=api_client, + ) + + assert "Found one alive job." in capsys.readouterr().out + api_client.jobs.list.assert_called_once_with(job_type="SchedulerJob", hostname=None, is_alive=True) + + def test_forwards_hostname(self, capsys): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(1) + + jobs_command.check( + self.parser.parse_args(["jobs", "check", "--hostname", "HOSTNAME"]), + api_client=api_client, + ) + + api_client.jobs.list.assert_called_once_with(job_type=None, hostname="HOSTNAME", is_alive=True) + + def test_reports_multiple_with_allow_multiple(self, capsys): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(3) + + jobs_command.check( + self.parser.parse_args(["jobs", "check", "--limit", "100", "--allow-multiple"]), + api_client=api_client, + ) + + assert "Found 3 alive jobs." in capsys.readouterr().out + + def test_no_alive_jobs(self): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(0) + + with pytest.raises(SystemExit, match=r"No alive jobs found."): + jobs_command.check(self.parser.parse_args(["jobs", "check"]), api_client=api_client) + + def test_multiple_without_allow_multiple_fails(self): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(3) + + with pytest.raises(SystemExit, match=r"Found 3 alive jobs. Expected only one."): + jobs_command.check( + self.parser.parse_args(["jobs", "check", "--limit", "100"]), api_client=api_client + ) + + def test_allow_multiple_requires_limit_above_one(self): + api_client = mock.MagicMock() + + with pytest.raises( + SystemExit, + match=r"To use option --allow-multiple, you must set the limit to a value greater than 1.", + ): + jobs_command.check( + self.parser.parse_args(["jobs", "check", "--allow-multiple"]), api_client=api_client + ) + api_client.jobs.list.assert_not_called() + + def test_hostname_and_local_are_mutually_exclusive(self): + api_client = mock.MagicMock() + + with pytest.raises(SystemExit, match=r"You can't use --hostname and --local at the same time"): + jobs_command.check( + self.parser.parse_args(["jobs", "check", "--hostname", "h", "--local"]), + api_client=api_client, + ) + api_client.jobs.list.assert_not_called() + + def test_local_resolves_hostname(self): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(1) + + with mock.patch("airflowctl.ctl.commands.jobs_command.socket.getfqdn", return_value="local-host"): + jobs_command.check(self.parser.parse_args(["jobs", "check", "--local"]), api_client=api_client) + + api_client.jobs.list.assert_called_once_with(job_type=None, hostname="local-host", is_alive=True) + + def test_limit_is_applied_client_side(self, capsys): + api_client = mock.MagicMock() + api_client.jobs.list.return_value = _jobs(5) + + jobs_command.check(self.parser.parse_args(["jobs", "check", "--limit", "1"]), api_client=api_client) + + assert "Found one alive job." in capsys.readouterr().out diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index 2fefc497905d1..4a426a23f8a15 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -770,11 +770,25 @@ server_tls_key_file = /etc/pgbouncer/server.key - --migration-wait-timeout={{ .Values.images.migrationsWaitTimeout }} {{- end }} +{{/* +Internal URL of the api-server. ``airflow jobs check`` now talks to the API server, so the +health/liveness probes point the CLI at it explicitly (the CLI otherwise defaults to +localhost, where no api-server runs inside the scheduler/triggerer/dag-processor pods). +Mirrors the api-server URL the chart computes for ``core.execution_api_server_url``. +*/}} +{{- define "airflow.apiServerInternalUrl" -}} +{{- $basePath := "" -}} +{{- if and .Values.config.api .Values.config.api.base_url -}} +{{- with urlParse .Values.config.api.base_url }}{{ $basePath = (trimSuffix "/" .path) }}{{ end -}} +{{- end -}} +{{- printf "http://%s-api-server:%d%s" (include "airflow.fullname" .) (int .Values.ports.apiServer) $basePath -}} +{{- end -}} + {{- define "scheduler_liveness_check_command" }} - sh - -c - | - CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ + CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR AIRFLOW__API__BASE_URL={{ include "airflow.apiServerInternalUrl" . }} exec /entrypoint \ airflow jobs check --job-type SchedulerJob --local {{- end }} @@ -783,7 +797,7 @@ server_tls_key_file = /etc/pgbouncer/server.key - sh - -c - | - CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ + CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR AIRFLOW__API__BASE_URL={{ include "airflow.apiServerInternalUrl" . }} exec /entrypoint \ airflow jobs check --job-type SchedulerJob --local {{- end }} @@ -791,7 +805,7 @@ server_tls_key_file = /etc/pgbouncer/server.key - sh - -c - | - CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ + CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR AIRFLOW__API__BASE_URL={{ include "airflow.apiServerInternalUrl" . }} exec /entrypoint \ airflow jobs check --job-type TriggererJob --local {{- end }} @@ -799,7 +813,7 @@ server_tls_key_file = /etc/pgbouncer/server.key - sh - -c - | - CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \ + CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR AIRFLOW__API__BASE_URL={{ include "airflow.apiServerInternalUrl" . }} exec /entrypoint \ airflow jobs check --local --job-type DagProcessorJob {{- end }} diff --git a/chart/tests/helm_tests/airflow_core/test_dag_processor.py b/chart/tests/helm_tests/airflow_core/test_dag_processor.py index 9bc1f6ebb2e84..5c82b02b3a310 100644 --- a/chart/tests/helm_tests/airflow_core/test_dag_processor.py +++ b/chart/tests/helm_tests/airflow_core/test_dag_processor.py @@ -429,10 +429,10 @@ def test_livenessprobe_values_are_configurable(self): def test_livenessprobe_command(self): docs = render_chart(show_only=["templates/dag-processor/dag-processor-deployment.yaml"]) - assert ( - "airflow jobs check --local --job-type DagProcessorJob" - in jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1] - ) + command = jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1] + assert "airflow jobs check --local --job-type DagProcessorJob" in command + # ``airflow jobs check`` talks to the API server, so the probe must point the CLI at it. + assert "AIRFLOW__API__BASE_URL=http://release-name-api-server:8080" in command @pytest.mark.parametrize( ("log_values", "expected_volume"), diff --git a/chart/tests/helm_tests/airflow_core/test_scheduler.py b/chart/tests/helm_tests/airflow_core/test_scheduler.py index 0322e27384004..1aea2cd0cdf4c 100644 --- a/chart/tests/helm_tests/airflow_core/test_scheduler.py +++ b/chart/tests/helm_tests/airflow_core/test_scheduler.py @@ -519,19 +519,18 @@ def test_livenessprobe_command(self): docs = render_chart( show_only=["templates/scheduler/scheduler-deployment.yaml"], ) - assert ( - "airflow jobs check --job-type SchedulerJob --local" - in jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1] - ) + command = jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1] + assert "airflow jobs check --job-type SchedulerJob --local" in command + # ``airflow jobs check`` talks to the API server, so the probe must point the CLI at it. + assert "AIRFLOW__API__BASE_URL=http://release-name-api-server:8080" in command def test_startupprobe_command_depends_on_airflow_version(self): docs = render_chart( show_only=["templates/scheduler/scheduler-deployment.yaml"], ) - assert ( - "airflow jobs check --job-type SchedulerJob --local" - in jmespath.search("spec.template.spec.containers[0].startupProbe.exec.command", docs[0])[-1] - ) + command = jmespath.search("spec.template.spec.containers[0].startupProbe.exec.command", docs[0])[-1] + assert "airflow jobs check --job-type SchedulerJob --local" in command + assert "AIRFLOW__API__BASE_URL=http://release-name-api-server:8080" in command @pytest.mark.parametrize( ("log_values", "expected_volume"), diff --git a/chart/tests/helm_tests/airflow_core/test_triggerer.py b/chart/tests/helm_tests/airflow_core/test_triggerer.py index 85643149de868..5785772147259 100644 --- a/chart/tests/helm_tests/airflow_core/test_triggerer.py +++ b/chart/tests/helm_tests/airflow_core/test_triggerer.py @@ -485,10 +485,10 @@ def test_livenessprobe_command_depends_on_airflow_version(self): docs = render_chart( show_only=["templates/triggerer/triggerer-deployment.yaml"], ) - assert ( - "airflow jobs check --job-type TriggererJob --local" - in jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1] - ) + command = jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1] + assert "airflow jobs check --job-type TriggererJob --local" in command + # ``airflow jobs check`` talks to the API server, so the probe must point the CLI at it. + assert "AIRFLOW__API__BASE_URL=http://release-name-api-server:8080" in command @pytest.mark.parametrize( ("log_values", "expected_volume"),