From 2665c2c15df455040d8f4170a7fa57d9b504bd1f Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 15 Apr 2026 11:34:48 +0900 Subject: [PATCH 1/3] perf: allow V1 incremental path to skip config change metadata queries (#1402) Replace inline config change detection/application in the V1 incremental path with the existing `process_config_changes()` macro already used by V2. This allows users to set `incremental_apply_config_changes: false` to skip 8 unnecessary information_schema queries per incremental model execution. Co-authored-by: Isaac --- CHANGELOG.md | 1 + .../incremental/incremental.sql | 28 +------------------ 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e65bcec1..df90ede78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Under the Hood - **BREAKING:** `databricks_tags` defined at different hierarchy levels (e.g. project-level and model-level) now merge additively instead of the child config completely replacing the parent. +- Allow V1 incremental path to skip unnecessary metadata queries by respecting `incremental_apply_config_changes` config flag, matching V2 behavior ([#1402](https://github.com/databricks/dbt-databricks/issues/1402)) ## dbt-databricks 1.11.8 (TBD) diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index c8f09220f..65930984d 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -133,9 +133,6 @@ {{ set_overwrite_mode('DYNAMIC') }} {%- endif -%} {#-- Relation must be merged --#} - {%- set _existing_config = adapter.get_relation_config(existing_relation) -%} - {%- set model_config = adapter.get_config_from_model(config.model) -%} - {%- set _configuration_changes = model_config.get_changeset(_existing_config) -%} {%- call statement('create_temp_relation', language=language) -%} {{ create_table_as(True, temp_relation, compiled_code, language) }} {%- endcall -%} @@ -173,30 +170,7 @@ Also, why does not either drop_relation or adapter.drop_relation work here?! --#} {%- endif -%} - {% if _configuration_changes is not none %} - {% set tags = _configuration_changes.changes.get("tags", None) %} - {% set tblproperties = _configuration_changes.changes.get("tblproperties", None) %} - {% set liquid_clustering = _configuration_changes.changes.get("liquid_clustering") %} - {% set row_filter = _configuration_changes.changes.get("row_filter") %} - {% set constraints = _configuration_changes.changes.get("constraints") %} - {% if tags is not none %} - {% do apply_tags(target_relation, tags.set_tags) %} - {%- endif -%} - {% if tblproperties is not none %} - {% do apply_tblproperties(target_relation, tblproperties.tblproperties) %} - {%- endif -%} - {% if liquid_clustering is not none %} - {% do apply_liquid_clustered_cols(target_relation, liquid_clustering) %} - {% endif %} - {% if row_filter is not none %} - {{ apply_row_filter(target_relation, row_filter) }} - {% endif %} - {#- Incremental constraint application requires information_schema access (see fetch_*_constraints macros) -#} - {% set contract_config = config.get('contract') %} - {% if constraints and contract_config and contract_config.enforced and not target_relation.is_hive_metastore() %} - {{ apply_constraints(target_relation, constraints) }} - {% endif %} - {%- endif -%} + {{ process_config_changes(target_relation) }} {% do persist_docs(target_relation, model, for_relation=True) %} {%- endif -%} From 13736dd8c20c6b4edec3d4b30f1951ad1a9f1025 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Thu, 23 Apr 2026 09:31:00 +0900 Subject: [PATCH 2/3] address review: remove redundant persist_docs, add V1 functional tests - Remove persist_docs call from V1 incremental merge path since process_config_changes -> apply_config_changeset already handles comment and column_comments changes (aligns with V2 behavior) - Add V1 functional tests for column_tags and column_masks changes - Add functional test verifying incremental_apply_config_changes=false skips all metadata fetch queries (fetch_tags, fetch_column_tags, fetch_non_null_constraint_columns, fetch_primary_key_constraints, fetch_foreign_key_constraints, fetch_column_masks) Co-authored-by: Isaac --- .../incremental/incremental.sql | 1 - .../adapter/incremental/fixtures.py | 90 ++++++++++++ .../test_v1_incremental_config_changes.py | 129 ++++++++++++++++++ 3 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 tests/functional/adapter/incremental/test_v1_incremental_config_changes.py diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index 65930984d..77c0f1be4 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -171,7 +171,6 @@ --#} {%- endif -%} {{ process_config_changes(target_relation) }} - {% do persist_docs(target_relation, model, for_relation=True) %} {%- endif -%} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index d50fa71f3..add74e99b 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -1118,6 +1118,96 @@ def model(dbt, spark): return spark.createDataFrame(data, schema=['id', 'msg', 'color']) """ +v1_config_changes_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color + +{% else %} + +select cast(1 as bigint) as id, 'updated' as msg, 'blue' as color + +{% endif %} +""" + +v1_skip_config_changes_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_apply_config_changes = false, +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg + +{% else %} + +select cast(1 as bigint) as id, 'updated' as msg + +{% endif %} +""" + +v1_column_tags_a = """ +version: 2 + +models: + - name: v1_config_changes_sql + columns: + - name: id + databricks_tags: + pii: "false" + - name: msg + - name: color +""" + +v1_column_tags_b = """ +version: 2 + +models: + - name: v1_config_changes_sql + columns: + - name: id + databricks_tags: + pii: "true" + - name: msg + databricks_tags: + source: "app" + - name: color +""" + +fail_if_metadata_fetched_macros = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called when incremental_apply_config_changes is false") }} +{% endmacro %} + +{% macro fetch_column_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} +{% endmacro %} + +{% macro fetch_non_null_constraint_columns(relation) %} + {{ exceptions.raise_compiler_error("fetch_non_null_constraint_columns should not be called") }} +{% endmacro %} + +{% macro fetch_primary_key_constraints(relation) %} + {{ exceptions.raise_compiler_error("fetch_primary_key_constraints should not be called") }} +{% endmacro %} + +{% macro fetch_foreign_key_constraints(relation) %} + {{ exceptions.raise_compiler_error("fetch_foreign_key_constraints should not be called") }} +{% endmacro %} + +{% macro fetch_column_masks(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_masks should not be called") }} +{% endmacro %} +""" + warn_unenforced_override_sql = """ select "abc" as id """ diff --git a/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py b/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py new file mode 100644 index 000000000..eaa13e719 --- /dev/null +++ b/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py @@ -0,0 +1,129 @@ +import pytest +from dbt.tests import util + +from tests.functional.adapter.incremental import fixtures + + +@pytest.mark.skip_profile("databricks_cluster") +class TestV1IncrementalColumnTags: + """Test that V1 incremental path applies column tag changes via process_config_changes.""" + + @pytest.fixture(scope="class") + def models(self): + return { + "v1_config_changes_sql.sql": fixtures.v1_config_changes_sql, + "schema.yml": fixtures.v1_column_tags_a, + } + + def test_changing_column_tags(self, project): + # First run creates the table + util.run_dbt(["run"]) + + # Update column tags + util.write_file(fixtures.v1_column_tags_b, "models", "schema.yml") + util.run_dbt(["run"]) + + # Verify column tags were applied + results = project.run_sql( + f""" + select column_name, tag_name, tag_value + from `system`.`information_schema`.`column_tags` + where schema_name = '{project.test_schema}' + and table_name = 'v1_config_changes_sql' + order by column_name, tag_name + """, + fetch="all", + ) + + tags_dict = {} + for row in results: + col = row.column_name + if col not in tags_dict: + tags_dict[col] = {} + tags_dict[col][row.tag_name] = row.tag_value + + # Verify expected final state + expected_tags = { + "id": {"pii": "true"}, + "msg": {"source": "app"}, + } + assert tags_dict == expected_tags + + +@pytest.mark.skip_profile("databricks_cluster") +class TestV1IncrementalColumnMasks: + """Test that V1 incremental path applies column mask changes via process_config_changes.""" + + @pytest.fixture(scope="class") + def models(self): + return { + "column_mask_sql.sql": fixtures.column_mask_sql, + "schema.yml": fixtures.column_mask_base, + } + + def test_changing_column_masks(self, project): + # Create mask functions + project.run_sql( + f""" + CREATE OR REPLACE FUNCTION + {project.database}.{project.test_schema}.full_mask(password STRING) + RETURNS STRING + RETURN '*****'; + """ + ) + project.run_sql( + f""" + CREATE OR REPLACE FUNCTION + {project.database}.{project.test_schema}.email_mask(value STRING) + RETURNS STRING + RETURN CONCAT( + REPEAT('*', POSITION('@' IN value) - 1), + SUBSTR(value, POSITION('@' IN value)) + ); + """ + ) + + # First run with initial masks + util.run_dbt(["run"]) + masks = project.run_sql( + "SELECT id, name, email, password FROM column_mask_sql", + fetch="all", + ) + assert len(masks) == 1 + assert masks[0][1] == "*****" # name (masked) + assert masks[0][3] == "password123" # password (unmasked) + + # Update masks and verify changes + util.write_file(fixtures.column_mask_valid_mask_updates, "models", "schema.yml") + util.run_dbt(["run"]) + + result = project.run_sql( + "SELECT id, name, email, password FROM column_mask_sql", fetch="all" + ) + assert len(result) == 1 + assert result[0][1] == "hello" # name (unmasked) + assert result[0][3] == "*****" # password (masked) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestV1IncrementalSkipConfigChanges: + """Test that incremental_apply_config_changes=false skips metadata fetch queries.""" + + @pytest.fixture(scope="class") + def models(self): + return { + "v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros, + } + + def test_incremental_run_skips_metadata_queries(self, project): + # First run creates the table + util.run_dbt(["run"]) + # Second run exercises the incremental merge path. + # If metadata fetch macros are called, they will raise errors and the run will fail. + util.run_dbt(["run"]) From cd551aeb13b71f4932f56bc8792b8a089a664cb9 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Thu, 30 Apr 2026 09:15:26 +0900 Subject: [PATCH 3/3] address review: guard column_masks to V2 only, add security test - Add V2 guard to apply_column_masks in apply_config_changeset to prevent unmasked data exposure in V1 (CTAS writes data before masks can be applied, whereas V2 creates an empty table first) - Add TestV1IncrementalColumnMasksNotApplied: overrides apply_column_masks to raise error, verifying it is never called in V1 incremental path - Remove V1 column_tags/column_masks tests that are no longer applicable - Keep TestV1IncrementalSkipConfigChanges for metadata skip validation Co-authored-by: Isaac --- .../macros/relations/table/alter.sql | 4 +- .../adapter/incremental/fixtures.py | 73 ++++------ .../test_v1_incremental_config_changes.py | 125 +++++------------- 3 files changed, 62 insertions(+), 140 deletions(-) diff --git a/dbt/include/databricks/macros/relations/table/alter.sql b/dbt/include/databricks/macros/relations/table/alter.sql index 095de9024..308e92b53 100644 --- a/dbt/include/databricks/macros/relations/table/alter.sql +++ b/dbt/include/databricks/macros/relations/table/alter.sql @@ -31,7 +31,9 @@ {% if constraints %} {{ apply_constraints(target_relation, constraints) }} {% endif %} - {% if column_masks %} + {#-- Column masks are only applied in V2 to avoid a window where data is unmasked (CTAS in V1 + writes data before masks can be applied, whereas V2 creates an empty table first) --#} + {% if column_masks and adapter.behavior.use_materialization_v2 %} {{ apply_column_masks(target_relation, column_masks) }} {% endif %} {% if row_filter %} diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index add74e99b..8f70c4a61 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -1118,24 +1118,6 @@ def model(dbt, spark): return spark.createDataFrame(data, schema=['id', 'msg', 'color']) """ -v1_config_changes_sql = """ -{{ config( - materialized = 'incremental', - unique_key = 'id', - merge_update_columns = ['msg'], -) }} - -{% if not is_incremental() %} - -select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color - -{% else %} - -select cast(1 as bigint) as id, 'updated' as msg, 'blue' as color - -{% endif %} -""" - v1_skip_config_changes_sql = """ {{ config( materialized = 'incremental', @@ -1154,34 +1136,6 @@ def model(dbt, spark): {% endif %} """ -v1_column_tags_a = """ -version: 2 - -models: - - name: v1_config_changes_sql - columns: - - name: id - databricks_tags: - pii: "false" - - name: msg - - name: color -""" - -v1_column_tags_b = """ -version: 2 - -models: - - name: v1_config_changes_sql - columns: - - name: id - databricks_tags: - pii: "true" - - name: msg - databricks_tags: - source: "app" - - name: color -""" - fail_if_metadata_fetched_macros = """ {% macro fetch_tags(relation) %} {{ exceptions.raise_compiler_error("fetch_tags should not be called when incremental_apply_config_changes is false") }} @@ -1208,6 +1162,33 @@ def model(dbt, spark): {% endmacro %} """ +fail_if_column_masks_applied_macro = """ +{% macro apply_column_masks(relation, column_masks) %} + {{ exceptions.raise_compiler_error("apply_column_masks should not be called in V1 — column masks must only be applied in V2 to prevent unmasked data exposure") }} +{% endmacro %} +""" + +v1_column_mask_model_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +select cast(1 as bigint) as id, 'hello' as name +""" + +v1_column_mask_schema = """ +version: 2 + +models: + - name: v1_column_mask_model_sql + columns: + - name: id + - name: name + column_mask: + function: full_mask +""" + warn_unenforced_override_sql = """ select "abc" as id """ diff --git a/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py b/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py index eaa13e719..99dbb8c00 100644 --- a/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py +++ b/tests/functional/adapter/incremental/test_v1_incremental_config_changes.py @@ -5,125 +5,64 @@ @pytest.mark.skip_profile("databricks_cluster") -class TestV1IncrementalColumnTags: - """Test that V1 incremental path applies column tag changes via process_config_changes.""" +class TestV1IncrementalSkipConfigChanges: + """Test that incremental_apply_config_changes=false skips metadata fetch queries in V1 path.""" @pytest.fixture(scope="class") def models(self): return { - "v1_config_changes_sql.sql": fixtures.v1_config_changes_sql, - "schema.yml": fixtures.v1_column_tags_a, - } - - def test_changing_column_tags(self, project): - # First run creates the table - util.run_dbt(["run"]) - - # Update column tags - util.write_file(fixtures.v1_column_tags_b, "models", "schema.yml") - util.run_dbt(["run"]) - - # Verify column tags were applied - results = project.run_sql( - f""" - select column_name, tag_name, tag_value - from `system`.`information_schema`.`column_tags` - where schema_name = '{project.test_schema}' - and table_name = 'v1_config_changes_sql' - order by column_name, tag_name - """, - fetch="all", - ) - - tags_dict = {} - for row in results: - col = row.column_name - if col not in tags_dict: - tags_dict[col] = {} - tags_dict[col][row.tag_name] = row.tag_value - - # Verify expected final state - expected_tags = { - "id": {"pii": "true"}, - "msg": {"source": "app"}, + "v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql, } - assert tags_dict == expected_tags - - -@pytest.mark.skip_profile("databricks_cluster") -class TestV1IncrementalColumnMasks: - """Test that V1 incremental path applies column mask changes via process_config_changes.""" @pytest.fixture(scope="class") - def models(self): + def macros(self): return { - "column_mask_sql.sql": fixtures.column_mask_sql, - "schema.yml": fixtures.column_mask_base, + "fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros, } - def test_changing_column_masks(self, project): - # Create mask functions - project.run_sql( - f""" - CREATE OR REPLACE FUNCTION - {project.database}.{project.test_schema}.full_mask(password STRING) - RETURNS STRING - RETURN '*****'; - """ - ) - project.run_sql( - f""" - CREATE OR REPLACE FUNCTION - {project.database}.{project.test_schema}.email_mask(value STRING) - RETURNS STRING - RETURN CONCAT( - REPEAT('*', POSITION('@' IN value) - 1), - SUBSTR(value, POSITION('@' IN value)) - ); - """ - ) - - # First run with initial masks + def test_incremental_run_skips_metadata_queries(self, project): + # First run creates the table util.run_dbt(["run"]) - masks = project.run_sql( - "SELECT id, name, email, password FROM column_mask_sql", - fetch="all", - ) - assert len(masks) == 1 - assert masks[0][1] == "*****" # name (masked) - assert masks[0][3] == "password123" # password (unmasked) - - # Update masks and verify changes - util.write_file(fixtures.column_mask_valid_mask_updates, "models", "schema.yml") + # Second run exercises the incremental merge path. + # If metadata fetch macros are called, they will raise errors and the run will fail. util.run_dbt(["run"]) - result = project.run_sql( - "SELECT id, name, email, password FROM column_mask_sql", fetch="all" - ) - assert len(result) == 1 - assert result[0][1] == "hello" # name (unmasked) - assert result[0][3] == "*****" # password (masked) - @pytest.mark.skip_profile("databricks_cluster") -class TestV1IncrementalSkipConfigChanges: - """Test that incremental_apply_config_changes=false skips metadata fetch queries.""" +class TestV1IncrementalColumnMasksNotApplied: + """Test that column masks are NOT applied in V1 incremental path. + + Column masks must only be applied in V2 where the empty table is created before data + arrives. In V1 (CTAS), data is written immediately, so applying masks after the fact + would leave a window where data is unmasked — a security/privacy vulnerability. + """ @pytest.fixture(scope="class") def models(self): return { - "v1_skip_config_changes_sql.sql": fixtures.v1_skip_config_changes_sql, + "v1_column_mask_model_sql.sql": fixtures.v1_column_mask_model_sql, + "schema.yml": fixtures.v1_column_mask_schema, } @pytest.fixture(scope="class") def macros(self): return { - "fail_if_metadata_fetched.sql": fixtures.fail_if_metadata_fetched_macros, + "fail_if_column_masks_applied.sql": fixtures.fail_if_column_masks_applied_macro, } - def test_incremental_run_skips_metadata_queries(self, project): + def test_column_masks_not_applied_in_v1(self, project): + # Create the mask function so the model config is valid + project.run_sql( + f""" + CREATE OR REPLACE FUNCTION + {project.database}.{project.test_schema}.full_mask(val STRING) + RETURNS STRING + RETURN '*****'; + """ + ) + # First run creates the table util.run_dbt(["run"]) - # Second run exercises the incremental merge path. - # If metadata fetch macros are called, they will raise errors and the run will fail. + # Second run exercises the incremental merge path with config change detection. + # If apply_column_masks is called, the overridden macro raises an error. util.run_dbt(["run"])