@@ -659,6 +659,7 @@ def _dag(
659659 }
660660 snapshots_to_create = snapshots_to_create or set ()
661661 original_snapshots_to_create = snapshots_to_create .copy ()
662+ upstream_dependencies_cache : t .Dict [SnapshotId , t .Set [SchedulingUnit ]] = {}
662663
663664 snapshot_dag = snapshot_dag or snapshots_to_dag (batches )
664665 dag = DAG [SchedulingUnit ]()
@@ -670,12 +671,15 @@ def _dag(
670671 snapshot = self .snapshots_by_name [snapshot_id .name ]
671672 intervals = intervals_per_snapshot .get (snapshot .name , [])
672673
673- upstream_dependencies : t .List [SchedulingUnit ] = []
674+ upstream_dependencies : t .Set [SchedulingUnit ] = set ()
674675
675676 for p_sid in snapshot .parents :
676- upstream_dependencies .extend (
677+ upstream_dependencies .update (
677678 self ._find_upstream_dependencies (
678- p_sid , intervals_per_snapshot , original_snapshots_to_create
679+ p_sid ,
680+ intervals_per_snapshot ,
681+ original_snapshots_to_create ,
682+ upstream_dependencies_cache ,
679683 )
680684 )
681685
@@ -726,29 +730,42 @@ def _find_upstream_dependencies(
726730 parent_sid : SnapshotId ,
727731 intervals_per_snapshot : t .Dict [str , Intervals ],
728732 snapshots_to_create : t .Set [SnapshotId ],
729- ) -> t .List [SchedulingUnit ]:
733+ cache : t .Dict [SnapshotId , t .Set [SchedulingUnit ]] = {},
734+ ) -> t .Set [SchedulingUnit ]:
730735 if parent_sid not in self .snapshots :
731- return []
736+ return set ()
737+ if parent_sid in cache :
738+ return cache [parent_sid ]
732739
733740 p_intervals = intervals_per_snapshot .get (parent_sid .name , [])
734741
742+ parent_node : t .Optional [SchedulingUnit ] = None
735743 if p_intervals :
736744 if len (p_intervals ) > 1 :
737- return [DummyNode (snapshot_name = parent_sid .name )]
738- interval = p_intervals [0 ]
739- return [EvaluateNode (snapshot_name = parent_sid .name , interval = interval , batch_index = 0 )]
740- if parent_sid in snapshots_to_create :
741- return [CreateNode (snapshot_name = parent_sid .name )]
745+ parent_node = DummyNode (snapshot_name = parent_sid .name )
746+ else :
747+ interval = p_intervals [0 ]
748+ parent_node = EvaluateNode (
749+ snapshot_name = parent_sid .name , interval = interval , batch_index = 0
750+ )
751+ elif parent_sid in snapshots_to_create :
752+ parent_node = CreateNode (snapshot_name = parent_sid .name )
753+
754+ if parent_node is not None :
755+ cache [parent_sid ] = {parent_node }
756+ return {parent_node }
757+
742758 # This snapshot has no intervals and doesn't need creation which means
743759 # that it can be a transitive dependency
744- transitive_deps : t .List [SchedulingUnit ] = []
760+ transitive_deps : t .Set [SchedulingUnit ] = set ()
745761 parent_snapshot = self .snapshots [parent_sid ]
746762 for grandparent_sid in parent_snapshot .parents :
747- transitive_deps .extend (
763+ transitive_deps .update (
748764 self ._find_upstream_dependencies (
749- grandparent_sid , intervals_per_snapshot , snapshots_to_create
765+ grandparent_sid , intervals_per_snapshot , snapshots_to_create , cache
750766 )
751767 )
768+ cache [parent_sid ] = transitive_deps
752769 return transitive_deps
753770
754771 def _run_or_audit (
0 commit comments