@@ -2082,7 +2082,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
20822082
20832083def apply_auto_restatements (
20842084 snapshots : t .Dict [SnapshotId , Snapshot ], execution_time : TimeLike
2085- ) -> t .List [SnapshotIntervals ]:
2085+ ) -> t .Tuple [ t . List [SnapshotIntervals ], t . Dict [ SnapshotId , SnapshotId ] ]:
20862086 """Applies auto restatements to the snapshots.
20872087
20882088 This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2097,6 +2097,8 @@ def apply_auto_restatements(
20972097 A list of SnapshotIntervals with **new** intervals that need to be restated.
20982098 """
20992099 dag = snapshots_to_dag (snapshots .values ())
2100+ snapshots_with_auto_restatements : t .List [SnapshotId ] = []
2101+ auto_restatement_triggers : t .Dict [SnapshotId , SnapshotId ] = {}
21002102 auto_restated_intervals_per_snapshot : t .Dict [SnapshotId , Interval ] = {}
21012103 for s_id in dag :
21022104 if s_id not in snapshots :
@@ -2120,6 +2122,23 @@ def apply_auto_restatements(
21202122 )
21212123 auto_restated_intervals .append (next_auto_restated_interval )
21222124
2125+ # auto-restated snapshot is its own trigger
2126+ snapshots_with_auto_restatements .append (s_id )
2127+ auto_restatement_triggers [s_id ] = s_id
2128+ else :
2129+ for parent_s_id in snapshot .parents :
2130+ # first auto-restated parent is the trigger
2131+ if parent_s_id in snapshots_with_auto_restatements :
2132+ auto_restatement_triggers [s_id ] = parent_s_id
2133+ break
2134+ # if no trigger yet and parent has trigger, inherit their trigger
2135+ # - will be overwritten if a different parent is auto-restated
2136+ if (
2137+ parent_s_id in auto_restatement_triggers
2138+ and s_id not in auto_restatement_triggers
2139+ ):
2140+ auto_restatement_triggers [s_id ] = auto_restatement_triggers [parent_s_id ]
2141+
21232142 if auto_restated_intervals :
21242143 auto_restated_interval_start = sys .maxsize
21252144 auto_restated_interval_end = - sys .maxsize
@@ -2149,20 +2168,22 @@ def apply_auto_restatements(
21492168
21502169 snapshot .apply_pending_restatement_intervals ()
21512170 snapshot .update_next_auto_restatement_ts (execution_time )
2152-
2153- return [
2154- SnapshotIntervals (
2155- name = snapshots [s_id ].name ,
2156- identifier = None ,
2157- version = snapshots [s_id ].version ,
2158- dev_version = None ,
2159- intervals = [],
2160- dev_intervals = [],
2161- pending_restatement_intervals = [interval ],
2162- )
2163- for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2164- if s_id in snapshots
2165- ]
2171+ return (
2172+ [
2173+ SnapshotIntervals (
2174+ name = snapshots [s_id ].name ,
2175+ identifier = None ,
2176+ version = snapshots [s_id ].version ,
2177+ dev_version = None ,
2178+ intervals = [],
2179+ dev_intervals = [],
2180+ pending_restatement_intervals = [interval ],
2181+ )
2182+ for s_id , interval in auto_restated_intervals_per_snapshot .items ()
2183+ if s_id in snapshots
2184+ ],
2185+ auto_restatement_triggers ,
2186+ )
21662187
21672188
21682189def parent_snapshots_by_name (
0 commit comments