Skip to content

Commit 4686e4c

Browse files
committed
fix(api): handle concurrent RTIF writes with IntegrityError retry
When multiple workers try to write rendered task instance fields for the same task instance simultaneously, a race condition in session.merge() can cause an IntegrityError (unique constraint violation). This happens because both workers SELECT (find no record), then both try to INSERT. Fix the ti_put_rtif endpoint to catch IntegrityError, rollback the failed transaction, re-fetch the task instance, and retry the write. The retry succeeds because merge() now finds the existing record and performs an UPDATE instead of INSERT. Closes: #61705
1 parent b24de01 commit 4686e4c

2 files changed

Lines changed: 90 additions & 3 deletions

File tree

airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from pydantic import JsonValue
3333
from sqlalchemy import and_, func, or_, tuple_, update
3434
from sqlalchemy.engine import CursorResult
35-
from sqlalchemy.exc import NoResultFound, SQLAlchemyError
35+
from sqlalchemy.exc import IntegrityError, NoResultFound, SQLAlchemyError
3636
from sqlalchemy.orm import joinedload
3737
from sqlalchemy.sql import select
3838
from structlog.contextvars import bind_contextvars
@@ -711,8 +711,19 @@ def ti_put_rtif(
711711
raise HTTPException(
712712
status_code=status.HTTP_404_NOT_FOUND,
713713
)
714-
task_instance.update_rtif(put_rtif_payload, session)
715-
log.debug("RenderedTaskInstanceFields updated successfully")
714+
try:
715+
task_instance.update_rtif(put_rtif_payload, session)
716+
except IntegrityError:
717+
session.rollback()
718+
# Re-fetch the task instance after rollback since the previous one is detached
719+
task_instance = session.scalar(select(TI).where(TI.id == task_instance_id))
720+
if task_instance:
721+
# Retry: the record now exists from the concurrent request,
722+
# so merge will find it and update rather than insert.
723+
task_instance.update_rtif(put_rtif_payload, session)
724+
log.info("RenderedTaskInstanceFields updated after concurrent write conflict")
725+
else:
726+
log.debug("RenderedTaskInstanceFields updated successfully")
716727

717728
return {"message": "Rendered task instance fields successfully set"}
718729

airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,82 @@ def test_ti_put_rtif_missing_ti(self, client, session, create_task_instance):
18641864
assert response.json()["detail"] == "Not Found"
18651865

18661866

1867+
def test_ti_put_rtif_concurrent_write(self, client, session, create_task_instance):
1868+
"""Test that concurrent RTIF writes don't cause 409 errors.
1869+
1870+
When two workers try to write rendered fields for the same task instance
1871+
simultaneously, the second write should succeed by updating the existing record
1872+
rather than failing with a unique constraint violation.
1873+
"""
1874+
ti = create_task_instance(
1875+
task_id="test_ti_put_rtif_concurrent",
1876+
state=State.RUNNING,
1877+
session=session,
1878+
)
1879+
session.commit()
1880+
1881+
payload1 = {"field1": "value1"}
1882+
payload2 = {"field1": "value2"}
1883+
1884+
# First write should succeed
1885+
response1 = client.put(f"/execution/task-instances/{ti.id}/rtif", json=payload1)
1886+
assert response1.status_code == 201
1887+
1888+
# Second write (simulating concurrent update) should also succeed by merging
1889+
response2 = client.put(f"/execution/task-instances/{ti.id}/rtif", json=payload2)
1890+
assert response2.status_code == 201
1891+
1892+
session.expire_all()
1893+
rtifs = session.scalars(select(RenderedTaskInstanceFields)).all()
1894+
assert len(rtifs) == 1
1895+
assert rtifs[0].rendered_fields == payload2
1896+
1897+
def test_ti_put_rtif_integrity_error_handled(self, client, session, create_task_instance):
1898+
"""Test that IntegrityError from a race condition is handled gracefully.
1899+
1900+
Simulates the race condition where the first update_rtif call raises
1901+
IntegrityError (as if another concurrent request already inserted the record),
1902+
and verifies the endpoint retries successfully.
1903+
"""
1904+
from unittest.mock import patch
1905+
1906+
from sqlalchemy.exc import IntegrityError
1907+
1908+
from airflow.models.taskinstance import TaskInstance
1909+
1910+
ti = create_task_instance(
1911+
task_id="test_ti_put_rtif_integrity",
1912+
state=State.RUNNING,
1913+
session=session,
1914+
)
1915+
session.commit()
1916+
1917+
payload = {"field1": "rendered_value1"}
1918+
1919+
original_update_rtif = TaskInstance.update_rtif
1920+
call_count = 0
1921+
1922+
def mock_update_rtif(self_ti, rendered_fields, session):
1923+
nonlocal call_count
1924+
call_count += 1
1925+
if call_count == 1:
1926+
raise IntegrityError(
1927+
statement="INSERT INTO rendered_task_instance_fields",
1928+
params={},
1929+
orig=Exception(
1930+
'duplicate key value violates unique constraint "rendered_task_instance_fields_pkey"'
1931+
),
1932+
)
1933+
return original_update_rtif(self_ti, rendered_fields, session)
1934+
1935+
with patch.object(TaskInstance, "update_rtif", mock_update_rtif):
1936+
response = client.put(f"/execution/task-instances/{ti.id}/rtif", json=payload)
1937+
1938+
assert response.status_code == 201
1939+
assert response.json() == {"message": "Rendered task instance fields successfully set"}
1940+
assert call_count == 2 # First call raises, second succeeds
1941+
1942+
18671943
class TestPreviousDagRun:
18681944
def setup_method(self):
18691945
clear_db_runs()

0 commit comments

Comments
 (0)