Skip to content

Commit e9dbba6

Browse files
HsiuChuanHsueladkal
authored andcommitted
fix: Record history for orphaned tasks during K8s executor failures
Move orphaned task detection before end_date assignment to ensure TaskInstanceHistory is recorded for tasks that become detached during scheduler restarts due to Kubernetes API 429 errors.
1 parent 78d4b6d commit e9dbba6

2 files changed

Lines changed: 121 additions & 22 deletions

File tree

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,35 @@ def fetch_handle_failure_context(
15121512
if not test_mode:
15131513
ti.refresh_from_db(session)
15141514

1515+
# Check for orphaned task before setting end_date
1516+
if (
1517+
ti.is_eligible_to_retry()
1518+
and ti.state is None
1519+
and ti.start_date is not None
1520+
and ti.end_date is None
1521+
):
1522+
# If the task instance state is None but has a start_date without end_date,
1523+
# it likely means the task was running but became orphaned and its state was reset.
1524+
# This can happen during scheduler restarts when executors fail to adopt running tasks
1525+
# (e.g., due to Kubernetes API 429 errors). We should still record the task instance
1526+
# history to maintain complete log history for troubleshooting.
1527+
from airflow.models.taskinstancehistory import TaskInstanceHistory
1528+
1529+
log.info(
1530+
"Recording task instance history for orphaned task %s that was previously running "
1531+
"(start_date: %s, state reset to None)",
1532+
ti.key,
1533+
ti.start_date,
1534+
)
1535+
# Temporarily set state to RUNNING to trigger proper history recording
1536+
original_state = ti.state
1537+
ti.state = TaskInstanceState.RUNNING
1538+
try:
1539+
TaskInstanceHistory.record_ti(ti, session=session)
1540+
finally:
1541+
# Restore the original state
1542+
ti.state = original_state
1543+
15151544
ti.end_date = timezone.utcnow()
15161545
ti.set_duration()
15171546

@@ -1547,28 +1576,6 @@ def fetch_handle_failure_context(
15471576
# about to retry so we record the task instance history. For other states, the task
15481577
# instance was cleared and already recorded in the task instance history.
15491578
ti.prepare_db_for_next_try(session)
1550-
elif ti.state is None and ti.start_date is not None and ti.end_date is None:
1551-
# If the task instance state is None but has a start_date without end_date,
1552-
# it likely means the task was running but became orphaned and its state was reset.
1553-
# This can happen during scheduler restarts when executors fail to adopt running tasks
1554-
# (e.g., due to Kubernetes API 429 errors). We should still record the task instance
1555-
# history to maintain complete log history for troubleshooting.
1556-
from airflow.models.taskinstancehistory import TaskInstanceHistory
1557-
1558-
log.info(
1559-
"Recording task instance history for orphaned task %s that was previously running "
1560-
"(start_date: %s, state reset to None)",
1561-
ti.key,
1562-
ti.start_date,
1563-
)
1564-
# Temporarily set state to RUNNING to trigger proper history recording
1565-
original_state = ti.state
1566-
ti.state = TaskInstanceState.RUNNING
1567-
try:
1568-
TaskInstanceHistory.record_ti(ti, session=session)
1569-
finally:
1570-
# Restore the original state
1571-
ti.state = original_state
15721579

15731580
ti.state = State.UP_FOR_RETRY
15741581

airflow-core/tests/unit/models/test_taskinstance.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2424,6 +2424,98 @@ def test_handle_failure_task_undefined(self, create_task_instance):
24242424
del ti.task
24252425
ti.handle_failure("test ti.task undefined")
24262426

2427+
@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
2428+
def test_fetch_handle_failure_context_orphaned_task_records_history(
2429+
self, mock_record_ti, dag_maker, session
2430+
):
2431+
"""
2432+
Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded.
2433+
This scenario occurs when tasks are running but become orphaned due to executor failures
2434+
(e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures).
2435+
"""
2436+
with dag_maker(dag_id="test_orphaned_task"):
2437+
task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries
2438+
2439+
dr = dag_maker.create_dagrun()
2440+
ti = dr.get_task_instance(task.task_id, session=session)
2441+
ti.task = task
2442+
2443+
# Simulate an orphaned task: state=None but has start_date (was running) and no end_date
2444+
start_time = timezone.utcnow() - datetime.timedelta(minutes=5)
2445+
ti.state = None # State was reset during scheduler restart
2446+
ti.start_date = start_time # Task had started previously
2447+
ti.end_date = None # Task was still running when it became orphaned
2448+
ti.try_number = 1 # First attempt
2449+
ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts
2450+
2451+
session.merge(ti)
2452+
session.commit()
2453+
2454+
# Call fetch_handle_failure_context which should detect and handle orphaned tasks
2455+
failure_context = TaskInstance.fetch_handle_failure_context(
2456+
ti=ti,
2457+
error="Test orphaned task error",
2458+
test_mode=False,
2459+
session=session,
2460+
fail_fast=False,
2461+
)
2462+
2463+
# Verify that TaskInstanceHistory.record_ti was called for the orphaned task
2464+
mock_record_ti.assert_called_once()
2465+
call_args = mock_record_ti.call_args
2466+
recorded_ti = call_args[0][0] # First positional argument (ti)
2467+
2468+
# Verify the correct TaskInstance was recorded
2469+
assert recorded_ti.task_id == ti.task_id
2470+
assert recorded_ti.dag_id == ti.dag_id
2471+
assert recorded_ti.run_id == ti.run_id
2472+
assert recorded_ti.start_date == start_time
2473+
2474+
# Verify the task instance state is set to UP_FOR_RETRY after failure handling
2475+
assert ti.state == State.UP_FOR_RETRY
2476+
assert failure_context["ti"] == ti
2477+
2478+
@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
2479+
def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history(
2480+
self, mock_record_ti, dag_maker, session
2481+
):
2482+
"""
2483+
Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording.
2484+
This ensures we only record history for tasks that were actually running.
2485+
"""
2486+
with dag_maker(dag_id="test_not_orphaned_task"):
2487+
task = EmptyOperator(task_id="not_orphaned_task", retries=2)
2488+
2489+
dr = dag_maker.create_dagrun()
2490+
ti = dr.get_task_instance(task.task_id, session=session)
2491+
ti.task = task
2492+
2493+
# Simulate a task that was never started: state=None and no start_date
2494+
ti.state = None
2495+
ti.start_date = None # Task never started
2496+
ti.end_date = None
2497+
ti.try_number = 1
2498+
ti.max_tries = 3 # Allow retries
2499+
2500+
session.merge(ti)
2501+
session.commit()
2502+
2503+
# Call fetch_handle_failure_context
2504+
failure_context = TaskInstance.fetch_handle_failure_context(
2505+
ti=ti,
2506+
error="Test non-orphaned task error",
2507+
test_mode=False,
2508+
session=session,
2509+
fail_fast=False,
2510+
)
2511+
2512+
# Verify that TaskInstanceHistory.record_ti was NOT called
2513+
mock_record_ti.assert_not_called()
2514+
2515+
# Verify the task instance state is set to UP_FOR_RETRY after failure handling
2516+
assert ti.state == State.UP_FOR_RETRY
2517+
assert failure_context["ti"] == ti
2518+
24272519
def test_handle_failure_fail_fast(self, dag_maker, session):
24282520
start_date = timezone.datetime(2016, 6, 1)
24292521

0 commit comments

Comments
 (0)