diff --git a/airflow-core/src/airflow/serialization/enums.py b/airflow-core/src/airflow/serialization/enums.py index ae4c1249cab09..a1f16ac9bfb49 100644 --- a/airflow-core/src/airflow/serialization/enums.py +++ b/airflow-core/src/airflow/serialization/enums.py @@ -70,7 +70,6 @@ class DagAttributeTypes(str, Enum): TIMEDELTA = "timedelta" TIMEZONE = "timezone" RELATIVEDELTA = "relativedelta" - BASE_TRIGGER = "base_trigger" AIRFLOW_EXC_SER = "airflow_exc_ser" BASE_EXC_SER = "base_exc_ser" DICT = "dict" diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index d6d45bdcd97e3..14bbd34335971 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -106,7 +106,7 @@ validate_and_load_priority_weight_strategy, ) from airflow.timetables.base import DagRunInfo, Timetable -from airflow.triggers.base import BaseTrigger, StartTriggerArgs +from airflow.triggers.base import StartTriggerArgs from airflow.utils.code_utils import get_python_source from airflow.utils.db import LazySelectSequence @@ -470,7 +470,6 @@ def serialize( :meta private: """ from airflow.sdk.definitions._internal.types import is_arg_set - from airflow.sdk.exceptions import TaskDeferred if not is_arg_set(var): return cls._encode(None, type_=DAT.ARG_NOT_SET) @@ -535,7 +534,7 @@ def serialize( var._asdict(), type_=DAT.TASK_INSTANCE_KEY, ) - elif isinstance(var, (AirflowException, TaskDeferred)) and hasattr(var, "serialize"): + elif isinstance(var, AirflowException) and hasattr(var, "serialize"): exc_cls_name, args, kwargs = var.serialize() return cls._encode( cls.serialize( @@ -556,14 +555,6 @@ def serialize( ), type_=DAT.BASE_EXC_SER, ) - elif isinstance(var, BaseTrigger): - return cls._encode( - cls.serialize( - var.serialize(), - strict=strict, - ), - type_=DAT.BASE_TRIGGER, - ) elif callable(var): return str(get_python_source(var)) elif isinstance(var, set): @@ -672,10 +663,6 @@ def deserialize(cls, encoded_var: Any) -> Any: else: exc_cls = import_string(f"builtins.{exc_cls_name}") return exc_cls(*args, **kwargs) - elif type_ == DAT.BASE_TRIGGER: - tr_cls_name, kwargs = cls.deserialize(var) - tr_cls = import_string(tr_cls_name) - return tr_cls(**kwargs) elif type_ == DAT.SET: return {cls.deserialize(v) for v in var} elif type_ == DAT.TUPLE: diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 58cb1f7790024..2d2119095d3aa 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -39,7 +39,6 @@ AirflowFailException, AirflowRescheduleException, SerializationError, - TaskDeferred, ) from airflow.models.connection import Connection from airflow.models.dag import DAG @@ -104,7 +103,6 @@ LazyDeserializedDAG, _has_kubernetes, ) -from airflow.triggers.base import BaseTrigger from airflow.utils.db import LazySelectSequence from unit.models import DEFAULT_DATE @@ -570,42 +568,14 @@ def test_ser_of_asset_event_accessor(): assert d[Asset(name="yo", uri="test://yo")].extra == {"this": "that", "the": "other"} -class MyTrigger(BaseTrigger): - def __init__(self, hi): - self.hi = hi - - def serialize(self): - return "unit.serialization.test_serialized_objects.MyTrigger", {"hi": self.hi} - - async def run(self): - yield - - def test_roundtrip_exceptions(): - """ - This is for AIP-44 when we need to send certain non-error exceptions - as part of an RPC call e.g. TaskDeferred or AirflowRescheduleException. - """ + """Non-error AirflowExceptions (e.g. AirflowRescheduleException) round-trip through BaseSerialization.""" some_date = pendulum.now() resched_exc = AirflowRescheduleException(reschedule_date=some_date) ser = BaseSerialization.serialize(resched_exc) deser = BaseSerialization.deserialize(ser) assert isinstance(deser, AirflowRescheduleException) assert deser.reschedule_date == some_date - del ser - del deser - exc = TaskDeferred( - trigger=MyTrigger(hi="yo"), - method_name="meth_name", - kwargs={"have": "pie"}, - timeout=timedelta(seconds=30), - ) - ser = BaseSerialization.serialize(exc) - deser = BaseSerialization.deserialize(ser) - assert deser.trigger.hi == "yo" - assert deser.method_name == "meth_name" - assert deser.kwargs == {"have": "pie"} - assert deser.timeout == timedelta(seconds=30) @pytest.mark.parametrize(