Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6fe9c36
test first class method
mberrien-fitzsimons Mar 6, 2025
8ce7eb9
updated variable inputs to init and individual methods
mberrien-fitzsimons Mar 7, 2025
40cac6f
added dock strings to all methods
mberrien-fitzsimons Mar 7, 2025
d550d0c
updated local path organization
mberrien-fitzsimons Mar 10, 2025
36d9651
updated way that dag is initialized
mberrien-fitzsimons Mar 10, 2025
7c4734f
added global to dag
mberrien-fitzsimons Mar 10, 2025
5d590b9
updated dag call structure
mberrien-fitzsimons Mar 10, 2025
efb510d
updated way params is called within Dag instantion
mberrien-fitzsimons Mar 10, 2025
db969c8
small update
mberrien-fitzsimons Mar 10, 2025
b335d66
updated file_sources to include .keys
mberrien-fitzsimons Mar 10, 2025
2fa3399
updated dag to using eacustomdag
mberrien-fitzsimons Mar 10, 2025
58a0332
updated param type to array
mberrien-fitzsimons Mar 10, 2025
5ec70e4
updated sharefile to snowflake dag builder script to include global e…
mberrien-fitzsimons Mar 11, 2025
929d9ad
removed global function because it did not work for explosing dag id
mberrien-fitzsimons Mar 11, 2025
f7178ac
updated s3.py in attempt to fix metadata_column bug
mberrien-fitzsimons Mar 11, 2025
9ef191f
updated class so that it would construct required folder structure
mberrien-fitzsimons Mar 12, 2025
111dc3b
updated input variable name
mberrien-fitzsimons Mar 12, 2025
248f4d6
updated way that local path is structured
mberrien-fitzsimons Mar 12, 2025
89a340b
updated where date and timestamp are instantiated
mberrien-fitzsimons Mar 12, 2025
85f91eb
updated timestamp code to use airflow runtime instead of datetime now…
mberrien-fitzsimons Mar 12, 2025
9882e64
updated timestamp to go back to original form
mberrien-fitzsimons Mar 12, 2025
9644caf
updated way filepath is created
mberrien-fitzsimons Mar 12, 2025
74e8615
save final changes to sharefile to snowflake dag before complete refa…
mberrien-fitzsimons Mar 12, 2025
01e8438
updated sharefile to snowflake dag builder class to remove unecessary…
mberrien-fitzsimons Mar 12, 2025
9a90112
updated sharefile sources
mberrien-fitzsimons Mar 13, 2025
bd8408e
re-ordered init arguments
mberrien-fitzsimons Mar 14, 2025
bd50225
update to readme documentation
mberrien-fitzsimons Mar 20, 2025
6eff93a
updated readme with yaml file showing file_sources
mberrien-fitzsimons Mar 20, 2025
df7cc14
added additional information to yaml section of readme
mberrien-fitzsimons Mar 20, 2025
32b37ff
updated s3 to snowflake task
mberrien-fitzsimons Mar 21, 2025
e791ca2
updated sharefile to snowflake dag builder
mberrien-fitzsimons Mar 21, 2025
0f4ef7c
added print statement for folder ID to see what is happening
mberrien-fitzsimons Mar 24, 2025
6f57bf6
updated transfer_to_s3
mberrien-fitzsimons Mar 24, 2025
344ada0
removed partial method
mberrien-fitzsimons Mar 24, 2025
950ca35
added controle flow for choosing tansfer from s3 to snowflake step
mberrien-fitzsimons Mar 25, 2025
3dab30f
updated sharefile to working state
mberrien-fitzsimons Mar 27, 2025
f76a18a
updated file to handle both single file and folder. testing in TX dev…
mberrien-fitzsimons Mar 27, 2025
47b0166
This was doe in order to access changes to the sharefiletodiskoperato…
mberrien-fitzsimons Mar 27, 2025
5baa75b
fixed mistake in imports
mberrien-fitzsimons Mar 27, 2025
2e53992
Merge branch 'main' into feature/sharefile_to_snowflake_dag_builder
Jul 9, 2025
7ed9fa0
Clean up dag and add to init.
Aug 1, 2025
1c588db
Update SharefileToSnowflakeDag.
Aug 11, 2025
5d3eea4
Make local file path more configurable.
Aug 18, 2025
4de45e3
Refactor sharefile_to_snowflake_dag and create ea_csv helper functions.
Aug 26, 2025
623b7d7
Make move_to_processed optional and allow numbered txt columns.
Sep 10, 2025
ee79a5f
Update conditional move_to_provessed task handling.
Sep 10, 2025
908307c
Update SharefileToSnowflakeDag and ea_csv docs.
Sep 16, 2025
f3fe13e
Add csv_encoding argument to translate_csv_to_jsonl.
Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,67 @@ Raise an error if a name-collision occurs after formatting.
</details>


## ea_csv
Helpers for working with csv's.

<details>
<summary>See more:</summary>

-----

### txt_to_csv
Convert a txt file to a csv.

