From 056d806bb5817c208a447861a11c95714b76f9bd Mon Sep 17 00:00:00 2001 From: aga60 <54966016+aga60@users.noreply.github.com> Date: Tue, 1 Nov 2022 14:51:52 -0300 Subject: [PATCH 01/13] Add files via upload --- prueba_subida_Aldo.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 prueba_subida_Aldo.txt diff --git a/prueba_subida_Aldo.txt b/prueba_subida_Aldo.txt new file mode 100644 index 00000000..d3f5a12f --- /dev/null +++ b/prueba_subida_Aldo.txt @@ -0,0 +1 @@ + From bdc7a6e39f8c740debd2b394e2a909fafc506591 Mon Sep 17 00:00:00 2001 From: juniors90 Date: Tue, 1 Nov 2022 20:33:07 -0300 Subject: [PATCH 02/13] first commit --- .astro/config.yaml | 2 + .astro/test_dag_integrity_default.py | 91 ++++++++++++ .dockerignore | 5 + .gitignore | 4 + Dockerfile | 1 + dags/example_dag_advanced.py | 200 +++++++++++++++++++++++++++ dags/example_dag_basic.py | 72 ++++++++++ packages.txt | 0 requirements.txt | 2 + tests/dags/test_dag_integrity.py | 83 +++++++++++ 10 files changed, 460 insertions(+) create mode 100644 .astro/config.yaml create mode 100644 .astro/test_dag_integrity_default.py create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 dags/example_dag_advanced.py create mode 100644 dags/example_dag_basic.py create mode 100644 packages.txt create mode 100644 requirements.txt create mode 100644 tests/dags/test_dag_integrity.py diff --git a/.astro/config.yaml b/.astro/config.yaml new file mode 100644 index 00000000..7f54315b --- /dev/null +++ b/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: skill-up-da-c-python diff --git a/.astro/test_dag_integrity_default.py b/.astro/test_dag_integrity_default.py new file mode 100644 index 00000000..09e537b9 --- /dev/null +++ b/.astro/test_dag_integrity_default.py @@ -0,0 +1,91 @@ +"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**""" +from contextlib import contextmanager +import logging +import os + +import pytest + +from airflow.models import DagBag, Variable, Connection +from airflow.hooks.base import BaseHook + + +# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables + +# =========== MONKEYPATCH BaseHook.get_connection() =========== +def basehook_get_connection_monkeypatch(key: str,*args, **kwargs): + print(f"Attempted to fetch connection during parse returning an empty Connection object for {key}") + return Connection(key) + + +BaseHook.get_connection = basehook_get_connection_monkeypatch +# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() =========== + +# =========== MONKEYPATCH OS.GETENV() =========== +def os_getenv_monkeypatch(key: str, *args, default=None, **kwargs): + print(f"Attempted to fetch os environment variable during parse, returning a mocked value for {key}") + if key == 'JENKINS_HOME' and default is None: # fix https://github.com/astronomer/astro-cli/issues/601 + return None + if default: + return default + return "NON_DEFAULT_OS_ENV_VALUE" + + +os.getenv = os_getenv_monkeypatch +# # =========== /MONKEYPATCH OS.GETENV() =========== + +# =========== MONKEYPATCH VARIABLE.GET() =========== + +class magic_dict(dict): + def __init__(self,*args,**kwargs): + self.update(*args,**kwargs) + + def __getitem__(self,key): + return {}.get(key, 'MOCKED_KEY_VALUE') + + +def variable_get_monkeypatch(key: str, default_var=None, deserialize_json=False): + print(f"Attempted to get Variable value during parse, returning a mocked value for {key}") + + if default_var: + return default_var + if deserialize_json: + return magic_dict() + return "NON_DEFAULT_MOCKED_VARIABLE_VALUE" + +Variable.get = variable_get_monkeypatch +# # =========== /MONKEYPATCH VARIABLE.GET() =========== + + +@contextmanager +def suppress_logging(namespace): + """ + Suppress logging within a specific namespace to keep tests "clean" during build + """ + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging('airflow') : + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path ,os.environ.get('AIRFLOW_HOME')) + + + # we prepend "(None,None)" to ensure that a test object is always created even if its a no op. + return [(None,None)] +[ ( strip_path_prefix(k) , v.strip() ) for k,v in dag_bag.import_errors.items()] + + +@pytest.mark.parametrize("rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]) +def test_file_imports(rel_path,rv): + """ Test for import errors on a file """ + if rel_path and rv : #Make sure our no op test doesn't raise an error + raise Exception(f"{rel_path} failed to import with message \n {rv}") \ No newline at end of file diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..f4f282c9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +astro +.git +.env +airflow_settings.yaml +logs/ \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..0ddee746 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.git +.env +airflow_settings.yaml +astro \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..c8b1655a --- /dev/null +++ b/Dockerfile @@ -0,0 +1 @@ +FROM quay.io/astronomer/astro-runtime:6.0.3 \ No newline at end of file diff --git a/dags/example_dag_advanced.py b/dags/example_dag_advanced.py new file mode 100644 index 00000000..a128ef80 --- /dev/null +++ b/dags/example_dag_advanced.py @@ -0,0 +1,200 @@ +from datetime import datetime, timedelta +from typing import Dict + +# Airflow operators are templates for tasks and encompass the logic that your DAG will actually execute. +# To use an operator in your DAG, you first have to import it. +# To learn more about operators, see: https://registry.astronomer.io/. + +from airflow.decorators import dag, task # DAG and task decorators for interfacing with the TaskFlow API +from airflow.models.baseoperator import chain # A function that sets sequential dependencies between tasks including lists of tasks. +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator +from airflow.operators.email import EmailOperator +from airflow.operators.python import BranchPythonOperator +from airflow.operators.weekday import BranchDayOfWeekOperator +from airflow.utils.edgemodifier import Label # Used to label node edges in the Airflow UI +from airflow.utils.task_group import TaskGroup # Used to group tasks together in the Graph view of the Airflow UI +from airflow.utils.trigger_rule import TriggerRule # Used to change how an Operator is triggered +from airflow.utils.weekday import WeekDay # Used to determine what day of the week it is + + +""" +This DAG is intended to demonstrate a number of core Apache Airflow concepts that are central to the pipeline +authoring experience, including the TaskFlow API, Edge Labels, Jinja templating, branching, +dynamic task generation, Task Groups, and Trigger Rules. + +First, this DAG checks if the current day is a weekday or weekend. Next, the DAG checks which day of the week +it is. Lastly, the DAG prints out a bash statement based on which day it is. On Tuesday, for example, the DAG +prints "It's Tuesday and I'm busy with studying". + +This DAG uses the following operators: + +BashOperator - + Executes a bash script or bash command. + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/bashoperator + +DummyOperator - + Does nothing but can be used to group tasks in a DAG + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/dummyoperator + +EmailOperator - + Used to send emails + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/emailoperator + +BranchPythonOperator - + Allows a workflow to “branch” after a task based on the result of a Python function + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/branchpythonoperator + +BranchDayOfWeekOperator - + Branches into one of two lists of tasks depending on the current day + + See more info about this operator here: + https://registry.astronomer.io/providers/apache-airflow/modules/branchdayofweekoperator +""" + +# Reference data that defines "weekday" as well as the activity assigned to each day of the week. +DAY_ACTIVITY_MAPPING = { + "monday": {"is_weekday": True, "activity": "guitar lessons"}, + "tuesday": {"is_weekday": True, "activity": "studying"}, + "wednesday": {"is_weekday": True, "activity": "soccer practice"}, + "thursday": {"is_weekday": True, "activity": "contributing to Airflow"}, + "friday": {"is_weekday": True, "activity": "family dinner"}, + "saturday": {"is_weekday": False, "activity": "going to the beach"}, + "sunday": {"is_weekday": False, "activity": "sleeping in"}, +} + + +@task(multiple_outputs=True) # multiple_outputs=True unrolls dictionaries into separate XCom values +def _going_to_the_beach() -> Dict: + return { + "subject": "Beach day!", + "body": "It's Saturday and I'm heading to the beach.

