@@ -148,16 +148,14 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
148148 plan_dag_spec .environment .name ,
149149 )
150150
151- environment = plan_dag_spec .environment
152-
153151 all_snapshots = {
154152 ** {s .snapshot_id : s for s in plan_dag_spec .new_snapshots },
155- ** self ._state_reader .get_snapshots (environment .snapshots ),
153+ ** self ._state_reader .get_snapshots (plan_dag_spec . environment .snapshots ),
156154 }
157155
158156 snapshots_to_create = [
159157 all_snapshots [snapshot .snapshot_id ]
160- for snapshot in environment .snapshots
158+ for snapshot in plan_dag_spec . environment .snapshots
161159 if snapshot .snapshot_id in all_snapshots
162160 and (
163161 plan_dag_spec .models_to_backfill is None
@@ -216,22 +214,36 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
216214 (
217215 promote_start_task ,
218216 promote_end_task ,
219- ) = self ._create_promotion_demotion_tasks (plan_dag_spec , environment , all_snapshots )
220-
221- update_views_task_pair = self ._create_update_views_tasks (plan_dag_spec , all_snapshots )
222-
223- finalize_task = self ._create_finalize_task (environment )
217+ ) = self ._create_promotion_demotion_tasks (plan_dag_spec , all_snapshots )
224218
225219 start_task >> create_start_task
226220 create_end_task >> backfill_before_promote_start_task
227221 backfill_before_promote_end_task >> promote_start_task
228- promote_end_task >> backfill_after_promote_start_task
229222
223+ update_views_task_pair = self ._create_update_views_tasks (plan_dag_spec , all_snapshots )
230224 if update_views_task_pair :
231225 backfill_after_promote_end_task >> update_views_task_pair [0 ]
232- update_views_task_pair [1 ] >> finalize_task
226+ before_finalize_task = update_views_task_pair [1 ]
233227 else :
234- backfill_after_promote_end_task >> finalize_task
228+ before_finalize_task = backfill_after_promote_end_task
229+
230+ unpause_snapshots_task = self ._create_unpause_snapshots_task (plan_dag_spec )
231+ if unpause_snapshots_task :
232+ if not plan_dag_spec .ensure_finalized_snapshots :
233+ # Only unpause right after updatign the environment record if we don't
234+ # have to use the finalized snapshots for subsequent plan applications.
235+ promote_end_task >> unpause_snapshots_task
236+ unpause_snapshots_task >> backfill_after_promote_start_task
237+ else :
238+ # Otherwise, unpause right before finalizing the environment.
239+ promote_end_task >> backfill_after_promote_start_task
240+ before_finalize_task >> unpause_snapshots_task
241+ before_finalize_task = unpause_snapshots_task
242+ else :
243+ promote_end_task >> backfill_after_promote_start_task
244+
245+ finalize_task = self ._create_finalize_task (plan_dag_spec .environment )
246+ before_finalize_task >> finalize_task
235247
236248 self ._add_notification_target_tasks (plan_dag_spec , start_task , end_task , finalize_task )
237249 return dag
@@ -310,51 +322,52 @@ def _create_creation_tasks(
310322 def _create_promotion_demotion_tasks (
311323 self ,
312324 request : common .PlanDagSpec ,
313- environment : Environment ,
314325 snapshots : t .Dict [SnapshotId , Snapshot ],
315326 ) -> t .Tuple [BaseOperator , BaseOperator ]:
316327 update_state_task = PythonOperator (
317328 task_id = "snapshot_promotion_update_state" ,
318329 python_callable = promotion_update_state_task ,
319330 op_kwargs = {
320- "environment" : environment ,
331+ "environment" : request . environment ,
321332 "no_gaps_snapshot_names" : (
322333 request .no_gaps_snapshot_names if request .no_gaps else set ()
323334 ),
324335 },
325336 )
326337
327338 start_task = update_state_task
328- end_task = update_state_task
329-
330- if request .environment .promoted_snapshots :
331- if not request .is_dev and request .unpaused_dt :
332- migrate_tables_task = self ._create_snapshot_migrate_tables_operator (
333- [
334- snapshots [s .snapshot_id ]
335- for s in request .environment .promoted_snapshots
336- if snapshots [s .snapshot_id ].is_paused
337- ],
338- request .ddl_concurrent_tasks ,
339- "snapshot_promotion_migrate_tables" ,
340- )
341-
342- unpause_snapshots_task = PythonOperator (
343- task_id = "snapshot_promotion_unpause_snapshots" ,
344- python_callable = promotion_unpause_snapshots_task ,
345- op_kwargs = {
346- "environment" : environment ,
347- "unpaused_dt" : request .unpaused_dt ,
348- },
349- trigger_rule = "none_failed" ,
350- )
351-
352- update_state_task >> migrate_tables_task
353- migrate_tables_task >> unpause_snapshots_task
354- end_task = unpause_snapshots_task
339+ end_task : BaseOperator = update_state_task
340+
341+ if request .environment .promoted_snapshots and not request .is_dev and request .unpaused_dt :
342+ migrate_tables_task = self ._create_snapshot_migrate_tables_operator (
343+ [
344+ snapshots [s .snapshot_id ]
345+ for s in request .environment .promoted_snapshots
346+ if snapshots [s .snapshot_id ].is_paused
347+ ],
348+ request .ddl_concurrent_tasks ,
349+ "snapshot_promotion_migrate_tables" ,
350+ )
351+ update_state_task >> migrate_tables_task
352+ end_task = migrate_tables_task
355353
356354 return (start_task , end_task )
357355
356+ def _create_unpause_snapshots_task (
357+ self , request : common .PlanDagSpec
358+ ) -> t .Optional [BaseOperator ]:
359+ if request .is_dev or not request .unpaused_dt :
360+ return None
361+ return PythonOperator (
362+ task_id = "snapshot_promotion_unpause_snapshots" ,
363+ python_callable = promotion_unpause_snapshots_task ,
364+ op_kwargs = {
365+ "environment" : request .environment ,
366+ "unpaused_dt" : request .unpaused_dt ,
367+ },
368+ trigger_rule = "none_failed" ,
369+ )
370+
358371 def _create_update_views_tasks (
359372 self , request : common .PlanDagSpec , snapshots : t .Dict [SnapshotId , Snapshot ]
360373 ) -> t .Optional [t .Tuple [BaseOperator , BaseOperator ]]:
0 commit comments