diff --git a/README.md b/README.md index 537f365..9b35e04 100644 --- a/README.md +++ b/README.md @@ -482,7 +482,6 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg - ## S3ToSnowflakeDag This DAG transfers data from an S3 bucket location into the Snowflake raw data lake. It should be used when data sources are not available from an Ed-Fi ODS but need to be brought into the data warehouse. @@ -516,6 +515,78 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg +
+Example YAML configuration: + +```yaml +s3_to_snowflake_dags__default_args: &s3_to_snowflake_dags__default_args + default_args: *default_task_args + schedule_interval: null + database: raw + schema: external + s3_source_conn_id: 'external_s3' + s3_dest_conn_id: 'data_lake' + transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py' + snowflake_conn_id: 'snowflake' + pool: 'external_pulls' + s3_dest_file_extension: '.json' + full_replace: True + +s3_to_snowflake_dags: + canon_city_sd: + ccr: + 2020: + resource_names: + - cdip_matriculation + - concurrent_enrollment + <<: *s3_to_snowflake_dags__default_args + 2021: + resource_names: + - cdip_matriculation + - concurrent_enrollment + <<: *s3_to_snowflake_dags__default_args +``` + +
+ +
+Example DAG instantiation: + +```python +import logging +from util import io_helpers +from ea_airflow_util import S3ToSnowflakeDag + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + +# Load variables from Airflow config +configs_dir = '/home/airflow/airflow/configs' +airflow_configs_file = 'airflow_config.yml' +airflow_configs = io_helpers.safe_load_yaml(configs_dir, airflow_configs_file) + +if dag_params := airflow_configs.get('s3_to_snowflake_dags'): + for tenant_code, data_sources in dag_params.items(): + + for data_source, api_year_vars in data_sources.items(): + + for api_year, dag_vars in api_year_vars.items(): + + s3_to_snowflake_dag_id = f"s3_to_snowflake_{tenant_code}_{data_source}_{api_year}" + + s3_to_snowflake_dag = S3ToSnowflakeDag( + dag_id=s3_to_snowflake_dag_id, + tenant_code=tenant_code, + api_year=api_year, + data_source=data_source, + **dag_vars + ) + + s3_to_snowflake_dag.build_s3_to_snowflake_dag() + + globals()[s3_to_snowflake_dag.dag.dag_id] = s3_to_snowflake_dag.dag +``` + +
## SFTPToSnowflakeDag @@ -542,6 +613,83 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg +
+Example YAML configuration: + +```yaml +sftp_to_snowflake_dags__default_args: &sftp_to_snowflake_dags__default_args + default_args: *default_task_args + schedule_interval: '0 7 * * *' + database: raw + schema: external + s3_conn_id: 'data_lake' + full_replace: True + snowflake_conn_id: 'snowflake' + pool: 'external_pulls' + domain: 'ccr' + sftp_conn_id: 'xello' + sftp_filepath: '/GluonaCO/' + local_base_path: '/efs/tmp_storage/' + transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py' + do_delete_from_local: True + +sftp_to_snowflake_dags: + brush_sd: + 2025: + xello_career_interests: + file_pattern: '*StudentDataForBrush*' + <<: *sftp_to_snowflake_dags__default_args + haxtun_sd: + 2025: + xello_career_interests: + file_pattern: '*StudentDataForHax*' + <<: *sftp_to_snowflake_dags__default_args +``` + +
+ +
+Example DAG instantiation: + +```python +import logging +from util import io_helpers +from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + +# Load variables from Airflow config +configs_dir = '/home/airflow/airflow/configs' +airflow_configs_file = 'airflow_config.yml' +airflow_configs = io_helpers.safe_load_yaml(configs_dir, airflow_configs_file) +default_args = airflow_configs.get('sftp_to_snowflake_dags__default_args') +dag_params = airflow_configs.get('sftp_to_snowflake_dags') + +sftp_to_snowflake_dag = SFTPToSnowflakeDag( + dag_id='sftp_to_snowflake', + **default_args +) + +for tenant_code, api_year_vars in dag_params.items(): + + for api_year, resource_name_vars in api_year_vars.items(): + + for resource_name, dag_vars in resource_name_vars.items(): + + tenant_year_resource_taskgroup = sftp_to_snowflake_dag.build_tenant_year_resource_taskgroup( + tenant_code=tenant_code, + api_year=api_year, + resource_name=resource_name, + **dag_vars + ) + + tenant_year_resource_taskgroup + +globals()[sftp_to_snowflake_dag.dag.dag_id] = sftp_to_snowflake_dag.dag +``` + +
+ ## AWSParamStoreToAirflowDAG