diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index eca5ab6..404049d 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -15,7 +15,7 @@ from ea_airflow_util.dags.ea_custom_dag import EACustomDAG from ea_airflow_util.callables.variable import check_variable, update_variable -from ea_airflow_util.providers.dbt.operators.dbt import DbtRunOperationOperator +from ea_airflow_util.providers.dbt.operators.dbt import DbtRunOperationOperator, DbtDepsUpgradeOperator class RunDbtDag: @@ -51,10 +51,13 @@ def __init__(self, full_refresh: bool = False, full_refresh_schedule: Optional[str] = None, + deps_vars: Optional[dict] = None, seed_vars: Optional[dict] = None, run_vars: Optional[dict] = None, test_vars: Optional[dict] = None, + deps_upgrade: bool = True, + opt_swap: bool = False, opt_dest_schema: Optional[str] = None, opt_swap_target: Optional[str] = None, @@ -77,10 +80,14 @@ def __init__(self, self.full_refresh_schedule = full_refresh_schedule # run-time vars + self.deps_vars = deps_vars self.seed_vars = seed_vars self.run_vars = run_vars self.test_vars = run_vars + # whether to attach --upgrade when running dbt deps + self.deps_upgrade = deps_upgrade + # bluegreen self.opt_swap = opt_swap self.opt_dest_schema = opt_dest_schema @@ -147,8 +154,9 @@ def __init__(self, # build function for tasks def build_dbt_run(self, on_success_callback=None, **kwargs): """ - four tasks defined here: + five tasks defined here: + dbt deps: dbt seed: dbt run: dbt test: @@ -168,6 +176,18 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) as dbt_task_group: + dbt_deps = DbtDepsUpgradeOperator( + task_id= f'dbt_deps_{self.environment}', + dir = self.dbt_repo_path, + target = self.dbt_target_name, + dbt_bin= self.dbt_bin_path, + trigger_rule='all_success', + vars=self.deps_vars, + upgrade=self.deps_upgrade, + dag=self.dag + + ) + dbt_seed = DbtSeedOperator( task_id= f'dbt_seed_{self.environment}', dir = self.dbt_repo_path, @@ -198,7 +218,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_seed >> dbt_run >> dbt_test + dbt_deps >> dbt_seed >> dbt_run >> dbt_test # bluegreen operator @@ -252,7 +272,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_build_artifact_tables >> dbt_seed + dbt_build_artifact_tables >> dbt_deps # Trigger downstream DAG when `dbt run` succeeds if self.external_dags: diff --git a/ea_airflow_util/providers/dbt/operators/dbt.py b/ea_airflow_util/providers/dbt/operators/dbt.py index 622ba53..49610c5 100644 --- a/ea_airflow_util/providers/dbt/operators/dbt.py +++ b/ea_airflow_util/providers/dbt/operators/dbt.py @@ -7,7 +7,7 @@ from typing import Optional -from airflow_dbt.operators.dbt_operator import DbtBaseOperator +from airflow_dbt.operators.dbt_operator import DbtBaseOperator, DbtDepsOperator class DbtRunOperationOperator(DbtBaseOperator): @@ -29,3 +29,17 @@ def execute(self, **context): cmd_pieces.extend(["--args", f'{json.dumps(self.arguments)}']) self.create_hook().run_cli(*cmd_pieces) + +class DbtDepsUpgradeOperator(DbtDepsOperator): + """ + Without forking the hook code, we don't have a way to pass the --upgrade flag to deps. + """ + def __init__(self, upgrade=False, *args, **kwargs): + super().__init__(*args, **kwargs) + self.upgrade = upgrade + + def execute(self, context): + if self.upgrade: + self.create_hook().run_cli('deps', '--upgrade') + else: + self.create_hook().run_cli('deps')