From f02f5099d68157f24a9180da0912fa4514b8c308 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Mon, 20 Apr 2026 18:32:20 +0900 Subject: [PATCH] feat: add `skip_merge_on_empty_source` incremental config Adds an opt-in incremental config that bypasses MERGE and all associated metadata queries (DESCRIBE, SHOW TBLPROPERTIES, constraint/tag/mask lookups) when the compiled source SELECT returns zero rows. Motivating case: customers who run `dbt run` on a schedule against source tables that receive deltas sporadically. Today, each incremental model still pays ~4-7s per run on temp view creation + metadata queries + MERGE planning even when there is nothing to merge. With `skip_merge_on_empty_source: true`, the materialization runs a cheap `SELECT 1 FROM () LIMIT 1` probe and, if empty, returns early after firing pre/post hooks and a no-op `main` statement. Scope: - V1 (`use_materialization_v2: false`) and V2 paths both honor the flag - Default is `false` (opt-in, no behavior change for existing projects) - SQL language only (Python models fall through to the standard path) Files: - `dbt/adapters/databricks/impl.py`: new `skip_merge_on_empty_source` field on `DatabricksConfig` - `dbt/include/databricks/macros/materializations/incremental/incremental.sql`: two helper macros (`source_has_rows`, `should_skip_merge_on_empty_source`) and short-circuit calls in the V1/V2 merge branches - `tests/functional/adapter/incremental/test_incremental_skip_on_empty_source.py`: functional tests covering the short-circuit path and the default-off behavior under both V1 and V2 - `CHANGELOG.md`: Features entry Co-authored-by: Isaac --- CHANGELOG.md | 6 ++ dbt/adapters/databricks/impl.py | 1 + .../incremental/incremental.sql | 58 ++++++++++++- .../test_incremental_skip_on_empty_source.py | 84 +++++++++++++++++++ 4 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 tests/functional/adapter/incremental/test_incremental_skip_on_empty_source.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c5767bf9..e45b2ddd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## dbt-databricks next + +### Features + +- Add `skip_merge_on_empty_source` incremental config to bypass MERGE and associated metadata queries when the compiled source SELECT returns no rows, significantly reducing no-op incremental run time. + ## dbt-databricks 1.11.7 (Apr 17, 2026) ### Features diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 045f4b106..963e31510 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -192,6 +192,7 @@ class DatabricksConfig(AdapterConfig): use_safer_relation_operations: Optional[bool] = None incremental_apply_config_changes: Optional[bool] = None view_update_via_alter: Optional[bool] = None + skip_merge_on_empty_source: Optional[bool] = None def get_identifier_list_string(table_names: set[str]) -> str: diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index ac717ae45..06ce03449 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -53,6 +53,16 @@ {% endif %} {%- else -%} {{ log("Existing relation found, proceeding with incremental work")}} + {#-- Short-circuit when `skip_merge_on_empty_source=true` and the source is empty. + This must come after the intermediate relation was created (so pre-hooks and + any side-effect SQL inside `compiled_code` still execute) but before we pay + for schema/config metadata queries, strategy planning, and the MERGE itself. + The `run_query` below issues `SELECT 1 FROM () LIMIT 1`; when + the source SELECT has no rows, we skip the remainder and return. --#} + {%- if should_skip_merge_on_empty_source(target_relation, existing_relation, compiled_code, grant_config, full_refresh_mode) -%} + {{ run_post_hooks() }} + {{ return({'relations': [target_relation]}) }} + {%- endif -%} {#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#} {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} {{ set_overwrite_mode('DYNAMIC') }} @@ -128,6 +138,13 @@ {% do apply_tags(target_relation, tags) %} {% do persist_docs(target_relation, model, for_relation=language=='python') %} {%- else -%} + {#-- Short-circuit when `skip_merge_on_empty_source=true` and the source is empty. + Placed before `get_relation_config` / `create_temp_relation` so we skip all + downstream metadata queries and the MERGE itself when there are no deltas. --#} + {%- if should_skip_merge_on_empty_source(target_relation, existing_relation, compiled_code, grant_config, full_refresh_mode) -%} + {{ run_hooks(post_hooks) }} + {{ return({'relations': [target_relation]}) }} + {%- endif -%} {#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#} {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} {{ set_overwrite_mode('DYNAMIC') }} @@ -242,4 +259,43 @@ {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {{ apply_config_changeset(target_relation, model, configuration_changes) }} {% endif %} -{% endmacro %} \ No newline at end of file +{% endmacro %} + +{#-- Returns true iff the compiled source SELECT produces at least one row. + Used by the `skip_merge_on_empty_source` incremental config to avoid + unnecessary MERGE / temp view / metadata queries when the delta is empty. --#} +{% macro source_has_rows(compiled_code) %} + {%- set check_sql -%} + select 1 from ({{ compiled_code }}) as __dbt_empty_source_check limit 1 + {%- endset -%} + {%- set result = run_query(check_sql) -%} + {{ return(result is not none and (result | length) > 0) }} +{% endmacro %} + +{#-- Short-circuit helper: if `skip_merge_on_empty_source` is true and the + compiled source SELECT is empty, perform the minimal work required by dbt + (pre/post hooks + a no-op `main` statement + grants) and return early. + + Returns true if the materialization should short-circuit (caller should + then `{{ return({'relations': [target_relation]}) }}`), false otherwise. --#} +{% macro should_skip_merge_on_empty_source(target_relation, existing_relation, compiled_code, grant_config, full_refresh_mode) %} + {%- set skip_flag = config.get('skip_merge_on_empty_source', False) | as_bool -%} + {%- if not skip_flag -%} + {{ return(false) }} + {%- endif -%} + {%- if not execute -%} + {{ return(false) }} + {%- endif -%} + {%- if model['language'] != 'sql' -%} + {{ return(false) }} + {%- endif -%} + {%- if source_has_rows(compiled_code) -%} + {{ return(false) }} + {%- endif -%} + {{ log("[skip_merge_on_empty_source] " ~ target_relation ~ ": empty source, skipping MERGE", info=True) }} + {%- call statement('main') -%} + select 1 as __dbt_skip_merge_noop where false + {%- endcall -%} + {% do apply_grants(target_relation, grant_config, should_revoke(existing_relation, full_refresh_mode)) %} + {{ return(true) }} +{% endmacro %} diff --git a/tests/functional/adapter/incremental/test_incremental_skip_on_empty_source.py b/tests/functional/adapter/incremental/test_incremental_skip_on_empty_source.py new file mode 100644 index 000000000..32ec75a19 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_skip_on_empty_source.py @@ -0,0 +1,84 @@ +import pytest +from dbt.tests.util import check_relations_equal, run_dbt + +from tests.functional.adapter.fixtures import MaterializationV2Mixin + +_MODEL_SQL = """ +{{ config( + materialized='incremental', + unique_key='id', + skip_merge_on_empty_source=true, +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg +union all +select cast(2 as bigint) as id, 'goodbye' as msg + +{% else %} + +-- Delta filter: only rows with id greater than existing max (=> empty on 2nd run) +select cast(id as bigint) as id, msg from ( + select 1 as id, 'hello' as msg + union all + select 2 as id, 'goodbye' as msg +) src +where id > (select max(id) from {{ this }}) + +{% endif %} +""" + +_SEED_AFTER_FIRST_RUN = """id,msg +1,hello +2,goodbye +""" + + +class TestSkipMergeOnEmptySource: + @pytest.fixture(scope="class") + def models(self): + return {"skip_merge_model.sql": _MODEL_SQL} + + @pytest.fixture(scope="class") + def seeds(self): + return {"expected.csv": _SEED_AFTER_FIRST_RUN} + + def test_skip_merge_when_source_empty(self, project): + # 1st run: seeds target with 2 rows + results = run_dbt(["seed"]) + assert len(results) == 1 + results = run_dbt(["run"]) + assert len(results) == 1 + + # 2nd run: incremental with empty delta -> short-circuit should trigger + results = run_dbt(["run"]) + assert len(results) == 1 + # Data must be unchanged (no MERGE happened, table same as after 1st run) + check_relations_equal(project.adapter, ["skip_merge_model", "expected"]) + + +class TestSkipMergeOnEmptySourceV2(MaterializationV2Mixin, TestSkipMergeOnEmptySource): + """Same behavior under V2 materialization path.""" + + +class TestSkipMergeDefaultDisabled: + """When `skip_merge_on_empty_source` is not set, behavior is unchanged + (MERGE runs as before, even if source is empty).""" + + @pytest.fixture(scope="class") + def models(self): + # Same model but WITHOUT the skip flag + return {"default_model.sql": _MODEL_SQL.replace("skip_merge_on_empty_source=true,", "")} + + @pytest.fixture(scope="class") + def seeds(self): + return {"expected.csv": _SEED_AFTER_FIRST_RUN} + + def test_default_no_skip(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + # 2nd run without the flag still succeeds (MERGE with empty source) + results = run_dbt(["run"]) + assert len(results) == 1 + check_relations_equal(project.adapter, ["default_model", "expected"])