From 3dcec5506c49a6a69d40cb3405b2d87b3a43a49f Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 3 Jun 2026 00:42:35 +0200 Subject: [PATCH 1/2] Restrict BASE_TRIGGER deserialization to BaseTrigger subclasses When loading a serialized DAG, the BASE_TRIGGER deserialization branch imported the stored class path and instantiated it without checking it is a BaseTrigger subclass. Restrict it to BaseTrigger subclasses, matching the encode side which only emits BASE_TRIGGER for BaseTrigger instances. Generated-by: Claude Opus 4.8 (1M context) following the guidelines at https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions --- .../src/airflow/serialization/serialized_objects.py | 2 ++ .../unit/serialization/test_dag_serialization.py | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index d6d45bdcd97e3..79f657c9ef328 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -675,6 +675,8 @@ def deserialize(cls, encoded_var: Any) -> Any: elif type_ == DAT.BASE_TRIGGER: tr_cls_name, kwargs = cls.deserialize(var) tr_cls = import_string(tr_cls_name) + if not (isinstance(tr_cls, type) and issubclass(tr_cls, BaseTrigger)): + raise ValueError(f"{tr_cls_name!r} is not a BaseTrigger subclass") return tr_cls(**kwargs) elif type_ == DAT.SET: return {cls.deserialize(v) for v in var} diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 5bc5b0ae84768..da434128d5856 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -2786,6 +2786,17 @@ def test_create_dagrun_accepts_partition_key_for_partition_at_runtime_dag(self, dr = dag_maker.create_dagrun(partition_key="runtime-key") assert dr.partition_key == "runtime-key" + def test_base_trigger_deserialization_rejects_non_trigger_class(self): + """A serialized BASE_TRIGGER whose class path is not a BaseTrigger subclass is rejected on load.""" + from airflow.serialization.enums import DagAttributeTypes + + encoded = BaseSerialization._encode( + BaseSerialization.serialize(["subprocess.run", {"args": ["true"]}]), + type_=DagAttributeTypes.BASE_TRIGGER, + ) + with pytest.raises(ValueError, match="not a BaseTrigger subclass"): + BaseSerialization.deserialize(encoded) + def test_kubernetes_optional(): """Test that serialization module loads without kubernetes, but deserialization of PODs requires it""" From 233e42a3dcd599d63c7441bb6ce29530bdbe82fb Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 12 Jun 2026 23:37:16 +0200 Subject: [PATCH 2/2] Validate BASE_TRIGGER class path before importing during deserialization Resolve the trigger class through a trusted-namespace allowlist that is checked before import_string runs, rather than importing first and checking the type afterward. A shared _safe_import_for_deserialize helper validates the class-path string against the trusted prefixes, then imports and verifies the subclass. Generated-by: Claude Opus 4.8 (1M context) following the guidelines at https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions --- .../serialization/serialized_objects.py | 31 +++++++++++++++++-- .../serialization/test_dag_serialization.py | 17 ++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 79f657c9ef328..9ebb46cb921e3 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -235,6 +235,33 @@ def _decode_priority_weight_strategy(var: str) -> PriorityWeightStrategy: return priority_weight_strategy_class() +# Module prefixes whose code is trusted to import while deserializing a stored DAG. +# The class path from the serialized blob is validated against these *before* +# import_string runs, so a path outside the trusted namespaces is rejected without +# importing it. Mirrors decode_timetable's prefix gate / the priority-weight registry. +_TRUSTED_DESERIALIZE_PREFIXES = ("airflow.",) + + +def _safe_import_for_deserialize(cls_name: str, base: type, *, allow_builtins: bool = False) -> type: + """ + Resolve ``cls_name`` to a subclass of ``base``, validating the name before importing. + + The check runs on the string before any import, so a class path outside the + trusted namespaces is never imported. A post-import ``issubclass`` check is + kept as a second gate. + """ + module_path = cls_name.rpartition(".")[0] + trusted = cls_name.startswith(_TRUSTED_DESERIALIZE_PREFIXES) or ( + allow_builtins and module_path == "builtins" + ) + if not trusted: + raise ValueError(f"Refusing to deserialize disallowed class path {cls_name!r}") + cls = import_string(cls_name) + if not (isinstance(cls, type) and issubclass(cls, base)): + raise ValueError(f"{cls_name!r} is not a {base.__name__} subclass") + return cls + + def _encode_start_trigger_args(var: StartTriggerArgs) -> dict[str, Any]: """Encode a StartTriggerArgs.""" @@ -674,9 +701,7 @@ def deserialize(cls, encoded_var: Any) -> Any: return exc_cls(*args, **kwargs) elif type_ == DAT.BASE_TRIGGER: tr_cls_name, kwargs = cls.deserialize(var) - tr_cls = import_string(tr_cls_name) - if not (isinstance(tr_cls, type) and issubclass(tr_cls, BaseTrigger)): - raise ValueError(f"{tr_cls_name!r} is not a BaseTrigger subclass") + tr_cls = _safe_import_for_deserialize(tr_cls_name, BaseTrigger) return tr_cls(**kwargs) elif type_ == DAT.SET: return {cls.deserialize(v) for v in var} diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index da434128d5856..6cb580e21aa54 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -2786,14 +2786,27 @@ def test_create_dagrun_accepts_partition_key_for_partition_at_runtime_dag(self, dr = dag_maker.create_dagrun(partition_key="runtime-key") assert dr.partition_key == "runtime-key" - def test_base_trigger_deserialization_rejects_non_trigger_class(self): - """A serialized BASE_TRIGGER whose class path is not a BaseTrigger subclass is rejected on load.""" + def test_base_trigger_deserialization_rejects_disallowed_class_path(self): + """A BASE_TRIGGER class path outside the trusted namespaces is rejected before import.""" from airflow.serialization.enums import DagAttributeTypes + # subprocess.run is importable; asserting the pre-import message (not the + # subclass message) proves the import is never attempted. encoded = BaseSerialization._encode( BaseSerialization.serialize(["subprocess.run", {"args": ["true"]}]), type_=DagAttributeTypes.BASE_TRIGGER, ) + with pytest.raises(ValueError, match="Refusing to deserialize disallowed class path"): + BaseSerialization.deserialize(encoded) + + def test_base_trigger_deserialization_rejects_non_trigger_class(self): + """A trusted-namespace class that is not a BaseTrigger subclass is still rejected.""" + from airflow.serialization.enums import DagAttributeTypes + + encoded = BaseSerialization._encode( + BaseSerialization.serialize(["airflow.models.dag.DAG", {}]), + type_=DagAttributeTypes.BASE_TRIGGER, + ) with pytest.raises(ValueError, match="not a BaseTrigger subclass"): BaseSerialization.deserialize(encoded)