From b466c9c5d2591eb1926783d522d718dd1dec355e Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Tue, 21 Apr 2026 17:26:05 +0900 Subject: [PATCH] perf: reuse columns from process_schema_changes in incremental materialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Incremental materialization previously discarded the return value of `process_schema_changes`, causing each strategy macro (`merge`, `append`, `delete+insert`) to issue a second `DESCRIBE TABLE EXTENDED` on the target relation even though `check_for_schema_changes` had just DESCRIBEd it. This change: - captures the columns returned by `process_schema_changes` in both V1 and V2 paths - falls back to a single `adapter.get_columns_in_relation(existing_relation)` when `on_schema_change == 'ignore'` - threads the result through `strategy_arg_dict['dest_columns']` - teaches `databricks__get_merge_sql`, `get_delete_insert_sql`, and `get_insert_into_sql` to honor a pre-supplied `dest_columns` and skip their own `DESCRIBE` when provided Net effect: one fewer `DESCRIBE TABLE EXTENDED … AS JSON` round-trip per incremental model, per run. Verified on a project with 9 incremental stg models (V1 path, `on_schema_change: 'fail'`): target DESCRIBE count drops from 2 to 1 per model across merge, append, and delete+insert strategies. Resolves #1411 Co-authored-by: Isaac --- CHANGELOG.md | 6 ++++ .../incremental/incremental.sql | 26 +++++++++++++---- .../incremental/strategies.sql | 28 +++++++++++++++---- 3 files changed, 48 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c5767bf9..bf2e33dc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## dbt-databricks next + +### Under the Hood + +- Reuse the columns returned by `process_schema_changes` in the incremental materialization so the downstream strategy macros (`merge`, `append`, `delete+insert`) no longer re-issue `DESCRIBE TABLE EXTENDED` on the target. Saves one metadata round-trip per incremental model. Mirrors the existing `dbt-snowflake` pattern. + ## dbt-databricks 1.11.7 (Apr 17, 2026) ### Features diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index ac717ae45..48297b0ed 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -58,9 +58,16 @@ {{ set_overwrite_mode('DYNAMIC') }} {%- endif -%} {#-- Relation must be merged --#} - {%- do process_schema_changes(on_schema_change, intermediate_relation, existing_relation) -%} + {#-- Reuse the columns returned by `process_schema_changes` so the downstream + merge strategy macro doesn't have to re-issue DESCRIBE on the target. + When `on_schema_change == 'ignore'`, the macro returns `{}` and we fall + back to a single DESCRIBE on the existing relation. --#} + {%- set dest_columns = process_schema_changes(on_schema_change, intermediate_relation, existing_relation) -%} + {%- if not dest_columns -%} + {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} + {%- endif -%} {{ process_config_changes(target_relation) }} - {% set build_sql = get_build_sql(incremental_strategy, target_relation, intermediate_relation) %} + {% set build_sql = get_build_sql(incremental_strategy, target_relation, intermediate_relation, dest_columns) %} {%- if language == 'sql' -%} {#-- Check if build_sql is a list (multi-statement strategy) or a string (single statement) --#} {%- if build_sql is sequence and build_sql is not string -%} @@ -139,13 +146,20 @@ {%- call statement('create_temp_relation', language=language) -%} {{ create_table_as(True, temp_relation, compiled_code, language) }} {%- endcall -%} - {%- do process_schema_changes(on_schema_change, temp_relation, existing_relation) -%} + {#-- Reuse the columns returned by `process_schema_changes` so the downstream + merge strategy macro doesn't have to re-issue DESCRIBE on the target. + When `on_schema_change == 'ignore'`, the macro returns `{}` and we fall + back to a single DESCRIBE on the existing relation. --#} + {%- set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) -%} + {%- if not dest_columns -%} + {%- set dest_columns = adapter.get_columns_in_relation(existing_relation) -%} + {%- endif -%} {%- set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) -%} {%- set strategy_arg_dict = ({ 'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, - 'dest_columns': none, + 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates}) -%} {%- set build_sql = strategy_sql_macro_func(strategy_arg_dict) -%} {%- if language == 'sql' -%} @@ -221,7 +235,7 @@ {% endif %} {% endmacro %} -{% macro get_build_sql(incremental_strategy, target_relation, intermediate_relation) %} +{% macro get_build_sql(incremental_strategy, target_relation, intermediate_relation, dest_columns=none) %} {%- set unique_key = config.get('unique_key') -%} {%- set incremental_predicates = config.get('predicates') or config.get('incremental_predicates') -%} {%- set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) -%} @@ -229,7 +243,7 @@ 'target_relation': target_relation, 'temp_relation': intermediate_relation, 'unique_key': unique_key, - 'dest_columns': none, + 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates}) -%} {% do return(strategy_sql_macro_func(strategy_arg_dict)) %} {% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index cd2805096..e98093133 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -8,7 +8,7 @@ {% endmacro %} {% macro databricks__get_incremental_append_sql(arg_dict) %} - {% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %} + {% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"], arg_dict.get("dest_columns"))) %} {% endmacro %} {% macro databricks__get_incremental_replace_where_sql(arg_dict) %} @@ -138,7 +138,13 @@ INSERT INTO {{ target_relation.render() }} {%- set source_relation = arg_dict.get('temp_relation') -%} {%- set target_relation = arg_dict.get('target_relation') -%} {%- set incremental_predicates = config.get('incremental_predicates') -%} - {%- set target_columns = (adapter.get_columns_in_relation(target_relation) | map(attribute='quoted') | list) -%} + {#-- Reuse dest_columns from the materialization (obtained via `process_schema_changes`) + when provided, otherwise fall back to a fresh DESCRIBE. --#} + {%- set dest_columns = arg_dict.get('dest_columns') -%} + {%- if dest_columns is none -%} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- endif -%} + {%- set target_columns = (dest_columns | map(attribute='quoted') | list) -%} {%- set unique_key = config.require('unique_key') -%} {% do return(delete_insert_sql_impl(source_relation, target_relation, target_columns, unique_key, incremental_predicates)) %} {% endmacro %} @@ -219,10 +225,15 @@ where {{ incremental_predicates }} {% endmacro %} -{% macro get_insert_into_sql(source_relation, target_relation) %} +{% macro get_insert_into_sql(source_relation, target_relation, dest_columns=none) %} {%- set source_columns = adapter.get_columns_in_relation(source_relation) | map(attribute="name") | list -%} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) | map(attribute="name") | list -%} - {{ insert_into_sql_impl(target_relation, dest_columns, source_relation, source_columns) }} + {#-- Reuse dest_columns from the materialization when provided; otherwise DESCRIBE. --#} + {%- if dest_columns is none -%} + {%- set dest_cols_list = adapter.get_columns_in_relation(target_relation) | map(attribute="name") | list -%} + {%- else -%} + {%- set dest_cols_list = dest_columns | map(attribute="name") | list -%} + {%- endif -%} + {{ insert_into_sql_impl(target_relation, dest_cols_list, source_relation, source_columns) }} {% endmacro %} {% macro insert_into_sql_impl(target_relation, dest_columns, source_relation, source_columns) %} @@ -273,7 +284,12 @@ where {{ incremental_predicates }} {%- set source_alias = config.get('source_alias', 'DBT_INTERNAL_SOURCE') -%} {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} - {%- set dest_columns = adapter.get_columns_in_relation(target) -%} + {#-- Prefer the `dest_columns` passed in by the materialization (obtained via + `process_schema_changes` or a single DESCRIBE on the existing relation). + Only issue a fresh DESCRIBE when no columns were provided. --#} + {%- if dest_columns is none -%} + {%- set dest_columns = adapter.get_columns_in_relation(target) -%} + {%- endif -%} {%- set source_columns = (adapter.get_columns_in_relation(source) | map(attribute='name') | list)-%} {%- set merge_update_columns = config.get('merge_update_columns') -%} {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}