Skip to content

Commit 94b9ea9

Browse files
committed
Remove CNCF provider part and fix CI Error
1 parent cc33463 commit 94b9ea9

3 files changed

Lines changed: 16 additions & 35 deletions

File tree

airflow-core/tests/unit/models/test_taskinstance.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2511,28 +2511,26 @@ def test_fetch_handle_failure_context_orphaned_task_records_history(
25112511
):
25122512
"""
25132513
Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded.
2514-
This scenario occurs when tasks are running but become orphaned due to executor failures
2515-
(e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures).
2514+
This scenario occurs when tasks are running but become orphaned due to executor failures.
25162515
"""
25172516
with dag_maker(dag_id="test_orphaned_task"):
2518-
task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries
2517+
task = EmptyOperator(task_id="orphaned_task", retries=2)
25192518

25202519
dr = dag_maker.create_dagrun()
25212520
ti = dr.get_task_instance(task.task_id, session=session)
25222521
ti.task = task
25232522

25242523
# Simulate an orphaned task: state=None but has start_date (was running) and no end_date
25252524
start_time = timezone.utcnow() - datetime.timedelta(minutes=5)
2526-
ti.state = None # State was reset during scheduler restart
2527-
ti.start_date = start_time # Task had started previously
2528-
ti.end_date = None # Task was still running when it became orphaned
2529-
ti.try_number = 1 # First attempt
2530-
ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts
2525+
ti.state = None
2526+
ti.start_date = start_time
2527+
ti.end_date = None
2528+
ti.try_number = 1
2529+
ti.max_tries = 3
25312530

25322531
session.merge(ti)
25332532
session.commit()
25342533

2535-
# Call fetch_handle_failure_context which should detect and handle orphaned tasks
25362534
failure_context = TaskInstance.fetch_handle_failure_context(
25372535
ti=ti,
25382536
error="Test orphaned task error",
@@ -2544,17 +2542,14 @@ def test_fetch_handle_failure_context_orphaned_task_records_history(
25442542
# Verify that TaskInstanceHistory.record_ti was called for the orphaned task
25452543
mock_record_ti.assert_called_once()
25462544
call_args = mock_record_ti.call_args
2547-
recorded_ti = call_args[0][0] # First positional argument (ti)
2545+
recorded_ti = call_args[0][0]
25482546

2549-
# Verify the correct TaskInstance was recorded
25502547
assert recorded_ti.task_id == ti.task_id
25512548
assert recorded_ti.dag_id == ti.dag_id
25522549
assert recorded_ti.run_id == ti.run_id
25532550
assert recorded_ti.start_date == start_time
2554-
2555-
# Verify the task instance state is set to UP_FOR_RETRY after failure handling
25562551
assert ti.state == State.UP_FOR_RETRY
2557-
assert failure_context["ti"] == ti
2552+
assert failure_context == ti
25582553

25592554
@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
25602555
def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history(
@@ -2573,10 +2568,10 @@ def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_histor
25732568

25742569
# Simulate a task that was never started: state=None and no start_date
25752570
ti.state = None
2576-
ti.start_date = None # Task never started
2571+
ti.start_date = None
25772572
ti.end_date = None
25782573
ti.try_number = 1
2579-
ti.max_tries = 3 # Allow retries
2574+
ti.max_tries = 3
25802575

25812576
session.merge(ti)
25822577
session.commit()
@@ -2590,12 +2585,9 @@ def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_histor
25902585
fail_fast=False,
25912586
)
25922587

2593-
# Verify that TaskInstanceHistory.record_ti was NOT called
25942588
mock_record_ti.assert_not_called()
2595-
2596-
# Verify the task instance state is set to UP_FOR_RETRY after failure handling
25972589
assert ti.state == State.UP_FOR_RETRY
2598-
assert failure_context["ti"] == ti
2590+
assert failure_context == ti
25992591

26002592
def test_handle_failure_fail_fast(self, dag_maker, session):
26012593
start_date = timezone.datetime(2016, 6, 1)

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ def sync(self) -> None:
384384
body = {"message": e.body}
385385

386386
retries = self.task_publish_retries[key]
387-
# In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries
387+
# In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries
388388
message = body.get("message", "")
389389
if (
390390
(str(e.status) == "403" and "exceeded quota" in message)
@@ -692,17 +692,6 @@ def adopt_launched_task(
692692
)
693693
except ApiException as e:
694694
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
695-
696-
# Log detailed information for rate limiting errors (429) which can cause task history loss
697-
if str(e.status) == "429":
698-
self.log.warning(
699-
"Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. "
700-
"This may cause task history loss if the task was previously running. "
701-
"Consider implementing rate limiting backoff or increasing API quota.",
702-
pod.metadata.name,
703-
ti_key,
704-
)
705-
706695
return
707696

708697
del tis_to_flush_by_key[ti_key]

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,9 +396,9 @@ def setup_method(self) -> None:
396396
pytest.param(
397397
HTTPResponse(body="Too many requests, please try again later.", status=429),
398398
1,
399-
True,
400-
State.SUCCESS,
401-
id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1) (retry succeeded)",
399+
False,
400+
State.FAILED,
401+
id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)",
402402
),
403403
pytest.param(
404404
HTTPResponse(body="", status=429),

0 commit comments

Comments
 (0)