1+ # ty: ignore[unresolved-import]
2+
13from datetime import timedelta
2- import yaml
34
5+ from airflow import DAG
46from airflow .models .baseoperator import chain
57from airflow .operators .dummy import DummyOperator
6- from airflow .utils .dates import days_ago
7- from airflow import DAG
8-
9- from kubernetes .client import models as k8s
8+ from airflow .operators .python import PythonOperator
109from airflow .providers .cncf .kubernetes .operators .pod import (
1110 KubernetesPodOperator ,
1211)
13-
14-
15- from airflow .operators .python import PythonOperator
16-
12+ from airflow .utils .dates import days_ago
13+ from airflow_basics .config_loader import load_config
1714from airflow_basics .datacatalog_update_bq import update_bq_entry
1815from airflow_basics .datacatalog_update_dataplex import update_dataplex
19- from airflow_basics .failure_reporting import failure_reporter , check_failures
20- from airflow_basics .config_loader import load_config
21-
22-
16+ from airflow_basics .failure_reporting import check_failures , failure_reporter
17+ from kubernetes .client import models as k8s
2318
2419dag_name = "boundaries"
2520
4742 is_paused_upon_creation = dag_metadata ["is_paused_upon_creation" ],
4843 start_date = days_ago (dag_metadata ["start_date" ]),
4944) as dag :
50-
5145 # dummy operators for flexibility
52- pipeline_start = DummyOperator (
53- task_id = "pipeline_start"
54- )
55- pipeline_end = DummyOperator (
56- task_id = "pipeline_end" ,
57- trigger_rule = "all_done"
58- )
46+ pipeline_start = DummyOperator (task_id = "pipeline_start" )
47+ pipeline_end = DummyOperator (task_id = "pipeline_end" , trigger_rule = "all_done" )
5948 failure_report_task = PythonOperator (
6049 task_id = "failure_report" ,
6150 python_callable = failure_reporter ,
6251 trigger_rule = "all_done" ,
63- op_kwargs = {' disable_reporting' : True },
52+ op_kwargs = {" disable_reporting" : True },
6453 provide_context = True ,
6554 )
66- trigger_task_list = []
6755 task_list = []
6856 for task in dag_tasks :
6957 current_task = KubernetesPodOperator (
8371 ],
8472 )
8573 task_list .append (current_task )
86-
74+
8775 # tasks are run sequentially depending on the order they are defined in configuration.yaml
8876 chain (* task_list )
8977 if dataplex_metadata .get ("dataset_id" ):
113101 print (
114102 "No dataset_id provided in the configuration, skipping update_dataplex and update_bq"
115103 )
116-
117- (
118- pipeline_start
119- >> task_list
120- >> pipeline_end
121- )
122- pipeline_end >> failure_report_task
104+
105+ (pipeline_start >> task_list >> pipeline_end )
106+ pipeline_end >> failure_report_task
0 commit comments