Skip to content

Commit 7de8247

Browse files
izeigermanthemisvaltinos
authored andcommitted
Use native Macro Registry APIs (#5452)
1 parent 4c367cc commit 7de8247

File tree

7 files changed

+32
-43
lines changed

7 files changed

+32
-43
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
SnapshotTableCleanupTask,
6868
)
6969
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker
70-
from sqlmesh.utils import random_id, CorrelationId
70+
from sqlmesh.utils import random_id, CorrelationId, AttributeDict
7171
from sqlmesh.utils.concurrency import (
7272
concurrent_apply_to_snapshots,
7373
concurrent_apply_to_values,
@@ -2731,12 +2731,12 @@ def _execute_materialization(
27312731
**kwargs: t.Any,
27322732
) -> None:
27332733
jinja_macros = model.jinja_macros
2734-
existing_globals = jinja_macros.global_objs.copy()
27352734

27362735
# For vdes we need to use the table, since we don't know the schema/table at parse time
27372736
parts = exp.to_table(table_name, dialect=self.adapter.dialect)
27382737

2739-
relation_info = existing_globals.pop("this")
2738+
existing_globals = jinja_macros.global_objs
2739+
relation_info = existing_globals.get("this")
27402740
if isinstance(relation_info, dict):
27412741
relation_info["database"] = parts.catalog
27422742
relation_info["identifier"] = parts.name
@@ -2750,29 +2750,29 @@ def _execute_materialization(
27502750
"identifier": parts.name,
27512751
"target": existing_globals.get("target", {"type": self.adapter.dialect}),
27522752
"execution_dt": kwargs.get("execution_time"),
2753+
"engine_adapter": self.adapter,
2754+
"sql": str(query_or_df),
2755+
"is_first_insert": is_first_insert,
2756+
"create_only": create_only,
2757+
# FIXME: Add support for transaction=False
2758+
"pre_hooks": [
2759+
AttributeDict({"sql": s.this.this, "transaction": True})
2760+
for s in model.pre_statements
2761+
],
2762+
"post_hooks": [
2763+
AttributeDict({"sql": s.this.this, "transaction": True})
2764+
for s in model.post_statements
2765+
],
2766+
"model_instance": model,
2767+
**kwargs,
27532768
}
27542769

2755-
context = jinja_macros._create_builtin_globals(
2756-
{"engine_adapter": self.adapter, **jinja_globals}
2757-
)
2758-
2759-
context.update(
2760-
{
2761-
"sql": str(query_or_df),
2762-
"is_first_insert": is_first_insert,
2763-
"create_only": create_only,
2764-
"pre_hooks": model.render_pre_statements(**render_kwargs),
2765-
"post_hooks": model.render_post_statements(**render_kwargs),
2766-
**kwargs,
2767-
}
2768-
)
2769-
27702770
try:
2771-
jinja_env = jinja_macros.build_environment(**context)
2771+
jinja_env = jinja_macros.build_environment(**jinja_globals)
27722772
template = jinja_env.from_string(self.materialization_template)
27732773

27742774
try:
2775-
template.render(**context)
2775+
template.render()
27762776
except MacroReturnVal as ret:
27772777
# this is a successful return from a macro call (dbt uses this list of Relations to update their relation cache)
27782778
returned_relations = ret.value.get("relations", [])

sqlmesh/dbt/adapter.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,6 @@ def execute(
9999
) -> t.Tuple[AdapterResponse, agate.Table]:
100100
"""Executes the given SQL statement and returns the results as an agate table."""
101101

102-
@abc.abstractmethod
103-
def run_hooks(
104-
self, hooks: t.List[str | exp.Expression], inside_transaction: bool = True
105-
) -> None:
106-
"""Executes the given hooks."""
107-
108102
@abc.abstractmethod
109103
def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
110104
"""Resolves the relation's schema to its physical schema."""
@@ -247,12 +241,6 @@ def execute(
247241
self._raise_parsetime_adapter_call_error("execute SQL")
248242
raise
249243

250-
def run_hooks(
251-
self, hooks: t.List[str | exp.Expression], inside_transaction: bool = True
252-
) -> None:
253-
self._raise_parsetime_adapter_call_error("run hooks")
254-
raise
255-
256244
def resolve_schema(self, relation: BaseRelation) -> t.Optional[str]:
257245
return relation.schema
258246

@@ -463,12 +451,6 @@ def resolve_identifier(self, relation: BaseRelation) -> t.Optional[str]:
463451
identifier = self._map_table_name(self._normalize(self._relation_to_table(relation))).name
464452
return identifier if identifier else None
465453

466-
def run_hooks(
467-
self, hooks: t.List[str | exp.Expression], inside_transaction: bool = True
468-
) -> None:
469-
# inside_transaction not yet supported similarly to transaction
470-
self.engine_adapter.execute([exp.maybe_parse(hook) for hook in hooks])
471-
472454
def _map_table_name(self, table: exp.Table) -> exp.Table:
473455
# Use the default dialect since this is the dialect used to normalize and quote keys in the
474456
# mapping table.

sqlmesh/dbt/builtin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,9 @@ def create_builtin_globals(
544544
"load_result": sql_execution.load_result,
545545
"run_query": sql_execution.run_query,
546546
"statement": sql_execution.statement,
547-
"run_hooks": adapter.run_hooks,
548547
"graph": adapter.graph,
549548
"selected_resources": list(jinja_globals.get("selected_models") or []),
549+
"write": lambda input: None, # We don't support writing yet
550550
}
551551
)
552552

sqlmesh/dbt/manifest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ def _load_models_and_seeds(self) -> None:
395395
dependencies = dependencies.union(
396396
self._extra_dependencies(sql, node.package_name, track_all_model_attrs=True)
397397
)
398+
for hook in [*node_config.get("pre-hook", []), *node_config.get("post-hook", [])]:
399+
dependencies = dependencies.union(
400+
self._extra_dependencies(
401+
hook["sql"], node.package_name, track_all_model_attrs=True
402+
)
403+
)
398404
dependencies = dependencies.union(
399405
self._flatten_dependencies_from_macros(dependencies.macros, node.package_name)
400406
)

sqlmesh/utils/jinja.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ def build_environment(self, **kwargs: t.Any) -> Environment:
369369
context.update(builtin_globals)
370370
context.update(root_macros)
371371
context.update(package_macros)
372+
context["render"] = lambda input: env.from_string(input).render()
372373

373374
env.globals.update(context)
374375
env.filters.update(self._environment.filters)

tests/dbt/test_custom_materializations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_custom_materialization_manifest_loading():
3737
assert custom_incremental.name == "custom_incremental"
3838
assert custom_incremental.adapter == "default"
3939
assert "make_temp_relation(new_relation)" in custom_incremental.definition
40-
assert "run_hooks(pre_hooks, inside_transaction=False)" in custom_incremental.definition
40+
assert "run_hooks(pre_hooks)" in custom_incremental.definition
4141
assert " {{ return({'relations': [new_relation]}) }}" in custom_incremental.definition
4242

4343

tests/fixtures/dbt/sushi_test/macros/materializations/custom_incremental.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
{%- set time_column = config.get('time_column') -%}
2020
{%- set interval_config = config.get('interval') -%}
2121

22-
{{ run_hooks(pre_hooks, inside_transaction=False) }}
22+
{{ run_hooks(pre_hooks) }}
2323

2424
{%- if existing_relation is none -%}
2525
{# The first insert creates new table if it doesn't exist #}
@@ -55,7 +55,7 @@
5555
{%- endcall -%}
5656
{%- endif -%}
5757

58-
{{ run_hooks(post_hooks, inside_transaction=False) }}
58+
{{ run_hooks(post_hooks) }}
5959

6060
{{ return({'relations': [new_relation]}) }}
61-
{%- endmaterialization -%}
61+
{%- endmaterialization -%}

0 commit comments

Comments
 (0)