Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,007 changes: 2,007 additions & 0 deletions assets/notebooks/GrupoA.ipynb

Large diffs are not rendered by default.

118 changes: 118 additions & 0 deletions dags/GAUNFlores_dag_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
import os
from airflow import DAG
from datetime import datetime
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np


##################### Connection #####################

POSTGRES_CONN_ID = "alkemy_db"
AWS_CONN_ID = "aws_s3_bucket"

##################### Logging Config #################

LOGS_DIR = '/usr/local/airflow/tests/'
LOGGER = logging.getLogger('GAUFlores')
LOGGER.setLevel(logging.INFO)

FORMATTER = logging.Formatter('%(asctime)s - %(name)s - %(message)s', '%Y-%m-%d')

HANDLER = logging.FileHandler(os.path.join(LOGS_DIR, 'GAUFlores.log'))
HANDLER.setFormatter(FORMATTER)

LOGGER.addHandler(HANDLER)

######################### DAG ########################

def pg_extract2csv():

LOGGER.info("Starting dag ETL process...")
LOGGER.info("Loading data...")

pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)

with open('/usr/local/airflow/include/GAUFlores.sql','r') as sqlfile:
sql_stm= sqlfile.read()

df = pg_hook.get_pandas_df(sql = f'{sql_stm}')

df.to_csv('/usr/local/airflow/tests/GAUFlores_select.csv')

LOGGER.info("Data successfully loaded")

def pd_transform2txt():

LOGGER.info("Transforming data")

df = pd.read_csv('/usr/local/airflow/tests/GAUFlores_select.csv', index_col=0)

df['university'] = df['university'].str.lower().str.strip()
df['career'] = df['career'].str.lower().str.strip()
df['last_name'] = df['last_name'].str.lower().str.strip().str.replace('(m[r|s]|[.])|(\smd\s)', '', regex=True)
df['email'] = df['email'].str.lower().str.strip()
df['gender'] = df['gender'].map({'F': 'female', 'M': 'male'})
df['inscription_date'] = pd.to_datetime(df['inscription_date'], format='%Y/%m/%d')
df['fecha_nacimiento'] = pd.to_datetime(df['fecha_nacimiento'])

today = datetime.now()
df['age'] = np.floor((today - df['fecha_nacimiento']).dt.days / 365)
df['age'] = df['age'].apply(lambda x: x if (x > 18.0) and (x < 80) else -1)
df['age'] = np.where(df['age']== -1, 21, df['age'])
df['age'] = df['age'].astype(int)

df = df.drop(columns='fecha_nacimiento')

cp_loc_df = pd.read_csv('/usr/local/airflow/tests/codigos_postales.csv')
cp_loc_df = cp_loc_df.rename(columns={'codigo_postal':'postal_code', 'localidad':'location'})
cp_loc_df['location'] = cp_loc_df['location'].str.lower().str.strip().str.replace('_',' ')

if 'location' in df.columns.to_list():
cp_loc_df['location'] = cp_loc_df['location'].drop_duplicates(keep='first')
df = df.merge(cp_loc_df, how='left', on='location')
else:
df = df.merge(cp_loc_df, how='left', on='postal_code')


df = df[['university', 'career', 'inscription_date', 'last_name', 'gender', 'age', 'postal_code', 'location', 'email']]

df.to_csv('/usr/local/airflow/tests/GAUFlores_process.txt', sep='\t', index=False)

LOGGER.info("Data succesfully transformed")
LOGGER.info("Loading data to s3 bucket...")


with DAG(
"GAUFlores_ETL",
start_date=datetime(2022, 3, 11),
schedule_interval="@hourly",
default_args={
"retries": 5
},
catchup=False,
) as dag:

extract = PythonOperator(
task_id="extract_task",
python_callable=pg_extract2csv
)

transform = PythonOperator(
task_id="transform_task",
python_callable = pd_transform2txt
)

load = LocalFilesystemToS3Operator(
task_id='load_to_s3_task',
filename='/usr/local/airflow/tests/GAUFlores_process.txt',
dest_key='GAUFlores_process.txt',
dest_bucket='dipa-s3',
aws_conn_id=AWS_CONN_ID,
replace=True,
)

extract >> transform >> load
116 changes: 116 additions & 0 deletions dags/GAUNVillaMaria_dag_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import logging
import os
from airflow import DAG
from datetime import datetime
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np


##################### Connection #####################

POSTGRES_CONN_ID = "alkemy_db"
AWS_CONN_ID = "aws_s3_bucket"

##################### Logging Config #################

LOGS_DIR = '/usr/local/airflow/tests/'
LOGGER = logging.getLogger('GAUNVillaMaria')
LOGGER.setLevel(logging.INFO)

FORMATTER = logging.Formatter('%(asctime)s - %(name)s - %(message)s', '%Y-%m-%d')

