Skip to content

Fix Task SDK silently ignoring AirflowRuntimeError when Variable.set() or Variable.delete() fails#68542

Open
jayachandrakasarla wants to merge 1 commit into
apache:mainfrom
jayachandrakasarla:fix-68537-task-sdk-ignoring-exceptions
Open

Fix Task SDK silently ignoring AirflowRuntimeError when Variable.set() or Variable.delete() fails#68542
jayachandrakasarla wants to merge 1 commit into
apache:mainfrom
jayachandrakasarla:fix-68537-task-sdk-ignoring-exceptions

Conversation

@jayachandrakasarla

@jayachandrakasarla jayachandrakasarla commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Closes #68537

As per the issue, Variable.set() and Variable.delete() in the Task SDK were silently ignoring AirflowRuntimeError, logging the exception but not re-raising, causing tasks to report success even when a variable write was rejected by the Execution API.

Added raise to both except blocks so errors now propagate and fail the task, consistent with the existing behavior of Variable.get().

Also added the unit tests:

  • test_var_set_raises_on_runtime_error - verifies Variable.set() re-raises AirflowRuntimeError when the underlying call fails (e.g. a 403 from the API server).
  • test_var_delete_raises_on_runtime_error - same for Variable.delete().
  • test_var_delete - basic success-path coverage for Variable.delete(), which was previously untested.

Here's a DAG for testing:

import datetime
from airflow.sdk import dag, task

@dag(
    dag_id="test_variable_error_propagation",
    start_date=datetime.datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
)
def test_variable_error_propagation():
    @task
    def test_variable_set_and_delete_success():
        from airflow.sdk import Variable

        Variable.set("abc", "efg")
        Variable.delete("abc")
        print("Variable.delete succeeded")

    t1 = test_variable_set_and_delete_success()

    t1


test_variable_error_propagation()

Use the following middleware plugin with breeze to test 403 API errors

from __future__ import annotations

import json

from airflow.plugins_manager import AirflowPlugin


class RejectVariableWritesMiddleware:
    """Intercept PUT/DELETE /execution/variables/* and return 403."""

    def __init__(self, app) -> None:
        self.app = app

    async def __call__(self, scope, receive, send) -> None:
        if scope["type"] == "http":
            path: str = scope.get("path", "")
            method: str = scope.get("method", "")
            if method in ("PUT", "DELETE") and path.startswith("/execution/variables/"):
                body = json.dumps({"detail": "Variable writes rejected by test middleware"}).encode()
                await send(
                    {
                        "type": "http.response.start",
                        "status": 403,
                        "headers": [
                            [b"content-type", b"application/json"],
                            [b"content-length", str(len(body)).encode()],
                        ],
                    }
                )
                await send({"type": "http.response.body", "body": body})
                return
        await self.app(scope, receive, send)


class RejectVariableWritesPlugin(AirflowPlugin):
    name = "reject_variable_writes"
    fastapi_root_middlewares = [
        {
            "name": "RejectVariableWritesMiddleware",
            "middleware": RejectVariableWritesMiddleware,
        }
    ]

Was generative AI tooling used to co-author this PR?

[X] Yes

Used Claude to generate the middleware plugin to test 403 API errors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task SDK: Variable.set/delete swallow API errors — a rejected write does not fail the task

1 participant