From 61a875a3c3857162947899dc3b015eb3242aaa10 Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Thu, 26 Mar 2026 01:27:32 +0800 Subject: [PATCH 1/2] Fix XCom PATCH/POST to store native values (instead of json.dumps strings) --- .../core_api/routes/public/xcom.py | 53 ++++++++++++------- .../core_api/routes/public/test_xcom.py | 8 +-- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py index 7bf64592aa640..ea45c02be1eb8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -17,7 +17,6 @@ from __future__ import annotations import copy -import json from typing import Annotated from fastapi import Depends, HTTPException, Query, status @@ -270,27 +269,24 @@ def create_xcom_entry( ) try: - value = json.dumps(request_body.value) - except (ValueError, TypeError): + XComModel.set( + key=request_body.key, + value=request_body.value, + dag_id=dag_id, + task_id=task_id, + run_id=dag_run_id, + map_index=request_body.map_index, + serialize=False, + session=session, + ) + except (ValueError, TypeError) as e: raise HTTPException( status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{request_body.key}`" - ) - - new = XComModel( - dag_run_id=dag_run.id, - key=request_body.key, - value=value, - run_id=dag_run_id, - task_id=task_id, - dag_id=dag_id, - map_index=request_body.map_index, - ) - session.add(new) - session.flush() + ) from e xcom = session.scalar( select(XComModel) - .filter( + .where( XComModel.dag_id == dag_id, XComModel.task_id == task_id, XComModel.run_id == dag_run_id, @@ -324,11 +320,12 @@ def update_xcom_entry( dag_run_id: str, xcom_key: str, patch_body: XComUpdateBody, + *, session: SessionDep, ) -> XComResponseNative: """Update an existing XCom entry.""" # Check if XCom entry exists - xcom_entry = session.scalar( + xcom_query = ( select(XComModel) .where( XComModel.dag_id == dag_id, @@ -340,6 +337,7 @@ def update_xcom_entry( .limit(1) .options(joinedload(XComModel.task), joinedload(XComModel.dag_run).joinedload(DR.dag_model)) ) + xcom_entry = session.scalar(xcom_query) if not xcom_entry: raise HTTPException( @@ -347,9 +345,24 @@ def update_xcom_entry( f"The XCom with key: `{xcom_key}` with mentioned task instance doesn't exist.", ) - # Update XCom entry - xcom_entry.value = json.dumps(patch_body.value) + try: + XComModel.set( + key=xcom_key, + value=patch_body.value, + dag_id=dag_id, + task_id=task_id, + run_id=dag_run_id, + map_index=patch_body.map_index, + serialize=False, + session=session, + ) + except (ValueError, TypeError) as e: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{xcom_key}`" + ) from e + # Re-fetch after set (delete + insert) to get fresh object for response + xcom_entry = session.scalar(xcom_query) return XComResponseNative.model_validate(xcom_entry) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py index c51469d944ecb..c5401163eea48 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py @@ -680,7 +680,7 @@ def test_create_xcom_entry( # Validate the created XCom response current_data = response.json() assert current_data["key"] == request_body.key - assert current_data["value"] == XComModel.serialize_value(request_body.value) + assert current_data["value"] == request_body.value assert current_data["dag_id"] == dag_id assert current_data["task_id"] == task_id assert current_data["run_id"] == dag_run_id @@ -716,7 +716,7 @@ def test_create_xcom_entry_with_slash_key(self, test_client): ) assert get_resp.status_code == 200 assert get_resp.json()["key"] == slash_key - assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE) + assert get_resp.json()["value"] == TEST_XCOM_VALUE @pytest.mark.parametrize( ("key", "value"), @@ -833,7 +833,7 @@ def test_patch_xcom_entry(self, key, patch_body, expected_status, expected_detai assert response.status_code == expected_status if expected_status == 200: - assert response.json()["value"] == json.dumps(patch_body["value"]) + assert response.json()["value"] == patch_body["value"] else: assert response.json()["detail"] == expected_detail check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None) @@ -862,5 +862,5 @@ def test_patch_xcom_entry_with_slash_key(self, test_client, session): ) assert response.status_code == 200 assert response.json()["key"] == slash_key - assert response.json()["value"] == json.dumps(new_value) + assert response.json()["value"] == new_value check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None) From e02110923d2bcd1ea82db7906a818e34d770c3c7 Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Fri, 27 Mar 2026 18:26:06 +0800 Subject: [PATCH 2/2] Add unit test --- .../api_fastapi/core_api/routes/public/xcom.py | 2 +- .../core_api/routes/public/test_xcom.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py index ea45c02be1eb8..06c67367ff80a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -361,7 +361,7 @@ def update_xcom_entry( status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with key: `{xcom_key}`" ) from e - # Re-fetch after set (delete + insert) to get fresh object for response + # Fetch after setting, to get fresh object for response xcom_entry = session.scalar(xcom_query) return XComResponseNative.model_validate(xcom_entry) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py index c5401163eea48..97d3bb9c5527b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py @@ -864,3 +864,21 @@ def test_patch_xcom_entry_with_slash_key(self, test_client, session): assert response.json()["key"] == slash_key assert response.json()["value"] == new_value check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None) + + def test_patch_xcom_preserves_int_type(self, test_client, session): + """Test scenario described in #59032: if existing XCom value type is int, + after patching with different value, it should still be int in the API response. + """ + key = "int_type_xcom" + # Create with int value + self._create_xcom(key, 42) + patch_value = 100 + response = test_client.patch( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{key}", + json={"value": patch_value}, + ) + assert response.status_code == 200 + data = response.json() + assert data["value"] == patch_value + assert isinstance(data["value"], int), f"Expected int type but got {type(data['value'])}" + check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)