Args:
- file_in (str): A path to a txt file.
- file_out (str): A path to a csv file. If 'None', then the input file path
is used.
- delimiter (str): A txt file delimiter.
- has_header (bool): If True, use the first row of the txt file as a column
header. If False, insert a column header using the column_names arg.
Default is True.
- column_names (list[str]): An ordered list of column names to use in the
output csv. If 'None' and has_header is False, insert an ordered,
integer column header (e.g. 1, 2, ..., n where n is the number of
columns).
- delete_txt (bool): If True, delete the input txt file.

Returns:
- file_out (str): A csv file path.

### txt_files_to_csv
Convert all txt files in a directory to csv files. Also works with a
single txt file path and can be optionally configured to process files in
all subdirectories.

Args:
- path_in (str): A file or directory path containing zero or more txt files.
- path_out (str): A file or directory path to write csv file(s) to. If
'None', then the input path is used. Note that a file will retain its
original names except with a .csv extension.
- delimiter (str): A txt file delimiter. Note that this function assumes
that all txt files in an input directory use the same delimiter.
- has_header (bool): If True, use the first row of the txt file(s) as a
column header. If False, insert a column header using the column_names
arg. Default is True.
- column_names (list[str]): An ordered list of column names to use in the
output csv(s). If 'None' and has_header is False, insert an ordered,
integer column header (e.g. 1, 2, ..., n where n is the number of
columns).
- delete_txt (bool): If True, delete all of the input txt files.
- include_subdirs (bool): If True, process all files in all subdirectories.
If False, only process files in the top level of the specified
directory. Default is False.

Returns:
- path_out (str): A file or directory path containing the output csv
file(s).

-----

</details>


## ftp
FTP- and SFTP-utility helpers

Expand Down Expand Up @@ -623,7 +684,113 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w

</details>

## SharefileToSnowflakeDag
`SharefileToSnowflakeDag` is an Airflow DAG that automates the process of
transferring files from ShareFile to Snowflake. The DAG retrieves txt and csv
files from a specified ShareFile location, transforms them into JSONL format,
uploads the files to an S3 bucket, and finally loads the data into a Snowflake
database.

<details>
<summary>Arguments:</summary>

-----

| Argument | Description |
|-------------------------|--------------------------------------------------------------------------|
| sharefile_conn_id | A Sharefile connection ID. |
| local_base_path | A base local path for downloading files. |
| s3_conn_id | An Airflow connection ID for AWS S3. |
| s3_bucket | An S3 bucket where to stage files. |
| snowflake_conn_id | An Airflow connection ID for Snowflake. |
| snowflake_database | A Snowflake database name. |
| snowflake_schema | A Snowflake schema name. |
| **kwargs | Additional arguments to pass to the Airflow DAG. |

-----

</details>

<details>
<summary>Methods:</summary>

-----

**build_task_group()**

Builds a task group to load data from csv and txt files in a
Sharefile directory to a Snowflake table.

Note that the arguments specified here are relative to the class
arguments provided at instantiation. For example, the sharefile_path
argument is relative to the sharefile_conn_id specified at the class
level.

| Argument | Description |
|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| group_id | A name for the Airflow task group. |
| sharefile_source_path | A Sharefile path to extract data from. |
| sharefile_processed_path | A Sharefile path to move files to after they have been processed. If None, do not move processed files. Default is None. |
| local_rel_path | A local relative path to stage data in with respect to the class's local_base_path. This is also used to determine the staging S3 destination relative to the class's S3 bucket. |
| snowflake_table | A Snowflake table name to write data to. |
| txt_delimiter | A text delimiter used in the txt files to load. Default is ','. |
| txt_has_header | If True, uses the first row of the txt file as a column header. If False, inserts a column header based on the txt columns arg. Default is True. |
| txt_columns | An ordered list of column names in the txt files to load. If None and txt_has_header is False, then columns are labeled using integers (i.e. 1, 2, 3, ..., n, where n is the number of columns). Default is None. |
| custom_metadata | A mapping of metadata field names to values to include in the target Snowflake table. |
| full_refresh | If True, performs a full refresh load in Snowflake. Default is False. |
| csv_encoding | Optional encoding to use for csv files. Default is 'utf-8'. |
| **kwargs | Additional keyword arguments to pass to the task group. |

-----

</details>

<details>
<summary>Example Yaml File:</summary>

```yaml
default_args: &default_args
owner:
run_as_user:
depends_on_past:
start_date:
email:
email_on_failure: False
retries: 0
trigger_rule:
retry_delay:
execution_timeout:
sla:

### Sharefile to Snowflake DAGs
sharefile_to_snowflake_dags__default_args: &sharefile_to_snowflake_dags__default_args
sharefile_conn_id:
# Null here means processed files will not be moved in Sharefile
sharefile_processed_dir:

local_base_path:

s3_conn_id:
s3_bucket:

snowflake_conn_id:
snowflake_database:
snowflake_schema:

airflow_default_args: *default_task_args
schedule_interval: null

sharefile_to_snowflake_dags:
resource_1:
<<: *sharefile_to_snowflake_dags__default_args
sharefile_base_path: path/to/folder
snowflake_table:
resource_2:
<<: *sharefile_to_snowflake_dags__default_args
sharefile_base_path: path/to/folder
snowflake_table:
```
</details>

