diff --git a/.astro/config.yaml b/.astro/config.yaml new file mode 100644 index 00000000..7f54315b --- /dev/null +++ b/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: skill-up-da-c-python diff --git a/.astro/test_dag_integrity_default.py b/.astro/test_dag_integrity_default.py new file mode 100644 index 00000000..09e537b9 --- /dev/null +++ b/.astro/test_dag_integrity_default.py @@ -0,0 +1,91 @@ +"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**""" +from contextlib import contextmanager +import logging +import os + +import pytest + +from airflow.models import DagBag, Variable, Connection +from airflow.hooks.base import BaseHook + + +# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables + +# =========== MONKEYPATCH BaseHook.get_connection() =========== +def basehook_get_connection_monkeypatch(key: str,*args, **kwargs): + print(f"Attempted to fetch connection during parse returning an empty Connection object for {key}") + return Connection(key) + + +BaseHook.get_connection = basehook_get_connection_monkeypatch +# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() =========== + +# =========== MONKEYPATCH OS.GETENV() =========== +def os_getenv_monkeypatch(key: str, *args, default=None, **kwargs): + print(f"Attempted to fetch os environment variable during parse, returning a mocked value for {key}") + if key == 'JENKINS_HOME' and default is None: # fix https://github.com/astronomer/astro-cli/issues/601 + return None + if default: + return default + return "NON_DEFAULT_OS_ENV_VALUE" + + +os.getenv = os_getenv_monkeypatch +# # =========== /MONKEYPATCH OS.GETENV() =========== + +# =========== MONKEYPATCH VARIABLE.GET() =========== + +class magic_dict(dict): + def __init__(self,*args,**kwargs): + self.update(*args,**kwargs) + + def __getitem__(self,key): + return {}.get(key, 'MOCKED_KEY_VALUE') + + +def variable_get_monkeypatch(key: str, default_var=None, deserialize_json=False): + print(f"Attempted to get Variable value during parse, returning a mocked value for {key}") + + if default_var: + return default_var + if deserialize_json: + return magic_dict() + return "NON_DEFAULT_MOCKED_VARIABLE_VALUE" + +Variable.get = variable_get_monkeypatch +# # =========== /MONKEYPATCH VARIABLE.GET() =========== + + +@contextmanager +def suppress_logging(namespace): + """ + Suppress logging within a specific namespace to keep tests "clean" during build + """ + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging('airflow') : + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path ,os.environ.get('AIRFLOW_HOME')) + + + # we prepend "(None,None)" to ensure that a test object is always created even if its a no op. + return [(None,None)] +[ ( strip_path_prefix(k) , v.strip() ) for k,v in dag_bag.import_errors.items()] + + +@pytest.mark.parametrize("rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]) +def test_file_imports(rel_path,rv): + """ Test for import errors on a file """ + if rel_path and rv : #Make sure our no op test doesn't raise an error + raise Exception(f"{rel_path} failed to import with message \n {rv}") \ No newline at end of file diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..f4f282c9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +astro +.git +.env +airflow_settings.yaml +logs/ \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..8e07df3f --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ + +DB_USERNAME = +DB_HOST = +DB_PORT = +DB_NAME = +DB_PASSWORD = \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..0ddee746 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.git +.env +airflow_settings.yaml +astro \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..c8b1655a --- /dev/null +++ b/Dockerfile @@ -0,0 +1 @@ +FROM quay.io/astronomer/astro-runtime:6.0.3 \ No newline at end of file diff --git a/airflow2/dags/__pycache__/dag_prueba.cpython-39.pyc b/airflow2/dags/__pycache__/dag_prueba.cpython-39.pyc new file mode 100644 index 00000000..81f15a11 Binary files /dev/null and b/airflow2/dags/__pycache__/dag_prueba.cpython-39.pyc differ diff --git a/airflow2/dags/dag_prueba.py b/airflow2/dags/dag_prueba.py new file mode 100644 index 00000000..3e34cf37 --- /dev/null +++ b/airflow2/dags/dag_prueba.py @@ -0,0 +1,23 @@ +from datetime import datetime, timedelta + +from airflow.decorators import dag, task +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.python import PythonOperator + +@dag( + 'empty_dag', + description = 'Dag con task dummyoperator, comentados los que se van a usar luego', + schedule_interval = "@hourly", + start_date = datetime(2022, 11, 3) +) +def empty_dag(): + task_1 = DummyOperator(task_id='task_1') #PostgresOperator(task_id= palermo_extract, sql=C-UNpalermo.sql, retries = 5, postgres_conn_id=) + task_2 = DummyOperator(task_id='task_2') #PostgresOperator(task_id= jujuy_extract, sql=C-UNjujuy.sql, retries = 5, postgres_conn_id=) + task_3 = DummyOperator(task_id='task_3') #PythonOperator(task_id=transform, retries = 5, python_callable=transform_data) + task_4 = DummyOperator(task_id='task_4') #PythonOperator(task_id=load, retries = 5, python_callable=s3_load) + + [task_1, task_2] >> task_3 >> task_4 + +dag = empty_dag() \ No newline at end of file diff --git a/dags/__pycache__/example_dag_advanced.cpython-39.pyc b/dags/__pycache__/example_dag_advanced.cpython-39.pyc new file mode 100644 index 00000000..f76336b0 Binary files /dev/null and b/dags/__pycache__/example_dag_advanced.cpython-39.pyc differ diff --git a/dags/__pycache__/example_dag_basic.cpython-39.pyc b/dags/__pycache__/example_dag_basic.cpython-39.pyc new file mode 100644 index 00000000..32ec764b Binary files /dev/null and b/dags/__pycache__/example_dag_basic.cpython-39.pyc differ diff --git a/dags/example_dag_advanced.py b/dags/example_dag_advanced.py new file mode 100644 index 00000000..a128ef80 --- /dev/null +++ b/dags/example_dag_advanced.py @@ -0,0 +1,200 @@ +from datetime import datetime, timedelta +from typing import Dict + +# Airflow operators are templates for tasks and encompass the logic that your DAG will actually execute. +# To use an operator in your DAG, you first have to import it. +# To learn more about operators, see: https://registry.astronomer.io/. + +from airflow.decorators import dag, task # DAG and task decorators for interfacing with the TaskFlow API +from airflow.models.baseoperator import chain # A function that sets sequential dependencies between tasks including lists of tasks. +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.email import EmailOperator +from airflow.operators.python import BranchPythonOperator +from airflow.operators.weekday import BranchDayOfWeekOperator +from airflow.utils.edgemodifier import Label # Used to label node edges in the Airflow UI +from airflow.utils.task_group import TaskGroup # Used to group tasks together in the Graph view of the Airflow UI +from airflow.utils.trigger_rule import TriggerRule # Used to change how an Operator is triggered +from airflow.utils.weekday import WeekDay # Used to determine what day of the week it is + + +""" +This DAG is intended to demonstrate a number of core Apache Airflow concepts that are central to the pipeline +authoring experience, including the TaskFlow API, Edge Labels, Jinja templating, branching, +dynamic task generation, Task Groups, and Trigger Rules. + +First, this DAG checks if the current day is a weekday or weekend. Next, the DAG checks which day of the week +it is. Lastly, the DAG prints out a bash statement based on which day it is. On Tuesday, for example, the DAG +prints "It's Tuesday and I'm busy with studying". + +This DAG uses the following operators: + +BashOperator - + Executes a bash script or bash command. + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/bashoperator + +DummyOperator - + Does nothing but can be used to group tasks in a DAG + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/dummyoperator + +EmailOperator - + Used to send emails + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/emailoperator + +BranchPythonOperator - + Allows a workflow to “branch” after a task based on the result of a Python function + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/branchpythonoperator + +BranchDayOfWeekOperator - + Branches into one of two lists of tasks depending on the current day + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/branchdayofweekoperator +""" + +# Reference data that defines "weekday" as well as the activity assigned to each day of the week. +DAY_ACTIVITY_MAPPING = { + "monday": {"is_weekday": True, "activity": "guitar lessons"}, + "tuesday": {"is_weekday": True, "activity": "studying"}, + "wednesday": {"is_weekday": True, "activity": "soccer practice"}, + "thursday": {"is_weekday": True, "activity": "contributing to Airflow"}, + "friday": {"is_weekday": True, "activity": "family dinner"}, + "saturday": {"is_weekday": False, "activity": "going to the beach"}, + "sunday": {"is_weekday": False, "activity": "sleeping in"}, +} + + +@task(multiple_outputs=True) # multiple_outputs=True unrolls dictionaries into separate XCom values +def _going_to_the_beach() -> Dict: + return { + "subject": "Beach day!", + "body": "It's Saturday and I'm heading to the beach.

