Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions ea_airflow_util/dags/run_dbt_airflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from ea_airflow_util.dags.ea_custom_dag import EACustomDAG
from ea_airflow_util.callables.variable import check_variable, update_variable
from ea_airflow_util.providers.dbt.operators.dbt import DbtRunOperationOperator
from ea_airflow_util.providers.dbt.operators.dbt import DbtRunOperationOperator, DbtDepsUpgradeOperator


class RunDbtDag:
Expand Down Expand Up @@ -51,10 +51,13 @@ def __init__(self,
full_refresh: bool = False,
full_refresh_schedule: Optional[str] = None,

deps_vars: Optional[dict] = None,
seed_vars: Optional[dict] = None,
run_vars: Optional[dict] = None,
test_vars: Optional[dict] = None,

deps_upgrade: bool = True,

opt_swap: bool = False,
opt_dest_schema: Optional[str] = None,
opt_swap_target: Optional[str] = None,
Expand All @@ -77,10 +80,14 @@ def __init__(self,
self.full_refresh_schedule = full_refresh_schedule

# run-time vars
self.deps_vars = deps_vars
self.seed_vars = seed_vars
self.run_vars = run_vars
self.test_vars = run_vars

# whether to attach --upgrade when running dbt deps
self.deps_upgrade = deps_upgrade

# bluegreen
self.opt_swap = opt_swap
self.opt_dest_schema = opt_dest_schema
Expand Down Expand Up @@ -147,8 +154,9 @@ def __init__(self,
# build function for tasks
def build_dbt_run(self, on_success_callback=None, **kwargs):
"""
four tasks defined here:
five tasks defined here:

dbt deps:
dbt seed:
dbt run:
dbt test:
Expand All @@ -168,6 +176,18 @@ def build_dbt_run(self, on_success_callback=None, **kwargs):
dag=self.dag
) as dbt_task_group:

dbt_deps = DbtDepsUpgradeOperator(
task_id= f'dbt_deps_{self.environment}',
dir = self.dbt_repo_path,
target = self.dbt_target_name,
dbt_bin= self.dbt_bin_path,
trigger_rule='all_success',
vars=self.deps_vars,
upgrade=self.deps_upgrade,
dag=self.dag

)

dbt_seed = DbtSeedOperator(
task_id= f'dbt_seed_{self.environment}',
dir = self.dbt_repo_path,
Expand Down Expand Up @@ -198,7 +218,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs):
dag=self.dag
)

dbt_seed >> dbt_run >> dbt_test
dbt_deps >> dbt_seed >> dbt_run >> dbt_test


# bluegreen operator
Expand Down Expand Up @@ -252,7 +272,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs):
dag=self.dag
)

dbt_build_artifact_tables >> dbt_seed
dbt_build_artifact_tables >> dbt_deps

# Trigger downstream DAG when `dbt run` succeeds
if self.external_dags:
Expand Down
16 changes: 15 additions & 1 deletion ea_airflow_util/providers/dbt/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from typing import Optional

from airflow_dbt.operators.dbt_operator import DbtBaseOperator
from airflow_dbt.operators.dbt_operator import DbtBaseOperator, DbtDepsOperator


class DbtRunOperationOperator(DbtBaseOperator):
Expand All @@ -29,3 +29,17 @@ def execute(self, **context):
cmd_pieces.extend(["--args", f'{json.dumps(self.arguments)}'])

self.create_hook().run_cli(*cmd_pieces)

class DbtDepsUpgradeOperator(DbtDepsOperator):
"""
Without forking the hook code, we don't have a way to pass the --upgrade flag to deps.
"""
def __init__(self, upgrade=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.upgrade = upgrade

def execute(self, context):
if self.upgrade:
self.create_hook().run_cli('deps', '--upgrade')
else:
self.create_hook().run_cli('deps')