Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion sqlmesh/cli/project_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,15 @@ def _gen_config(
- invalidselectstarexpansion
- noambiguousprojections
""",
ProjectTemplate.DBT: f"""# --- Virtual Data Environment Mode ---
ProjectTemplate.DBT: f"""# --- DBT-specific options ---
dbt:
# This configuration ensures that each dbt target gets its own isolated state.
# The inferred state schemas are named "sqlmesh_state_<profile name>_<target schema>", eg "sqlmesh_state_jaffle_shop_dev"
# If this is undesirable, you may manually configure the gateway to use a specific state schema name
# https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#selecting-a-different-state-connection
infer_state_schema_name: True

# --- Virtual Data Environment Mode ---
# Enable Virtual Data Environments (VDE) for *development* environments.
# Note that the production environment in dbt projects is not virtual by default to maintain compatibility with existing tooling.
# https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
from sqlmesh.core.config.plan import PlanConfig as PlanConfig
from sqlmesh.core.config.root import Config as Config
from sqlmesh.core.config.root import Config as Config, DbtConfig as DbtConfig
from sqlmesh.core.config.run import RunConfig as RunConfig
from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig as BuiltInSchedulerConfig
13 changes: 13 additions & 0 deletions sqlmesh/core/config/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from sqlmesh.core.config.base import BaseConfig


class DbtConfig(BaseConfig):
"""
Represents dbt-specific options on the SQLMesh root config.
These options are only taken into account for dbt projects and are ignored on native projects
"""

infer_state_schema_name: bool = False
"""If set, indicates to the dbt loader that the state schema should be inferred based on the profile/target
so that each target gets its own isolated state"""
5 changes: 5 additions & 0 deletions sqlmesh/core/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,17 @@ def load_config_from_paths(
if dbt_project_file:
from sqlmesh.dbt.loader import sqlmesh_config

infer_state_schema_name = False
if dbt := non_python_config.dbt:
infer_state_schema_name = dbt.infer_state_schema_name

dbt_python_config = sqlmesh_config(
project_root=dbt_project_file.parent,
dbt_profile_name=kwargs.pop("profile", None),
dbt_target_name=kwargs.pop("target", None),
variables=variables,
threads=kwargs.pop("threads", None),
infer_state_schema_name=infer_state_schema_name,
)
if type(dbt_python_config) != config_type:
dbt_python_config = convert_config_type(dbt_python_config, config_type)
Expand Down
3 changes: 3 additions & 0 deletions sqlmesh/core/config/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
from sqlmesh.core.config.plan import PlanConfig
from sqlmesh.core.config.run import RunConfig
from sqlmesh.core.config.dbt import DbtConfig
from sqlmesh.core.config.scheduler import (
BuiltInSchedulerConfig,
SchedulerConfig,
Expand Down Expand Up @@ -173,6 +174,7 @@ class Config(BaseConfig):
linter: LinterConfig = LinterConfig()
janitor: JanitorConfig = JanitorConfig()
cache_dir: t.Optional[str] = None
dbt: t.Optional[DbtConfig] = None

_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
"gateways": UpdateStrategy.NESTED_UPDATE,
Expand All @@ -191,6 +193,7 @@ class Config(BaseConfig):
"before_all": UpdateStrategy.EXTEND,
"after_all": UpdateStrategy.EXTEND,
"linter": UpdateStrategy.NESTED_UPDATE,
"dbt": UpdateStrategy.NESTED_UPDATE,
}

_connection_config_validator = connection_config_validator
Expand Down
26 changes: 26 additions & 0 deletions sqlmesh/dbt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ConnectionConfig,
GatewayConfig,
ModelDefaultsConfig,
DbtConfig as RootDbtConfig,
)
from sqlmesh.core.environment import EnvironmentStatements
from sqlmesh.core.loader import CacheBase, LoadedProject, Loader
Expand Down Expand Up @@ -51,6 +52,7 @@ def sqlmesh_config(
variables: t.Optional[t.Dict[str, t.Any]] = None,
threads: t.Optional[int] = None,
register_comments: t.Optional[bool] = None,
infer_state_schema_name: bool = False,
**kwargs: t.Any,
) -> Config:
project_root = project_root or Path()
Expand All @@ -72,16 +74,40 @@ def sqlmesh_config(
# the to_sqlmesh() function on TargetConfig maps self.threads -> concurrent_tasks
profile.target.threads = threads

gateway_kwargs = {}
if infer_state_schema_name:
profile_name = context.profile_name

# Note: we deliberately isolate state based on the target *schema* and not the target name.
# It is assumed that the project will define a target, eg 'dev', and then in each users own ~/.dbt/profiles.yml the schema
# for the 'dev' target is overriden to something user-specific, rather than making the target name itself user-specific.
# This means that the schema name is the indicator of isolated state, not the target name which may be re-used across multiple schemas.
target_schema = profile.target.schema_

# dbt-core doesnt allow schema to be undefined, but it does allow an empty string, and then just
# fails at runtime when `CREATE SCHEMA ""` doesnt work
if not target_schema:
raise ConfigError(
f"Target '{profile.target_name}' does not specify a schema.\n"
"A schema is required in order to infer where to store SQLMesh state"
)

inferred_state_schema_name = f"sqlmesh_state_{profile_name}_{target_schema}"
logger.info("Inferring state schema: %s", inferred_state_schema_name)
gateway_kwargs["state_schema"] = inferred_state_schema_name

return Config(
loader=loader,
model_defaults=model_defaults,
variables=variables or {},
dbt=RootDbtConfig(infer_state_schema_name=infer_state_schema_name),
**{
"default_gateway": profile.target_name if "gateways" not in kwargs else "",
"gateways": {
profile.target_name: GatewayConfig(
connection=profile.target.to_sqlmesh(**target_to_sqlmesh_args),
state_connection=state_connection,
**gateway_kwargs,
)
}, # type: ignore
**kwargs,
Expand Down
38 changes: 37 additions & 1 deletion tests/dbt/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sqlmesh.core.dialect import jinja_query
from sqlmesh.core.model import SqlModel
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange
from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync
from sqlmesh.dbt.builtin import Api
from sqlmesh.dbt.column import ColumnConfig
from sqlmesh.dbt.common import Dependencies
Expand Down Expand Up @@ -46,7 +47,8 @@
)
from sqlmesh.dbt.test import TestConfig
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.yaml import load as yaml_load
from sqlmesh.utils.yaml import load as yaml_load, dump as yaml_dump
from tests.dbt.conftest import EmptyProjectCreator

pytestmark = pytest.mark.dbt

Expand Down Expand Up @@ -1211,3 +1213,37 @@ def test_empty_vars_config(tmp_path):
# Verify the variables are empty (not causing any issues)
assert project.packages["test_empty_vars"].variables == {}
assert project.context.variables == {}


def test_infer_state_schema_name(create_empty_project: EmptyProjectCreator):
project_dir, _ = create_empty_project("test_foo", "dev")

# infer_state_schema_name defaults to False if omitted
config = sqlmesh_config(project_root=project_dir)
assert config.dbt
assert not config.dbt.infer_state_schema_name
assert config.get_state_schema() == "sqlmesh"

# create_empty_project() uses the default dbt template for sqlmesh yaml config which
# sets infer_state_schema_name=True
ctx = Context(paths=[project_dir])
assert ctx.config.dbt
assert ctx.config.dbt.infer_state_schema_name
assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_main"
assert isinstance(ctx.state_sync, CachingStateSync)
assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync)
assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_main"

# If the user delberately overrides state_schema then we should respect this choice
config_file = project_dir / "sqlmesh.yaml"
config_yaml = yaml_load(config_file)
config_yaml["gateways"] = {"dev": {"state_schema": "state_override"}}
config_file.write_text(yaml_dump(config_yaml))

ctx = Context(paths=[project_dir])
assert ctx.config.dbt
assert ctx.config.dbt.infer_state_schema_name
assert ctx.config.get_state_schema() == "state_override"
assert isinstance(ctx.state_sync, CachingStateSync)
assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync)
assert ctx.state_sync.state_sync.schema == "state_override"
50 changes: 49 additions & 1 deletion tests/dbt/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from sqlmesh.core.config.connection import DuckDBConnectionConfig
from sqlmesh.core.engine_adapter import DuckDBEngineAdapter
from sqlmesh.utils.pandas import columns_to_types_from_df
from sqlmesh.utils.yaml import YAML
from sqlmesh.utils.yaml import YAML, load as yaml_load, dump as yaml_dump
from sqlmesh_dbt.operations import init_project_if_required
from tests.utils.pandas import compare_dataframes, create_df

# Some developers had issues with this test freezing locally so we mark it as cicdonly
Expand Down Expand Up @@ -604,3 +605,50 @@ def test_dbt_node_info(jaffle_shop_duckdb_context: Context):
relationship_audit.node.dbt_node_info.name
== "relationships_orders_customer_id__customer_id__ref_customers_"
)


def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path):
profiles_file = jaffle_shop_duckdb / "profiles.yml"

profiles_yml = yaml_load(profiles_file)

# make prod / dev config identical with the exception of a different default schema to simulate using the same warehouse
profiles_yml["jaffle_shop"]["outputs"]["prod"] = {
**profiles_yml["jaffle_shop"]["outputs"]["dev"]
}
profiles_yml["jaffle_shop"]["outputs"]["prod"]["schema"] = "prod_schema"
profiles_yml["jaffle_shop"]["outputs"]["dev"]["schema"] = "dev_schema"

profiles_file.write_text(yaml_dump(profiles_yml))

init_project_if_required(jaffle_shop_duckdb)

# start off with the prod target
prod_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "prod"})
assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod_schema"
assert all("prod_schema" in fqn for fqn in prod_ctx.models)
assert prod_ctx.plan(auto_apply=True).has_changes
assert not prod_ctx.plan(auto_apply=True).has_changes

# dev target should have changes - new state separate from prod
dev_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "dev"})
assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema"
assert all("dev_schema" in fqn for fqn in dev_ctx.models)
assert dev_ctx.plan(auto_apply=True).has_changes
assert not dev_ctx.plan(auto_apply=True).has_changes

# no explicitly specified target should use dev because that's what's set for the default in the profiles.yml
assert profiles_yml["jaffle_shop"]["target"] == "dev"
default_ctx = Context(paths=[jaffle_shop_duckdb])
assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema"
assert all("dev_schema" in fqn for fqn in default_ctx.models)
assert not default_ctx.plan(auto_apply=True).has_changes

# an explicit state schema override set in `sqlmesh.yaml` should use that
sqlmesh_yaml_file = jaffle_shop_duckdb / "sqlmesh.yaml"
sqlmesh_yaml = yaml_load(sqlmesh_yaml_file)
sqlmesh_yaml["gateways"] = {"dev": {"state_schema": "sqlmesh_dev_state_override"}}
sqlmesh_yaml_file.write_text(yaml_dump(sqlmesh_yaml))
default_ctx = Context(paths=[jaffle_shop_duckdb])
assert default_ctx.config.get_state_schema() == "sqlmesh_dev_state_override"
assert all("dev_schema" in fqn for fqn in default_ctx.models)
6 changes: 5 additions & 1 deletion tests/fixtures/dbt/empty_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ empty_project:
target: __DEFAULT_TARGET__

outputs:
duckdb:
__DEFAULT_TARGET__:
type: duckdb
# database is required for dbt < 1.5 where our adapter deliberately doesnt infer the database from the path and
# defaults it to "main", which raises a "project catalog doesnt match context catalog" error
# ref: https://github.com/TobikoData/sqlmesh/pull/1109
database: empty_project
path: 'empty_project.duckdb'
threads: 4