Skip to content

Commit 0c59f7e

Browse files
committed
Split original PR into cncf & taskinstance parts
- Remove taskinstance part
1 parent 2ba99e2 commit 0c59f7e

2 files changed

Lines changed: 0 additions & 121 deletions

File tree

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,35 +1512,6 @@ def fetch_handle_failure_context(
15121512
if not test_mode:
15131513
ti.refresh_from_db(session)
15141514

1515-
# Check for orphaned task before setting end_date
1516-
if (
1517-
ti.is_eligible_to_retry()
1518-
and ti.state is None
1519-
and ti.start_date is not None
1520-
and ti.end_date is None
1521-
):
1522-
# If the task instance state is None but has a start_date without end_date,
1523-
# it likely means the task was running but became orphaned and its state was reset.
1524-
# This can happen during scheduler restarts when executors fail to adopt running tasks
1525-
# (e.g., due to Kubernetes API 429 errors). We should still record the task instance
1526-
# history to maintain complete log history for troubleshooting.
1527-
from airflow.models.taskinstancehistory import TaskInstanceHistory
1528-
1529-
log.info(
1530-
"Recording task instance history for orphaned task %s that was previously running "
1531-
"(start_date: %s, state reset to None)",
1532-
ti.key,
1533-
ti.start_date,
1534-
)
1535-
# Temporarily set state to RUNNING to trigger proper history recording
1536-
original_state = ti.state
1537-
ti.state = TaskInstanceState.RUNNING
1538-
try:
1539-
TaskInstanceHistory.record_ti(ti, session=session)
1540-
finally:
1541-
# Restore the original state
1542-
ti.state = original_state
1543-
15441515
ti.end_date = timezone.utcnow()
15451516
ti.set_duration()
15461517

airflow-core/tests/unit/models/test_taskinstance.py

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2424,98 +2424,6 @@ def test_handle_failure_task_undefined(self, create_task_instance):
24242424
del ti.task
24252425
ti.handle_failure("test ti.task undefined")
24262426

2427-
@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
2428-
def test_fetch_handle_failure_context_orphaned_task_records_history(
2429-
self, mock_record_ti, dag_maker, session
2430-
):
2431-
"""
2432-
Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded.
2433-
This scenario occurs when tasks are running but become orphaned due to executor failures
2434-
(e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures).
2435-
"""
2436-
with dag_maker(dag_id="test_orphaned_task"):
2437-
task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries
2438-
2439-
dr = dag_maker.create_dagrun()
2440-
ti = dr.get_task_instance(task.task_id, session=session)
2441-
ti.task = task
2442-
2443-
# Simulate an orphaned task: state=None but has start_date (was running) and no end_date
2444-
start_time = timezone.utcnow() - datetime.timedelta(minutes=5)
2445-
ti.state = None # State was reset during scheduler restart
2446-
ti.start_date = start_time # Task had started previously
2447-
ti.end_date = None # Task was still running when it became orphaned
2448-
ti.try_number = 1 # First attempt
2449-
ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts
2450-
2451-
session.merge(ti)
2452-
session.commit()
2453-
2454-
# Call fetch_handle_failure_context which should detect and handle orphaned tasks
2455-
failure_context = TaskInstance.fetch_handle_failure_context(
2456-
ti=ti,
2457-
error="Test orphaned task error",
2458-
test_mode=False,
2459-
session=session,
2460-
fail_fast=False,
2461-
)
2462-
2463-
# Verify that TaskInstanceHistory.record_ti was called for the orphaned task
2464-
mock_record_ti.assert_called_once()
2465-
call_args = mock_record_ti.call_args
2466-
recorded_ti = call_args[0][0] # First positional argument (ti)
2467-
2468-
# Verify the correct TaskInstance was recorded
2469-
assert recorded_ti.task_id == ti.task_id
2470-
assert recorded_ti.dag_id == ti.dag_id
2471-
assert recorded_ti.run_id == ti.run_id
2472-
assert recorded_ti.start_date == start_time
2473-
2474-
# Verify the task instance state is set to UP_FOR_RETRY after failure handling
2475-
assert ti.state == State.UP_FOR_RETRY
2476-
assert failure_context["ti"] == ti
2477-
2478-
@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
2479-
def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history(
2480-
self, mock_record_ti, dag_maker, session
2481-
):
2482-
"""
2483-
Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording.
2484-
This ensures we only record history for tasks that were actually running.
2485-
"""
2486-
with dag_maker(dag_id="test_not_orphaned_task"):
2487-
task = EmptyOperator(task_id="not_orphaned_task", retries=2)
2488-
2489-
dr = dag_maker.create_dagrun()
2490-
ti = dr.get_task_instance(task.task_id, session=session)
2491-
ti.task = task
2492-
2493-
# Simulate a task that was never started: state=None and no start_date
2494-
ti.state = None
2495-
ti.start_date = None # Task never started
2496-
ti.end_date = None
2497-
ti.try_number = 1
2498-
ti.max_tries = 3 # Allow retries
2499-
2500-
session.merge(ti)
2501-
session.commit()
2502-
2503-
# Call fetch_handle_failure_context
2504-
failure_context = TaskInstance.fetch_handle_failure_context(
2505-
ti=ti,
2506-
error="Test non-orphaned task error",
2507-
test_mode=False,
2508-
session=session,
2509-
fail_fast=False,
2510-
)
2511-
2512-
# Verify that TaskInstanceHistory.record_ti was NOT called
2513-
mock_record_ti.assert_not_called()
2514-
2515-
# Verify the task instance state is set to UP_FOR_RETRY after failure handling
2516-
assert ti.state == State.UP_FOR_RETRY
2517-
assert failure_context["ti"] == ti
2518-
25192427
def test_handle_failure_fail_fast(self, dag_maker, session):
25202428
start_date = timezone.datetime(2016, 6, 1)
25212429

0 commit comments

Comments
 (0)