Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions airflow-core/docs/howto/docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ services:
airflow-scheduler:
<<: *airflow-common
command: scheduler
# ``airflow jobs check`` (run by the healthcheck) talks to the API server; point the CLI
# at it here rather than globally so the api-server keeps its host-facing base_url.
environment:
<<: *airflow-common-env
AIRFLOW__API__BASE_URL: http://airflow-apiserver:8080
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 30s
Expand All @@ -148,12 +153,19 @@ services:
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-apiserver:
condition: service_healthy
airflow-init:
condition: service_completed_successfully

airflow-dag-processor:
<<: *airflow-common
command: dag-processor
# ``airflow jobs check`` (run by the healthcheck) talks to the API server; point the CLI
# at it here rather than globally so the api-server keeps its host-facing base_url.
environment:
<<: *airflow-common-env
AIRFLOW__API__BASE_URL: http://airflow-apiserver:8080
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"']
interval: 30s
Expand All @@ -163,6 +175,8 @@ services:
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-apiserver:
condition: service_healthy
airflow-init:
condition: service_completed_successfully

Expand Down Expand Up @@ -192,6 +206,11 @@ services:
airflow-triggerer:
<<: *airflow-common
command: triggerer
# ``airflow jobs check`` (run by the healthcheck) talks to the API server; point the CLI
# at it here rather than globally so the api-server keeps its host-facing base_url.
environment:
<<: *airflow-common-env
AIRFLOW__API__BASE_URL: http://airflow-apiserver:8080
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
Expand All @@ -201,6 +220,8 @@ services:
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-apiserver:
condition: service_healthy
airflow-init:
condition: service_completed_successfully

Expand Down
31 changes: 10 additions & 21 deletions airflow-core/src/airflow/cli/commands/jobs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,28 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import select

from airflow.jobs.job import Job, JobState
from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client
from airflow.cli.utils import deprecated_for_airflowctl
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.net import get_hostname
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session


@deprecated_for_airflowctl("airflowctl jobs check")
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
def check(args, *, session: Session = NEW_SESSION) -> None:
@provide_api_client
def check(args, api_client: Client = NEW_API_CLIENT) -> None:
"""Check if job(s) are still alive."""
if args.allow_multiple and args.limit <= 1:
raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.")
if args.hostname and args.local:
raise SystemExit("You can't use --hostname and --local at the same time")

query = select(Job).where(Job.state == JobState.RUNNING).order_by(Job.latest_heartbeat.desc())
if args.job_type:
query = query.where(Job.job_type == args.job_type)
if args.hostname:
query = query.where(Job.hostname == args.hostname)
if args.local:
query = query.where(Job.hostname == get_hostname())
hostname = get_hostname() if args.local else args.hostname
alive_jobs = api_client.jobs.list(job_type=args.job_type, hostname=hostname, is_alive=True).jobs
if args.limit > 0:
query = query.limit(args.limit)

alive_jobs: list[Job] = [job for job in session.scalars(query) if job.is_alive()]
alive_jobs = alive_jobs[: args.limit]

count_alive_jobs = len(alive_jobs)
if count_alive_jobs == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import pytest

from airflow.cli.commands import asset_command, dag_command, pool_command
from airflow.cli.commands import asset_command, dag_command, jobs_command, pool_command
from airflow.exceptions import RemovedInAirflow4Warning

# (command callable, argv to parse, expected airflowctl replacement named in the warning)
Expand All @@ -52,6 +52,7 @@
["assets", "materialize", "--name=foo"],
"airflowctl assets materialize",
),
(jobs_command.check, ["jobs", "check"], "airflowctl jobs check"),
]


Expand Down
123 changes: 46 additions & 77 deletions airflow-core/tests/unit/cli/commands/test_jobs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,49 @@
# under the License.
from __future__ import annotations

from types import SimpleNamespace
from unittest import mock

import pytest

from airflow.cli import cli_parser
from airflow.cli.commands import jobs_command
from airflow.jobs.job import Job, JobState
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.utils.session import create_session
from airflow.utils.state import State

from tests_common.test_utils.db import clear_db_jobs

def _jobs(count: int) -> SimpleNamespace:
"""Stand in for the airflowctl ``JobCollectionResponse`` of ``count`` already-alive jobs."""
return SimpleNamespace(jobs=[SimpleNamespace(id=i) for i in range(count)])


@pytest.mark.db_test
class TestCliConfigList:
class TestCliJobsCheck:
@classmethod
def setup_class(cls):
cls.parser = cli_parser.get_parser()

def setup_method(self) -> None:
clear_db_jobs()
self.scheduler_job = None
self.job_runner = None

def teardown_method(self) -> None:
clear_db_jobs()

def test_should_report_success_for_one_working_scheduler(self, stdout_capture):
with create_session() as session:
self.scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=self.scheduler_job)
self.scheduler_job.state = State.RUNNING
session.add(self.scheduler_job)
session.commit()
self.scheduler_job.heartbeat(heartbeat_callback=self.job_runner.heartbeat_callback)

def test_reports_one_alive_job(self, mock_cli_api_client, stdout_capture):
mock_cli_api_client.jobs.list.return_value = _jobs(1)
with stdout_capture as temp_stdout:
jobs_command.check(self.parser.parse_args(["jobs", "check", "--job-type", "SchedulerJob"]))
assert "Found one alive job." in temp_stdout.getvalue()
mock_cli_api_client.jobs.list.assert_called_once_with(
job_type="SchedulerJob", hostname=None, is_alive=True
)

