Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 149 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg
</details>



## S3ToSnowflakeDag
This DAG transfers data from an S3 bucket location into the Snowflake raw data lake.
It should be used when data sources are not available from an Ed-Fi ODS but need to be brought into the data warehouse.
Expand Down Expand Up @@ -516,6 +515,78 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg

</details>

<details>
<summary>Example YAML configuration:</summary>

```yaml
s3_to_snowflake_dags__default_args: &s3_to_snowflake_dags__default_args
default_args: *default_task_args
schedule_interval: null
database: raw
schema: external
s3_source_conn_id: 'external_s3'
s3_dest_conn_id: 'data_lake'
transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py'
snowflake_conn_id: 'snowflake'
pool: 'external_pulls'
s3_dest_file_extension: '.json'
full_replace: True

s3_to_snowflake_dags:
canon_city_sd:
ccr:
2020:
resource_names:
- cdip_matriculation
- concurrent_enrollment
<<: *s3_to_snowflake_dags__default_args
2021:
resource_names:
- cdip_matriculation
- concurrent_enrollment
<<: *s3_to_snowflake_dags__default_args
```

</details>

<details>
<summary>Example DAG instantiation:</summary>

```python
import logging
from util import io_helpers
from ea_airflow_util import S3ToSnowflakeDag

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Load variables from Airflow config
configs_dir = '/home/airflow/airflow/configs'
airflow_configs_file = 'airflow_config.yml'
airflow_configs = io_helpers.safe_load_yaml(configs_dir, airflow_configs_file)

if dag_params := airflow_configs.get('s3_to_snowflake_dags'):
for tenant_code, data_sources in dag_params.items():

for data_source, api_year_vars in data_sources.items():

for api_year, dag_vars in api_year_vars.items():

s3_to_snowflake_dag_id = f"s3_to_snowflake_{tenant_code}_{data_source}_{api_year}"

s3_to_snowflake_dag = S3ToSnowflakeDag(
dag_id=s3_to_snowflake_dag_id,
tenant_code=tenant_code,
api_year=api_year,
data_source=data_source,
**dag_vars
)

s3_to_snowflake_dag.build_s3_to_snowflake_dag()

globals()[s3_to_snowflake_dag.dag.dag_id] = s3_to_snowflake_dag.dag
```

</details>


## SFTPToSnowflakeDag
Expand All @@ -542,6 +613,83 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg

</details>

<details>
<summary>Example YAML configuration:</summary>

```yaml
sftp_to_snowflake_dags__default_args: &sftp_to_snowflake_dags__default_args
default_args: *default_task_args
schedule_interval: '0 7 * * *'
database: raw
schema: external
s3_conn_id: 'data_lake'
full_replace: True
snowflake_conn_id: 'snowflake'
pool: 'external_pulls'
domain: 'ccr'
sftp_conn_id: 'xello'
sftp_filepath: '/GluonaCO/'
local_base_path: '/efs/tmp_storage/'
transform_script: '/home/airflow/airflow/dags/util/csv_to_json.py'
do_delete_from_local: True

sftp_to_snowflake_dags:
brush_sd:
2025:
xello_career_interests:
file_pattern: '*StudentDataForBrush*'
<<: *sftp_to_snowflake_dags__default_args
haxtun_sd:
2025:
xello_career_interests:
file_pattern: '*StudentDataForHax*'
<<: *sftp_to_snowflake_dags__default_args
```

</details>

<details>
<summary>Example DAG instantiation:</summary>

```python
import logging
from util import io_helpers
from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Load variables from Airflow config
configs_dir = '/home/airflow/airflow/configs'
airflow_configs_file = 'airflow_config.yml'
airflow_configs = io_helpers.safe_load_yaml(configs_dir, airflow_configs_file)
default_args = airflow_configs.get('sftp_to_snowflake_dags__default_args')
dag_params = airflow_configs.get('sftp_to_snowflake_dags')

sftp_to_snowflake_dag = SFTPToSnowflakeDag(
dag_id='sftp_to_snowflake',
**default_args
)

for tenant_code, api_year_vars in dag_params.items():

for api_year, resource_name_vars in api_year_vars.items():

for resource_name, dag_vars in resource_name_vars.items():

tenant_year_resource_taskgroup = sftp_to_snowflake_dag.build_tenant_year_resource_taskgroup(
tenant_code=tenant_code,
api_year=api_year,
resource_name=resource_name,
**dag_vars
)

tenant_year_resource_taskgroup

globals()[sftp_to_snowflake_dag.dag.dag_id] = sftp_to_snowflake_dag.dag
```

</details>



## AWSParamStoreToAirflowDAG
Expand Down