diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index 6b4f6c7a83..e3132a6de3 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -116,7 +116,15 @@ def _gen_config( - invalidselectstarexpansion - noambiguousprojections """, - ProjectTemplate.DBT: f"""# --- Virtual Data Environment Mode --- + ProjectTemplate.DBT: f"""# --- DBT-specific options --- +dbt: + # This configuration ensures that each dbt target gets its own isolated state. + # The inferred state schemas are named "sqlmesh_state__", eg "sqlmesh_state_jaffle_shop_dev" + # If this is undesirable, you may manually configure the gateway to use a specific state schema name + # https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#selecting-a-different-state-connection + infer_state_schema_name: True + +# --- Virtual Data Environment Mode --- # Enable Virtual Data Environments (VDE) for *development* environments. # Note that the production environment in dbt projects is not virtual by default to maintain compatibility with existing tooling. # https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index 0dc99c0fd1..42ed82c6e6 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -36,6 +36,6 @@ from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig from sqlmesh.core.config.linter import LinterConfig as LinterConfig from sqlmesh.core.config.plan import PlanConfig as PlanConfig -from sqlmesh.core.config.root import Config as Config +from sqlmesh.core.config.root import Config as Config, DbtConfig as DbtConfig from sqlmesh.core.config.run import RunConfig as RunConfig from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig as BuiltInSchedulerConfig diff --git a/sqlmesh/core/config/dbt.py b/sqlmesh/core/config/dbt.py new file mode 100644 index 0000000000..e3132c40a4 --- /dev/null +++ b/sqlmesh/core/config/dbt.py @@ -0,0 +1,13 @@ +from sqlmesh.core.config.base import BaseConfig + + +class DbtConfig(BaseConfig): + """ + Represents dbt-specific options on the SQLMesh root config. + + These options are only taken into account for dbt projects and are ignored on native projects + """ + + infer_state_schema_name: bool = False + """If set, indicates to the dbt loader that the state schema should be inferred based on the profile/target + so that each target gets its own isolated state""" diff --git a/sqlmesh/core/config/loader.py b/sqlmesh/core/config/loader.py index 2d202cb276..e05c148b90 100644 --- a/sqlmesh/core/config/loader.py +++ b/sqlmesh/core/config/loader.py @@ -172,12 +172,17 @@ def load_config_from_paths( if dbt_project_file: from sqlmesh.dbt.loader import sqlmesh_config + infer_state_schema_name = False + if dbt := non_python_config.dbt: + infer_state_schema_name = dbt.infer_state_schema_name + dbt_python_config = sqlmesh_config( project_root=dbt_project_file.parent, dbt_profile_name=kwargs.pop("profile", None), dbt_target_name=kwargs.pop("target", None), variables=variables, threads=kwargs.pop("threads", None), + infer_state_schema_name=infer_state_schema_name, ) if type(dbt_python_config) != config_type: dbt_python_config = convert_config_type(dbt_python_config, config_type) diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 9b6fae63e3..211d271b01 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -36,6 +36,7 @@ from sqlmesh.core.config.linter import LinterConfig as LinterConfig from sqlmesh.core.config.plan import PlanConfig from sqlmesh.core.config.run import RunConfig +from sqlmesh.core.config.dbt import DbtConfig from sqlmesh.core.config.scheduler import ( BuiltInSchedulerConfig, SchedulerConfig, @@ -173,6 +174,7 @@ class Config(BaseConfig): linter: LinterConfig = LinterConfig() janitor: JanitorConfig = JanitorConfig() cache_dir: t.Optional[str] = None + dbt: t.Optional[DbtConfig] = None _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { "gateways": UpdateStrategy.NESTED_UPDATE, @@ -191,6 +193,7 @@ class Config(BaseConfig): "before_all": UpdateStrategy.EXTEND, "after_all": UpdateStrategy.EXTEND, "linter": UpdateStrategy.NESTED_UPDATE, + "dbt": UpdateStrategy.NESTED_UPDATE, } _connection_config_validator = connection_config_validator diff --git a/sqlmesh/dbt/loader.py b/sqlmesh/dbt/loader.py index 39973776a8..049c761ed1 100644 --- a/sqlmesh/dbt/loader.py +++ b/sqlmesh/dbt/loader.py @@ -11,6 +11,7 @@ ConnectionConfig, GatewayConfig, ModelDefaultsConfig, + DbtConfig as RootDbtConfig, ) from sqlmesh.core.environment import EnvironmentStatements from sqlmesh.core.loader import CacheBase, LoadedProject, Loader @@ -51,6 +52,7 @@ def sqlmesh_config( variables: t.Optional[t.Dict[str, t.Any]] = None, threads: t.Optional[int] = None, register_comments: t.Optional[bool] = None, + infer_state_schema_name: bool = False, **kwargs: t.Any, ) -> Config: project_root = project_root or Path() @@ -72,16 +74,40 @@ def sqlmesh_config( # the to_sqlmesh() function on TargetConfig maps self.threads -> concurrent_tasks profile.target.threads = threads + gateway_kwargs = {} + if infer_state_schema_name: + profile_name = context.profile_name + + # Note: we deliberately isolate state based on the target *schema* and not the target name. + # It is assumed that the project will define a target, eg 'dev', and then in each users own ~/.dbt/profiles.yml the schema + # for the 'dev' target is overriden to something user-specific, rather than making the target name itself user-specific. + # This means that the schema name is the indicator of isolated state, not the target name which may be re-used across multiple schemas. + target_schema = profile.target.schema_ + + # dbt-core doesnt allow schema to be undefined, but it does allow an empty string, and then just + # fails at runtime when `CREATE SCHEMA ""` doesnt work + if not target_schema: + raise ConfigError( + f"Target '{profile.target_name}' does not specify a schema.\n" + "A schema is required in order to infer where to store SQLMesh state" + ) + + inferred_state_schema_name = f"sqlmesh_state_{profile_name}_{target_schema}" + logger.info("Inferring state schema: %s", inferred_state_schema_name) + gateway_kwargs["state_schema"] = inferred_state_schema_name + return Config( loader=loader, model_defaults=model_defaults, variables=variables or {}, + dbt=RootDbtConfig(infer_state_schema_name=infer_state_schema_name), **{ "default_gateway": profile.target_name if "gateways" not in kwargs else "", "gateways": { profile.target_name: GatewayConfig( connection=profile.target.to_sqlmesh(**target_to_sqlmesh_args), state_connection=state_connection, + **gateway_kwargs, ) }, # type: ignore **kwargs, diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index b3ee0c422a..5dccd90ed2 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -15,6 +15,7 @@ from sqlmesh.core.dialect import jinja_query from sqlmesh.core.model import SqlModel from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange +from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync from sqlmesh.dbt.builtin import Api from sqlmesh.dbt.column import ColumnConfig from sqlmesh.dbt.common import Dependencies @@ -46,7 +47,8 @@ ) from sqlmesh.dbt.test import TestConfig from sqlmesh.utils.errors import ConfigError -from sqlmesh.utils.yaml import load as yaml_load +from sqlmesh.utils.yaml import load as yaml_load, dump as yaml_dump +from tests.dbt.conftest import EmptyProjectCreator pytestmark = pytest.mark.dbt @@ -1211,3 +1213,37 @@ def test_empty_vars_config(tmp_path): # Verify the variables are empty (not causing any issues) assert project.packages["test_empty_vars"].variables == {} assert project.context.variables == {} + + +def test_infer_state_schema_name(create_empty_project: EmptyProjectCreator): + project_dir, _ = create_empty_project("test_foo", "dev") + + # infer_state_schema_name defaults to False if omitted + config = sqlmesh_config(project_root=project_dir) + assert config.dbt + assert not config.dbt.infer_state_schema_name + assert config.get_state_schema() == "sqlmesh" + + # create_empty_project() uses the default dbt template for sqlmesh yaml config which + # sets infer_state_schema_name=True + ctx = Context(paths=[project_dir]) + assert ctx.config.dbt + assert ctx.config.dbt.infer_state_schema_name + assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_main" + assert isinstance(ctx.state_sync, CachingStateSync) + assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync) + assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_main" + + # If the user delberately overrides state_schema then we should respect this choice + config_file = project_dir / "sqlmesh.yaml" + config_yaml = yaml_load(config_file) + config_yaml["gateways"] = {"dev": {"state_schema": "state_override"}} + config_file.write_text(yaml_dump(config_yaml)) + + ctx = Context(paths=[project_dir]) + assert ctx.config.dbt + assert ctx.config.dbt.infer_state_schema_name + assert ctx.config.get_state_schema() == "state_override" + assert isinstance(ctx.state_sync, CachingStateSync) + assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync) + assert ctx.state_sync.state_sync.schema == "state_override" diff --git a/tests/dbt/test_integration.py b/tests/dbt/test_integration.py index e1f051dbcf..ab22bf7826 100644 --- a/tests/dbt/test_integration.py +++ b/tests/dbt/test_integration.py @@ -19,7 +19,8 @@ from sqlmesh.core.config.connection import DuckDBConnectionConfig from sqlmesh.core.engine_adapter import DuckDBEngineAdapter from sqlmesh.utils.pandas import columns_to_types_from_df -from sqlmesh.utils.yaml import YAML +from sqlmesh.utils.yaml import YAML, load as yaml_load, dump as yaml_dump +from sqlmesh_dbt.operations import init_project_if_required from tests.utils.pandas import compare_dataframes, create_df # Some developers had issues with this test freezing locally so we mark it as cicdonly @@ -604,3 +605,50 @@ def test_dbt_node_info(jaffle_shop_duckdb_context: Context): relationship_audit.node.dbt_node_info.name == "relationships_orders_customer_id__customer_id__ref_customers_" ) + + +def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path): + profiles_file = jaffle_shop_duckdb / "profiles.yml" + + profiles_yml = yaml_load(profiles_file) + + # make prod / dev config identical with the exception of a different default schema to simulate using the same warehouse + profiles_yml["jaffle_shop"]["outputs"]["prod"] = { + **profiles_yml["jaffle_shop"]["outputs"]["dev"] + } + profiles_yml["jaffle_shop"]["outputs"]["prod"]["schema"] = "prod_schema" + profiles_yml["jaffle_shop"]["outputs"]["dev"]["schema"] = "dev_schema" + + profiles_file.write_text(yaml_dump(profiles_yml)) + + init_project_if_required(jaffle_shop_duckdb) + + # start off with the prod target + prod_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "prod"}) + assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod_schema" + assert all("prod_schema" in fqn for fqn in prod_ctx.models) + assert prod_ctx.plan(auto_apply=True).has_changes + assert not prod_ctx.plan(auto_apply=True).has_changes + + # dev target should have changes - new state separate from prod + dev_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "dev"}) + assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema" + assert all("dev_schema" in fqn for fqn in dev_ctx.models) + assert dev_ctx.plan(auto_apply=True).has_changes + assert not dev_ctx.plan(auto_apply=True).has_changes + + # no explicitly specified target should use dev because that's what's set for the default in the profiles.yml + assert profiles_yml["jaffle_shop"]["target"] == "dev" + default_ctx = Context(paths=[jaffle_shop_duckdb]) + assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema" + assert all("dev_schema" in fqn for fqn in default_ctx.models) + assert not default_ctx.plan(auto_apply=True).has_changes + + # an explicit state schema override set in `sqlmesh.yaml` should use that + sqlmesh_yaml_file = jaffle_shop_duckdb / "sqlmesh.yaml" + sqlmesh_yaml = yaml_load(sqlmesh_yaml_file) + sqlmesh_yaml["gateways"] = {"dev": {"state_schema": "sqlmesh_dev_state_override"}} + sqlmesh_yaml_file.write_text(yaml_dump(sqlmesh_yaml)) + default_ctx = Context(paths=[jaffle_shop_duckdb]) + assert default_ctx.config.get_state_schema() == "sqlmesh_dev_state_override" + assert all("dev_schema" in fqn for fqn in default_ctx.models) diff --git a/tests/fixtures/dbt/empty_project/profiles.yml b/tests/fixtures/dbt/empty_project/profiles.yml index b352fc5792..adae09e9c6 100644 --- a/tests/fixtures/dbt/empty_project/profiles.yml +++ b/tests/fixtures/dbt/empty_project/profiles.yml @@ -3,7 +3,11 @@ empty_project: target: __DEFAULT_TARGET__ outputs: - duckdb: + __DEFAULT_TARGET__: type: duckdb + # database is required for dbt < 1.5 where our adapter deliberately doesnt infer the database from the path and + # defaults it to "main", which raises a "project catalog doesnt match context catalog" error + # ref: https://github.com/TobikoData/sqlmesh/pull/1109 + database: empty_project path: 'empty_project.duckdb' threads: 4