Come join me!
", + } + + +# This functions gets the activity from the "DAY_ACTIVITY_MAPPING" dictionary +def _get_activity(day_name) -> str: + activity_id = DAY_ACTIVITY_MAPPING[day_name]["activity"].replace(" ", "_") + + if DAY_ACTIVITY_MAPPING[day_name]["is_weekday"]: + return f"weekday_activities.{activity_id}" + + return f"weekend_activities.{activity_id}" + + +# When using the DAG decorator, the "dag" argument doesn't need to be specified for each task. +# The "dag_id" value defaults to the name of the function it is decorating if not explicitly set. +# In this example, the "dag_id" value would be "example_dag_advanced". +@dag( + # This DAG is set to run for the first time on June 11, 2021. Best practice is to use a static start_date. + # Subsequent DAG runs are instantiated based on scheduler_interval below. + start_date=datetime(2021, 6, 11), + # This defines how many instantiations of this DAG (DAG Runs) can execute concurrently. In this case, + # we're only allowing 1 DAG run at any given time, as opposed to allowing multiple overlapping DAG runs. + max_active_runs=1, + # This defines how often your DAG will run, or the schedule by which DAG runs are created. It can be + # defined as a cron expression or custom timetable. This DAG will run daily. + schedule_interval="@daily", + # Default settings applied to all tasks within the DAG; can be overwritten at the task level. + default_args={ + "owner": "community", # This defines the value of the "owner" column in the DAG view of the Airflow UI + "retries": 2, # If a task fails, it will retry 2 times. + "retry_delay": timedelta(minutes=3), # A task that fails will wait 3 minutes to retry. + }, + default_view="graph", # This defines the default view for this DAG in the Airflow UI + # When catchup=False, your DAG will only run for the latest schedule interval. In this case, this means + # that tasks will not be run between June 11, 2021 and 1 day ago. When turned on, this DAG's first run + # will be for today, per the @daily schedule interval + catchup=False, + tags=["example"], # If set, this tag is shown in the DAG view of the Airflow UI +) +def example_dag_advanced(): + # DummyOperator placeholder for first task + begin = DummyOperator(task_id="begin") + # Last task will only trigger if no previous task failed + end = DummyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED) + + # This task checks which day of the week it is + check_day_of_week = BranchDayOfWeekOperator( + task_id="check_day_of_week", + week_day={WeekDay.SATURDAY, WeekDay.SUNDAY}, # This checks day of week + follow_task_ids_if_true="weekend", # Next task if criteria is met + follow_task_ids_if_false="weekday", # Next task if criteria is not met + use_task_execution_day=True, # If True, uses task’s execution day to compare with is_today + ) + + weekend = DummyOperator(task_id="weekend") # "weekend" placeholder task + weekday = DummyOperator(task_id="weekday") # "weekday" placeholder task + + # Templated value for determining the name of the day of week based on the start date of the DAG Run + day_name = "{{ dag_run.start_date.strftime('%A').lower() }}" + + # Begin weekday tasks. + # Tasks within this TaskGroup (weekday tasks) will be grouped together in the Airflow UI + with TaskGroup("weekday_activities") as weekday_activities: + which_weekday_activity_day = BranchPythonOperator( + task_id="which_weekday_activity_day", + python_callable=_get_activity, # Python function called when task executes + op_args=[day_name], + ) + + for day, day_info in DAY_ACTIVITY_MAPPING.items(): + if day_info["is_weekday"]: + day_of_week = Label(label=day) + activity = day_info["activity"] + + # This task prints the weekday activity to bash + do_activity = BashOperator( + task_id=activity.replace(" ", "_"), + bash_command=f"echo It's {day.capitalize()} and I'm busy with {activity}.", # This is the bash command to run + ) + + # Declaring task dependencies within the "TaskGroup" via the classic bitshift operator. + which_weekday_activity_day >> day_of_week >> do_activity + + # Begin weekend tasks + # Tasks within this TaskGroup will be grouped together in the UI + with TaskGroup("weekend_activities") as weekend_activities: + which_weekend_activity_day = BranchPythonOperator( + task_id="which_weekend_activity_day", + python_callable=_get_activity, # Python function called when task executes + op_args=[day_name], + ) + + # Labels that will appear in the Graph view of the Airflow UI + saturday = Label(label="saturday") + sunday = Label(label="sunday") + + # This task prints the Sunday activity to bash + sleeping_in = BashOperator(task_id="sleeping_in", bash_command="sleep $[ ( $RANDOM % 30 ) + 1 ]s") + + going_to_the_beach = _going_to_the_beach() # Calling the taskflow function + + # Because the "_going_to_the_beach()" function has "multiple_outputs" enabled, each dict key is + # accessible as their own "XCom" key. + inviting_friends = EmailOperator( + task_id="inviting_friends", + to="friends@community.com", # Email to send email to + subject=going_to_the_beach["subject"], # Email subject + html_content=going_to_the_beach["body"], # Eamil body content + ) + + # Using "chain()" here for list-to-list dependencies which are not supported by the bitshift + # operator and to simplify the notation for the desired dependency structure. + chain(which_weekend_activity_day, [saturday, sunday], [going_to_the_beach, sleeping_in]) + + # High-level dependencies between tasks + chain(begin, check_day_of_week, [weekday, weekend], [weekday_activities, weekend_activities], end) + + # Task dependency created by XComArgs: + # going_to_the_beach >> inviting_friends + +dag = example_dag_advanced() \ No newline at end of file diff --git a/dags/example_dag_basic.py b/dags/example_dag_basic.py new file mode 100644 index 00000000..af15c35a --- /dev/null +++ b/dags/example_dag_basic.py @@ -0,0 +1,72 @@ +import json +from datetime import datetime, timedelta + +from airflow.decorators import dag, task # DAG and task decorators for interfacing with the TaskFlow API + + +@dag( + # This defines how often your DAG will run, or the schedule by which your DAG runs. In this case, this DAG + # will run daily + schedule_interval="@daily", + # This DAG is set to run for the first time on January 1, 2021. Best practice is to use a static + # start_date. Subsequent DAG runs are instantiated based on scheduler_interval + start_date=datetime(2021, 1, 1), + # When catchup=False, your DAG will only run for the latest schedule_interval. In this case, this means + # that tasks will not be run between January 1, 2021 and 30 mins ago. When turned on, this DAG's first + # run will be for the next 30 mins, per the schedule_interval + catchup=False, + default_args={ + "retries": 2, # If a task fails, it will retry 2 times. + }, + tags=['example']) # If set, this tag is shown in the DAG view of the Airflow UI +def example_dag_basic(): + """ + ### Basic ETL Dag + This is a simple ETL data pipeline example that demonstrates the use of + the TaskFlow API using three simple tasks for extract, transform, and load. + For more information on Airflow's TaskFlow API, reference documentation here: + https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html + """ + + @task() + def extract(): + """ + #### Extract task + A simple "extract" task to get data ready for the rest of the + pipeline. In this case, getting data is simulated by reading from a + hardcoded JSON string. + """ + data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' + + order_data_dict = json.loads(data_string) + return order_data_dict + + @task(multiple_outputs=True) # multiple_outputs=True unrolls dictionaries into separate XCom values + def transform(order_data_dict: dict): + """ + #### Transform task + A simple "transform" task which takes in the collection of order data and + computes the total order value. + """ + total_order_value = 0 + + for value in order_data_dict.values(): + total_order_value += value + + return {"total_order_value": total_order_value} + + @task() + def load(total_order_value: float): + """ + #### Load task + A simple "load" task that takes in the result of the "transform" task and prints it out, + instead of saving it to end user review + """ + + print(f"Total order value is: {total_order_value:.2f}") + + order_data = extract() + order_summary = transform(order_data) + load(order_summary["total_order_value"]) + +example_dag_basic = example_dag_basic() \ No newline at end of file diff --git a/packages.txt b/packages.txt new file mode 100644 index 00000000..e69de29b diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..f8aa7e82 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pytest +apache-airflow \ No newline at end of file diff --git a/tests/dags/test_dag_integrity.py b/tests/dags/test_dag_integrity.py new file mode 100644 index 00000000..6b68ce4e --- /dev/null +++ b/tests/dags/test_dag_integrity.py @@ -0,0 +1,83 @@ +"""Test the validity of all DAGs. This test ensures that all Dags have tags, retries set to two, and no import errors. Feel free to add and remove tests.""" + +import os +import logging +from contextlib import contextmanager +import pytest +from airflow.models import DagBag + + +@contextmanager +def suppress_logging(namespace): + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # we prepend "(None,None)" to ensure that a test object is always created even if its a no op. + return [(None, None)] + [ + (strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items() + ] + + +def get_dags(): + """ + Generate a tuple of dag_id, in the DagBag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()] + + +@pytest.mark.parametrize( + "rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()] +) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if rel_path and rv: + raise Exception(f"{rel_path} failed to import with message \n {rv}") + + +APPROVED_TAGS = {} + + +@pytest.mark.parametrize( + "dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()] +) +def test_dag_tags(dag_id, dag, fileloc): + """ + test if a DAG is tagged and if those TAGs are in the approved list + """ + assert dag.tags, f"{dag_id} in {fileloc} has no tags" + if APPROVED_TAGS: + assert not set(dag.tags) - APPROVED_TAGS + + +@pytest.mark.parametrize( + "dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()] +) +def test_dag_retries(dag_id, dag, fileloc): + """ + test if a DAG has retries set + """ + assert ( + dag.default_args.get("retries", None) >= 2 + ), f"{dag_id} in {fileloc} does not have retries not set to 2." \ No newline at end of file From 4eec45898b449fc4d8e8769226c41b4e1d32c318 Mon Sep 17 00:00:00 2001 From: Leandro Pardo Date: Tue, 1 Nov 2022 21:34:07 -0300 Subject: [PATCH 03/13] template .env --- .env | 6 ++++++ .gitignore | 2 +- .../example_dag_advanced.cpython-39.pyc | Bin 0 -> 3431 bytes .../__pycache__/example_dag_basic.cpython-39.pyc | Bin 0 -> 2220 bytes plugins/cfg.py | 14 ++++++++++++++ prueba_subida_Aldo.txt | 1 - requirements.txt | 3 +-- 7 files changed, 22 insertions(+), 4 deletions(-) create mode 100644 .env create mode 100644 dags/__pycache__/example_dag_advanced.cpython-39.pyc create mode 100644 dags/__pycache__/example_dag_basic.cpython-39.pyc create mode 100644 plugins/cfg.py delete mode 100644 prueba_subida_Aldo.txt diff --git a/.env b/.env new file mode 100644 index 00000000..8e07df3f --- /dev/null +++ b/.env @@ -0,0 +1,6 @@ + +DB_USERNAME = +DB_HOST = +DB_PORT = +DB_NAME = +DB_PASSWORD = \ No newline at end of file diff --git a/.gitignore b/.gitignore index 0ddee746..e58b88b8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ .git -.env + airflow_settings.yaml astro \ No newline at end of file diff --git a/dags/__pycache__/example_dag_advanced.cpython-39.pyc b/dags/__pycache__/example_dag_advanced.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f76336b09e1e4da1471d6f093a5410b44349ec58 GIT binary patch literal 3431 zcmZuzOLH5?5#IMMUL**D4^R)T9LW-FhqTI0C8?-vQKD=!u_;BO7GYV&)41K)oNu4 zp1=Ri^Yb?)>7Uq`e@f7}4qy6-EJ;KnawtWTBSo?!%h*;zB~l$#@M@?g0|C;!(KQ)~*)PQlV zW0|UCBBjd{;AiYA+->1wTFjV?VA4Mw3~?Bn6h zKPohC!k1z!5t%4NeX77QYx|?}4K&Y&IWLTM_xR=1zBgUV_F`i1U5@NN^jjT%{mXL&kL=Skl|k?whqg_B;RdsXDQnb6Wxw~`U-MM+N_nWHTBQGjQ&=(`)I)O+7 zc=<^bWicT91^VR$A_C(h3gV2@w5=9;atPo>(*WZ%Eh@+iU5I#qo!(>KpkJ8N18SQ^ zIfZEAa8jHWi;?$uVjOi@7N^Bh>h~#u5Ojl>Q})OUiz=aA4-o5mte3uQJqqYyVfr5T z``G|9NiX%mwAs@H4pY21#6Q87xWMnRJVs9C@&kov$9Rw`(Yf{jKclY@gP6o3rDJ`p zKtF>hFhT{+6r4uNW1X9%awPxxk7I*dq{>UV4hgY#td31yepY!d?Mh!B{7xBL<5FHC z3jmOiS96Oj!c1k5)LYWH!fScu87A_^bA>GD*0aUu($^Ac<`tkT-{@o&Y^&yGUj9(Z zs~<|T^oQ^ZC`%f=M^KdF`c*0Y$GafM_da}J)f!mE#x<~Nea@-{UL`HD`XX=}f~)fy zmH@HJ60aXg;|5yG|@y33kcW6%}d=o1Gz_w5!0@@10Xe+9KBTU5riI5gc zfC&E~;AQA0U6FSS6XOU5Oc>A{wDrkQ3pLo+S?eYtq*!;lK#tFCy)dQD zYL?PTfBKmEkgSs!ry6a4^%bPK9wKz>C3t~lT`X-^E??TY+}?u4D7(_O$K#^$YMd7v zhyB3spVW<6MB)N%bz=PDKvXdovRmkNLh6{wK%ib^4+uIP#55FXg`ELUv9mx%W$O16 zTV&wT@+|juvUF%42D}gHXKUu!4%Al{BN|fl#LUx~XWq;?4MejyWXrfD##J$^Nf4%^ z3!*^T7k_JCu`k};+26hWnSIHA?_IlX+i%-f?Jv`!9>h>1p%l1Kmtfv>v@&I`&(g~l zaBLNqSOe15CO%Q%B7nl8+UHRSrbAkYd3%{*Y_mnwRQ7N0-*P|N+57a?E~}!rj$-TH z&cWw*A&VBqz0ddI&lXUJ8BSz9b`E>#0jE*QYAD6T$j+m*?0W+!`e87lqC7Axe+;su z)NZo(kpBsipCb7glAj~lMDhzH*MQ&^2W|yGz)9Vts312*usA}eLXsH{*ba`nfn?@p zhPlg`+zC!qFt&^RUe-pD>6@>sfxm~2sRKl6m6ZjocXAm%c>iNqWx1?s+O#ce-y23< zwm@$|zp~u?!7$#L*P7VBsu-FWFB`J)U!(NC%JyOQ{kF}pJTNQ|g~5k-VPRNAi^jB2 zZxQMz;$oLF9lK(|DbAc|qXY^@x&;8G$@Bs!9PX^1^f(cVPJMA&&f2R=f>%ZXkJo7_ z-XFzrI_s~Fodlr3h|_viJq>FCvSQ;z4~YT{r=&+=4?#DeOu(s_%|GcN5>XFtn*xac zKYl*v30ylXrx!R`9KkKHE09^JkcK9B#&ehhg@A`ZhII`LgH-wf7WVX)KQ myKMmiX79g^iY3>rg{q>KRRv!_j`h8!D}Vv*TWziU>;D0pp1t}2 literal 0 HcmV?d00001 diff --git a/dags/__pycache__/example_dag_basic.cpython-39.pyc b/dags/__pycache__/example_dag_basic.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..32ec764beb3543cc024535e54eb457618b298135 GIT binary patch literal 2220 zcmaJ?&yU+g6!zH8k4>|5TYeP~>Sz}UMGDDo7F20fRV}oYimC;vd#EBoH1$jpQ#%eb z;Jal1vW6OaAgym>S4``&vN>+1~<*Kfaw z=nuU8Ny7724Z>yIrjJ2-k|kcBo_@ko)@P~T_Zi7S5~N{2Oe_5g^HeB9#biZRW%W3a zHCdNyF_VoElcye5WK*s`@Qzu(s;b9czbe@+ul3qf`1kf(eo>dgDjTP&Sff#@#0sbb zDMm$Tg}FPcUX~(G4&%RYxTg%ut`{|}td5m=>O(AQ>N}B66V<8|XG}C!GEdYXo>-;t ziKJ*4E3_TZHbpHGHX7&CqAAr-!Ju=b=3H4!i)zCXiScUn8al z$F;S!#Xl7$j`+2MoBXO6IqktXHXMJ!O-xa`Nkk*~G@hy?o+!SIpWCspT&gsi7^@Ml zfkyG%D4q?SFA@)c>GdSL&#!!W19S0Y1S_r7$~|Fj_%PFO3onsvb37&nnzYTuMDipP zvg7<+hgF(s#p4N03maz>j{lWd53$9E=EQDouGLU!HHj3LS(K-0Vi!xs(C#^pV{4~o zuiF(14;?WTV6>CzQ5Q>0cOieYobH0#JBjMr+-5o!$$$<|I}gM(?u>1kIHxD5rIZP9 z-y&+yVvvr|M{Mpr28DAdhnWVxN@ z*0h)=UL)!itBV*fS>J>J*1`BA@3D8)`>J{5qd@%&-Vyt@De)dt76h}OOJq@-Tl`>E z2QSNmvQUDaORE;!&0)Dq0lEkURVq6r-}r-J&c-Aj|r-nH9;>5)Ca^ z&j{5NVsXY<3>I8C3!qYUFQyeZEcDxCSaCW>(PDUmv}@(5+Yn71gI5dvCTsY!4_-k* zx$FsZ?oZ%N2crh+s*JWl%LLxi{~`Rzcg-&V!r|=KGG>SHO=Q{2z@-daRTwq86R@JP zkOj-y;+y5!mJ6tDO;HRMJiscq?Q-83%@RdQOO3ckMFk6#u)-3wPK{c-7pwcF{p_uS z7YRXo-s7{N(;Mys`HI^}zlWe-ggT@c_Ap_n3wia~`B&gewhq{RkVqe-BBUBCDrj)R zwkrBf==8fZypN&Zcvg)4^~Jl%(^Tlg7msn~f4kCm-!((;T*w;DI@?$e*~VIx)tTm) zYt{5=JTx6!Qymp&R~=faD8t7l(?%2F?xb$bbGuSg&@S2P7t@Ts&zC4c%sC9$%ijom S{2HO}hyVIP_;+}^(fbdBJzeeq literal 0 HcmV?d00001 diff --git a/plugins/cfg.py b/plugins/cfg.py new file mode 100644 index 00000000..0564134d --- /dev/null +++ b/plugins/cfg.py @@ -0,0 +1,14 @@ +from decouple import config + +DB_USERNAME = config("DB_USERNAME") +DB_HOST = config("DB_HOST") +DB_PORT = config("DB_PORT") +DB_NAME = config("DB_NAME") +DB_PASSWORD = config("DB_PASSWORD") + + +print(DB_USERNAME) +print(DB_HOST) +print(DB_PORT) +print(DB_NAME) +print(DB_PASSWORD) diff --git a/prueba_subida_Aldo.txt b/prueba_subida_Aldo.txt deleted file mode 100644 index d3f5a12f..00000000 --- a/prueba_subida_Aldo.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/requirements.txt b/requirements.txt index f8aa7e82..a4dbc9a6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -pytest -apache-airflow \ No newline at end of file +python-decouple==3.6 \ No newline at end of file From e0f4350f86f8acf004501c17c78c1ee4ad082f9a Mon Sep 17 00:00:00 2001 From: Leandro Pardo <99141620+leo9952011@users.noreply.github.com> Date: Tue, 1 Nov 2022 21:36:57 -0300 Subject: [PATCH 04/13] Update cfg.py --- plugins/cfg.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/plugins/cfg.py b/plugins/cfg.py index 0564134d..d0b4823c 100644 --- a/plugins/cfg.py +++ b/plugins/cfg.py @@ -5,10 +5,3 @@ DB_PORT = config("DB_PORT") DB_NAME = config("DB_NAME") DB_PASSWORD = config("DB_PASSWORD") - - -print(DB_USERNAME) -print(DB_HOST) -print(DB_PORT) -print(DB_NAME) -print(DB_PASSWORD) From b2f9d775e21888061997aea51a5b1c3a5ffc43e9 Mon Sep 17 00:00:00 2001 From: Leandro Pardo <99141620+leo9952011@users.noreply.github.com> Date: Tue, 1 Nov 2022 21:44:27 -0300 Subject: [PATCH 05/13] Delete .env --- .env | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 .env diff --git a/.env b/.env deleted file mode 100644 index 8e07df3f..00000000 --- a/.env +++ /dev/null @@ -1,6 +0,0 @@ - -DB_USERNAME = -DB_HOST = -DB_PORT = -DB_NAME = -DB_PASSWORD = \ No newline at end of file From c7b4b55edd7e235a76ca09bef33e5e2e18658603 Mon Sep 17 00:00:00 2001 From: Leandro Pardo Date: Tue, 1 Nov 2022 21:44:52 -0300 Subject: [PATCH 06/13] env.example --- .env.example | 6 ++++++ .gitignore | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..8e07df3f --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ + +DB_USERNAME = +DB_HOST = +DB_PORT = +DB_NAME = +DB_PASSWORD = \ No newline at end of file diff --git a/.gitignore b/.gitignore index e58b88b8..0ddee746 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ .git - +.env airflow_settings.yaml astro \ No newline at end of file From 76e6ba4c45ccb3453463501fc6e0841aad8b4b99 Mon Sep 17 00:00:00 2001 From: Leandro Pardo <99141620+leo9952011@users.noreply.github.com> Date: Wed, 2 Nov 2022 10:04:53 -0300 Subject: [PATCH 07/13] Update requirements.txt --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a4dbc9a6..8cd1d65e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -python-decouple==3.6 \ No newline at end of file +python-decouple==3.6 +apache-airflow-providers-postgres==5.2.2 From 311e9527d78b73e73cb62a4af118a92a63c4111f Mon Sep 17 00:00:00 2001 From: MauMN <81426396+MauMN@users.noreply.github.com> Date: Wed, 2 Nov 2022 12:35:49 -0300 Subject: [PATCH 08/13] sql scripts --- C-UNjujuy.sql | 4 ++++ C-UNpalermo.sql | 4 ++++ 2 files changed, 8 insertions(+) create mode 100644 C-UNjujuy.sql create mode 100644 C-UNpalermo.sql diff --git a/C-UNjujuy.sql b/C-UNjujuy.sql new file mode 100644 index 00000000..c019ebcb --- /dev/null +++ b/C-UNjujuy.sql @@ -0,0 +1,4 @@ +select nombre, sexo, direccion, email, birth_date, university, inscription_date, career, "location" +FROM public.jujuy_utn +where university = 'universidad nacional de jujuy' and +to_date(inscription_date, 'YYYY/MM/DD') between '2020-09-01' and '2021-02-01'; diff --git a/C-UNpalermo.sql b/C-UNpalermo.sql new file mode 100644 index 00000000..1ccbd8dc --- /dev/null +++ b/C-UNpalermo.sql @@ -0,0 +1,4 @@ +SELECT universidad, careers, fecha_de_inscripcion, names, sexo, birth_dates, codigo_postal, direcciones, correos_electronicos +FROM public.palermo_tres_de_febrero +where universidad = '_universidad_de_palermo' and +to_date(fecha_de_inscripcion, 'DD/Mon/YY') between '2020-09-01' and '2021-02-01'; \ No newline at end of file From 0a180ca935dea15104c28f46f99ef2fb79631264 Mon Sep 17 00:00:00 2001 From: MauMN <81426396+MauMN@users.noreply.github.com> Date: Wed, 2 Nov 2022 12:37:36 -0300 Subject: [PATCH 09/13] move --- C-UNjujuy.sql => include/C-UNjujuy.sql | 0 C-UNpalermo.sql => include/C-UNpalermo.sql | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename C-UNjujuy.sql => include/C-UNjujuy.sql (100%) rename C-UNpalermo.sql => include/C-UNpalermo.sql (100%) diff --git a/C-UNjujuy.sql b/include/C-UNjujuy.sql similarity index 100% rename from C-UNjujuy.sql rename to include/C-UNjujuy.sql diff --git a/C-UNpalermo.sql b/include/C-UNpalermo.sql similarity index 100% rename from C-UNpalermo.sql rename to include/C-UNpalermo.sql From 6f191273046ba2ceb13873bf8896e41a89d204ed Mon Sep 17 00:00:00 2001 From: MauMN <81426396+MauMN@users.noreply.github.com> Date: Wed, 2 Nov 2022 20:12:14 -0300 Subject: [PATCH 10/13] Create dag_prueba.py --- airflow2/dags/dag_prueba.py | 72 +++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 airflow2/dags/dag_prueba.py diff --git a/airflow2/dags/dag_prueba.py b/airflow2/dags/dag_prueba.py new file mode 100644 index 00000000..b424485d --- /dev/null +++ b/airflow2/dags/dag_prueba.py @@ -0,0 +1,72 @@ +import json +from datetime import datetime, timedelta + +from airflow.decorators import dag, task # DAG and task decorators for interfacing with the TaskFlow API + + +@dag( + # This defines how often your DAG will run, or the schedule by which your DAG runs. In this case, this DAG + # will run daily + schedule_interval="@daily", + # This DAG is set to run for the first time on January 1, 2021. Best practice is to use a static + # start_date. Subsequent DAG runs are instantiated based on scheduler_interval + start_date=datetime(2021, 1, 1), + # When catchup=False, your DAG will only run for the latest schedule_interval. In this case, this means + # that tasks will not be run between January 1, 2021 and 30 mins ago. When turned on, this DAG's first + # run will be for the next 30 mins, per the schedule_interval + catchup=False, + default_args={ + "retries": 5, # If a task fails, it will retry 2 times. + }, + tags=['example']) # If set, this tag is shown in the DAG view of the Airflow UI +def example_dag_basic2(): + """ + ### Basic ETL Dag + This is a simple ETL data pipeline example that demonstrates the use of + the TaskFlow API using three simple tasks for extract, transform, and load. + For more information on Airflow's TaskFlow API, reference documentation here: + https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html + """ + + @task() + def extract(): + """ + #### Extract task + A simple "extract" task to get data ready for the rest of the + pipeline. In this case, getting data is simulated by reading from a + hardcoded JSON string. + """ + data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' + + order_data_dict = json.loads(data_string) + return order_data_dict + + @task(multiple_outputs=True) # multiple_outputs=True unrolls dictionaries into separate XCom values + def transform(order_data_dict: dict): + """ + #### Transform task + A simple "transform" task which takes in the collection of order data and + computes the total order value. + """ + total_order_value = 0 + + for value in order_data_dict.values(): + total_order_value += value + + return {"total_order_value": total_order_value} + + @task() + def load(total_order_value: float): + """ + #### Load task + A simple "load" task that takes in the result of the "transform" task and prints it out, + instead of saving it to end user review + """ + + print(f"Total order value is: {total_order_value:.2f}") + + order_data = extract() + order_summary = transform(order_data) + load(order_summary["total_order_value"]) + +example_dag_basic2 = example_dag_basic2() \ No newline at end of file From ea02005a037004910f39e1e5dc738a4e660858bf Mon Sep 17 00:00:00 2001 From: MauMN <81426396+MauMN@users.noreply.github.com> Date: Wed, 2 Nov 2022 20:31:41 -0300 Subject: [PATCH 11/13] . --- .../__pycache__/dag_prueba.cpython-39.pyc | Bin 0 -> 2217 bytes airflow2/dags/dag_prueba.py | 89 ++++-------------- 2 files changed, 19 insertions(+), 70 deletions(-) create mode 100644 airflow2/dags/__pycache__/dag_prueba.cpython-39.pyc diff --git a/airflow2/dags/__pycache__/dag_prueba.cpython-39.pyc b/airflow2/dags/__pycache__/dag_prueba.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..81f15a11c06fbe982c6f102f18597d924525b196 GIT binary patch literal 2217 zcmaJ?&u<$=6yDig|A^y|v_RD&Bu0rUWXafx1F95NRSRvUqADOY2V@DX$obyrBuK+U3_D)&1tjecew=CHMukpqU*!T7ueo>XeDjTP&SfNp>#0pFY zQVfdF3iEJMz9~hV9LK-ma4#6P-7P9wSsg3$q6D$1sBc9&PE?~*Trkm4$vjcLcx07+ zB$A?LtkAYcZi-4IY&6WrMO~`C$P?QWdSJ-Uz(jPS#YJ{Q+@Ih!UtqA_DY7AbisS^6 z3}pC(Sw#Hk0F7|!PeZwS1y+#9Ix?{)zCuhj zj%#aci+?Ok9Pv8`_xWuxaI*))*l_#>H!(%&77>l$<9Mu+c%=9|er|`ta;eg6WUNNK z1~ZE1M)9ogY>{|?oZd~cBYx}idsvG{188Zb7Un7EhW9fKv#=7`CdXrBFq1a97)hRF zLbjZ}yUpon(Z_apYa;^GGt5Kx5%%VJ1BRktN#O$84IJ9^m;aU18TZ1Fp@RB*DtT)DMK+2%Y`n{J)kj1LrmAdIvU z^4MjQa<7%Kz=B{|T3Mjj;`c^a0Md~#s!1j+f#b|z7U<^*K$ZOP*qNopzRprEmbQjM z%P5o3`SihO`y5pOr5RKd%4GYy&7JM-oy|SIyS>xuTx)WYJCHxz-EDPtX7VoNkG4Cl zPUi$n48Jzns3;NW2I4B6r#G`)tY=y(P23Va8AtZHr{6?I|GL`FjczAdB$5S^Z4lbf z&>QPq9g5cYn275t>^*{_jKQmhevM7G<^>JIgWe%AqSLzWteG3F`JRIY>uw+;HU=kp zS#HO?BN+KXw?x9F|9$wQ@A_VV6o-?amqEJ}-$#MHF1u8G3vyA9 zJCj$C3S}^-72jN*ZN7mz))+jn00LUMZ7%)B=#$_lZ8hQ%@d+AWufi5|O%0koiq+9l ze{$*IJR<1KdwdeKI{kg5zT_O~cM$aXT!$3H9u^EWAg?-EbnX8va#D7g-3N^BBNr0C ziV~Weu#J*_n_OI>;av>f+DjsK*Jke~Pg9|f&!6Mszj39HzUzeE<&f3tRkpDfvW=B8 zt1?a0Y*h3*d7^`BqEc~j(VnG>GJI(=Z8Smdj_UTj;+8sscJ)@ho@VrMzCk5oE@OZ% Ve> task_3 >> task_4 \ No newline at end of file From 558903746ab88a8f17b3b76d7cccecedf47d6742 Mon Sep 17 00:00:00 2001 From: MauMN <81426396+MauMN@users.noreply.github.com> Date: Wed, 2 Nov 2022 22:08:27 -0300 Subject: [PATCH 12/13] change sql scripts --- airflow2/dags/dag_prueba.py | 22 ++++++++++++---------- include/C-UNjujuy.sql | 4 ++-- include/C-UNpalermo.sql | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/airflow2/dags/dag_prueba.py b/airflow2/dags/dag_prueba.py index dcd4b92a..3e34cf37 100644 --- a/airflow2/dags/dag_prueba.py +++ b/airflow2/dags/dag_prueba.py @@ -6,16 +6,18 @@ from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator -with DAG( +@dag( 'empty_dag', description = 'Dag con task dummyoperator, comentados los que se van a usar luego', - scheduled_interval = @hourly, - start_date = datetime(2022, 11, 3), - retries = 5 - ) as dag: - task_1 = DummyOperator(task_id='task_1') #PostgresOperator(task_id= palermo_extract, sql=C-UNpalermo.sql, postgres_conn_id=) - task_2 = DummyOperator(task_id='task_2') #PostgresOperator(task_id= jujuy_extract, sql=C-UNjujuy.sql, postgres_conn_id=) - task_3 = DummyOperator(task_id='task_3') #PythonOperator(task_id=transform, python_callable=transform_data) - task_4 = DummyOperator(task_id='task_4') #PythonOperator(task_id=load, python_callable=s3_load) + schedule_interval = "@hourly", + start_date = datetime(2022, 11, 3) +) +def empty_dag(): + task_1 = DummyOperator(task_id='task_1') #PostgresOperator(task_id= palermo_extract, sql=C-UNpalermo.sql, retries = 5, postgres_conn_id=) + task_2 = DummyOperator(task_id='task_2') #PostgresOperator(task_id= jujuy_extract, sql=C-UNjujuy.sql, retries = 5, postgres_conn_id=) + task_3 = DummyOperator(task_id='task_3') #PythonOperator(task_id=transform, retries = 5, python_callable=transform_data) + task_4 = DummyOperator(task_id='task_4') #PythonOperator(task_id=load, retries = 5, python_callable=s3_load) -[task_1, task_2] >> task_3 >> task_4 \ No newline at end of file + [task_1, task_2] >> task_3 >> task_4 + +dag = empty_dag() \ No newline at end of file diff --git a/include/C-UNjujuy.sql b/include/C-UNjujuy.sql index c019ebcb..ac4bc32c 100644 --- a/include/C-UNjujuy.sql +++ b/include/C-UNjujuy.sql @@ -1,4 +1,4 @@ -select nombre, sexo, direccion, email, birth_date, university, inscription_date, career, "location" +select university, career, inscription_date, null as first_name, nombre as last_name, sexo as gender, birth_date, null as postal_code, direccion as "location", email FROM public.jujuy_utn where university = 'universidad nacional de jujuy' and -to_date(inscription_date, 'YYYY/MM/DD') between '2020-09-01' and '2021-02-01'; +to_date(inscription_date, 'YYYY/MM/DD') between '2020-09-01' and '2021-02-01'; \ No newline at end of file diff --git a/include/C-UNpalermo.sql b/include/C-UNpalermo.sql index 1ccbd8dc..123ee730 100644 --- a/include/C-UNpalermo.sql +++ b/include/C-UNpalermo.sql @@ -1,4 +1,4 @@ -SELECT universidad, careers, fecha_de_inscripcion, names, sexo, birth_dates, codigo_postal, direcciones, correos_electronicos +SELECT universidad as university, careers, fecha_de_inscripcion as inscription_date, null as first_name, names AS last_name, sexo as gender, birth_dates, codigo_postal as postal_code, direcciones as "location", correos_electronicos as email FROM public.palermo_tres_de_febrero where universidad = '_universidad_de_palermo' and to_date(fecha_de_inscripcion, 'DD/Mon/YY') between '2020-09-01' and '2021-02-01'; \ No newline at end of file From 0a20d034105aad175465816075c5009cb1501e4f Mon Sep 17 00:00:00 2001 From: MauMN <81426396+MauMN@users.noreply.github.com> Date: Wed, 2 Nov 2022 23:30:49 -0300 Subject: [PATCH 13/13] null age --- include/C-UNjujuy.sql | 2 +- include/C-UNpalermo.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/C-UNjujuy.sql b/include/C-UNjujuy.sql index ac4bc32c..2892a1b7 100644 --- a/include/C-UNjujuy.sql +++ b/include/C-UNjujuy.sql @@ -1,4 +1,4 @@ -select university, career, inscription_date, null as first_name, nombre as last_name, sexo as gender, birth_date, null as postal_code, direccion as "location", email +select university, career, inscription_date, null as first_name, nombre as last_name, sexo as gender, birth_date, null as age, null as postal_code, direccion as "location", email FROM public.jujuy_utn where university = 'universidad nacional de jujuy' and to_date(inscription_date, 'YYYY/MM/DD') between '2020-09-01' and '2021-02-01'; \ No newline at end of file diff --git a/include/C-UNpalermo.sql b/include/C-UNpalermo.sql index 123ee730..5e816ff2 100644 --- a/include/C-UNpalermo.sql +++ b/include/C-UNpalermo.sql @@ -1,4 +1,4 @@ -SELECT universidad as university, careers, fecha_de_inscripcion as inscription_date, null as first_name, names AS last_name, sexo as gender, birth_dates, codigo_postal as postal_code, direcciones as "location", correos_electronicos as email +SELECT universidad as university, careers, fecha_de_inscripcion as inscription_date, null as first_name, names AS last_name, sexo as gender, birth_dates, null as age, codigo_postal as postal_code, direcciones as "location", correos_electronicos as email FROM public.palermo_tres_de_febrero where universidad = '_universidad_de_palermo' and to_date(fecha_de_inscripcion, 'DD/Mon/YY') between '2020-09-01' and '2021-02-01'; \ No newline at end of file