def test_should_report_success_for_one_working_scheduler_with_hostname(self, stdout_capture):
with create_session() as session:
self.scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=self.scheduler_job)
self.scheduler_job.state = State.RUNNING
self.scheduler_job.hostname = "HOSTNAME"
session.add(self.scheduler_job)
session.commit()
self.scheduler_job.heartbeat(heartbeat_callback=self.job_runner.heartbeat_callback)

def test_forwards_hostname(self, mock_cli_api_client, stdout_capture):
mock_cli_api_client.jobs.list.return_value = _jobs(1)
with stdout_capture as temp_stdout:
jobs_command.check(
self.parser.parse_args(
["jobs", "check", "--job-type", "SchedulerJob", "--hostname", "HOSTNAME"]
)
)
assert "Found one alive job." in temp_stdout.getvalue()
mock_cli_api_client.jobs.list.assert_called_once_with(
job_type="SchedulerJob", hostname="HOSTNAME", is_alive=True
)

def test_should_report_success_for_ha_schedulers(self, stdout_capture):
scheduler_jobs = []
job_runners = []
with create_session() as session:
for _ in range(3):
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job)
scheduler_job.state = State.RUNNING
session.add(scheduler_job)
scheduler_jobs.append(scheduler_job)
job_runners.append(job_runner)
session.commit()
scheduler_job.heartbeat(heartbeat_callback=job_runner.heartbeat_callback)
def test_reports_multiple_with_allow_multiple(self, mock_cli_api_client, stdout_capture):
mock_cli_api_client.jobs.list.return_value = _jobs(3)
with stdout_capture as temp_stdout:
jobs_command.check(
self.parser.parse_args(
Expand All @@ -94,38 +67,13 @@ def test_should_report_success_for_ha_schedulers(self, stdout_capture):
)
assert "Found 3 alive jobs." in temp_stdout.getvalue()

def test_should_ignore_not_running_jobs(self):
scheduler_jobs = []
job_runners = []
with create_session() as session:
for _ in range(3):
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job)
scheduler_job.state = JobState.FAILED
session.add(scheduler_job)
scheduler_jobs.append(scheduler_job)
job_runners.append(job_runner)
session.commit()
# No alive jobs found.
def test_no_alive_jobs(self, mock_cli_api_client):
mock_cli_api_client.jobs.list.return_value = _jobs(0)
with pytest.raises(SystemExit, match=r"No alive jobs found."):
jobs_command.check(self.parser.parse_args(["jobs", "check"]))

def test_should_raise_exception_for_multiple_scheduler_on_one_host(self):
scheduler_jobs = []
job_runners = []
with create_session() as session:
for _ in range(3):
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job)
job_runner.job = scheduler_job
scheduler_job.state = State.RUNNING
scheduler_job.hostname = "HOSTNAME"
session.add(scheduler_job)
scheduler_jobs.append(scheduler_job)
job_runners.append(job_runner)
session.commit()
scheduler_job.heartbeat(heartbeat_callback=job_runner.heartbeat_callback)

def test_multiple_without_allow_multiple_fails(self, mock_cli_api_client):
mock_cli_api_client.jobs.list.return_value = _jobs(3)
with pytest.raises(SystemExit, match=r"Found 3 alive jobs. Expected only one."):
jobs_command.check(
self.parser.parse_args(
Expand All @@ -140,9 +88,30 @@ def test_should_raise_exception_for_multiple_scheduler_on_one_host(self):
)
)

def test_should_raise_exception_for_allow_multiple_and_limit_1(self):
def test_allow_multiple_requires_limit_above_one(self, mock_cli_api_client):
with pytest.raises(
SystemExit,
match=r"To use option --allow-multiple, you must set the limit to a value greater than 1.",
):
jobs_command.check(self.parser.parse_args(["jobs", "check", "--allow-multiple"]))
mock_cli_api_client.jobs.list.assert_not_called()

def test_hostname_and_local_are_mutually_exclusive(self, mock_cli_api_client):
with pytest.raises(SystemExit, match=r"You can't use --hostname and --local at the same time"):
jobs_command.check(self.parser.parse_args(["jobs", "check", "--hostname", "h", "--local"]))
mock_cli_api_client.jobs.list.assert_not_called()

def test_local_resolves_hostname(self, mock_cli_api_client, stdout_capture):
mock_cli_api_client.jobs.list.return_value = _jobs(1)
with mock.patch("airflow.cli.commands.jobs_command.get_hostname", return_value="local-host"):
jobs_command.check(self.parser.parse_args(["jobs", "check", "--local"]))
mock_cli_api_client.jobs.list.assert_called_once_with(
job_type=None, hostname="local-host", is_alive=True
)

def test_limit_is_applied_client_side(self, mock_cli_api_client, stdout_capture):
# The API returns all alive jobs (heartbeat desc); --limit trims to the most recent N.
mock_cli_api_client.jobs.list.return_value = _jobs(5)
with stdout_capture as stdout:
jobs_command.check(self.parser.parse_args(["jobs", "check", "--limit", "1"]))
assert "Found one alive job." in stdout.getvalue()
2 changes: 1 addition & 1 deletion airflow-ctl/docs/images/command_hashes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ config:a3d936cb15fe3b547bf6c82cf93d923f
connections:942f9f88cb908c28bf5c19159fc5065b
dags:6b38e6bcd491bc1941e7814b77e63bde
dagrun:c32e0011aa9a845456c778786717208e
jobs:a5b644c5da8889443bb40ee10b599270
jobs:d4af478f28dae48ee18d43b998edc345
pools:19efe105b9515ab1926ebcaf0e028d71
providers:34502fe09dc0b8b0a13e7e46efdffda6
variables:f8fc76d3d398b2780f4e97f7cd816646
Expand Down
Loading
Loading