From 131f734b279eb654ede1a9d94a43a96f4a64bb4c Mon Sep 17 00:00:00 2001 From: oscarrezab Date: Tue, 17 Jun 2025 16:25:36 -0500 Subject: [PATCH 1/2] Add Example YAML configuration and Example DAG instantiation for S3ToSnowflakeDag and SFTPToSnowflakeDag --- README.md | 172 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 171 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 537f365..50f5d4d 100644 --- a/README.md +++ b/README.md @@ -482,7 +482,6 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg - ## S3ToSnowflakeDag This DAG transfers data from an S3 bucket location into the Snowflake raw data lake. It should be used when data sources are not available from an Ed-Fi ODS but need to be brought into the data warehouse. @@ -516,6 +515,89 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg +
+Example YAML configuration: + +```yaml +s3_to_snowflake_dags__default_args: &s3_to_snowflake_dags__default_args + default_args: *default_task_args + schedule_interval: null + start_date: '2022-08-01' + database: raw + schema: external + s3_source_conn_id: 'external_s3' + s3_dest_conn_id: 'data_lake' + transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py' + slack_conn_id: null + snowflake_conn_id: 'snowflake' + pool: 'external_pulls' + s3_dest_file_extension: '.json' + full_replace: True + +s3_to_snowflake_dags: + canon_city_sd: + ccr: + 2020: + resource_names: + - cdip_matriculation + - concurrent_enrollment + <<: *s3_to_snowflake_dags__default_args + 2021: + resource_names: + - cdip_matriculation + - concurrent_enrollment + <<: *s3_to_snowflake_dags__default_args + 2022: + resource_names: + - cdip_matriculation + - concurrent_enrollment + <<: *s3_to_snowflake_dags__default_args + 2023: + resource_names: + - concurrent_enrollment + <<: *s3_to_snowflake_dags__default_args +``` + +
+ +
+Example DAG instantiation: + +```python +import logging +from util import io_helpers +from ea_airflow_util import S3ToSnowflakeDag + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + +# Load variables from Airflow config +configs_dir = '/home/airflow/airflow/configs' +airflow_configs_file = 'airflow_config.yml' +airflow_configs = io_helpers.safe_load_yaml(configs_dir, airflow_configs_file) + +if dag_params := airflow_configs.get('s3_to_snowflake_dags'): + for tenant_code, data_sources in dag_params.items(): + + for data_source, api_year_vars in data_sources.items(): + + for api_year, dag_vars in api_year_vars.items(): + + s3_to_snowflake_dag_id = f"s3_to_snowflake_{tenant_code}_{data_source}_{api_year}" + + s3_to_snowflake_dag = S3ToSnowflakeDag( + dag_id=s3_to_snowflake_dag_id, + tenant_code=tenant_code, + api_year=api_year, + data_source=data_source, + **dag_vars + ) + + s3_to_snowflake_dag.build_s3_to_snowflake_dag() + + globals()[s3_to_snowflake_dag.dag.dag_id] = s3_to_snowflake_dag.dag +``` + +
## SFTPToSnowflakeDag @@ -542,6 +624,94 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg +
+Example YAML configuration: + +```yaml +sftp_to_snowflake_dags__default_args: &sftp_to_snowflake_dags__default_args + default_args: *default_task_args + schedule_interval: '0 7 * * *' + database: raw + schema: external + s3_conn_id: 'data_lake' + full_replace: True + slack_conn_id: null + snowflake_conn_id: 'snowflake' + pool: 'external_pulls' + domain: 'ccr' + sftp_conn_id: 'xello' + sftp_filepath: '/GluonaCO/' + local_base_path: '/efs/tmp_storage/' + transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py' + do_delete_from_local: True + +sftp_to_snowflake_dags: + brush_sd: + 2025: + xello_career_interests: + file_pattern: '*StudentDataForBrush*' + <<: *sftp_to_snowflake_dags__default_args + haxtun_sd: + 2025: + xello_career_interests: + file_pattern: '*StudentDataForHax*' + <<: *sftp_to_snowflake_dags__default_args + lone_star_sd: + 2025: + xello_career_interests: + file_pattern: '*StudentDataForLone*' + <<: *sftp_to_snowflake_dags__default_args + wiggins_sd: + 2025: + xello_career_interests: + file_pattern: '*StudentDataForWiggins*' + <<: *sftp_to_snowflake_dags__default_args +``` + +
+ +
+Example DAG instantiation: + +```python +import logging +from util import io_helpers +from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + +# Load variables from Airflow config +configs_dir = '/home/airflow/airflow/configs' +airflow_configs_file = 'airflow_config.yml' +airflow_configs = io_helpers.safe_load_yaml(configs_dir, airflow_configs_file) +default_args = airflow_configs.get('sftp_to_snowflake_dags__default_args') +dag_params = airflow_configs.get('sftp_to_snowflake_dags') + +sftp_to_snowflake_dag = SFTPToSnowflakeDag( + dag_id='sftp_to_snowflake', + **default_args +) + +for tenant_code, api_year_vars in dag_params.items(): + + for api_year, resource_name_vars in api_year_vars.items(): + + for resource_name, dag_vars in resource_name_vars.items(): + + tenant_year_resource_taskgroup = sftp_to_snowflake_dag.build_tenant_year_resource_taskgroup( + tenant_code=tenant_code, + api_year=api_year, + resource_name=resource_name, + **dag_vars + ) + + tenant_year_resource_taskgroup + +globals()[sftp_to_snowflake_dag.dag.dag_id] = sftp_to_snowflake_dag.dag +``` + +
+ ## AWSParamStoreToAirflowDAG From f2ada01f1210a4bed67bb40d853d4122e7174778 Mon Sep 17 00:00:00 2001 From: oscarrezab Date: Thu, 19 Jun 2025 15:36:00 -0500 Subject: [PATCH 2/2] Shorten yaml configs for both S3ToSnowflake and SFTPToSnowflake DAGs --- README.md | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/README.md b/README.md index 50f5d4d..9b35e04 100644 --- a/README.md +++ b/README.md @@ -522,13 +522,11 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg s3_to_snowflake_dags__default_args: &s3_to_snowflake_dags__default_args default_args: *default_task_args schedule_interval: null - start_date: '2022-08-01' database: raw schema: external s3_source_conn_id: 'external_s3' s3_dest_conn_id: 'data_lake' transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py' - slack_conn_id: null snowflake_conn_id: 'snowflake' pool: 'external_pulls' s3_dest_file_extension: '.json' @@ -546,15 +544,6 @@ s3_to_snowflake_dags: resource_names: - cdip_matriculation - concurrent_enrollment - <<: *s3_to_snowflake_dags__default_args - 2022: - resource_names: - - cdip_matriculation - - concurrent_enrollment - <<: *s3_to_snowflake_dags__default_args - 2023: - resource_names: - - concurrent_enrollment <<: *s3_to_snowflake_dags__default_args ``` @@ -635,7 +624,6 @@ sftp_to_snowflake_dags__default_args: &sftp_to_snowflake_dags__default_args schema: external s3_conn_id: 'data_lake' full_replace: True - slack_conn_id: null snowflake_conn_id: 'snowflake' pool: 'external_pulls' domain: 'ccr' @@ -655,17 +643,7 @@ sftp_to_snowflake_dags: 2025: xello_career_interests: file_pattern: '*StudentDataForHax*' - <<: *sftp_to_snowflake_dags__default_args - lone_star_sd: - 2025: - xello_career_interests: - file_pattern: '*StudentDataForLone*' - <<: *sftp_to_snowflake_dags__default_args - wiggins_sd: - 2025: - xello_career_interests: - file_pattern: '*StudentDataForWiggins*' - <<: *sftp_to_snowflake_dags__default_args + <<: *sftp_to_snowflake_dags__default_args ```