diff --git a/.circleci/continue_config.yml b/.circleci/continue_config.yml index e21f3d869b..c549c0ae78 100644 --- a/.circleci/continue_config.yml +++ b/.circleci/continue_config.yml @@ -144,8 +144,11 @@ jobs: - halt_unless_core - checkout - run: - name: Run the migration test - command: ./.circleci/test_migration.sh + name: Run the migration test - sushi + command: ./.circleci/test_migration.sh sushi "--gateway duckdb_persistent" + - run: + name: Run the migration test - sushi_dbt + command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config" ui_style: docker: diff --git a/.circleci/test_migration.sh b/.circleci/test_migration.sh index a85d933bd3..9b8fe89e6e 100755 --- a/.circleci/test_migration.sh +++ b/.circleci/test_migration.sh @@ -1,11 +1,6 @@ #!/usr/bin/env bash set -ex -GATEWAY_NAME="duckdb_persistent" -TMP_DIR=$(mktemp -d) -SUSHI_DIR="$TMP_DIR/sushi" - - if [[ -z $(git tag --points-at HEAD) ]]; then # If the current commit is not tagged, we need to find the last tag LAST_TAG=$(git describe --tags --abbrev=0) @@ -14,28 +9,48 @@ else LAST_TAG=$(git tag --sort=-creatordate | head -n 2 | tail -n 1) fi +if [ "$1" == "" ]; then + echo "Usage: $0 " + echo "eg $0 sushi '--gateway duckdb_persistent'" + exit 1 +fi + + +TMP_DIR=$(mktemp -d) +EXAMPLE_NAME="$1" +SQLMESH_OPTS="$2" +EXAMPLE_DIR="./examples/$EXAMPLE_NAME" +TEST_DIR="$TMP_DIR/$EXAMPLE_NAME" + +echo "Running migration test for '$EXAMPLE_NAME' in '$TEST_DIR' for example project '$EXAMPLE_DIR' using options '$SQLMESH_OPTS'" + git checkout $LAST_TAG # Install dependencies from the previous release. make install-dev -cp -r ./examples/sushi $TMP_DIR +cp -r $EXAMPLE_DIR $TEST_DIR + +# this is only needed temporarily until the released tag for $LAST_TAG includes this config +if [ "$EXAMPLE_NAME" == "sushi_dbt" ]; then + echo 'migration_test_config = sqlmesh_config(Path(__file__).parent, dbt_target_name="duckdb")' >> $TEST_DIR/config.py +fi # Run initial plan -pushd $SUSHI_DIR +pushd $TEST_DIR rm -rf ./data/* -sqlmesh --gateway $GATEWAY_NAME plan --no-prompts --auto-apply +sqlmesh $SQLMESH_OPTS plan --no-prompts --auto-apply rm -rf .cache popd -# Switch back to the starting state of the repository +# Switch back to the starting state of the repository git checkout - # Install updated dependencies. make install-dev # Migrate and make sure the diff is empty -pushd $SUSHI_DIR -sqlmesh --gateway $GATEWAY_NAME migrate -sqlmesh --gateway $GATEWAY_NAME diff prod -popd +pushd $TEST_DIR +sqlmesh $SQLMESH_OPTS migrate +sqlmesh $SQLMESH_OPTS diff prod +popd \ No newline at end of file diff --git a/examples/sushi_dbt/config.py b/examples/sushi_dbt/config.py index e7e28c98e4..2305cf79f2 100644 --- a/examples/sushi_dbt/config.py +++ b/examples/sushi_dbt/config.py @@ -5,3 +5,5 @@ config = sqlmesh_config(Path(__file__).parent) test_config = config + +migration_test_config = sqlmesh_config(Path(__file__).parent, dbt_target_name="duckdb") diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index 0790562de7..6b4f6c7a83 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -298,6 +298,7 @@ def init_example_project( dlt_path: t.Optional[str] = None, schema_name: str = "sqlmesh_example", cli_mode: InitCliMode = InitCliMode.DEFAULT, + start: t.Optional[str] = None, ) -> Path: root_path = Path(path) @@ -336,7 +337,6 @@ def init_example_project( models: t.Set[t.Tuple[str, str]] = set() settings = None - start = None if engine_type and template == ProjectTemplate.DLT: project_dialect = dialect or DIALECT_TO_TYPE.get(engine_type) if pipeline and project_dialect: diff --git a/sqlmesh/core/audit/definition.py b/sqlmesh/core/audit/definition.py index 561ee539f6..9f470872fe 100644 --- a/sqlmesh/core/audit/definition.py +++ b/sqlmesh/core/audit/definition.py @@ -19,7 +19,7 @@ sorted_python_env_payloads, ) from sqlmesh.core.model.common import make_python_env, single_value_or_tuple, ParsableSql -from sqlmesh.core.node import _Node +from sqlmesh.core.node import _Node, DbtInfoMixin, DbtNodeInfo from sqlmesh.core.renderer import QueryRenderer from sqlmesh.utils.date import TimeLike from sqlmesh.utils.errors import AuditConfigError, SQLMeshError, raise_config_error @@ -120,7 +120,7 @@ def audit_map_validator(cls: t.Type, v: t.Any, values: t.Any) -> t.Dict[str, t.A return {} -class ModelAudit(PydanticModel, AuditMixin, frozen=True): +class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True): """ Audit is an assertion made about your tables. @@ -137,6 +137,7 @@ class ModelAudit(PydanticModel, AuditMixin, frozen=True): expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() formatting: t.Optional[bool] = Field(default=None, exclude=True) + dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) _path: t.Optional[Path] = None @@ -150,6 +151,10 @@ def __str__(self) -> str: path = f": {self._path.name}" if self._path else "" return f"{self.__class__.__name__}<{self.name}{path}>" + @property + def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: + return self.dbt_node_info_ + class StandaloneAudit(_Node, AuditMixin): """ @@ -552,4 +557,5 @@ def _maybe_parse_arg_pair(e: exp.Expression) -> t.Tuple[str, exp.Expression]: "depends_on_": lambda value: exp.Tuple(expressions=sorted(value)), "tags": single_value_or_tuple, "default_catalog": exp.to_identifier, + "dbt_node_info_": lambda value: value.to_expression(), } diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index d7a2984f3a..437fbd6edd 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1697,9 +1697,9 @@ def plan_builder( console=self.console, user_provided_flags=user_provided_flags, selected_models={ - dbt_name + dbt_unique_id for model in model_selector.expand_model_selections(select_models or "*") - if (dbt_name := snapshots[model].node.dbt_name) + if (dbt_unique_id := snapshots[model].node.dbt_unique_id) }, explain=explain or False, ignore_cron=ignore_cron or False, diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index c9eaa43b3e..974901cb55 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -1197,6 +1197,9 @@ def metadata_hash(self) -> str: for k, v in sorted(args.items()): metadata.append(f"{k}:{gen(v)}") + if self.dbt_node_info: + metadata.append(self.dbt_node_info.json(sort_keys=True)) + metadata.extend(self._additional_metadata) self._metadata_hash = hash_data(metadata) @@ -3019,6 +3022,7 @@ def render_expression( "formatting": str, "optimize_query": str, "virtual_environment_mode": lambda value: exp.Literal.string(value.value), + "dbt_node_info_": lambda value: value.to_expression(), } diff --git a/sqlmesh/core/node.py b/sqlmesh/core/node.py index b04a59a39f..4a3bf2564b 100644 --- a/sqlmesh/core/node.py +++ b/sqlmesh/core/node.py @@ -153,6 +153,101 @@ def milliseconds(self) -> int: return self.seconds * 1000 +class DbtNodeInfo(PydanticModel): + """ + Represents dbt-specific model information set by the dbt loader and intended to be made available at the Snapshot level + (as opposed to hidden within the individual model jinja macro registries). + + This allows for things like injecting implementations of variables / functions into the Jinja context that are compatible with + their dbt equivalents but are backed by the sqlmesh snapshots in any given plan / environment + """ + + unique_id: str + """This is the node/resource name/unique_id that's used as the node key in the dbt manifest. + It's prefixed by the resource type and is exposed in context variables like {{ selected_resources }}. + + Examples: + - test.jaffle_shop.unique_stg_orders_order_id.e3b841c71a + - seed.jaffle_shop.raw_payments + - model.jaffle_shop.stg_orders + """ + + name: str + """Name of this object in the dbt global namespace, used by things like {{ ref() }} calls. + + Examples: + - unique_stg_orders_order_id + - raw_payments + - stg_orders + """ + + fqn: str + """Used for selectors in --select/--exclude. + Takes the filesystem into account so may be structured differently to :unique_id. + + Examples: + - jaffle_shop.staging.unique_stg_orders_order_id + - jaffle_shop.raw_payments + - jaffle_shop.staging.stg_orders + """ + + alias: t.Optional[str] = None + """This is dbt's way of overriding the _physical table_ a model is written to. + + It's used in the following situation: + - Say you have two models, "stg_customers" and "customers" + - You want "stg_customers" to be written to the "staging" schema as eg "staging.customers" - NOT "staging.stg_customers" + - But you cant rename the file to "customers" because it will conflict with your other model file "customers" + - Even if you put it in a different folder, eg "staging/customers.sql" - dbt still has a global namespace so it will conflict + when you try to do something like "{{ ref('customers') }}" + - So dbt's solution to this problem is to keep calling it "stg_customers" at the dbt project/model level, + but allow overriding the physical table to "customers" via something like "{{ config(alias='customers', schema='staging') }}" + + Note that if :alias is set, it does *not* replace :name at the model level and cannot be used interchangably with :name. + It also does not affect the :fqn or :unique_id. It's just used to override :name when it comes time to generate the physical table name. + """ + + @model_validator(mode="after") + def post_init(self) -> Self: + # by default, dbt sets alias to the same as :name + # however, we only want to include :alias if it is actually different / actually providing an override + if self.alias == self.name: + self.alias = None + return self + + def to_expression(self) -> exp.Expression: + """Produce a SQLGlot expression representing this object, for use in things like the model/audit definition renderers""" + return exp.tuple_( + *( + exp.PropertyEQ(this=exp.var(k), expression=exp.Literal.string(v)) + for k, v in sorted(self.model_dump(exclude_none=True).items()) + ) + ) + + +class DbtInfoMixin: + """This mixin encapsulates properties that only exist for dbt compatibility and are otherwise not required + for native projects""" + + @property + def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: + raise NotImplementedError() + + @property + def dbt_unique_id(self) -> t.Optional[str]: + """Used for compatibility with jinja context variables such as {{ selected_resources }}""" + if self.dbt_node_info: + return self.dbt_node_info.unique_id + return None + + @property + def dbt_fqn(self) -> t.Optional[str]: + """Used in the selector engine for compatibility with selectors that select models by dbt fqn""" + if self.dbt_node_info: + return self.dbt_node_info.fqn + return None + + # this must be sorted in descending order INTERVAL_SECONDS = { IntervalUnit.YEAR: 60 * 60 * 24 * 365, @@ -165,7 +260,7 @@ def milliseconds(self) -> int: } -class _Node(PydanticModel): +class _Node(DbtInfoMixin, PydanticModel): """ Node is the core abstraction for entity that can be executed within the scheduler. @@ -199,7 +294,7 @@ class _Node(PydanticModel): interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None) tags: t.List[str] = [] stamp: t.Optional[str] = None - dbt_name: t.Optional[str] = None # dbt node name + dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) _path: t.Optional[Path] = None _data_hash: t.Optional[str] = None _metadata_hash: t.Optional[str] = None @@ -446,6 +541,10 @@ def is_audit(self) -> bool: """Return True if this is an audit node""" return False + @property + def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: + return self.dbt_node_info_ + class NodeType(str, Enum): MODEL = "model" diff --git a/sqlmesh/core/scheduler.py b/sqlmesh/core/scheduler.py index fd2e1cf004..af4d72b165 100644 --- a/sqlmesh/core/scheduler.py +++ b/sqlmesh/core/scheduler.py @@ -839,7 +839,9 @@ def _run_or_audit( run_environment_statements=run_environment_statements, audit_only=audit_only, auto_restatement_triggers=auto_restatement_triggers, - selected_models={s.node.dbt_name for s in merged_intervals if s.node.dbt_name}, + selected_models={ + s.node.dbt_unique_id for s in merged_intervals if s.node.dbt_unique_id + }, ) return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS diff --git a/sqlmesh/dbt/basemodel.py b/sqlmesh/dbt/basemodel.py index 3534b95bc3..4637bbf91c 100644 --- a/sqlmesh/dbt/basemodel.py +++ b/sqlmesh/dbt/basemodel.py @@ -13,6 +13,7 @@ from sqlmesh.core.config.base import UpdateStrategy from sqlmesh.core.config.common import VirtualEnvironmentMode from sqlmesh.core.model import Model +from sqlmesh.core.node import DbtNodeInfo from sqlmesh.dbt.column import ( ColumnConfig, column_descriptions_to_sqlmesh, @@ -120,8 +121,10 @@ class BaseModelConfig(GeneralConfig): grain: t.Union[str, t.List[str]] = [] # DBT configuration fields + unique_id: str = "" name: str = "" package_name: str = "" + fqn: t.List[str] = [] schema_: str = Field("", alias="schema") database: t.Optional[str] = None alias: t.Optional[str] = None @@ -273,12 +276,10 @@ def sqlmesh_config_fields(self) -> t.Set[str]: return {"description", "owner", "stamp", "storage_format"} @property - def node_name(self) -> str: - resource_type = getattr(self, "resource_type", "model") - node_name = f"{resource_type}.{self.package_name}.{self.name}" - if self.version: - node_name += f".v{self.version}" - return node_name + def node_info(self) -> DbtNodeInfo: + return DbtNodeInfo( + unique_id=self.unique_id, name=self.name, fqn=".".join(self.fqn), alias=self.alias + ) def sqlmesh_model_kwargs( self, @@ -349,8 +350,8 @@ def to_sqlmesh( def _model_jinja_context( self, context: DbtContext, dependencies: Dependencies ) -> t.Dict[str, t.Any]: - if context._manifest and self.node_name in context._manifest._manifest.nodes: - attributes = context._manifest._manifest.nodes[self.node_name].to_dict() + if context._manifest and self.unique_id in context._manifest._manifest.nodes: + attributes = context._manifest._manifest.nodes[self.unique_id].to_dict() if dependencies.model_attrs.all_attrs: model_node: AttributeDict[str, t.Any] = AttributeDict(attributes) else: diff --git a/sqlmesh/dbt/model.py b/sqlmesh/dbt/model.py index efad5e790b..9386b0b4f8 100644 --- a/sqlmesh/dbt/model.py +++ b/sqlmesh/dbt/model.py @@ -694,7 +694,7 @@ def to_sqlmesh( extract_dependencies_from_query=False, allow_partials=allow_partials, virtual_environment_mode=virtual_environment_mode, - dbt_name=self.node_name, + dbt_node_info=self.node_info, **optional_kwargs, **model_kwargs, ) diff --git a/sqlmesh/dbt/seed.py b/sqlmesh/dbt/seed.py index d6ecc768f9..c0c8186f29 100644 --- a/sqlmesh/dbt/seed.py +++ b/sqlmesh/dbt/seed.py @@ -92,7 +92,7 @@ def to_sqlmesh( audit_definitions=audit_definitions, virtual_environment_mode=virtual_environment_mode, start=self.start or context.sqlmesh_config.model_defaults.start, - dbt_name=self.node_name, + dbt_node_info=self.node_info, **kwargs, ) diff --git a/sqlmesh/dbt/test.py b/sqlmesh/dbt/test.py index 5c18ff4d81..747c9d469c 100644 --- a/sqlmesh/dbt/test.py +++ b/sqlmesh/dbt/test.py @@ -8,6 +8,7 @@ from pydantic import Field import sqlmesh.core.dialect as d from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit +from sqlmesh.core.node import DbtNodeInfo from sqlmesh.dbt.common import ( Dependencies, GeneralConfig, @@ -79,8 +80,10 @@ class TestConfig(GeneralConfig): dialect_: t.Optional[str] = Field(None, alias="dialect") # dbt fields + unique_id: str = "" package_name: str = "" alias: t.Optional[str] = None + fqn: t.List[str] = [] schema_: t.Optional[str] = Field("", alias="schema") database: t.Optional[str] = None severity: Severity = Severity.ERROR @@ -155,6 +158,7 @@ def to_sqlmesh(self, context: DbtContext) -> Audit: jinja_macros.add_globals({"this": self.relation_info}) audit = StandaloneAudit( name=self.name, + dbt_node_info=self.node_info, dialect=self.dialect(context), skip=skip, query=query, @@ -171,6 +175,7 @@ def to_sqlmesh(self, context: DbtContext) -> Audit: else: audit = ModelAudit( name=self.name, + dbt_node_info=self.node_info, dialect=self.dialect(context), skip=skip, blocking=blocking, @@ -214,6 +219,12 @@ def relation_info(self) -> AttributeDict: } ) + @property + def node_info(self) -> DbtNodeInfo: + return DbtNodeInfo( + unique_id=self.unique_id, name=self.name, fqn=".".join(self.fqn), alias=self.alias + ) + def _remove_jinja_braces(jinja_str: str) -> str: no_braces = jinja_str diff --git a/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py b/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py new file mode 100644 index 0000000000..c8acd0bafd --- /dev/null +++ b/sqlmesh/migrations/v0098_add_dbt_node_info_in_node.py @@ -0,0 +1,105 @@ +"""Replace 'dbt_name' with 'dbt_node_info' in the snapshot definition""" + +import json +from sqlglot import exp +from sqlmesh.utils.migration import index_text_type, blob_text_type + + +def migrate_schemas(state_sync, **kwargs): # type: ignore + pass + + +def migrate_rows(state_sync, **kwargs): # type: ignore + import pandas as pd + + engine_adapter = state_sync.engine_adapter + schema = state_sync.schema + snapshots_table = "_snapshots" + if schema: + snapshots_table = f"{schema}.{snapshots_table}" + + index_type = index_text_type(engine_adapter.dialect) + blob_type = blob_text_type(engine_adapter.dialect) + + new_snapshots = [] + migration_needed = False + + for ( + name, + identifier, + version, + snapshot, + kind_name, + updated_ts, + unpaused_ts, + ttl_ms, + unrestorable, + forward_only, + dev_version, + fingerprint, + ) in engine_adapter.fetchall( + exp.select( + "name", + "identifier", + "version", + "snapshot", + "kind_name", + "updated_ts", + "unpaused_ts", + "ttl_ms", + "unrestorable", + "forward_only", + "dev_version", + "fingerprint", + ).from_(snapshots_table), + quote_identifiers=True, + ): + parsed_snapshot = json.loads(snapshot) + if dbt_name := parsed_snapshot["node"].get("dbt_name"): + parsed_snapshot["node"].pop("dbt_name") + parsed_snapshot["node"]["dbt_node_info"] = { + "unique_id": dbt_name, + # these will get populated as metadata-only changes on the next plan + "name": "", + "fqn": "", + } + migration_needed = True + + new_snapshots.append( + { + "name": name, + "identifier": identifier, + "version": version, + "snapshot": json.dumps(parsed_snapshot), + "kind_name": kind_name, + "updated_ts": updated_ts, + "unpaused_ts": unpaused_ts, + "ttl_ms": ttl_ms, + "unrestorable": unrestorable, + "forward_only": forward_only, + "dev_version": dev_version, + "fingerprint": fingerprint, + } + ) + + if migration_needed and new_snapshots: + engine_adapter.delete_from(snapshots_table, "TRUE") + + engine_adapter.insert_append( + snapshots_table, + pd.DataFrame(new_snapshots), + target_columns_to_types={ + "name": exp.DataType.build(index_type), + "identifier": exp.DataType.build(index_type), + "version": exp.DataType.build(index_type), + "snapshot": exp.DataType.build(blob_type), + "kind_name": exp.DataType.build(index_type), + "updated_ts": exp.DataType.build("bigint"), + "unpaused_ts": exp.DataType.build("bigint"), + "ttl_ms": exp.DataType.build("bigint"), + "unrestorable": exp.DataType.build("boolean"), + "forward_only": exp.DataType.build("boolean"), + "dev_version": exp.DataType.build(index_type), + "fingerprint": exp.DataType.build(blob_type), + }, + ) diff --git a/sqlmesh_dbt/console.py b/sqlmesh_dbt/console.py index 3c62adfe68..6bf7a1618f 100644 --- a/sqlmesh_dbt/console.py +++ b/sqlmesh_dbt/console.py @@ -1,6 +1,7 @@ import typing as t from sqlmesh.core.console import TerminalConsole from sqlmesh.core.model import Model +from sqlmesh.core.snapshot.definition import Node from rich.tree import Tree @@ -9,19 +10,26 @@ def print(self, msg: str) -> None: return self._print(msg) def list_models( - self, models: t.List[Model], list_parents: bool = True, list_audits: bool = True + self, + models: t.List[Model], + all_nodes: t.Dict[str, Node], + list_parents: bool = True, + list_audits: bool = True, ) -> None: model_list = Tree("[bold]Models in project:[/bold]") for model in models: - model_tree = model_list.add(model.name) + model_tree = model_list.add(model.dbt_fqn or model.name) if list_parents: - for parent in model.depends_on: - model_tree.add(f"depends_on: {parent}") + for parent_name in model.depends_on: + if parent := all_nodes.get(parent_name): + parent_name = parent.dbt_fqn or parent_name + + model_tree.add(f"depends_on: {parent_name}") if list_audits: - for audit_name in model.audit_definitions: - model_tree.add(f"audit: {audit_name}") + for audit_name, audit in model.audit_definitions.items(): + model_tree.add(f"audit: {audit.dbt_fqn or audit_name}") self._print(model_list) diff --git a/sqlmesh_dbt/operations.py b/sqlmesh_dbt/operations.py index f95d0d931e..e15a2cb93e 100644 --- a/sqlmesh_dbt/operations.py +++ b/sqlmesh_dbt/operations.py @@ -32,7 +32,9 @@ def list_( # - "data tests" (audits) for those models # it also applies selectors which is useful for testing selectors selected_models = list(self._selected_models(select, exclude).values()) - self.console.list_models(selected_models) + self.console.list_models( + selected_models, {k: v.node for k, v in self.context.snapshots.items()} + ) def run( self, @@ -260,7 +262,7 @@ def create( return DbtOperations(sqlmesh_context, dbt_project, debug=debug) -def init_project_if_required(project_dir: Path) -> None: +def init_project_if_required(project_dir: Path, start: t.Optional[str] = None) -> None: """ SQLMesh needs a start date to as the starting point for calculating intervals on incremental models, amongst other things @@ -276,4 +278,6 @@ def init_project_if_required(project_dir: Path) -> None: if not any(f.exists() for f in [project_dir / file for file in ALL_CONFIG_FILENAMES]): get_console().log_warning("No existing SQLMesh config detected; creating one") - init_example_project(path=project_dir, engine_type=None, template=ProjectTemplate.DBT) + init_example_project( + path=project_dir, engine_type=None, template=ProjectTemplate.DBT, start=start + ) diff --git a/tests/core/test_audit.py b/tests/core/test_audit.py index ed67975e9e..2ffcbbc4b2 100644 --- a/tests/core/test_audit.py +++ b/tests/core/test_audit.py @@ -5,6 +5,7 @@ from sqlmesh.core import constants as c from sqlmesh.core.config.model import ModelDefaultsConfig from sqlmesh.core.context import Context +from sqlmesh.core.node import DbtNodeInfo from sqlmesh.core.audit import ( ModelAudit, StandaloneAudit, @@ -12,7 +13,7 @@ load_audit, load_multiple_audits, ) -from sqlmesh.core.dialect import parse +from sqlmesh.core.dialect import parse, jinja_query from sqlmesh.core.model import ( FullKind, IncrementalByTimeRangeKind, @@ -730,6 +731,27 @@ def test_render_definition(): assert "def test_macro(evaluator, v):" in format_model_expressions(audit.render_definition()) +def test_render_definition_dbt_node_info(): + node_info = DbtNodeInfo( + unique_id="test.project.my_audit", name="my_audit", fqn="project.my_audit" + ) + + audit = StandaloneAudit(name="my_audit", dbt_node_info=node_info, query=jinja_query("select 1")) + + assert ( + audit.render_definition()[0].sql(pretty=True) + == """AUDIT ( + name my_audit, + dbt_node_info ( + fqn := 'project.my_audit', + name := 'my_audit', + unique_id := 'test.project.my_audit' + ), + standalone TRUE +)""" + ) + + def test_text_diff(): expressions = parse( """ diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 00ff48b0d2..726ac52b66 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -61,7 +61,7 @@ from sqlmesh.core.model.common import parse_expression from sqlmesh.core.model.kind import ModelKindName, _model_kind_validator from sqlmesh.core.model.seed import CsvSettings -from sqlmesh.core.node import IntervalUnit, _Node +from sqlmesh.core.node import IntervalUnit, _Node, DbtNodeInfo from sqlmesh.core.signal import signal from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory from sqlmesh.utils.date import TimeLike, to_datetime, to_ds, to_timestamp @@ -2100,6 +2100,33 @@ def test_render_definition_with_virtual_update_statements(): ) +def test_render_definition_dbt_node_info(): + node_info = DbtNodeInfo(unique_id="model.db.table", name="table", fqn="db.table") + model = load_sql_based_model( + d.parse( + f""" + MODEL ( + name db.table, + kind FULL + ); + + select 1 as a; + """ + ), + dbt_node_info=node_info, + ) + + assert model.dbt_node_info + assert ( + model.render_definition()[0].sql(pretty=True) + == """MODEL ( + name db.table, + dbt_node_info (fqn := 'db.table', name := 'table', unique_id := 'model.db.table'), + kind FULL +)""" + ) + + def test_cron(): daily = _Node(name="x", cron="@daily") assert to_datetime(daily.cron_prev("2020-01-01")) == to_datetime("2019-12-31") diff --git a/tests/dbt/cli/conftest.py b/tests/dbt/cli/conftest.py index e555f9144a..26757bf3ab 100644 --- a/tests/dbt/cli/conftest.py +++ b/tests/dbt/cli/conftest.py @@ -1,82 +1,7 @@ import typing as t -from pathlib import Path -import os import functools from click.testing import CliRunner, Result -from sqlmesh_dbt.operations import init_project_if_required import pytest -import uuid - - -class EmptyProjectCreator(t.Protocol): - def __call__( - self, project_name: t.Optional[str] = None, target_name: t.Optional[str] = None - ) -> Path: ... - - -@pytest.fixture -def jaffle_shop_duckdb(copy_to_temp_path: t.Callable[..., t.List[Path]]) -> t.Iterable[Path]: - fixture_path = Path(__file__).parent / "fixtures" / "jaffle_shop_duckdb" - assert fixture_path.exists() - - current_path = os.getcwd() - output_path = copy_to_temp_path(paths=fixture_path)[0] - - # so that we can invoke commands from the perspective of a user that is already in the correct directory - os.chdir(output_path) - - yield output_path - - os.chdir(current_path) - - -@pytest.fixture -def create_empty_project( - copy_to_temp_path: t.Callable[..., t.List[Path]], -) -> t.Iterable[t.Callable[..., Path]]: - default_project_name = f"test_{str(uuid.uuid4())[:8]}" - default_target_name = "duckdb" - fixture_path = Path(__file__).parent / "fixtures" / "empty_project" - assert fixture_path.exists() - - current_path = os.getcwd() - - def _create_empty_project( - project_name: t.Optional[str] = None, target_name: t.Optional[str] = None - ) -> Path: - project_name = project_name or default_project_name - target_name = target_name or default_target_name - output_path = copy_to_temp_path(paths=fixture_path)[0] - - dbt_project_yml = output_path / "dbt_project.yml" - profiles_yml = output_path / "profiles.yml" - - assert dbt_project_yml.exists() - assert profiles_yml.exists() - - (output_path / "models").mkdir() - (output_path / "seeds").mkdir() - - dbt_project_yml.write_text( - dbt_project_yml.read_text().replace("empty_project", project_name) - ) - profiles_yml.write_text( - profiles_yml.read_text() - .replace("empty_project", project_name) - .replace("__DEFAULT_TARGET__", target_name) - ) - - init_project_if_required(output_path) - - # so that we can invoke commands from the perspective of a user that is already in the correct directory - os.chdir(output_path) - - return output_path - - yield _create_empty_project - - # cleanup - switch cwd back to original - os.chdir(current_path) @pytest.fixture diff --git a/tests/dbt/cli/test_list.py b/tests/dbt/cli/test_list.py index 1bc22ce87e..4d294decc1 100644 --- a/tests/dbt/cli/test_list.py +++ b/tests/dbt/cli/test_list.py @@ -12,10 +12,10 @@ def test_list(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[..., Result]): assert result.exit_code == 0 assert not result.exception - assert "main.orders" in result.output - assert "main.customers" in result.output - assert "main.stg_payments" in result.output - assert "main.raw_orders" in result.output + assert "─ jaffle_shop.orders" in result.output + assert "─ jaffle_shop.customers" in result.output + assert "─ jaffle_shop.staging.stg_payments" in result.output + assert "─ jaffle_shop.raw_orders" in result.output def test_list_select(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[..., Result]): @@ -24,12 +24,12 @@ def test_list_select(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[..., Resul assert result.exit_code == 0 assert not result.exception - assert "main.customers" in result.output - assert "main.stg_customers" in result.output - assert "main.raw_customers" in result.output + assert "─ jaffle_shop.customers" in result.output + assert "─ jaffle_shop.staging.stg_customers" in result.output + assert "─ jaffle_shop.raw_customers" in result.output - assert "main.stg_payments" not in result.output - assert "main.raw_orders" not in result.output + assert "─ jaffle_shop.staging.stg_payments" not in result.output + assert "─ jaffle_shop.raw_orders" not in result.output def test_list_select_exclude(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[..., Result]): @@ -39,13 +39,13 @@ def test_list_select_exclude(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[.. assert result.exit_code == 0 assert not result.exception - assert "main.customers" in result.output - assert "main.stg_customers" in result.output - assert "main.raw_customers" in result.output + assert "─ jaffle_shop.customers" in result.output + assert "─ jaffle_shop.staging.stg_customers" in result.output + assert "─ jaffle_shop.raw_customers" in result.output - assert "main.orders" not in result.output - assert "main.stg_payments" not in result.output - assert "main.raw_orders" not in result.output + assert "─ jaffle_shop.orders" not in result.output + assert "─ jaffle_shop.staging.stg_payments" not in result.output + assert "─ jaffle_shop.raw_orders" not in result.output # multiple exclude for args in ( @@ -56,21 +56,26 @@ def test_list_select_exclude(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[.. assert result.exit_code == 0 assert not result.exception - assert "main.stg_orders" in result.output + assert "─ jaffle_shop.staging.stg_orders" in result.output - assert "main.customers" not in result.output - assert "main.orders" not in result.output + assert "─ jaffle_shop.customers" not in result.output + assert "─ jaffle_shop.orders" not in result.output def test_list_with_vars(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[..., Result]): - (jaffle_shop_duckdb / "models" / "aliased_model.sql").write_text(""" - {{ config(alias='model_' + var('foo')) }} - select 1 + ( + jaffle_shop_duckdb / "models" / "vars_model.sql" + ).write_text(""" + select * from {{ ref('custom' + var('foo')) }} """) - result = invoke_cli(["list", "--vars", "foo: bar"]) + result = invoke_cli(["list", "--vars", "foo: ers"]) assert result.exit_code == 0 assert not result.exception - assert "model_bar" in result.output + assert ( + """├── jaffle_shop.vars_model +│ └── depends_on: jaffle_shop.customers""" + in result.output + ) diff --git a/tests/dbt/cli/test_operations.py b/tests/dbt/cli/test_operations.py index e9c4dc0063..769887efe4 100644 --- a/tests/dbt/cli/test_operations.py +++ b/tests/dbt/cli/test_operations.py @@ -8,7 +8,7 @@ import time_machine from sqlmesh.core.plan import PlanBuilder from sqlmesh.core.config.common import VirtualEnvironmentMode -from tests.dbt.cli.conftest import EmptyProjectCreator +from tests.dbt.conftest import EmptyProjectCreator pytestmark = pytest.mark.slow @@ -273,7 +273,7 @@ def test_run_option_full_refresh( create_empty_project: EmptyProjectCreator, env_name: str, vde_mode: VirtualEnvironmentMode ): # create config file prior to load - project_path = create_empty_project(project_name="test") + project_path, models_path = create_empty_project(project_name="test") config_path = project_path / "sqlmesh.yaml" config = yaml.load(config_path) @@ -282,8 +282,8 @@ def test_run_option_full_refresh( with config_path.open("w") as f: yaml.dump(config, f) - (project_path / "models" / "model_a.sql").write_text("select 1") - (project_path / "models" / "model_b.sql").write_text("select 2") + (models_path / "model_a.sql").write_text("select 1") + (models_path / "model_b.sql").write_text("select 2") operations = create(project_dir=project_path) diff --git a/tests/dbt/cli/test_run.py b/tests/dbt/cli/test_run.py index 9af1de8561..788a7b04a8 100644 --- a/tests/dbt/cli/test_run.py +++ b/tests/dbt/cli/test_run.py @@ -5,7 +5,7 @@ import time_machine from sqlmesh_dbt.operations import create from tests.cli.test_cli import FREEZE_TIME -from tests.dbt.cli.conftest import EmptyProjectCreator +from tests.dbt.conftest import EmptyProjectCreator pytestmark = pytest.mark.slow @@ -45,13 +45,13 @@ def test_run_with_selectors(jaffle_shop_duckdb: Path, invoke_cli: t.Callable[... def test_run_with_changes_and_full_refresh( create_empty_project: EmptyProjectCreator, invoke_cli: t.Callable[..., Result] ): - project_path = create_empty_project(project_name="test") + project_path, models_path = create_empty_project(project_name="test") engine_adapter = create(project_path).context.engine_adapter engine_adapter.execute("create table external_table as select 'foo' as a, 'bar' as b") - (project_path / "models" / "model_a.sql").write_text("select a, b from external_table") - (project_path / "models" / "model_b.sql").write_text("select a, b from {{ ref('model_a') }}") + (models_path / "model_a.sql").write_text("select a, b from external_table") + (models_path / "model_b.sql").write_text("select a, b from {{ ref('model_a') }}") # populate initial env result = invoke_cli(["run"]) diff --git a/tests/dbt/conftest.py b/tests/dbt/conftest.py index 5875d9f575..56d77e7496 100644 --- a/tests/dbt/conftest.py +++ b/tests/dbt/conftest.py @@ -1,6 +1,8 @@ from __future__ import annotations import typing as t +import os +from pathlib import Path import pytest @@ -8,6 +10,17 @@ from sqlmesh.dbt.context import DbtContext from sqlmesh.dbt.project import Project from sqlmesh.dbt.target import PostgresConfig +from sqlmesh_dbt.operations import init_project_if_required +import uuid + + +class EmptyProjectCreator(t.Protocol): + def __call__( + self, + project_name: t.Optional[str] = None, + target_name: t.Optional[str] = None, + start: t.Optional[str] = None, + ) -> t.Tuple[Path, Path]: ... @pytest.fixture() @@ -15,6 +28,80 @@ def sushi_test_project(sushi_test_dbt_context: Context) -> Project: return sushi_test_dbt_context._loaders[0]._load_projects()[0] # type: ignore +@pytest.fixture +def create_empty_project( + copy_to_temp_path: t.Callable[..., t.List[Path]], +) -> t.Iterable[EmptyProjectCreator]: + default_project_name = f"test_{str(uuid.uuid4())[:8]}" + default_target_name = "duckdb" + fixture_path = Path(__file__).parent.parent / "fixtures" / "dbt" / "empty_project" + assert fixture_path.exists() + + current_path = os.getcwd() + + def _create_empty_project( + project_name: t.Optional[str] = None, + target_name: t.Optional[str] = None, + start: t.Optional[str] = None, + ) -> t.Tuple[Path, Path]: + project_name = project_name or default_project_name + target_name = target_name or default_target_name + output_path = copy_to_temp_path(paths=fixture_path)[0] + + dbt_project_yml = output_path / "dbt_project.yml" + profiles_yml = output_path / "profiles.yml" + + assert dbt_project_yml.exists() + assert profiles_yml.exists() + + models_path = output_path / "models" + (models_path).mkdir() + (output_path / "seeds").mkdir() + + dbt_project_yml.write_text( + dbt_project_yml.read_text().replace("empty_project", project_name) + ) + profiles_yml.write_text( + profiles_yml.read_text() + .replace("empty_project", project_name) + .replace("__DEFAULT_TARGET__", target_name) + ) + + init_project_if_required(output_path, start) + + # so that we can invoke commands from the perspective of a user that is already in the correct directory + os.chdir(output_path) + + return output_path, models_path + + yield _create_empty_project + + # cleanup - switch cwd back to original + os.chdir(current_path) + + +@pytest.fixture +def jaffle_shop_duckdb(copy_to_temp_path: t.Callable[..., t.List[Path]]) -> t.Iterable[Path]: + fixture_path = Path(__file__).parent.parent / "fixtures" / "dbt" / "jaffle_shop_duckdb" + assert fixture_path.exists() + + current_path = os.getcwd() + output_path = copy_to_temp_path(paths=fixture_path)[0] + + # so that we can invoke commands from the perspective of a user that is alrady in the correct directory + os.chdir(output_path) + + yield output_path + + os.chdir(current_path) + + +@pytest.fixture +def jaffle_shop_duckdb_context(jaffle_shop_duckdb: Path) -> Context: + init_project_if_required(jaffle_shop_duckdb) + return Context(paths=[jaffle_shop_duckdb]) + + @pytest.fixture() def runtime_renderer() -> t.Callable: def create_renderer(context: DbtContext, **kwargs: t.Any) -> t.Callable: diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index fe226d4926..0e96024aa1 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -91,8 +91,10 @@ def test_update(current: t.Dict[str, t.Any], new: t.Dict[str, t.Any], expected: def test_model_to_sqlmesh_fields(dbt_dummy_postgres_config: PostgresConfig): model_config = ModelConfig( + unique_id="model.package.name", name="name", package_name="package", + fqn=["package", "name"], alias="model", schema="custom", database="database", @@ -123,6 +125,8 @@ def test_model_to_sqlmesh_fields(dbt_dummy_postgres_config: PostgresConfig): assert isinstance(model, SqlModel) assert model.name == "database.custom.model" + assert model.dbt_unique_id == "model.package.name" + assert model.dbt_fqn == "package.name" assert model.description == "test model" assert ( model.render_query_or_raise().sql() @@ -185,7 +189,9 @@ def test_model_to_sqlmesh_fields(dbt_dummy_postgres_config: PostgresConfig): def test_test_to_sqlmesh_fields(): sql = "SELECT * FROM FOO WHERE cost > 100" test_config = TestConfig( + unique_id="test.test_package.foo_test", name="foo_test", + fqn=["test_package", "foo_test"], sql=sql, model_name="Foo", column_name="cost", @@ -199,6 +205,8 @@ def test_test_to_sqlmesh_fields(): audit = test_config.to_sqlmesh(context) assert audit.name == "foo_test" + assert audit.dbt_unique_id == "test.test_package.foo_test" + assert audit.dbt_fqn == "test_package.foo_test" assert audit.dialect == "duckdb" assert not audit.skip assert audit.blocking diff --git a/tests/dbt/test_integration.py b/tests/dbt/test_integration.py index 5a944d55d4..e1f051dbcf 100644 --- a/tests/dbt/test_integration.py +++ b/tests/dbt/test_integration.py @@ -540,3 +540,67 @@ def test_scd_type_2_by_column( ) df_expected = create_df(expected_table_data, self.target_schema) compare_dataframes(df_actual, df_expected, msg=f"Failed on time {time}") + + +def test_dbt_node_info(jaffle_shop_duckdb_context: Context): + ctx = jaffle_shop_duckdb_context + + customers = ctx.models['"jaffle_shop"."main"."customers"'] + assert customers.dbt_unique_id == "model.jaffle_shop.customers" + assert customers.dbt_fqn == "jaffle_shop.customers" + assert customers.dbt_node_info + assert customers.dbt_node_info.name == "customers" + + orders = ctx.models['"jaffle_shop"."main"."orders"'] + assert orders.dbt_unique_id == "model.jaffle_shop.orders" + assert orders.dbt_fqn == "jaffle_shop.orders" + assert orders.dbt_node_info + assert orders.dbt_node_info.name == "orders" + + stg_customers = ctx.models['"jaffle_shop"."main"."stg_customers"'] + assert stg_customers.dbt_unique_id == "model.jaffle_shop.stg_customers" + assert stg_customers.dbt_fqn == "jaffle_shop.staging.stg_customers" + assert stg_customers.dbt_node_info + assert stg_customers.dbt_node_info.name == "stg_customers" + + stg_orders = ctx.models['"jaffle_shop"."main"."stg_orders"'] + assert stg_orders.dbt_unique_id == "model.jaffle_shop.stg_orders" + assert stg_orders.dbt_fqn == "jaffle_shop.staging.stg_orders" + assert stg_orders.dbt_node_info + assert stg_orders.dbt_node_info.name == "stg_orders" + + raw_customers = ctx.models['"jaffle_shop"."main"."raw_customers"'] + assert raw_customers.dbt_unique_id == "seed.jaffle_shop.raw_customers" + assert raw_customers.dbt_fqn == "jaffle_shop.raw_customers" + assert raw_customers.dbt_node_info + assert raw_customers.dbt_node_info.name == "raw_customers" + + raw_orders = ctx.models['"jaffle_shop"."main"."raw_orders"'] + assert raw_orders.dbt_unique_id == "seed.jaffle_shop.raw_orders" + assert raw_orders.dbt_fqn == "jaffle_shop.raw_orders" + assert raw_orders.dbt_node_info + assert raw_orders.dbt_node_info.name == "raw_orders" + + raw_payments = ctx.models['"jaffle_shop"."main"."raw_payments"'] + assert raw_payments.dbt_unique_id == "seed.jaffle_shop.raw_payments" + assert raw_payments.dbt_fqn == "jaffle_shop.raw_payments" + assert raw_payments.dbt_node_info + assert raw_payments.dbt_node_info.name == "raw_payments" + + relationship_audit = ctx.snapshots[ + "relationships_orders_customer_id__customer_id__ref_customers_" + ] + assert relationship_audit.node.is_audit + assert ( + relationship_audit.node.dbt_unique_id + == "test.jaffle_shop.relationships_orders_customer_id__customer_id__ref_customers_.c6ec7f58f2" + ) + assert ( + relationship_audit.node.dbt_fqn + == "jaffle_shop.relationships_orders_customer_id__customer_id__ref_customers_" + ) + assert relationship_audit.node.dbt_node_info + assert ( + relationship_audit.node.dbt_node_info.name + == "relationships_orders_customer_id__customer_id__ref_customers_" + ) diff --git a/tests/dbt/test_model.py b/tests/dbt/test_model.py index 7bcfe98768..a64b29e89d 100644 --- a/tests/dbt/test_model.py +++ b/tests/dbt/test_model.py @@ -1,5 +1,4 @@ import datetime -import typing as t import pytest from pathlib import Path @@ -16,53 +15,11 @@ from sqlmesh.dbt.target import PostgresConfig from sqlmesh.dbt.test import TestConfig from sqlmesh.utils.yaml import YAML +from sqlmesh.utils.date import to_ds pytestmark = pytest.mark.dbt -@pytest.fixture -def create_empty_project(tmp_path: Path) -> t.Callable[[], t.Tuple[Path, Path]]: - def _create_empty_project() -> t.Tuple[Path, Path]: - yaml = YAML() - dbt_project_dir = tmp_path / "dbt" - dbt_project_dir.mkdir() - dbt_model_dir = dbt_project_dir / "models" - dbt_model_dir.mkdir() - dbt_project_config = { - "name": "empty_project", - "version": "1.0.0", - "config-version": 2, - "profile": "test", - "model-paths": ["models"], - } - dbt_project_file = dbt_project_dir / "dbt_project.yml" - with open(dbt_project_file, "w", encoding="utf-8") as f: - YAML().dump(dbt_project_config, f) - sqlmesh_config = { - "model_defaults": { - "start": "2025-01-01", - } - } - sqlmesh_config_file = dbt_project_dir / "sqlmesh.yaml" - with open(sqlmesh_config_file, "w", encoding="utf-8") as f: - YAML().dump(sqlmesh_config, f) - dbt_data_dir = tmp_path / "dbt_data" - dbt_data_dir.mkdir() - dbt_data_file = dbt_data_dir / "local.db" - dbt_profile_config = { - "test": { - "outputs": {"duckdb": {"type": "duckdb", "path": str(dbt_data_file)}}, - "target": "duckdb", - } - } - db_profile_file = dbt_project_dir / "profiles.yml" - with open(db_profile_file, "w", encoding="utf-8") as f: - yaml.dump(dbt_profile_config, f) - return dbt_project_dir, dbt_model_dir - - return _create_empty_project - - def test_test_config_is_standalone_behavior() -> None: """Test that TestConfig.is_standalone correctly identifies tests with cross-model references""" @@ -174,7 +131,7 @@ def test_manifest_filters_standalone_tests_from_models( ) -> None: """Integration test that verifies models only contain non-standalone tests after manifest loading.""" yaml = YAML() - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") # Create two models model1_contents = "SELECT 1 as id" @@ -265,7 +222,7 @@ def test_load_invalid_ref_audit_constraints( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: yaml = YAML() - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it full_model_contents = """{{ config(tags=["blah"], tests=[{"blah": {"to": "ref('completely_ignored')", "field": "blah2"} }]) }} SELECT 1 as cola""" full_model_file = model_dir / "full_model.sql" @@ -332,7 +289,7 @@ def test_load_invalid_ref_audit_constraints( def test_load_microbatch_all_defined( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it microbatch_contents = """ {{ @@ -373,7 +330,7 @@ def test_load_microbatch_all_defined( def test_load_microbatch_all_defined_diff_values( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it microbatch_contents = """ {{ @@ -415,7 +372,7 @@ def test_load_microbatch_all_defined_diff_values( def test_load_microbatch_required_only( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it microbatch_contents = """ {{ @@ -454,7 +411,7 @@ def test_load_microbatch_required_only( def test_load_incremental_time_range_strategy_required_only( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local", start="2025-01-01") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it incremental_time_range_contents = """ {{ @@ -476,7 +433,7 @@ def test_load_incremental_time_range_strategy_required_only( snapshot = context.snapshots[snapshot_fqn] model = snapshot.model # Validate model-level attributes - assert model.start == "2025-01-01" + assert to_ds(model.start or "") == "2025-01-01" assert model.interval_unit.is_day # Validate model kind attributes assert isinstance(model.kind, IncrementalByTimeRangeKind) @@ -496,7 +453,7 @@ def test_load_incremental_time_range_strategy_required_only( def test_load_incremental_time_range_strategy_all_defined( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local", start="2025-01-01") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it incremental_time_range_contents = """ {{ @@ -532,7 +489,7 @@ def test_load_incremental_time_range_strategy_all_defined( snapshot = context.snapshots[snapshot_fqn] model = snapshot.model # Validate model-level attributes - assert model.start == "2025-01-01" + assert to_ds(model.start or "") == "2025-01-01" assert model.interval_unit.is_day # Validate model kind attributes assert isinstance(model.kind, IncrementalByTimeRangeKind) @@ -559,7 +516,7 @@ def test_load_incremental_time_range_strategy_all_defined( def test_load_deprecated_incremental_time_column( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local", start="2025-01-01") # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it incremental_time_range_contents = """ {{ @@ -580,10 +537,10 @@ def test_load_deprecated_incremental_time_column( context = Context(paths=project_dir) model = context.snapshots[snapshot_fqn].model # Validate model-level attributes - assert model.start == "2025-01-01" + assert to_ds(model.start or "") == "2025-01-01" assert model.interval_unit.is_day # Validate model-level attributes - assert model.start == "2025-01-01" + assert to_ds(model.start or "") == "2025-01-01" assert model.interval_unit.is_day # Validate model kind attributes assert isinstance(model.kind, IncrementalByTimeRangeKind) @@ -606,7 +563,7 @@ def test_load_microbatch_with_ref( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: yaml = YAML() - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") source_schema = { "version": 2, "sources": [ @@ -672,7 +629,7 @@ def test_load_microbatch_with_ref_no_filter( tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project ) -> None: yaml = YAML() - project_dir, model_dir = create_empty_project() + project_dir, model_dir = create_empty_project(project_name="local") source_schema = { "version": 2, "sources": [ @@ -749,21 +706,6 @@ def test_load_multiple_snapshots_defined_in_same_file(sushi_test_dbt_context: Co def test_dbt_jinja_macro_undefined_variable_error(create_empty_project): project_dir, model_dir = create_empty_project() - dbt_profile_config = { - "test": { - "outputs": { - "duckdb": { - "type": "duckdb", - "path": str(project_dir.parent / "dbt_data" / "main.db"), - } - }, - "target": "duckdb", - } - } - db_profile_file = project_dir / "profiles.yml" - with open(db_profile_file, "w", encoding="utf-8") as f: - YAML().dump(dbt_profile_config, f) - macros_dir = project_dir / "macros" macros_dir.mkdir() @@ -801,6 +743,8 @@ def test_dbt_jinja_macro_undefined_variable_error(create_empty_project): @pytest.mark.slow def test_node_name_populated_for_dbt_models(dbt_dummy_postgres_config: PostgresConfig) -> None: model_config = ModelConfig( + unique_id="model.test_package.test_model", + fqn=["test_package", "test_model"], name="test_model", package_name="test_package", sql="SELECT 1 as id", @@ -815,7 +759,8 @@ def test_node_name_populated_for_dbt_models(dbt_dummy_postgres_config: PostgresC # check after convert to SQLMesh model that node_name is populated correctly sqlmesh_model = model_config.to_sqlmesh(context) - assert sqlmesh_model.dbt_name == "model.test_package.test_model" + assert sqlmesh_model.dbt_unique_id == "model.test_package.test_model" + assert sqlmesh_model.dbt_fqn == "test_package.test_model" @pytest.mark.slow @@ -872,12 +817,15 @@ def test_load_model_dbt_node_name(tmp_path: Path) -> None: # Verify that node_name is the equivalent dbt one model = context.snapshots[model_fqn].model - assert model.dbt_name == "model.test_project.simple_model" + assert model.dbt_unique_id == "model.test_project.simple_model" + assert model.dbt_fqn == "test_project.simple_model" + assert model.dbt_node_info + assert model.dbt_node_info.name == "simple_model" @pytest.mark.slow -def test_jinja_config_no_query(tmp_path, create_empty_project): - project_dir, model_dir = create_empty_project() +def test_jinja_config_no_query(create_empty_project): + project_dir, model_dir = create_empty_project(project_name="local") # model definition contains only a comment and non-SQL jinja model_contents = "/* comment */ {{ config(materialized='table') }}" diff --git a/tests/dbt/cli/fixtures/empty_project/dbt_project.yml b/tests/fixtures/dbt/empty_project/dbt_project.yml similarity index 94% rename from tests/dbt/cli/fixtures/empty_project/dbt_project.yml rename to tests/fixtures/dbt/empty_project/dbt_project.yml index beceadcd33..dab3d1e0e8 100644 --- a/tests/dbt/cli/fixtures/empty_project/dbt_project.yml +++ b/tests/fixtures/dbt/empty_project/dbt_project.yml @@ -1,7 +1,7 @@ name: 'empty_project' +version: '1.0.0' config-version: 2 -version: '0.1' profile: 'empty_project' diff --git a/tests/dbt/cli/fixtures/empty_project/profiles.yml b/tests/fixtures/dbt/empty_project/profiles.yml similarity index 85% rename from tests/dbt/cli/fixtures/empty_project/profiles.yml rename to tests/fixtures/dbt/empty_project/profiles.yml index a4f9836b7e..b352fc5792 100644 --- a/tests/dbt/cli/fixtures/empty_project/profiles.yml +++ b/tests/fixtures/dbt/empty_project/profiles.yml @@ -6,4 +6,4 @@ empty_project: duckdb: type: duckdb path: 'empty_project.duckdb' - threads: 4 + threads: 4 diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/dbt_project.yml b/tests/fixtures/dbt/jaffle_shop_duckdb/dbt_project.yml similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/dbt_project.yml rename to tests/fixtures/dbt/jaffle_shop_duckdb/dbt_project.yml diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/customers.sql b/tests/fixtures/dbt/jaffle_shop_duckdb/models/customers.sql similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/customers.sql rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/customers.sql diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/docs.md b/tests/fixtures/dbt/jaffle_shop_duckdb/models/docs.md similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/docs.md rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/docs.md diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/orders.sql b/tests/fixtures/dbt/jaffle_shop_duckdb/models/orders.sql similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/orders.sql rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/orders.sql diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/overview.md b/tests/fixtures/dbt/jaffle_shop_duckdb/models/overview.md similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/overview.md rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/overview.md diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/schema.yml b/tests/fixtures/dbt/jaffle_shop_duckdb/models/schema.yml similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/schema.yml rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/schema.yml diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/schema.yml b/tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/schema.yml similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/schema.yml rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/schema.yml diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/stg_customers.sql b/tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/stg_customers.sql similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/stg_customers.sql rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/stg_customers.sql diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/stg_orders.sql b/tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/stg_orders.sql similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/stg_orders.sql rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/stg_orders.sql diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/stg_payments.sql b/tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/stg_payments.sql similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/models/staging/stg_payments.sql rename to tests/fixtures/dbt/jaffle_shop_duckdb/models/staging/stg_payments.sql diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/profiles.yml b/tests/fixtures/dbt/jaffle_shop_duckdb/profiles.yml similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/profiles.yml rename to tests/fixtures/dbt/jaffle_shop_duckdb/profiles.yml diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/.gitkeep b/tests/fixtures/dbt/jaffle_shop_duckdb/seeds/.gitkeep similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/.gitkeep rename to tests/fixtures/dbt/jaffle_shop_duckdb/seeds/.gitkeep diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/raw_customers.csv b/tests/fixtures/dbt/jaffle_shop_duckdb/seeds/raw_customers.csv similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/raw_customers.csv rename to tests/fixtures/dbt/jaffle_shop_duckdb/seeds/raw_customers.csv diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/raw_orders.csv b/tests/fixtures/dbt/jaffle_shop_duckdb/seeds/raw_orders.csv similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/raw_orders.csv rename to tests/fixtures/dbt/jaffle_shop_duckdb/seeds/raw_orders.csv diff --git a/tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/raw_payments.csv b/tests/fixtures/dbt/jaffle_shop_duckdb/seeds/raw_payments.csv similarity index 100% rename from tests/dbt/cli/fixtures/jaffle_shop_duckdb/seeds/raw_payments.csv rename to tests/fixtures/dbt/jaffle_shop_duckdb/seeds/raw_payments.csv