HANDLER = logging.FileHandler(os.path.join(LOGS_DIR, 'GAUNVillaMaria.log'))
HANDLER.setFormatter(FORMATTER)

LOGGER.addHandler(HANDLER)

######################### DAG ########################

def pg_extract2csv():

LOGGER.info("Starting dag ETL process...")
LOGGER.info("Loading data...")

pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)

with open('/usr/local/airflow/include/GAUNVillaMaria.sql','r') as sqlfile:
sql_stm= sqlfile.read()

df = pg_hook.get_pandas_df(sql = f'{sql_stm}')

df.to_csv('/usr/local/airflow/tests/GAUNVillaMaria_select.csv')

LOGGER.info("Data successfully loaded")

def pd_transform2txt():

LOGGER.info("Transforming data")

df = pd.read_csv('/usr/local/airflow/tests/GAUNVillaMaria_select.csv', index_col=0)

df['university'] = df['university'].str.lower().str.replace('_',' ').str.strip()
df['career'] = df['career'].str.lower().str.replace('_',' ').str.strip()
df['last_name'] = df['last_name'].str.lower().str.replace('_',' ').str.strip().str.replace('(m[r|s]|[.])|(\smd\s)', '', regex=True)
df['email'] = df['email'].str.lower().str.replace('_',' ').str.strip()
df['location'] = df['location'].str.lower().str.replace('_',' ').str.strip()
df['gender'] = df['gender'].map({'F': 'female', 'M': 'male'})
df['inscription_date'] = pd.to_datetime(df['inscription_date'], format='%Y/%m/%d')
df['fecha_nacimiento'] = pd.to_datetime(df['fecha_nacimiento'])

today = datetime.now()
df['age'] = np.floor((today - df['fecha_nacimiento']).dt.days / 365)
df['age'] = df['age'].apply(lambda x: x if (x > 18.0) and (x < 80.0) else -1)
df['age'] = np.where(df['age'] == -1, df.loc[df['age'] !=-1, 'age'].median(), df['age'])
df['age'] = df['age'].astype(int)

df = df.drop(columns='fecha_nacimiento')

cp_loc_df = pd.read_csv('/usr/local/airflow/tests/codigos_postales.csv')
cp_loc_df = cp_loc_df.rename(columns={'codigo_postal':'postal_code', 'localidad':'location'})
cp_loc_df['location'] = cp_loc_df['location'].str.lower().str.strip().str.replace('_',' ')

if 'location' in df.columns.to_list():
cp_loc_df['location'] = cp_loc_df['location'].drop_duplicates( keep='first')
df = df.merge(cp_loc_df, how='left', on='location')
else:
df = df.merge(cp_loc_df, how='left', on='postal_code')

df = df[['university', 'career', 'inscription_date', 'last_name', 'gender', 'age', 'postal_code', 'location', 'email']]

df.to_csv('/usr/local/airflow/tests/GAUNVillaMaria_process.txt', sep='\t', index=False)

LOGGER.info("Data succesfully transformed")
LOGGER.info("Loading data to s3 bucket...")

with DAG(
"GAUNVillaMaria_ETL",
start_date=datetime(2022, 3, 11),
schedule_interval="@hourly",
default_args={
"retries": 5,
},
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract_task",
python_callable=pg_extract2csv
)

transform = PythonOperator(
task_id="transform_task",
python_callable = pd_transform2txt
)

load = LocalFilesystemToS3Operator(
task_id='load_to_s3_task',
filename='/usr/local/airflow/tests/GAUNVillaMaria_process.txt',
dest_key='GAUNVillaMaria_process.txt',
dest_bucket='dipa-s3',
aws_conn_id=AWS_CONN_ID,
replace=True,
)

extract >> transform >> load
6 changes: 6 additions & 0 deletions dags/dags_dynamic/grupoA/config_GAUFlores.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
nombre : 'test'
start_date : '2022, 3, 11'
max_active_runs : 3
schedule_interval: '@daily'
retries : 1
bucket : 'dipa-s3'
5 changes: 5 additions & 0 deletions dags/dags_dynamic/grupoA/config_GAUNVillaMaria.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
nombre : 'GAUNVillaMaria'
start_date : '2022, 3, 11'
schedule_interval: '@hourly'
retries : 5
bucket : 'dipa-s3'
15 changes: 15 additions & 0 deletions dags/dags_dynamic/grupoA/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from jinja2 import Environment, FileSystemLoader
import yaml
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template("template_dags_GRUPOA.jinja2")

print(file_dir)
for filename in os.listdir(file_dir):
if filename.endswith(".yaml"):
with open(f"{file_dir}/{filename}", 'r') as configfile:
config = yaml.safe_load(configfile)
with open(f"{file_dir}/{config['nombre']}_dag_etl.py", "w") as f:
f.write(template.render(config))
105 changes: 105 additions & 0 deletions dags/dags_dynamic/grupoA/template_dags_GRUPOA.jinja2
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
import logging.config
from os import path
from pathlib import Path
from airflow import DAG
from datetime import datetime
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np

