Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 80 additions & 19 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
StreamingTableConfig,
)
from dbt.adapters.databricks.relation_configs.table_format import TableFormat
from dbt.adapters.databricks.relation_configs.tags import (
TagsProcessor,
)
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig
from dbt.adapters.databricks.relation_configs.view import ViewConfig
from dbt.adapters.databricks.utils import (
Expand Down Expand Up @@ -964,15 +967,19 @@ def parse_columns_and_constraints(
return enriched_columns, parsed_constraints

@available.parse(lambda *a, **k: {})
def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase:
def get_relation_config(
Comment thread
sd-db marked this conversation as resolved.
self,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> DatabricksRelationConfigBase:
if relation.type == DatabricksRelationType.MaterializedView:
return MaterializedViewAPI.get_from_relation(self, relation)
return MaterializedViewAPI.get_from_relation(self, relation, model_config)
elif relation.type == DatabricksRelationType.StreamingTable:
return StreamingTableAPI.get_from_relation(self, relation)
return StreamingTableAPI.get_from_relation(self, relation, model_config)
elif relation.type == DatabricksRelationType.Table:
return IncrementalTableAPI.get_from_relation(self, relation)
return IncrementalTableAPI.get_from_relation(self, relation, model_config)
elif relation.type == DatabricksRelationType.View:
return ViewAPI.get_from_relation(self, relation)
return ViewAPI.get_from_relation(self, relation, model_config)
else:
raise NotImplementedError(f"Relation type {relation.type} is not supported.")

Expand Down Expand Up @@ -1053,12 +1060,15 @@ def config_type(cls) -> type[DatabricksRelationConfig]:

@classmethod
def get_from_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> DatabricksRelationConfig:
"""Get the relation config from the relation."""

assert relation.type == cls.relation_type
results = cls._describe_relation(adapter, relation)
results = cls._describe_relation(adapter, relation, model_config)
return cls.config_type().from_results(results)

@classmethod
Expand All @@ -1070,7 +1080,10 @@ def get_from_relation_config(cls, relation_config: RelationConfig) -> Databricks
@classmethod
@abstractmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> RelationResults:
"""Describe the relation and return the results."""

Expand All @@ -1080,11 +1093,14 @@ def _describe_relation(
class DeltaLiveTableAPIBase(RelationAPIBase[DatabricksRelationConfig]):
@classmethod
def get_from_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> DatabricksRelationConfig:
"""Get the relation config from the relation."""

relation_config = super().get_from_relation(adapter, relation)
Comment thread
sd-db marked this conversation as resolved.
relation_config = super().get_from_relation(adapter, relation, model_config)

# Ensure any current refreshes are completed before returning the relation config
tblproperties = cast(TblPropertiesConfig, relation_config.config["tblproperties"])
Expand All @@ -1104,7 +1120,10 @@ def config_type(cls) -> type[MaterializedViewConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> RelationResults:
kwargs = {"table_name": relation}
results: RelationResults = dict()
Expand All @@ -1116,8 +1135,15 @@ def _describe_relation(
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

# To be backward compatible model_config can be None. In that case, tags should be fetched
# to maintain backward compatibility.
table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to model_table_tag_config we should differentiate between desired(model) configs and existing (relation) configs better

if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff():
Comment thread
sd-db marked this conversation as resolved.
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
else:
results["information_schema.tags"] = None
return results


Expand All @@ -1130,7 +1156,10 @@ def config_type(cls) -> type[StreamingTableConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> RelationResults:
kwargs = {"table_name": relation}
results: RelationResults = dict()
Expand All @@ -1153,16 +1182,37 @@ def config_type(cls) -> type[IncrementalTableConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> RelationResults:
results = {}
kwargs = {"relation": relation}

if not relation.is_hive_metastore():
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["information_schema.column_tags"] = adapter.execute_macro(
"fetch_column_tags", kwargs=kwargs
# To be backward compatible model_config can be None. In that case, tags should be fetched
# to maintain backward compatibility.
table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None
Comment thread
sd-db marked this conversation as resolved.
if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff():
Comment thread
sd-db marked this conversation as resolved.
results["information_schema.tags"] = adapter.execute_macro(
"fetch_tags", kwargs=kwargs
)
else:
results["information_schema.tags"] = None

# To be backward compatible model_config can be None. In that case, tags should be fetched
# to maintain backward compatibility.
column_tag_config = (
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments on naming + checks on desried_column_tag_config

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suffix _config and surrounding code makes it obvious that its coming from the user configuration.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable naming should still be intentional and descriptive, we should update it to model_column_tag_config

model_config.config.get(ColumnTagsProcessor.name) if model_config else None
)
if column_tag_config is None or column_tag_config.requires_server_metadata_for_diff():
results["information_schema.column_tags"] = adapter.execute_macro(
"fetch_column_tags", kwargs=kwargs
)
else:
results["information_schema.column_tags"] = None

results["non_null_constraint_columns"] = adapter.execute_macro(
"fetch_non_null_constraint_columns", kwargs=kwargs
)
Expand Down Expand Up @@ -1191,15 +1241,26 @@ def config_type(cls) -> type[ViewConfig]:

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
cls,
Comment thread
sd-db marked this conversation as resolved.
adapter: DatabricksAdapter,
relation: DatabricksRelation,
model_config: Optional[DatabricksRelationConfigBase] = None,
) -> RelationResults:
results = {}
kwargs = {"relation": relation}

results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)

# To be backward compatible model_config can be None. In that case, tags should be fetched
# to maintain backward compatibility.
table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None
if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff():
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
else:
results["information_schema.tags"] = None

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

kwargs = {"table_name": relation}
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ def get_diff(self, other: Self) -> Optional[Self]:
return self
return None

def requires_server_metadata_for_diff(self) -> bool:
"""
Indicates whether server metadata is required to compute the diff for this component.
"""
return True


class DatabricksRelationChangeSet(BaseModel):
"""Class for encapsulating the changes that need to be applied to a Databricks relation."""
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/databricks/relation_configs/column_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def get_diff(self, other: "ColumnTagsConfig") -> Optional["ColumnTagsConfig"]:
return ColumnTagsConfig(set_column_tags=set_column_tags)
return None

def requires_server_metadata_for_diff(self) -> bool:
"""
Indicates whether server metadata is required to compute the diff for this component.
"""
return self.set_column_tags is not None and len(self.set_column_tags) > 0


class ColumnTagsProcessor(DatabricksComponentProcessor[ColumnTagsConfig]):
name: ClassVar[str] = "column_tags"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class StreamingTableConfig(DatabricksRelationConfigBase):
CommentProcessor,
TblPropertiesProcessor,
RefreshProcessor,
TagsProcessor,
DescribeQueryProcessor,
TagsProcessor,
]
Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/databricks/relation_configs/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ def get_diff(self, other: "TagsConfig") -> Optional["TagsConfig"]:
return TagsConfig(set_tags=self.set_tags)
return None

def requires_server_metadata_for_diff(self) -> bool:
"""
Indicates whether server metadata is required to compute the diff for this component.
"""
return self.set_tags is not None and len(self.set_tags) > 0


class TagsProcessor(DatabricksComponentProcessor[TagsConfig]):
name: ClassVar[str] = "tags"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@
{{ set_overwrite_mode('DYNAMIC') }}
{%- endif -%}
{#-- Relation must be merged --#}
{%- set _existing_config = adapter.get_relation_config(existing_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set _existing_config = adapter.get_relation_config(existing_relation, model_config) -%}
{%- set _configuration_changes = model_config.get_changeset(_existing_config) -%}
{%- call statement('create_temp_relation', language=language) -%}
{{ create_table_as(True, temp_relation, compiled_code, language) }}
Expand Down Expand Up @@ -237,8 +237,8 @@
{% macro process_config_changes(target_relation) %}
{% set apply_config_changes = config.get('incremental_apply_config_changes', True) | as_bool %}
{% if apply_config_changes %}
{%- set existing_config = adapter.get_relation_config(target_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set existing_config = adapter.get_relation_config(target_relation, model_config) -%}
{%- set configuration_changes = model_config.get_changeset(existing_config) -%}
{{ apply_config_changeset(target_relation, model, configuration_changes) }}
{% endif %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/databricks/macros/relations/config.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{%- macro get_configuration_changes(existing_relation) -%}
{%- set existing_config = adapter.get_relation_config(existing_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
{%- set existing_config = adapter.get_relation_config(existing_relation, model_config) -%}
{%- set configuration_changes = model_config.get_changeset(existing_config) -%}
{% do return(configuration_changes) %}
{%- endmacro -%}
44 changes: 44 additions & 0 deletions tests/functional/adapter/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dbt.adapters.base.relation import BaseRelation
from dbt.tests.util import get_manifest

FAIL_IF_TAG_FETCH_CALLED_MACROS = """
{% macro fetch_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_tags should not be called") }}
{% endmacro %}
"""

FAIL_IF_TAG_AND_COLUMN_TAG_FETCH_CALLED_MACROS = """
{% macro fetch_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_tags should not be called") }}
{% endmacro %}

{% macro fetch_column_tags(relation) %}
{{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }}
{% endmacro %}
"""


def get_model_config(project, relation: BaseRelation):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this ? Since model_config is optional in the tests we can just not pass it and that also tests for other behaviour overall. Look at the places where this gets called it is most a no-op. I think this is mostly an artifact of the config not being Optional earlier. Since now it is we can look to remove as well.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this is adding extra complexity while mostly being no-op

"""Return the parsed dbt model config for the given relation fixture."""
manifest = get_manifest(project.project_root)
model_nodes = [
node
for node in manifest.nodes.values()
if getattr(node, "resource_type", None) == "model"
and getattr(node, "alias", None) == relation.identifier
]
assert len(model_nodes) == 1, (
f"Expected exactly one model node for relation {relation.identifier}, "
f"found {len(model_nodes)}"
)
model_node = model_nodes[0]

# The partial-parse manifest only stores `raw_code`; `compiled_code` is set
# later during compile/run. `get_config_from_model` requires it for MV/ST,
# so backfill from `raw_code` (only used for equality checks here). If a
# caller needs the actually-compiled SQL, run `run_dbt(["compile"])` first
# and read `target/manifest.json` instead.
if model_node.compiled_code is None:
model_node.compiled_code = model_node.raw_code

return project.adapter.get_config_from_model(model_node)
41 changes: 41 additions & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,47 @@
- name: color
"""

metadata_fetch_incremental_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
) }}

select cast(1 as bigint) as id
"""

metadata_fetch_no_tags_schema = """
version: 2

models:
- name: metadata_fetch_incremental
columns:
- name: id
"""

metadata_fetch_table_tags_schema = """
version: 2

models:
- name: metadata_fetch_incremental
config:
databricks_tags:
classification: internal
columns:
- name: id
"""

metadata_fetch_column_tags_schema = """
version: 2

models:
- name: metadata_fetch_incremental
columns:
- name: id
databricks_tags:
classification: internal
"""

tblproperties_a = """
version: 2

Expand Down
Loading
Loading