From 39fabe123db789b72bb47305905824dd47f4cc06 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Mon, 6 Oct 2025 03:10:58 +0000 Subject: [PATCH 1/5] Feat: Add 'state_schema_naming_pattern' to infer the state schema per dbt target --- sqlmesh/cli/project_init.py | 8 +++- sqlmesh/core/config/__init__.py | 2 +- sqlmesh/core/config/base.py | 14 +++++++ sqlmesh/core/config/gateway.py | 3 +- sqlmesh/core/config/root.py | 33 +++++++++++++-- sqlmesh/dbt/loader.py | 10 +++++ tests/dbt/test_config.py | 36 +++++++++++++++- tests/dbt/test_integration.py | 41 ++++++++++++++++++- tests/fixtures/dbt/empty_project/profiles.yml | 2 +- 9 files changed, 139 insertions(+), 10 deletions(-) diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index 6b4f6c7a83..b864d02ec3 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -116,7 +116,13 @@ def _gen_config( - invalidselectstarexpansion - noambiguousprojections """, - ProjectTemplate.DBT: f"""# --- Virtual Data Environment Mode --- + ProjectTemplate.DBT: f"""# --- State --- +# This default configuration ensures that each dbt target gets its own isolated state. +# If this is undesirable, you may configure the state connection manually. +# https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/?h=dbt#selecting-a-different-state-connection +state_schema_naming_pattern: sqlmesh_state_@{{dbt_profile_name}}_@{{dbt_target_name}} + +# --- Virtual Data Environment Mode --- # Enable Virtual Data Environments (VDE) for *development* environments. # Note that the production environment in dbt projects is not virtual by default to maintain compatibility with existing tooling. # https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index 0dc99c0fd1..e829ff7df2 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -36,6 +36,6 @@ from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig from sqlmesh.core.config.linter import LinterConfig as LinterConfig from sqlmesh.core.config.plan import PlanConfig as PlanConfig -from sqlmesh.core.config.root import Config as Config +from sqlmesh.core.config.root import Config as Config, DbtConfigInfo as DbtConfigInfo from sqlmesh.core.config.run import RunConfig as RunConfig from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig as BuiltInSchedulerConfig diff --git a/sqlmesh/core/config/base.py b/sqlmesh/core/config/base.py index 0da36e4754..1aaf8da4db 100644 --- a/sqlmesh/core/config/base.py +++ b/sqlmesh/core/config/base.py @@ -140,3 +140,17 @@ def update_with(self: T, other: t.Union[t.Dict[str, t.Any], T]) -> T: setattr(updated, field, value) return updated + + +class DbtConfigInfo(PydanticModel): + """ + This is like DbtNodeInfo except it applies to config instead of DAG nodes. + + It's intended to capture information from a dbt project loaded by the DbtLoader so that it can be used for things like + variable substitutions in regular project config. + """ + + profile_name: str + """Which profile in the dbt project is being used""" + target_name: str + """Which target of the specified profile is being used""" diff --git a/sqlmesh/core/config/gateway.py b/sqlmesh/core/config/gateway.py index a51557c4d7..05551a6089 100644 --- a/sqlmesh/core/config/gateway.py +++ b/sqlmesh/core/config/gateway.py @@ -2,7 +2,6 @@ import typing as t -from sqlmesh.core import constants as c from sqlmesh.core.config.base import BaseConfig from sqlmesh.core.config.model import ModelDefaultsConfig from sqlmesh.core.config.common import variables_validator @@ -33,7 +32,7 @@ class GatewayConfig(BaseConfig): state_connection: t.Optional[SerializableConnectionConfig] = None test_connection: t.Optional[SerializableConnectionConfig] = None scheduler: t.Optional[SchedulerConfig] = None - state_schema: t.Optional[str] = c.SQLMESH + state_schema: t.Optional[str] = None variables: t.Dict[str, t.Any] = {} model_defaults: t.Optional[ModelDefaultsConfig] = None diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 9b6fae63e3..4ce1568b85 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -4,6 +4,7 @@ import re import typing as t import zlib +import logging from pydantic import Field from pydantic.functional_validators import BeforeValidator @@ -19,7 +20,7 @@ TableNamingConvention, VirtualEnvironmentMode, ) -from sqlmesh.core.config.base import BaseConfig, UpdateStrategy +from sqlmesh.core.config.base import BaseConfig, UpdateStrategy, DbtConfigInfo from sqlmesh.core.config.common import variables_validator, compile_regex_mapping from sqlmesh.core.config.connection import ( ConnectionConfig, @@ -49,6 +50,8 @@ from sqlmesh.utils.errors import ConfigError from sqlmesh.utils.pydantic import model_validator +logger = logging.getLogger(__name__) + def validate_no_past_ttl(v: str) -> str: current_time = now() @@ -96,6 +99,8 @@ class Config(BaseConfig): default_test_connection: The default connection to use for tests if one is not specified in a gateway. default_scheduler: The default scheduler configuration to use if one is not specified in a gateway. default_gateway: The default gateway. + state_schema_naming_pattern: A pattern supporting variable substitutions to determine the state schema name, rather than just using 'sqlmesh'. + Only applies when the state schema is not explicitly set in the gateway config notification_targets: The notification targets to use. project: The project name of this config. Used for multi-repo setups. snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted. @@ -128,6 +133,7 @@ class Config(BaseConfig): before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands. after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands. cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder. + dbt_config_info: Dbt-specific properties (such as profile and target) for dbt projects loaded by the dbt loader """ gateways: GatewayDict = {"": GatewayConfig()} @@ -137,6 +143,7 @@ class Config(BaseConfig): ) default_scheduler: SchedulerConfig = BuiltInSchedulerConfig() default_gateway: str = "" + state_schema_naming_pattern: t.Optional[str] = None notification_targets: t.List[NotificationTarget] = [] project: str = "" snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL @@ -173,6 +180,7 @@ class Config(BaseConfig): linter: LinterConfig = LinterConfig() janitor: JanitorConfig = JanitorConfig() cache_dir: t.Optional[str] = None + dbt_config_info: t.Optional[DbtConfigInfo] = None _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { "gateways": UpdateStrategy.NESTED_UPDATE, @@ -344,8 +352,27 @@ def get_test_connection( def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig: return self.get_gateway(gateway_name).scheduler or self.default_scheduler - def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]: - return self.get_gateway(gateway_name).state_schema + def get_state_schema(self, gateway_name: t.Optional[str] = None) -> str: + state_schema = self.get_gateway(gateway_name).state_schema + + if state_schema is None and self.state_schema_naming_pattern: + substitutions = {} + if dbt := self.dbt_config_info: + # TODO: keeping this simple for now rather than trying to set up a Jinja or SQLMesh Macro rendering context + substitutions.update( + { + "@{dbt_profile_name}": dbt.profile_name, + # TODO @iaroslav: what was the problem with using target name instead of the default schema name again? + "@{dbt_target_name}": dbt.target_name, + } + ) + state_schema = self.state_schema_naming_pattern + for pattern, value in substitutions.items(): + state_schema = state_schema.replace(pattern, value) + + logger.info("Inferring state schema: %s", state_schema) + + return state_schema or c.SQLMESH @property def default_gateway_name(self) -> str: diff --git a/sqlmesh/dbt/loader.py b/sqlmesh/dbt/loader.py index 39973776a8..1386b9d9ab 100644 --- a/sqlmesh/dbt/loader.py +++ b/sqlmesh/dbt/loader.py @@ -11,6 +11,7 @@ ConnectionConfig, GatewayConfig, ModelDefaultsConfig, + DbtConfigInfo, ) from sqlmesh.core.environment import EnvironmentStatements from sqlmesh.core.loader import CacheBase, LoadedProject, Loader @@ -71,11 +72,20 @@ def sqlmesh_config( if threads is not None: # the to_sqlmesh() function on TargetConfig maps self.threads -> concurrent_tasks profile.target.threads = threads + + if context.profile_name is None: + # Note: Profile.load() mutates `context` and will have already raised an exception if profile_name is not set, + # but mypy doesnt know this because the field is defined as t.Optional[str] + raise ConfigError(f"profile name must be set") return Config( loader=loader, model_defaults=model_defaults, variables=variables or {}, + dbt_config_info=DbtConfigInfo( + profile_name=dbt_profile_name or context.profile_name, + target_name=dbt_target_name or profile.target_name, + ), **{ "default_gateway": profile.target_name if "gateways" not in kwargs else "", "gateways": { diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index b3ee0c422a..82bc7fca88 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -15,6 +15,7 @@ from sqlmesh.core.dialect import jinja_query from sqlmesh.core.model import SqlModel from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange +from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync from sqlmesh.dbt.builtin import Api from sqlmesh.dbt.column import ColumnConfig from sqlmesh.dbt.common import Dependencies @@ -46,7 +47,8 @@ ) from sqlmesh.dbt.test import TestConfig from sqlmesh.utils.errors import ConfigError -from sqlmesh.utils.yaml import load as yaml_load +from sqlmesh.utils.yaml import load as yaml_load, dump as yaml_dump +from tests.dbt.conftest import EmptyProjectCreator pytestmark = pytest.mark.dbt @@ -1211,3 +1213,35 @@ def test_empty_vars_config(tmp_path): # Verify the variables are empty (not causing any issues) assert project.packages["test_empty_vars"].variables == {} assert project.context.variables == {} + + +def test_state_schema_naming_pattern(create_empty_project: EmptyProjectCreator): + project_dir, _ = create_empty_project("test_foo", "dev") + + # no state_schema_naming_pattern, creating python config manually doesnt take into account + # any config yaml files that may be present, so we get the default state schema + config = sqlmesh_config(project_root=project_dir) + assert not config.state_schema_naming_pattern + assert config.get_state_schema() == "sqlmesh" + + # create_empty_project() uses the default dbt template for sqlmesh yaml config which + # sets state_schema_naming_pattern + ctx = Context(paths=[project_dir]) + assert ctx.config.state_schema_naming_pattern + assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_dev" + assert isinstance(ctx.state_sync, CachingStateSync) + assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync) + assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_dev" + + # If the user delberately overrides state_schema then we should respect this choice + config_file = project_dir / "sqlmesh.yaml" + config_yaml = yaml_load(config_file) + config_yaml["gateways"] = {"dev": {"state_schema": "state_override"}} + config_file.write_text(yaml_dump(config_yaml)) + + ctx = Context(paths=[project_dir]) + assert ctx.config.state_schema_naming_pattern + assert ctx.config.get_state_schema() == "state_override" + assert isinstance(ctx.state_sync, CachingStateSync) + assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync) + assert ctx.state_sync.state_sync.schema == "state_override" diff --git a/tests/dbt/test_integration.py b/tests/dbt/test_integration.py index e1f051dbcf..2354c9d609 100644 --- a/tests/dbt/test_integration.py +++ b/tests/dbt/test_integration.py @@ -19,7 +19,8 @@ from sqlmesh.core.config.connection import DuckDBConnectionConfig from sqlmesh.core.engine_adapter import DuckDBEngineAdapter from sqlmesh.utils.pandas import columns_to_types_from_df -from sqlmesh.utils.yaml import YAML +from sqlmesh.utils.yaml import YAML, load as yaml_load, dump as yaml_dump +from sqlmesh_dbt.operations import init_project_if_required from tests.utils.pandas import compare_dataframes, create_df # Some developers had issues with this test freezing locally so we mark it as cicdonly @@ -604,3 +605,41 @@ def test_dbt_node_info(jaffle_shop_duckdb_context: Context): relationship_audit.node.dbt_node_info.name == "relationships_orders_customer_id__customer_id__ref_customers_" ) + + +def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path): + profiles_file = jaffle_shop_duckdb / "profiles.yml" + + profiles_yml = yaml_load(profiles_file) + + # make prod / dev config identical with the exception of a different default schema to simulate using the same warehouse + profiles_yml["jaffle_shop"]["outputs"]["prod"] = { + **profiles_yml["jaffle_shop"]["outputs"]["dev"] + } + profiles_yml["jaffle_shop"]["outputs"]["prod"]["schema"] = "prod_schema" + profiles_yml["jaffle_shop"]["outputs"]["dev"]["schema"] = "dev_schema" + + profiles_file.write_text(yaml_dump(profiles_yml)) + + init_project_if_required(jaffle_shop_duckdb) + + # start off with the prod target + prod_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "prod"}) + assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod" + assert all("prod_schema" in fqn for fqn in prod_ctx.models) + assert prod_ctx.plan(auto_apply=True).has_changes + assert not prod_ctx.plan(auto_apply=True).has_changes + + # dev target should have changes - new state separate from prod + dev_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "dev"}) + assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev" + assert all("dev_schema" in fqn for fqn in dev_ctx.models) + assert dev_ctx.plan(auto_apply=True).has_changes + assert not dev_ctx.plan(auto_apply=True).has_changes + + # no explicitly specified target should use dev because that's what's set for the default in the profiles.yml + assert profiles_yml["jaffle_shop"]["target"] == "dev" + default_ctx = Context(paths=[jaffle_shop_duckdb]) + assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev" + assert all("dev_schema" in fqn for fqn in default_ctx.models) + assert not default_ctx.plan(auto_apply=True).has_changes diff --git a/tests/fixtures/dbt/empty_project/profiles.yml b/tests/fixtures/dbt/empty_project/profiles.yml index b352fc5792..83ffab163b 100644 --- a/tests/fixtures/dbt/empty_project/profiles.yml +++ b/tests/fixtures/dbt/empty_project/profiles.yml @@ -3,7 +3,7 @@ empty_project: target: __DEFAULT_TARGET__ outputs: - duckdb: + __DEFAULT_TARGET__: type: duckdb path: 'empty_project.duckdb' threads: 4 From 591b81b614f350436ec213ac05cc2270935d72d0 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 7 Oct 2025 00:54:33 +0000 Subject: [PATCH 2/5] PR feedback --- sqlmesh/cli/project_init.py | 12 +++++++----- sqlmesh/core/config/__init__.py | 2 +- sqlmesh/core/config/base.py | 14 -------------- sqlmesh/core/config/dbt.py | 13 +++++++++++++ sqlmesh/core/config/gateway.py | 3 ++- sqlmesh/core/config/loader.py | 5 +++++ sqlmesh/core/config/root.py | 33 ++++++--------------------------- sqlmesh/dbt/loader.py | 22 +++++++++++++--------- tests/dbt/test_config.py | 20 +++++++++++--------- tests/dbt/test_integration.py | 15 ++++++++++++--- 10 files changed, 70 insertions(+), 69 deletions(-) create mode 100644 sqlmesh/core/config/dbt.py diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index b864d02ec3..27f2326545 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -116,11 +116,13 @@ def _gen_config( - invalidselectstarexpansion - noambiguousprojections """, - ProjectTemplate.DBT: f"""# --- State --- -# This default configuration ensures that each dbt target gets its own isolated state. -# If this is undesirable, you may configure the state connection manually. -# https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/?h=dbt#selecting-a-different-state-connection -state_schema_naming_pattern: sqlmesh_state_@{{dbt_profile_name}}_@{{dbt_target_name}} + ProjectTemplate.DBT: f"""# --- DBT-specific options --- +dbt: + # This configuration ensures that each dbt target gets its own isolated state. + # The inferred state schemas are named "sqlmesh_state__", eg "sqlmesh_state_jaffle_shop_dev" + # If this is undesirable, you may manually configure the gateway to use a specific state schema name + # https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#selecting-a-different-state-connection + infer_state_schema_name: True # --- Virtual Data Environment Mode --- # Enable Virtual Data Environments (VDE) for *development* environments. diff --git a/sqlmesh/core/config/__init__.py b/sqlmesh/core/config/__init__.py index e829ff7df2..42ed82c6e6 100644 --- a/sqlmesh/core/config/__init__.py +++ b/sqlmesh/core/config/__init__.py @@ -36,6 +36,6 @@ from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig from sqlmesh.core.config.linter import LinterConfig as LinterConfig from sqlmesh.core.config.plan import PlanConfig as PlanConfig -from sqlmesh.core.config.root import Config as Config, DbtConfigInfo as DbtConfigInfo +from sqlmesh.core.config.root import Config as Config, DbtConfig as DbtConfig from sqlmesh.core.config.run import RunConfig as RunConfig from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig as BuiltInSchedulerConfig diff --git a/sqlmesh/core/config/base.py b/sqlmesh/core/config/base.py index 1aaf8da4db..0da36e4754 100644 --- a/sqlmesh/core/config/base.py +++ b/sqlmesh/core/config/base.py @@ -140,17 +140,3 @@ def update_with(self: T, other: t.Union[t.Dict[str, t.Any], T]) -> T: setattr(updated, field, value) return updated - - -class DbtConfigInfo(PydanticModel): - """ - This is like DbtNodeInfo except it applies to config instead of DAG nodes. - - It's intended to capture information from a dbt project loaded by the DbtLoader so that it can be used for things like - variable substitutions in regular project config. - """ - - profile_name: str - """Which profile in the dbt project is being used""" - target_name: str - """Which target of the specified profile is being used""" diff --git a/sqlmesh/core/config/dbt.py b/sqlmesh/core/config/dbt.py new file mode 100644 index 0000000000..e3132c40a4 --- /dev/null +++ b/sqlmesh/core/config/dbt.py @@ -0,0 +1,13 @@ +from sqlmesh.core.config.base import BaseConfig + + +class DbtConfig(BaseConfig): + """ + Represents dbt-specific options on the SQLMesh root config. + + These options are only taken into account for dbt projects and are ignored on native projects + """ + + infer_state_schema_name: bool = False + """If set, indicates to the dbt loader that the state schema should be inferred based on the profile/target + so that each target gets its own isolated state""" diff --git a/sqlmesh/core/config/gateway.py b/sqlmesh/core/config/gateway.py index 05551a6089..a51557c4d7 100644 --- a/sqlmesh/core/config/gateway.py +++ b/sqlmesh/core/config/gateway.py @@ -2,6 +2,7 @@ import typing as t +from sqlmesh.core import constants as c from sqlmesh.core.config.base import BaseConfig from sqlmesh.core.config.model import ModelDefaultsConfig from sqlmesh.core.config.common import variables_validator @@ -32,7 +33,7 @@ class GatewayConfig(BaseConfig): state_connection: t.Optional[SerializableConnectionConfig] = None test_connection: t.Optional[SerializableConnectionConfig] = None scheduler: t.Optional[SchedulerConfig] = None - state_schema: t.Optional[str] = None + state_schema: t.Optional[str] = c.SQLMESH variables: t.Dict[str, t.Any] = {} model_defaults: t.Optional[ModelDefaultsConfig] = None diff --git a/sqlmesh/core/config/loader.py b/sqlmesh/core/config/loader.py index 2d202cb276..e05c148b90 100644 --- a/sqlmesh/core/config/loader.py +++ b/sqlmesh/core/config/loader.py @@ -172,12 +172,17 @@ def load_config_from_paths( if dbt_project_file: from sqlmesh.dbt.loader import sqlmesh_config + infer_state_schema_name = False + if dbt := non_python_config.dbt: + infer_state_schema_name = dbt.infer_state_schema_name + dbt_python_config = sqlmesh_config( project_root=dbt_project_file.parent, dbt_profile_name=kwargs.pop("profile", None), dbt_target_name=kwargs.pop("target", None), variables=variables, threads=kwargs.pop("threads", None), + infer_state_schema_name=infer_state_schema_name, ) if type(dbt_python_config) != config_type: dbt_python_config = convert_config_type(dbt_python_config, config_type) diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 4ce1568b85..6108cc0387 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -20,7 +20,7 @@ TableNamingConvention, VirtualEnvironmentMode, ) -from sqlmesh.core.config.base import BaseConfig, UpdateStrategy, DbtConfigInfo +from sqlmesh.core.config.base import BaseConfig, UpdateStrategy from sqlmesh.core.config.common import variables_validator, compile_regex_mapping from sqlmesh.core.config.connection import ( ConnectionConfig, @@ -37,6 +37,7 @@ from sqlmesh.core.config.linter import LinterConfig as LinterConfig from sqlmesh.core.config.plan import PlanConfig from sqlmesh.core.config.run import RunConfig +from sqlmesh.core.config.dbt import DbtConfig from sqlmesh.core.config.scheduler import ( BuiltInSchedulerConfig, SchedulerConfig, @@ -99,8 +100,6 @@ class Config(BaseConfig): default_test_connection: The default connection to use for tests if one is not specified in a gateway. default_scheduler: The default scheduler configuration to use if one is not specified in a gateway. default_gateway: The default gateway. - state_schema_naming_pattern: A pattern supporting variable substitutions to determine the state schema name, rather than just using 'sqlmesh'. - Only applies when the state schema is not explicitly set in the gateway config notification_targets: The notification targets to use. project: The project name of this config. Used for multi-repo setups. snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted. @@ -133,7 +132,6 @@ class Config(BaseConfig): before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands. after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands. cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder. - dbt_config_info: Dbt-specific properties (such as profile and target) for dbt projects loaded by the dbt loader """ gateways: GatewayDict = {"": GatewayConfig()} @@ -143,7 +141,6 @@ class Config(BaseConfig): ) default_scheduler: SchedulerConfig = BuiltInSchedulerConfig() default_gateway: str = "" - state_schema_naming_pattern: t.Optional[str] = None notification_targets: t.List[NotificationTarget] = [] project: str = "" snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL @@ -180,7 +177,7 @@ class Config(BaseConfig): linter: LinterConfig = LinterConfig() janitor: JanitorConfig = JanitorConfig() cache_dir: t.Optional[str] = None - dbt_config_info: t.Optional[DbtConfigInfo] = None + dbt: t.Optional[DbtConfig] = None _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { "gateways": UpdateStrategy.NESTED_UPDATE, @@ -199,6 +196,7 @@ class Config(BaseConfig): "before_all": UpdateStrategy.EXTEND, "after_all": UpdateStrategy.EXTEND, "linter": UpdateStrategy.NESTED_UPDATE, + "dbt": UpdateStrategy.NESTED_UPDATE, } _connection_config_validator = connection_config_validator @@ -352,27 +350,8 @@ def get_test_connection( def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig: return self.get_gateway(gateway_name).scheduler or self.default_scheduler - def get_state_schema(self, gateway_name: t.Optional[str] = None) -> str: - state_schema = self.get_gateway(gateway_name).state_schema - - if state_schema is None and self.state_schema_naming_pattern: - substitutions = {} - if dbt := self.dbt_config_info: - # TODO: keeping this simple for now rather than trying to set up a Jinja or SQLMesh Macro rendering context - substitutions.update( - { - "@{dbt_profile_name}": dbt.profile_name, - # TODO @iaroslav: what was the problem with using target name instead of the default schema name again? - "@{dbt_target_name}": dbt.target_name, - } - ) - state_schema = self.state_schema_naming_pattern - for pattern, value in substitutions.items(): - state_schema = state_schema.replace(pattern, value) - - logger.info("Inferring state schema: %s", state_schema) - - return state_schema or c.SQLMESH + def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]: + return self.get_gateway(gateway_name).state_schema @property def default_gateway_name(self) -> str: diff --git a/sqlmesh/dbt/loader.py b/sqlmesh/dbt/loader.py index 1386b9d9ab..e1f3562789 100644 --- a/sqlmesh/dbt/loader.py +++ b/sqlmesh/dbt/loader.py @@ -11,7 +11,7 @@ ConnectionConfig, GatewayConfig, ModelDefaultsConfig, - DbtConfigInfo, + DbtConfig as RootDbtConfig, ) from sqlmesh.core.environment import EnvironmentStatements from sqlmesh.core.loader import CacheBase, LoadedProject, Loader @@ -52,6 +52,7 @@ def sqlmesh_config( variables: t.Optional[t.Dict[str, t.Any]] = None, threads: t.Optional[int] = None, register_comments: t.Optional[bool] = None, + infer_state_schema_name: bool = False, **kwargs: t.Any, ) -> Config: project_root = project_root or Path() @@ -73,25 +74,28 @@ def sqlmesh_config( # the to_sqlmesh() function on TargetConfig maps self.threads -> concurrent_tasks profile.target.threads = threads - if context.profile_name is None: - # Note: Profile.load() mutates `context` and will have already raised an exception if profile_name is not set, - # but mypy doesnt know this because the field is defined as t.Optional[str] - raise ConfigError(f"profile name must be set") + gateway_kwargs = {} + if infer_state_schema_name: + profile_name = context.profile_name + # Note: we deliberately isolate state based on the target *schema* and not the target name. + # It is assumed that the project will define a target, eg 'dev', and then in each users own ~/.dbt/profiles.yml the schema + # for the 'dev' target is overriden to something user-specific, rather than making the target name itself user-specific. + # This means that the schema name is the indicator of isolated state, not the target name which may be re-used across multiple schemas. + target_schema = profile.target.schema_ + gateway_kwargs["state_schema"] = f"sqlmesh_state_{profile_name}_{target_schema}" return Config( loader=loader, model_defaults=model_defaults, variables=variables or {}, - dbt_config_info=DbtConfigInfo( - profile_name=dbt_profile_name or context.profile_name, - target_name=dbt_target_name or profile.target_name, - ), + dbt=RootDbtConfig(infer_state_schema_name=infer_state_schema_name), **{ "default_gateway": profile.target_name if "gateways" not in kwargs else "", "gateways": { profile.target_name: GatewayConfig( connection=profile.target.to_sqlmesh(**target_to_sqlmesh_args), state_connection=state_connection, + **gateway_kwargs, ) }, # type: ignore **kwargs, diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index 82bc7fca88..5dccd90ed2 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -1215,23 +1215,24 @@ def test_empty_vars_config(tmp_path): assert project.context.variables == {} -def test_state_schema_naming_pattern(create_empty_project: EmptyProjectCreator): +def test_infer_state_schema_name(create_empty_project: EmptyProjectCreator): project_dir, _ = create_empty_project("test_foo", "dev") - # no state_schema_naming_pattern, creating python config manually doesnt take into account - # any config yaml files that may be present, so we get the default state schema + # infer_state_schema_name defaults to False if omitted config = sqlmesh_config(project_root=project_dir) - assert not config.state_schema_naming_pattern + assert config.dbt + assert not config.dbt.infer_state_schema_name assert config.get_state_schema() == "sqlmesh" # create_empty_project() uses the default dbt template for sqlmesh yaml config which - # sets state_schema_naming_pattern + # sets infer_state_schema_name=True ctx = Context(paths=[project_dir]) - assert ctx.config.state_schema_naming_pattern - assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_dev" + assert ctx.config.dbt + assert ctx.config.dbt.infer_state_schema_name + assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_main" assert isinstance(ctx.state_sync, CachingStateSync) assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync) - assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_dev" + assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_main" # If the user delberately overrides state_schema then we should respect this choice config_file = project_dir / "sqlmesh.yaml" @@ -1240,7 +1241,8 @@ def test_state_schema_naming_pattern(create_empty_project: EmptyProjectCreator): config_file.write_text(yaml_dump(config_yaml)) ctx = Context(paths=[project_dir]) - assert ctx.config.state_schema_naming_pattern + assert ctx.config.dbt + assert ctx.config.dbt.infer_state_schema_name assert ctx.config.get_state_schema() == "state_override" assert isinstance(ctx.state_sync, CachingStateSync) assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync) diff --git a/tests/dbt/test_integration.py b/tests/dbt/test_integration.py index 2354c9d609..ab22bf7826 100644 --- a/tests/dbt/test_integration.py +++ b/tests/dbt/test_integration.py @@ -625,14 +625,14 @@ def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path): # start off with the prod target prod_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "prod"}) - assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod" + assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod_schema" assert all("prod_schema" in fqn for fqn in prod_ctx.models) assert prod_ctx.plan(auto_apply=True).has_changes assert not prod_ctx.plan(auto_apply=True).has_changes # dev target should have changes - new state separate from prod dev_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "dev"}) - assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev" + assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema" assert all("dev_schema" in fqn for fqn in dev_ctx.models) assert dev_ctx.plan(auto_apply=True).has_changes assert not dev_ctx.plan(auto_apply=True).has_changes @@ -640,6 +640,15 @@ def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path): # no explicitly specified target should use dev because that's what's set for the default in the profiles.yml assert profiles_yml["jaffle_shop"]["target"] == "dev" default_ctx = Context(paths=[jaffle_shop_duckdb]) - assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev" + assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema" assert all("dev_schema" in fqn for fqn in default_ctx.models) assert not default_ctx.plan(auto_apply=True).has_changes + + # an explicit state schema override set in `sqlmesh.yaml` should use that + sqlmesh_yaml_file = jaffle_shop_duckdb / "sqlmesh.yaml" + sqlmesh_yaml = yaml_load(sqlmesh_yaml_file) + sqlmesh_yaml["gateways"] = {"dev": {"state_schema": "sqlmesh_dev_state_override"}} + sqlmesh_yaml_file.write_text(yaml_dump(sqlmesh_yaml)) + default_ctx = Context(paths=[jaffle_shop_duckdb]) + assert default_ctx.config.get_state_schema() == "sqlmesh_dev_state_override" + assert all("dev_schema" in fqn for fqn in default_ctx.models) From 1921e08ecc7a945a13bb42edc0ff21ef6c143b5d Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 7 Oct 2025 00:59:39 +0000 Subject: [PATCH 3/5] update wording --- sqlmesh/cli/project_init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/cli/project_init.py b/sqlmesh/cli/project_init.py index 27f2326545..e3132a6de3 100644 --- a/sqlmesh/cli/project_init.py +++ b/sqlmesh/cli/project_init.py @@ -119,7 +119,7 @@ def _gen_config( ProjectTemplate.DBT: f"""# --- DBT-specific options --- dbt: # This configuration ensures that each dbt target gets its own isolated state. - # The inferred state schemas are named "sqlmesh_state__", eg "sqlmesh_state_jaffle_shop_dev" + # The inferred state schemas are named "sqlmesh_state__", eg "sqlmesh_state_jaffle_shop_dev" # If this is undesirable, you may manually configure the gateway to use a specific state schema name # https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#selecting-a-different-state-connection infer_state_schema_name: True From 783d18ca47569952455aa66e7440a1a1cbb1f3e5 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 7 Oct 2025 02:18:28 +0000 Subject: [PATCH 4/5] Fix test on dbt<1.5 --- sqlmesh/core/config/root.py | 3 --- tests/fixtures/dbt/empty_project/profiles.yml | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 6108cc0387..211d271b01 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -4,7 +4,6 @@ import re import typing as t import zlib -import logging from pydantic import Field from pydantic.functional_validators import BeforeValidator @@ -51,8 +50,6 @@ from sqlmesh.utils.errors import ConfigError from sqlmesh.utils.pydantic import model_validator -logger = logging.getLogger(__name__) - def validate_no_past_ttl(v: str) -> str: current_time = now() diff --git a/tests/fixtures/dbt/empty_project/profiles.yml b/tests/fixtures/dbt/empty_project/profiles.yml index 83ffab163b..adae09e9c6 100644 --- a/tests/fixtures/dbt/empty_project/profiles.yml +++ b/tests/fixtures/dbt/empty_project/profiles.yml @@ -5,5 +5,9 @@ empty_project: outputs: __DEFAULT_TARGET__: type: duckdb + # database is required for dbt < 1.5 where our adapter deliberately doesnt infer the database from the path and + # defaults it to "main", which raises a "project catalog doesnt match context catalog" error + # ref: https://github.com/TobikoData/sqlmesh/pull/1109 + database: empty_project path: 'empty_project.duckdb' threads: 4 From 9cb3bce16ac53b5e00c7c79eaa8a826cb88da93f Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 7 Oct 2025 23:20:08 +0000 Subject: [PATCH 5/5] Raise error if schema is an empty string --- sqlmesh/dbt/loader.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/sqlmesh/dbt/loader.py b/sqlmesh/dbt/loader.py index e1f3562789..049c761ed1 100644 --- a/sqlmesh/dbt/loader.py +++ b/sqlmesh/dbt/loader.py @@ -73,16 +73,28 @@ def sqlmesh_config( if threads is not None: # the to_sqlmesh() function on TargetConfig maps self.threads -> concurrent_tasks profile.target.threads = threads - + gateway_kwargs = {} if infer_state_schema_name: profile_name = context.profile_name + # Note: we deliberately isolate state based on the target *schema* and not the target name. # It is assumed that the project will define a target, eg 'dev', and then in each users own ~/.dbt/profiles.yml the schema # for the 'dev' target is overriden to something user-specific, rather than making the target name itself user-specific. # This means that the schema name is the indicator of isolated state, not the target name which may be re-used across multiple schemas. target_schema = profile.target.schema_ - gateway_kwargs["state_schema"] = f"sqlmesh_state_{profile_name}_{target_schema}" + + # dbt-core doesnt allow schema to be undefined, but it does allow an empty string, and then just + # fails at runtime when `CREATE SCHEMA ""` doesnt work + if not target_schema: + raise ConfigError( + f"Target '{profile.target_name}' does not specify a schema.\n" + "A schema is required in order to infer where to store SQLMesh state" + ) + + inferred_state_schema_name = f"sqlmesh_state_{profile_name}_{target_schema}" + logger.info("Inferring state schema: %s", inferred_state_schema_name) + gateway_kwargs["state_schema"] = inferred_state_schema_name return Config( loader=loader,