Skip to content

Commit 02e99f9

Browse files
committed
fix(api): handle concurrent RTIF writes with IntegrityError retry
When multiple workers try to write rendered task instance fields for the same task instance simultaneously, a race condition in session.merge() can cause an IntegrityError (unique constraint violation). This happens because both workers SELECT (find no record), then both try to INSERT. Fix the ti_put_rtif endpoint to catch IntegrityError, rollback the failed transaction, re-fetch the task instance, and retry the write. The retry succeeds because merge() now finds the existing record and performs an UPDATE instead of INSERT. Closes: #61705
1 parent 0f68191 commit 02e99f9

File tree

2 files changed

+90
-3
lines changed

2 files changed

+90
-3
lines changed

airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from pydantic import JsonValue
3333
from sqlalchemy import and_, func, or_, tuple_, update
3434
from sqlalchemy.engine import CursorResult
35-
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
35+
from sqlalchemy.exc import IntegrityError, NoResultFound, SQLAlchemyError
3636
from sqlalchemy.orm import joinedload
3737
from sqlalchemy.sql import select
3838
from structlog.contextvars import bind_contextvars
@@ -729,8 +729,19 @@ def ti_put_rtif(
729729
raise HTTPException(
730730
status_code=status.HTTP_404_NOT_FOUND,
731731
)
732-
task_instance.update_rtif(put_rtif_payload, session)
733-
log.debug("RenderedTaskInstanceFields updated successfully")
732+
try:
733+
task_instance.update_rtif(put_rtif_payload, session)
734+
except IntegrityError:
735+
session.rollback()
736+
# Re-fetch the task instance after rollback since the previous one is detached
737+
task_instance = session.scalar(select(TI).where(TI.id == task_instance_id))
738+
if task_instance:
739+
# Retry: the record now exists from the concurrent request,
740+
# so merge will find it and update rather than insert.
741+
task_instance.update_rtif(put_rtif_payload, session)
742+
log.info("RenderedTaskInstanceFields updated after concurrent write conflict")
743+
else:
744+
log.debug("RenderedTaskInstanceFields updated successfully")
734745

735746
return {"message": "Rendered task instance fields successfully set"}
736747

airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1953,6 +1953,82 @@ def test_ti_put_rtif_missing_ti(self, client, session, create_task_instance):
19531953
assert response.json()["detail"] == "Not Found"
19541954

19551955

1956+
def test_ti_put_rtif_concurrent_write(self, client, session, create_task_instance):
1957+
"""Test that concurrent RTIF writes don't cause 409 errors.
1958+
1959+
When two workers try to write rendered fields for the same task instance
1960+
simultaneously, the second write should succeed by updating the existing record
1961+
rather than failing with a unique constraint violation.
1962+
"""
1963+
ti = create_task_instance(
1964+
task_id="test_ti_put_rtif_concurrent",
1965+
state=State.RUNNING,
1966+
session=session,
1967+
)
1968+
session.commit()
1969+
1970+
payload1 = {"field1": "value1"}
1971+
payload2 = {"field1": "value2"}
1972+
1973+
# First write should succeed
1974+
response1 = client.put(f"/execution/task-instances/{ti.id}/rtif", json=payload1)
1975+
assert response1.status_code == 201
1976+
1977+
# Second write (simulating concurrent update) should also succeed by merging
1978+
response2 = client.put(f"/execution/task-instances/{ti.id}/rtif", json=payload2)
1979+
assert response2.status_code == 201
1980+
1981+
session.expire_all()
1982+
rtifs = session.scalars(select(RenderedTaskInstanceFields)).all()
1983+
assert len(rtifs) == 1
1984+
assert rtifs[0].rendered_fields == payload2
1985+
1986+
def test_ti_put_rtif_integrity_error_handled(self, client, session, create_task_instance):
1987+
"""Test that IntegrityError from a race condition is handled gracefully.
1988+
1989+
Simulates the race condition where the first update_rtif call raises
1990+
IntegrityError (as if another concurrent request already inserted the record),
1991+
and verifies the endpoint retries successfully.
1992+
"""
1993+
from unittest.mock import patch
1994+
1995+
from sqlalchemy.exc import IntegrityError
1996+
1997+
from airflow.models.taskinstance import TaskInstance
1998+
1999+
ti = create_task_instance(
2000+
task_id="test_ti_put_rtif_integrity",
2001+
state=State.RUNNING,
2002+
session=session,
2003+
)
2004+
session.commit()
2005+
2006+
payload = {"field1": "rendered_value1"}
2007+
2008+
original_update_rtif = TaskInstance.update_rtif
2009+
call_count = 0
2010+
2011+
def mock_update_rtif(self_ti, rendered_fields, session):
2012+
nonlocal call_count
2013+
call_count += 1
2014+
if call_count == 1:
2015+
raise IntegrityError(
2016+
statement="INSERT INTO rendered_task_instance_fields",
2017+
params={},
2018+
orig=Exception(
2019+
'duplicate key value violates unique constraint "rendered_task_instance_fields_pkey"'
2020+
),
2021+
)
2022+
return original_update_rtif(self_ti, rendered_fields, session)
2023+
2024+
with patch.object(TaskInstance, "update_rtif", mock_update_rtif):
2025+
response = client.put(f"/execution/task-instances/{ti.id}/rtif", json=payload)
2026+
2027+
assert response.status_code == 201
2028+
assert response.json() == {"message": "Rendered task instance fields successfully set"}
2029+
assert call_count == 2 # First call raises, second succeeds
2030+
2031+
19562032
class TestPreviousDagRun:
19572033
def setup_method(self):
19582034
clear_db_runs()

0 commit comments

Comments
 (0)