From a5d901e931264432bb4a31b257ee28f3d7384e25 Mon Sep 17 00:00:00 2001 From: rlittle08 Date: Wed, 19 Nov 2025 09:31:41 -0600 Subject: [PATCH 1/4] add deps to dag --- ea_airflow_util/dags/run_dbt_airflow_dag.py | 23 +++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index eca5ab6..9f7f8a4 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -11,7 +11,7 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.task_group import TaskGroup -from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator +from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator, DbtDepsOperator from ea_airflow_util.dags.ea_custom_dag import EACustomDAG from ea_airflow_util.callables.variable import check_variable, update_variable @@ -51,6 +51,7 @@ def __init__(self, full_refresh: bool = False, full_refresh_schedule: Optional[str] = None, + deps_vars: Optional[dict] = None, seed_vars: Optional[dict] = None, run_vars: Optional[dict] = None, test_vars: Optional[dict] = None, @@ -77,6 +78,7 @@ def __init__(self, self.full_refresh_schedule = full_refresh_schedule # run-time vars + self.deps_vars = deps_vars self.seed_vars = seed_vars self.run_vars = run_vars self.test_vars = run_vars @@ -147,8 +149,9 @@ def __init__(self, # build function for tasks def build_dbt_run(self, on_success_callback=None, **kwargs): """ - four tasks defined here: + five tasks defined here: + dbt deps: dbt seed: dbt run: dbt test: @@ -168,6 +171,18 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) as dbt_task_group: + dbt_deps = DbtDepsOperator( + task_id= f'dbt_deps_{self.environment}', + dir = self.dbt_repo_path, + target = self.dbt_target_name, + dbt_bin= self.dbt_bin_path, + trigger_rule='all_success', + full_refresh=True, + vars=self.deps_vars, + dag=self.dag + + ) + dbt_seed = DbtSeedOperator( task_id= f'dbt_seed_{self.environment}', dir = self.dbt_repo_path, @@ -198,7 +213,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_seed >> dbt_run >> dbt_test + dbt_deps >> dbt_seed >> dbt_run >> dbt_test # bluegreen operator @@ -252,7 +267,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) - dbt_build_artifact_tables >> dbt_seed + dbt_build_artifact_tables >> dbt_deps # Trigger downstream DAG when `dbt run` succeeds if self.external_dags: From 696bf46784d7d74d5090a1ccc71bcf40260bfd44 Mon Sep 17 00:00:00 2001 From: rlittle08 Date: Wed, 19 Nov 2025 09:38:47 -0600 Subject: [PATCH 2/4] deps upgrade op --- ea_airflow_util/providers/dbt/operators/dbt.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/ea_airflow_util/providers/dbt/operators/dbt.py b/ea_airflow_util/providers/dbt/operators/dbt.py index 622ba53..49610c5 100644 --- a/ea_airflow_util/providers/dbt/operators/dbt.py +++ b/ea_airflow_util/providers/dbt/operators/dbt.py @@ -7,7 +7,7 @@ from typing import Optional -from airflow_dbt.operators.dbt_operator import DbtBaseOperator +from airflow_dbt.operators.dbt_operator import DbtBaseOperator, DbtDepsOperator class DbtRunOperationOperator(DbtBaseOperator): @@ -29,3 +29,17 @@ def execute(self, **context): cmd_pieces.extend(["--args", f'{json.dumps(self.arguments)}']) self.create_hook().run_cli(*cmd_pieces) + +class DbtDepsUpgradeOperator(DbtDepsOperator): + """ + Without forking the hook code, we don't have a way to pass the --upgrade flag to deps. + """ + def __init__(self, upgrade=False, *args, **kwargs): + super().__init__(*args, **kwargs) + self.upgrade = upgrade + + def execute(self, context): + if self.upgrade: + self.create_hook().run_cli('deps', '--upgrade') + else: + self.create_hook().run_cli('deps') From 69cec3965774dfd2967f3db1fe4ebf8123aaf3c8 Mon Sep 17 00:00:00 2001 From: rlittle08 Date: Wed, 19 Nov 2025 09:40:08 -0600 Subject: [PATCH 3/4] swap in forked deps upgrade op --- ea_airflow_util/dags/run_dbt_airflow_dag.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index 9f7f8a4..1036dd5 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -11,11 +11,11 @@ from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.task_group import TaskGroup -from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator, DbtDepsOperator +from airflow_dbt.operators.dbt_operator import DbtRunOperator, DbtSeedOperator, DbtTestOperator from ea_airflow_util.dags.ea_custom_dag import EACustomDAG from ea_airflow_util.callables.variable import check_variable, update_variable -from ea_airflow_util.providers.dbt.operators.dbt import DbtRunOperationOperator +from ea_airflow_util.providers.dbt.operators.dbt import DbtRunOperationOperator, DbtDepsUpgradeOperator class RunDbtDag: @@ -56,6 +56,8 @@ def __init__(self, run_vars: Optional[dict] = None, test_vars: Optional[dict] = None, + deps_upgrade: bool = True, + opt_swap: bool = False, opt_dest_schema: Optional[str] = None, opt_swap_target: Optional[str] = None, @@ -83,6 +85,9 @@ def __init__(self, self.run_vars = run_vars self.test_vars = run_vars + # whether to attach --upgrade when running dbt deps + self.deps_upgrade = deps_upgrade + # bluegreen self.opt_swap = opt_swap self.opt_dest_schema = opt_dest_schema @@ -171,7 +176,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): dag=self.dag ) as dbt_task_group: - dbt_deps = DbtDepsOperator( + dbt_deps = DbtDepsUpgradeOperator( task_id= f'dbt_deps_{self.environment}', dir = self.dbt_repo_path, target = self.dbt_target_name, @@ -179,6 +184,7 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): trigger_rule='all_success', full_refresh=True, vars=self.deps_vars, + upgrade=self.deps_upgrade, dag=self.dag ) From 378615b58311aaebe4a2e901bdc22dd7cb0686ea Mon Sep 17 00:00:00 2001 From: rlittle08 Date: Wed, 19 Nov 2025 09:53:28 -0600 Subject: [PATCH 4/4] no fr on deps --- ea_airflow_util/dags/run_dbt_airflow_dag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ea_airflow_util/dags/run_dbt_airflow_dag.py b/ea_airflow_util/dags/run_dbt_airflow_dag.py index 1036dd5..404049d 100644 --- a/ea_airflow_util/dags/run_dbt_airflow_dag.py +++ b/ea_airflow_util/dags/run_dbt_airflow_dag.py @@ -182,7 +182,6 @@ def build_dbt_run(self, on_success_callback=None, **kwargs): target = self.dbt_target_name, dbt_bin= self.dbt_bin_path, trigger_rule='all_success', - full_refresh=True, vars=self.deps_vars, upgrade=self.deps_upgrade, dag=self.dag