#logging.config.fileConfig('usr/local/airflow/plugins/logging.conf')
#log_file_path = path.join(path.dirname(path.abspath(__file__)), 'logging.conf')
#logging.config.fileConfig(log_file_path)
#LOGGING_CONFIG = Path(__file__).parent/'logging.conf'
#logging.config.fileConfig(LOGGING_CONFIG, disable_existing_loggers=True)
#LOGGER = logging.getLogger("GBUNComahue_dag_elt")

POSTGRES_CONN_ID = "alkemy_db"
AWS_CONN_ID = "aws_s3_bucket"


def pg_extract2csv():

pg_hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)

with open('/usr/local/airflow/include/{{ nombre }}.sql','r') as sqlfile:
sql_stm= sqlfile.read()

df = pg_hook.get_pandas_df(sql = f'{sql_stm}')

df.to_csv('/usr/local/airflow/tests/{{ nombre }}_select.csv')

def pd_transform2txt():

# load input file
df = pd.read_csv('/usr/local/airflow/tests/{{ nombre }}_select.csv', index_col=0)

# columns style and type convertion
df['university'] = df['university'].str.lower().str.replace('_',' ').str.strip()
df['career'] = df['career'].str.lower().str.replace('_',' ').str.strip()
df['last_name'] = df['last_name'].str.lower().str.replace('_',' ').str.strip()
df['email'] = df['email'].str.lower().str.replace('_',' ').str.strip()
df['location'] = df['location'].str.lower().str.replace('_',' ').str.strip()
df['gender'] = df['gender'].map({'F': 'female', 'M': 'male'})
df['inscription_date'] = pd.to_datetime(df['inscription_date'], format='%Y/%m/%d')
df['fecha_nacimiento'] = pd.to_datetime(df['fecha_nacimiento'])

# calculate column Age
today = datetime.now()
df['age'] = np.floor((today - df['fecha_nacimiento']).dt.days / 365)
df['age'] = df['age'].apply(lambda x: int(x) if (x > 16.0) and (x < 80) else -1)

df = df.drop(columns='fecha_nacimiento')

# Get column location ot postal_code from external file
cp_loc_df = pd.read_csv('/usr/local/airflow/tests/codigos_postales.csv')
cp_loc_df = cp_loc_df.rename(columns={'codigo_postal':'postal_code', 'localidad':'location'})
cp_loc_df['location'] = cp_loc_df['location'].str.lower().str.strip().str.replace('_',' ')

if 'location' in df.columns.to_list:
df = df.merge(cp_loc_df, how='inner', on='location')
else:
df = df.merge(cp_loc_df, how='inner', on='postal_code')

#order columns
df = df[['university', 'career', 'inscription_date', 'last_name', 'gender', 'age', 'postal_code', 'location', 'email']]

# save processed file
df.to_csv('/usr/local/airflow/tests/{{ nombre }}_process.txt', sep=' ', index=False)

# Instantiate DAG
with DAG(
"{{ nombre }}_ETL",
start_date=datetime({{ start_date }}),
max_active_runs= {{ max_active_runs }},
schedule_interval= "{{ schedule_interval }}",
default_args={
"retries": {{ retries }},
# "retry_delay": timedelta(minutes=1),
},
catchup=False,
#template_searchpath="/usr/local/airflow/include/"
) as dag:
extract = PythonOperator(
task_id="extract_task",
python_callable=pg_extract2csv
)

transform = PythonOperator(
task_id="transform_task",
python_callable = pd_transform2txt
)

load = LocalFilesystemToS3Operator(
task_id='load_to_s3_task',
filename='/usr/local/airflow/tests/{{ nombre }}_process.txt',
dest_key='{{ nombre }}_process.txt',
dest_bucket="{{ bucket }}",
aws_conn_id=AWS_CONN_ID,
replace=True,
)

extract >> transform >> load
6 changes: 6 additions & 0 deletions dags/logs/GAUFlores.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
2022-11-11 - GAUFlores - Starting dag ETL process...
2022-11-11 - GAUFlores - Loading data...
2022-11-11 - GAUFlores - Data successfully loaded
2022-11-11 - GAUFlores - Transforming data
2022-11-11 - GAUFlores - Data succesfully transformed
2022-11-11 - GAUFlores - Loading data to s3 bucket...
6 changes: 6 additions & 0 deletions dags/logs/GAUNVillaMaria.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
2022-11-11 - GAUNVillaMaria - Starting dag ETL process...
2022-11-11 - GAUNVillaMaria - Loading data...
2022-11-11 - GAUNVillaMaria - Data successfully loaded
2022-11-11 - GAUNVillaMaria - Transforming data
2022-11-11 - GAUNVillaMaria - Data succesfully transformed
2022-11-11 - GAUNVillaMaria - Loading data to s3 bucket...
Loading