Come join me!
", + } + + +# This functions gets the activity from the "DAY_ACTIVITY_MAPPING" dictionary +def _get_activity(day_name) -> str: + activity_id = DAY_ACTIVITY_MAPPING[day_name]["activity"].replace(" ", "_") + + if DAY_ACTIVITY_MAPPING[day_name]["is_weekday"]: + return f"weekday_activities.{activity_id}" + + return f"weekend_activities.{activity_id}" + + +# When using the DAG decorator, the "dag" argument doesn't need to be specified for each task. +# The "dag_id" value defaults to the name of the function it is decorating if not explicitly set. +# In this example, the "dag_id" value would be "example_dag_advanced". +@dag( + # This DAG is set to run for the first time on June 11, 2021. Best practice is to use a static start_date. + # Subsequent DAG runs are instantiated based on scheduler_interval below. + start_date=datetime(2021, 6, 11), + # This defines how many instantiations of this DAG (DAG Runs) can execute concurrently. In this case, + # we're only allowing 1 DAG run at any given time, as opposed to allowing multiple overlapping DAG runs. + max_active_runs=1, + # This defines how often your DAG will run, or the schedule by which DAG runs are created. It can be + # defined as a cron expression or custom timetable. This DAG will run daily. + schedule_interval="@daily", + # Default settings applied to all tasks within the DAG; can be overwritten at the task level. + default_args={ + "owner": "community", # This defines the value of the "owner" column in the DAG view of the Airflow UI + "retries": 2, # If a task fails, it will retry 2 times. + "retry_delay": timedelta(minutes=3), # A task that fails will wait 3 minutes to retry. + }, + default_view="graph", # This defines the default view for this DAG in the Airflow UI + # When catchup=False, your DAG will only run for the latest schedule interval. In this case, this means + # that tasks will not be run between June 11, 2021 and 1 day ago. When turned on, this DAG's first run + # will be for today, per the @daily schedule interval + catchup=False, + tags=["example"], # If set, this tag is shown in the DAG view of the Airflow UI +) +def example_dag_advanced(): + # DummyOperator placeholder for first task + begin = DummyOperator(task_id="begin") + # Last task will only trigger if no previous task failed + end = DummyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED) + + # This task checks which day of the week it is + check_day_of_week = BranchDayOfWeekOperator( + task_id="check_day_of_week", + week_day={WeekDay.SATURDAY, WeekDay.SUNDAY}, # This checks day of week + follow_task_ids_if_true="weekend", # Next task if criteria is met + follow_task_ids_if_false="weekday", # Next task if criteria is not met + use_task_execution_day=True, # If True, uses task’s execution day to compare with is_today + ) + + weekend = DummyOperator(task_id="weekend") # "weekend" placeholder task + weekday = DummyOperator(task_id="weekday") # "weekday" placeholder task + + # Templated value for determining the name of the day of week based on the start date of the DAG Run + day_name = "{{ dag_run.start_date.strftime('%A').lower() }}" + + # Begin weekday tasks. + # Tasks within this TaskGroup (weekday tasks) will be grouped together in the Airflow UI + with TaskGroup("weekday_activities") as weekday_activities: + which_weekday_activity_day = BranchPythonOperator( + task_id="which_weekday_activity_day", + python_callable=_get_activity, # Python function called when task executes + op_args=[day_name], + ) + + for day, day_info in DAY_ACTIVITY_MAPPING.items(): + if day_info["is_weekday"]: + day_of_week = Label(label=day) + activity = day_info["activity"] + + # This task prints the weekday activity to bash + do_activity = BashOperator( + task_id=activity.replace(" ", "_"), + bash_command=f"echo It's {day.capitalize()} and I'm busy with {activity}.", # This is the bash command to run + ) + + # Declaring task dependencies within the "TaskGroup" via the classic bitshift operator. + which_weekday_activity_day >> day_of_week >> do_activity + + # Begin weekend tasks + # Tasks within this TaskGroup will be grouped together in the UI + with TaskGroup("weekend_activities") as weekend_activities: + which_weekend_activity_day = BranchPythonOperator( + task_id="which_weekend_activity_day", + python_callable=_get_activity, # Python function called when task executes + op_args=[day_name], + ) + + # Labels that will appear in the Graph view of the Airflow UI + saturday = Label(label="saturday") + sunday = Label(label="sunday") + + # This task prints the Sunday activity to bash + sleeping_in = BashOperator(task_id="sleeping_in", bash_command="sleep $[ ( $RANDOM % 30 ) + 1 ]s") + + going_to_the_beach = _going_to_the_beach() # Calling the taskflow function + + # Because the "_going_to_the_beach()" function has "multiple_outputs" enabled, each dict key is + # accessible as their own "XCom" key. + inviting_friends = EmailOperator( + task_id="inviting_friends", + to="friends@community.com", # Email to send email to + subject=going_to_the_beach["subject"], # Email subject + html_content=going_to_the_beach["body"], # Eamil body content + ) + + # Using "chain()" here for list-to-list dependencies which are not supported by the bitshift + # operator and to simplify the notation for the desired dependency structure. + chain(which_weekend_activity_day, [saturday, sunday], [going_to_the_beach, sleeping_in]) + + # High-level dependencies between tasks + chain(begin, check_day_of_week, [weekday, weekend], [weekday_activities, weekend_activities], end) + + # Task dependency created by XComArgs: + # going_to_the_beach >> inviting_friends + +dag = example_dag_advanced() \ No newline at end of file diff --git a/dags/example_dag_basic.py b/dags/example_dag_basic.py new file mode 100644 index 00000000..af15c35a --- /dev/null +++ b/dags/example_dag_basic.py @@ -0,0 +1,72 @@ +import json +from datetime import datetime, timedelta + +from airflow.decorators import dag, task # DAG and task decorators for interfacing with the TaskFlow API + + +@dag( + # This defines how often your DAG will run, or the schedule by which your DAG runs. In this case, this DAG + # will run daily + schedule_interval="@daily", + # This DAG is set to run for the first time on January 1, 2021. Best practice is to use a static + # start_date. Subsequent DAG runs are instantiated based on scheduler_interval + start_date=datetime(2021, 1, 1), + # When catchup=False, your DAG will only run for the latest schedule_interval. In this case, this means + # that tasks will not be run between January 1, 2021 and 30 mins ago. When turned on, this DAG's first + # run will be for the next 30 mins, per the schedule_interval + catchup=False, + default_args={ + "retries": 2, # If a task fails, it will retry 2 times. + }, + tags=['example']) # If set, this tag is shown in the DAG view of the Airflow UI +def example_dag_basic(): + """ + ### Basic ETL Dag + This is a simple ETL data pipeline example that demonstrates the use of + the TaskFlow API using three simple tasks for extract, transform, and load. + For more information on Airflow's TaskFlow API, reference documentation here: + https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html + """ + + @task() + def extract(): + """ + #### Extract task + A simple "extract" task to get data ready for the rest of the + pipeline. In this case, getting data is simulated by reading from a + hardcoded JSON string. + """ + data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' + + order_data_dict = json.loads(data_string) + return order_data_dict + + @task(multiple_outputs=True) # multiple_outputs=True unrolls dictionaries into separate XCom values + def transform(order_data_dict: dict): + """ + #### Transform task + A simple "transform" task which takes in the collection of order data and + computes the total order value. + """ + total_order_value = 0 + + for value in order_data_dict.values(): + total_order_value += value + + return {"total_order_value": total_order_value} + + @task() + def load(total_order_value: float): + """ + #### Load task + A simple "load" task that takes in the result of the "transform" task and prints it out, + instead of saving it to end user review + """ + + print(f"Total order value is: {total_order_value:.2f}") + + order_data = extract() + order_summary = transform(order_data) + load(order_summary["total_order_value"]) + +example_dag_basic = example_dag_basic() \ No newline at end of file diff --git a/include/C-UNjujuy.sql b/include/C-UNjujuy.sql new file mode 100644 index 00000000..2892a1b7 --- /dev/null +++ b/include/C-UNjujuy.sql @@ -0,0 +1,4 @@ +select university, career, inscription_date, null as first_name, nombre as last_name, sexo as gender, birth_date, null as age, null as postal_code, direccion as "location", email +FROM public.jujuy_utn +where university = 'universidad nacional de jujuy' and +to_date(inscription_date, 'YYYY/MM/DD') between '2020-09-01' and '2021-02-01'; \ No newline at end of file diff --git a/include/C-UNpalermo.sql b/include/C-UNpalermo.sql new file mode 100644 index 00000000..5e816ff2 --- /dev/null +++ b/include/C-UNpalermo.sql @@ -0,0 +1,4 @@ +SELECT universidad as university, careers, fecha_de_inscripcion as inscription_date, null as first_name, names AS last_name, sexo as gender, birth_dates, null as age, codigo_postal as postal_code, direcciones as "location", correos_electronicos as email +FROM public.palermo_tres_de_febrero +where universidad = '_universidad_de_palermo' and +to_date(fecha_de_inscripcion, 'DD/Mon/YY') between '2020-09-01' and '2021-02-01'; \ No newline at end of file diff --git a/packages.txt b/packages.txt new file mode 100644 index 00000000..e69de29b diff --git a/plugins/cfg.py b/plugins/cfg.py new file mode 100644 index 00000000..d0b4823c --- /dev/null +++ b/plugins/cfg.py @@ -0,0 +1,7 @@ +from decouple import config + +DB_USERNAME = config("DB_USERNAME") +DB_HOST = config("DB_HOST") +DB_PORT = config("DB_PORT") +DB_NAME = config("DB_NAME") +DB_PASSWORD = config("DB_PASSWORD") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..8cd1d65e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +python-decouple==3.6 +apache-airflow-providers-postgres==5.2.2 diff --git a/tests/dags/test_dag_integrity.py b/tests/dags/test_dag_integrity.py new file mode 100644 index 00000000..6b68ce4e --- /dev/null +++ b/tests/dags/test_dag_integrity.py @@ -0,0 +1,83 @@ +"""Test the validity of all DAGs. This test ensures that all Dags have tags, retries set to two, and no import errors. Feel free to add and remove tests.""" + +import os +import logging +from contextlib import contextmanager +import pytest +from airflow.models import DagBag + + +@contextmanager +def suppress_logging(namespace): + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # we prepend "(None,None)" to ensure that a test object is always created even if its a no op. + return [(None, None)] + [ + (strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items() + ] + + +def get_dags(): + """ + Generate a tuple of dag_id, in the DagBag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()] + + +@pytest.mark.parametrize( + "rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()] +) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if rel_path and rv: + raise Exception(f"{rel_path} failed to import with message \n {rv}") + + +APPROVED_TAGS = {} + + +@pytest.mark.parametrize( + "dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()] +) +def test_dag_tags(dag_id, dag, fileloc): + """ + test if a DAG is tagged and if those TAGs are in the approved list + """ + assert dag.tags, f"{dag_id} in {fileloc} has no tags" + if APPROVED_TAGS: + assert not set(dag.tags) - APPROVED_TAGS + + +@pytest.mark.parametrize( + "dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()] +) +def test_dag_retries(dag_id, dag, fileloc): + """ + test if a DAG has retries set + """ + assert ( + dag.default_args.get("retries", None) >= 2 + ), f"{dag_id} in {fileloc} does not have retries not set to 2." \ No newline at end of file