Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions task-sdk/src/airflow/sdk/definitions/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def set(cls, key: str, value: Any, description: str | None = None, serialize_jso
return _set_variable(key, value, description, serialize_json=serialize_json)
except AirflowRuntimeError as e:
log.exception(e)
raise

@classmethod
def keys(cls, prefix: str | None = None) -> Sequence[str]:
Expand Down Expand Up @@ -101,3 +102,4 @@ def delete(cls, key: str) -> None:
_delete_variable(key=key)
except AirflowRuntimeError as e:
log.exception(e)
raise
35 changes: 34 additions & 1 deletion task-sdk/tests/task_sdk/definitions/test_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@

from airflow.sdk import Variable
from airflow.sdk.configuration import initialize_secrets_backends
from airflow.sdk.execution_time.comms import GetVariableKeys, PutVariable, VariableKeysResult, VariableResult
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
from airflow.sdk.execution_time.comms import (
DeleteVariable,
ErrorResponse,
GetVariableKeys,
PutVariable,
VariableKeysResult,
VariableResult,
)
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS

from tests_common.test_utils.config import conf_vars
Expand Down Expand Up @@ -89,6 +97,31 @@ def test_var_set(self, key, value, description, serialize_json, mock_supervisor_
),
)

def test_var_set_raises_on_runtime_error(self, mock_supervisor_comms):
with mock.patch(
"airflow.sdk.execution_time.context._set_variable",
side_effect=AirflowRuntimeError(
ErrorResponse(error=ErrorType.GENERIC_ERROR, detail={"message": "API_SERVER_ERROR"})
),
):
with pytest.raises(AirflowRuntimeError):
Variable.set(key="my_key", value="my_value")

def test_var_delete(self, mock_supervisor_comms):
Variable.delete(key="my_key")

mock_supervisor_comms.send.assert_called_once_with(msg=DeleteVariable(key="my_key"))

def test_var_delete_raises_on_runtime_error(self, mock_supervisor_comms):
with mock.patch(
"airflow.sdk.execution_time.context._delete_variable",
side_effect=AirflowRuntimeError(
ErrorResponse(error=ErrorType.GENERIC_ERROR, detail={"message": "API_SERVER_ERROR"})
),
):
with pytest.raises(AirflowRuntimeError):
Variable.delete(key="my_key")


class TestVariableKeys:
@pytest.mark.parametrize(
Expand Down
Loading