Skip to content

Commit 8fab5a6

Browse files
committed
Fix XCom PATCH/POST to store native values (instead of json.dumps strings)
1 parent eca09a0 commit 8fab5a6

2 files changed

Lines changed: 37 additions & 24 deletions

File tree

  • airflow-core
    • src/airflow/api_fastapi/core_api/routes/public
    • tests/unit/api_fastapi/core_api/routes/public

airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from __future__ import annotations
1818

1919
import copy
20-
import json
2120
from typing import Annotated
2221

2322
from fastapi import Depends, HTTPException, Query, status
@@ -270,27 +269,24 @@ def create_xcom_entry(
270269
)
271270

272271
try:
273-
value = json.dumps(request_body.value)
274-
except (ValueError, TypeError):
272+
XComModel.set(
273+
key=request_body.key,
274+
value=request_body.value,
275+
dag_id=dag_id,
276+
task_id=task_id,
277+
run_id=dag_run_id,
278+
map_index=request_body.map_index,
279+
serialize=False,
280+
session=session,
281+
)
282+
except (ValueError, TypeError) as e:
275283
raise HTTPException(
276284
status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{request_body.key}`"
277-
)
278-
279-
new = XComModel(
280-
dag_run_id=dag_run.id,
281-
key=request_body.key,
282-
value=value,
283-
run_id=dag_run_id,
284-
task_id=task_id,
285-
dag_id=dag_id,
286-
map_index=request_body.map_index,
287-
)
288-
session.add(new)
289-
session.flush()
285+
) from e
290286

291287
xcom = session.scalar(
292288
select(XComModel)
293-
.filter(
289+
.where(
294290
XComModel.dag_id == dag_id,
295291
XComModel.task_id == task_id,
296292
XComModel.run_id == dag_run_id,
@@ -324,11 +320,12 @@ def update_xcom_entry(
324320
dag_run_id: str,
325321
xcom_key: str,
326322
patch_body: XComUpdateBody,
323+
*,
327324
session: SessionDep,
328325
) -> XComResponseNative:
329326
"""Update an existing XCom entry."""
330327
# Check if XCom entry exists
331-
xcom_entry = session.scalar(
328+
xcom_query = (
332329
select(XComModel)
333330
.where(
334331
XComModel.dag_id == dag_id,
@@ -340,16 +337,32 @@ def update_xcom_entry(
340337
.limit(1)
341338
.options(joinedload(XComModel.task), joinedload(XComModel.dag_run).joinedload(DR.dag_model))
342339
)
340+
xcom_entry = session.scalar(xcom_query)
343341

344342
if not xcom_entry:
345343
raise HTTPException(
346344
status.HTTP_404_NOT_FOUND,
347345
f"The XCom with key: `{xcom_key}` with mentioned task instance doesn't exist.",
348346
)
349347

350-
# Update XCom entry
351-
xcom_entry.value = json.dumps(patch_body.value)
348+
try:
349+
XComModel.set(
350+
key=xcom_key,
351+
value=patch_body.value,
352+
dag_id=dag_id,
353+
task_id=task_id,
354+
run_id=dag_run_id,
355+
map_index=patch_body.map_index,
356+
serialize=False,
357+
session=session,
358+
)
359+
except (ValueError, TypeError) as e:
360+
raise HTTPException(
361+
status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{xcom_key}`"
362+
) from e
352363

364+
# Re-fetch after set (delete + insert) to get fresh object for response
365+
xcom_entry = session.scalar(xcom_query)
353366
return XComResponseNative.model_validate(xcom_entry)
354367

355368

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ def test_create_xcom_entry(
680680
# Validate the created XCom response
681681
current_data = response.json()
682682
assert current_data["key"] == request_body.key
683-
assert current_data["value"] == XComModel.serialize_value(request_body.value)
683+
assert current_data["value"] == request_body.value
684684
assert current_data["dag_id"] == dag_id
685685
assert current_data["task_id"] == task_id
686686
assert current_data["run_id"] == dag_run_id
@@ -716,7 +716,7 @@ def test_create_xcom_entry_with_slash_key(self, test_client):
716716
)
717717
assert get_resp.status_code == 200
718718
assert get_resp.json()["key"] == slash_key
719-
assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
719+
assert get_resp.json()["value"] == TEST_XCOM_VALUE
720720

721721

722722
class TestDeleteXComEntry(TestXComEndpoint):
@@ -814,7 +814,7 @@ def test_patch_xcom_entry(self, key, patch_body, expected_status, expected_detai
814814
assert response.status_code == expected_status
815815

816816
if expected_status == 200:
817-
assert response.json()["value"] == json.dumps(patch_body["value"])
817+
assert response.json()["value"] == patch_body["value"]
818818
else:
819819
assert response.json()["detail"] == expected_detail
820820
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)
@@ -843,5 +843,5 @@ def test_patch_xcom_entry_with_slash_key(self, test_client, session):
843843
)
844844
assert response.status_code == 200
845845
assert response.json()["key"] == slash_key
846-
assert response.json()["value"] == json.dumps(new_value)
846+
assert response.json()["value"] == new_value
847847
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)

0 commit comments

Comments
 (0)