2525from unittest import mock
2626from unittest .mock import MagicMock
2727
28+ import msgspec
2829import pendulum
2930import pytest
3031import time_machine
4243from airflow .providers .standard .triggers .temporal import DateTimeTrigger , TimeDeltaTrigger
4344from airflow .sdk import BaseOperator , task
4445from airflow .sdk .definitions .dag import _run_inline_trigger
46+ from airflow .sdk .execution_time .comms import _RequestFrame , _ResponseFrame
4547from airflow .serialization .serialized_objects import DagSerialization
4648from airflow .triggers .base import TriggerEvent
4749from airflow .utils .session import create_session
@@ -1029,7 +1031,8 @@ def test_reserialize_should_support_multiple_bundle_name_arguments(self, configu
10291031 assert serialized_dag_ids == {"test_example_bash_operator" , "test_sensor" }
10301032
10311033 @conf_vars ({("core" , "load_examples" ): "false" })
1032- def test_reserialize_should_make_equal_hash (self , configure_dag_bundles , session ):
1034+ def test_reserialize_should_make_equal_hash_with_dag_processor (self , configure_dag_bundles , session ):
1035+ from airflow .dag_processing .processor import DagFileParsingResult , DagFileProcessorProcess
10331036 from airflow .serialization .serialized_objects import LazyDeserializedDAG
10341037
10351038 with configure_dag_bundles (self .test_bundles_config ):
@@ -1038,10 +1041,19 @@ def test_reserialize_should_make_equal_hash(self, configure_dag_bundles, session
10381041 )
10391042
10401043 dagbag = DagBag (self .test_bundles_config ["bundle4" ], bundle_path = self .test_bundles_config ["bundle4" ])
1041- dag_hashes = set (
1042- [LazyDeserializedDAG (data = DagSerialization .to_dict (dag )).hash for dag in dagbag .dags .values ()]
1044+ dag_parsing_result = DagFileParsingResult (
1045+ fileloc = self .test_bundles_config ["bundle4" ].name ,
1046+ serialized_dags = [
1047+ LazyDeserializedDAG (data = DagSerialization .to_dict (dag )) for dag in dagbag .dags .values ()
1048+ ],
10431049 )
10441050
1045- serialized_dag_hash = set (session .execute (select (SerializedDagModel .dag_hash )).scalars ())
1051+ frame = _ResponseFrame (id = 0 , body = dag_parsing_result .model_dump ()).as_bytes ()
1052+ request_frame = msgspec .msgpack .Decoder [_RequestFrame ](_RequestFrame ).decode (frame [4 :])
1053+ dag_processor_parsing_result = DagFileProcessorProcess .decoder .validate_python (request_frame .body )
10461054
1047- assert dag_hashes == serialized_dag_hash
1055+ serialized_dag_hash = list (session .execute (select (SerializedDagModel .dag_hash )).scalars ())
1056+
1057+ assert len (dag_processor_parsing_result .serialized_dags ) == 1
1058+ assert len (serialized_dag_hash ) == 1
1059+ assert dag_processor_parsing_result .serialized_dags [0 ].hash == serialized_dag_hash [0 ]
0 commit comments