# Providers
Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators.
Expand Down
1 change: 1 addition & 0 deletions ea_airflow_util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ea_airflow_util.dags.dbt_snapshot_dag import DbtSnapshotDag
from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag
from ea_airflow_util.dags.sharefile_custom_users_dag import LoadSharefileCustomUsersDag
from ea_airflow_util.dags.sharefile_to_snowflake_dag import SharefileToSnowflakeDag

from ea_airflow_util.callables.airflow import xcom_pull_template
from ea_airflow_util.callables import slack as slack_callbacks
Expand Down
128 changes: 128 additions & 0 deletions ea_airflow_util/callables/ea_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import os
import pandas as pd


def txt_to_csv(
file_in,
file_out=None,
delimiter=',',
has_header=True,
column_names=None,
delete_txt=False
):
"""Convert a txt file to a csv.

Args:
- file_in (str): A path to a txt file.
- file_out (str): A path to a csv file. If 'None', then the input file path
is used.
- delimiter (str): A txt file delimiter.
- has_header (bool): If True, use the first row of the txt file as a column
header. If False, insert a column header using the column_names arg.
Default is True.
- column_names (list[str]): An ordered list of column names to use in the
output csv. If 'None' and has_header is False, insert an ordered,
integer column header (e.g. 1, 2, ..., n where n is the number of
columns).
- delete_txt (bool): If True, delete the input txt file.

Returns:
- file_out (str): A csv file path.
"""

if has_header == True:
# Force str dtype, otherwise pandas will do things like cast int's to
# floats
df = pd.read_csv(file_in, delimiter=delimiter, dtype=str)

elif has_header == False and column_names != None:
df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None)
df.columns = column_names

elif has_header == False and column_names == None:
df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None)
# 1-indexed column labels can simplify downstream processing
df.columns = df.columns + 1

if file_out == None:
file_out = file_in[:-4] + '.csv'

df.to_csv(file_out, index=False)

if delete_txt:
os.remove(file_in)

return file_out


def txt_files_to_csv(
Comment thread
sleblanc23 marked this conversation as resolved.
path_in,
path_out=None,
delimiter=',',
has_header=False,
column_names=None,
delete_txt=False,
include_subdirs=False
):
"""Convert all txt files in a directory to csv files. Also works with a
single txt file path and can be optionally configured to process files in
all subdirectories.

Args:
- path_in (str): A file or directory path containing zero or more txt files.
- path_out (str): A file or directory path to write csv file(s) to. If
'None', then the input path is used. Note that a file will retain its
original names except with a .csv extension.
- delimiter (str): A txt file delimiter. Note that this function assumes
that all txt files in an input directory use the same delimiter.
- has_header (bool): If True, use the first row of the txt file(s) as a
column header. If False, insert a column header using the column_names
arg. Default is True.
- column_names (list[str]): An ordered list of column names to use in the
output csv(s). If 'None' and has_header is False, insert an ordered,
integer column header (e.g. 1, 2, ..., n where n is the number of
columns).
- delete_txt (bool): If True, delete all of the input txt files.
- include_subdirs (bool): If True, process all files in all subdirectories.
If False, only process files in the top level of the specified
directory. Default is False.

Returns:
- path_out (str): A file or directory path containing the output csv
file(s).
"""

for root, _, files in os.walk(path_in):

for file in files:

# Only process txt files
if file[-4:] != '.txt':
continue

filepath_in = os.path.join(root, file)

if path_out == None:
dir_out = root
else:
dir_out = path_out

filename_out = file[:-4] + '.csv'
filepath_out = os.path.join(dir_out, filename_out)

txt_to_csv(
file_in=filepath_in,
file_out=filepath_out,
delimiter=delimiter,
has_header=has_header,
column_names=column_names,
delete_txt=delete_txt
)

if include_subdirs == False:
break

if path_out == None:
path_out = path_in

return path_out
3 changes: 2 additions & 1 deletion ea_airflow_util/callables/jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def translate_csv_file_to_jsonl(
delete_csv : bool = False,
metadata_dict: Optional[dict] = None,
to_snake_case: bool = False,
csv_encoding: str = 'utf-8',
**kwargs
):
"""
Expand Down Expand Up @@ -95,7 +96,7 @@ def translate_csv_file_to_jsonl(
output_path_new = output_path

try:
with open(full_local_path, 'r') as reader:
with open(full_local_path, 'r', encoding=csv_encoding) as reader:
json_records = csv.DictReader(reader)
serialize_json_records_to_disk(json_records, output_path_new, "w", metadata_dict, to_snake_case, **kwargs)
except UnicodeDecodeError:
Expand Down
2 changes: 1 addition & 1 deletion ea_airflow_util/dags/s3_to_snowflake_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self,
is_manual_upload: bool = False,

pool: str,
full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level
full_replace: bool = False,

do_delete_from_source: bool = True,
**kwargs
Expand Down
Loading