Skip to content

Commit bd16a2f

Browse files
committed
PR feedback
1 parent 2de2d05 commit bd16a2f

File tree

10 files changed

+70
-69
lines changed

10 files changed

+70
-69
lines changed

sqlmesh/cli/project_init.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,13 @@ def _gen_config(
116116
- invalidselectstarexpansion
117117
- noambiguousprojections
118118
""",
119-
ProjectTemplate.DBT: f"""# --- State ---
120-
# This default configuration ensures that each dbt target gets its own isolated state.
121-
# If this is undesirable, you may configure the state connection manually.
122-
# https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/?h=dbt#selecting-a-different-state-connection
123-
state_schema_naming_pattern: sqlmesh_state_@{{dbt_profile_name}}_@{{dbt_target_name}}
119+
ProjectTemplate.DBT: f"""# --- DBT-specific options ---
120+
dbt:
121+
# This configuration ensures that each dbt target gets its own isolated state.
122+
# The inferred state schemas are named "sqlmesh_state_<project name>_<target schema>", eg "sqlmesh_state_jaffle_shop_dev"
123+
# If this is undesirable, you may manually configure the gateway to use a specific state schema name
124+
# https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#selecting-a-different-state-connection
125+
infer_state_schema_name: True
124126
125127
# --- Virtual Data Environment Mode ---
126128
# Enable Virtual Data Environments (VDE) for *development* environments.

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@
3636
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
3737
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
3838
from sqlmesh.core.config.plan import PlanConfig as PlanConfig
39-
from sqlmesh.core.config.root import Config as Config, DbtConfigInfo as DbtConfigInfo
39+
from sqlmesh.core.config.root import Config as Config, DbtConfig as DbtConfig
4040
from sqlmesh.core.config.run import RunConfig as RunConfig
4141
from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig as BuiltInSchedulerConfig

sqlmesh/core/config/base.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,3 @@ def update_with(self: T, other: t.Union[t.Dict[str, t.Any], T]) -> T:
140140
setattr(updated, field, value)
141141

142142
return updated
143-
144-
145-
class DbtConfigInfo(PydanticModel):
146-
"""
147-
This is like DbtNodeInfo except it applies to config instead of DAG nodes.
148-
149-
It's intended to capture information from a dbt project loaded by the DbtLoader so that it can be used for things like
150-
variable substitutions in regular project config.
151-
"""
152-
153-
profile_name: str
154-
"""Which profile in the dbt project is being used"""
155-
target_name: str
156-
"""Which target of the specified profile is being used"""

sqlmesh/core/config/dbt.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from sqlmesh.core.config.base import BaseConfig
2+
3+
4+
class DbtConfig(BaseConfig):
5+
"""
6+
Represents dbt-specific options on the SQLMesh root config.
7+
8+
These options are only taken into account for dbt projects and are ignored on native projects
9+
"""
10+
11+
infer_state_schema_name: bool = False
12+
"""If set, indicates to the dbt loader that the state schema should be inferred based on the profile/target
13+
so that each target gets its own isolated state"""

sqlmesh/core/config/gateway.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import typing as t
44

5+
from sqlmesh.core import constants as c
56
from sqlmesh.core.config.base import BaseConfig
67
from sqlmesh.core.config.model import ModelDefaultsConfig
78
from sqlmesh.core.config.common import variables_validator
@@ -32,7 +33,7 @@ class GatewayConfig(BaseConfig):
3233
state_connection: t.Optional[SerializableConnectionConfig] = None
3334
test_connection: t.Optional[SerializableConnectionConfig] = None
3435
scheduler: t.Optional[SchedulerConfig] = None
35-
state_schema: t.Optional[str] = None
36+
state_schema: t.Optional[str] = c.SQLMESH
3637
variables: t.Dict[str, t.Any] = {}
3738
model_defaults: t.Optional[ModelDefaultsConfig] = None
3839

sqlmesh/core/config/loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,16 @@ def load_config_from_paths(
172172
if dbt_project_file:
173173
from sqlmesh.dbt.loader import sqlmesh_config
174174

175+
infer_state_schema_name = False
176+
if dbt := non_python_config.dbt:
177+
infer_state_schema_name = dbt.infer_state_schema_name
178+
175179
dbt_python_config = sqlmesh_config(
176180
project_root=dbt_project_file.parent,
177181
dbt_profile_name=kwargs.pop("profile", None),
178182
dbt_target_name=kwargs.pop("target", None),
179183
variables=variables,
184+
infer_state_schema_name=infer_state_schema_name,
180185
)
181186
if type(dbt_python_config) != config_type:
182187
dbt_python_config = convert_config_type(dbt_python_config, config_type)

sqlmesh/core/config/root.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
TableNamingConvention,
2121
VirtualEnvironmentMode,
2222
)
23-
from sqlmesh.core.config.base import BaseConfig, UpdateStrategy, DbtConfigInfo
23+
from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
2424
from sqlmesh.core.config.common import variables_validator, compile_regex_mapping
2525
from sqlmesh.core.config.connection import (
2626
ConnectionConfig,
@@ -37,6 +37,7 @@
3737
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
3838
from sqlmesh.core.config.plan import PlanConfig
3939
from sqlmesh.core.config.run import RunConfig
40+
from sqlmesh.core.config.dbt import DbtConfig
4041
from sqlmesh.core.config.scheduler import (
4142
BuiltInSchedulerConfig,
4243
SchedulerConfig,
@@ -99,8 +100,6 @@ class Config(BaseConfig):
99100
default_test_connection: The default connection to use for tests if one is not specified in a gateway.
100101
default_scheduler: The default scheduler configuration to use if one is not specified in a gateway.
101102
default_gateway: The default gateway.
102-
state_schema_naming_pattern: A pattern supporting variable substitutions to determine the state schema name, rather than just using 'sqlmesh'.
103-
Only applies when the state schema is not explicitly set in the gateway config
104103
notification_targets: The notification targets to use.
105104
project: The project name of this config. Used for multi-repo setups.
106105
snapshot_ttl: The period of time that a model snapshot that is not a part of any environment should exist before being deleted.
@@ -133,7 +132,6 @@ class Config(BaseConfig):
133132
before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands.
134133
after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands.
135134
cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder.
136-
dbt_config_info: Dbt-specific properties (such as profile and target) for dbt projects loaded by the dbt loader
137135
"""
138136

139137
gateways: GatewayDict = {"": GatewayConfig()}
@@ -143,7 +141,6 @@ class Config(BaseConfig):
143141
)
144142
default_scheduler: SchedulerConfig = BuiltInSchedulerConfig()
145143
default_gateway: str = ""
146-
state_schema_naming_pattern: t.Optional[str] = None
147144
notification_targets: t.List[NotificationTarget] = []
148145
project: str = ""
149146
snapshot_ttl: NoPastTTLString = c.DEFAULT_SNAPSHOT_TTL
@@ -180,7 +177,7 @@ class Config(BaseConfig):
180177
linter: LinterConfig = LinterConfig()
181178
janitor: JanitorConfig = JanitorConfig()
182179
cache_dir: t.Optional[str] = None
183-
dbt_config_info: t.Optional[DbtConfigInfo] = None
180+
dbt: t.Optional[DbtConfig] = None
184181

185182
_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
186183
"gateways": UpdateStrategy.NESTED_UPDATE,
@@ -199,6 +196,7 @@ class Config(BaseConfig):
199196
"before_all": UpdateStrategy.EXTEND,
200197
"after_all": UpdateStrategy.EXTEND,
201198
"linter": UpdateStrategy.NESTED_UPDATE,
199+
"dbt": UpdateStrategy.NESTED_UPDATE,
202200
}
203201

204202
_connection_config_validator = connection_config_validator
@@ -352,27 +350,8 @@ def get_test_connection(
352350
def get_scheduler(self, gateway_name: t.Optional[str] = None) -> SchedulerConfig:
353351
return self.get_gateway(gateway_name).scheduler or self.default_scheduler
354352

355-
def get_state_schema(self, gateway_name: t.Optional[str] = None) -> str:
356-
state_schema = self.get_gateway(gateway_name).state_schema
357-
358-
if state_schema is None and self.state_schema_naming_pattern:
359-
substitutions = {}
360-
if dbt := self.dbt_config_info:
361-
# TODO: keeping this simple for now rather than trying to set up a Jinja or SQLMesh Macro rendering context
362-
substitutions.update(
363-
{
364-
"@{dbt_profile_name}": dbt.profile_name,
365-
# TODO @iaroslav: what was the problem with using target name instead of the default schema name again?
366-
"@{dbt_target_name}": dbt.target_name,
367-
}
368-
)
369-
state_schema = self.state_schema_naming_pattern
370-
for pattern, value in substitutions.items():
371-
state_schema = state_schema.replace(pattern, value)
372-
373-
logger.info("Inferring state schema: %s", state_schema)
374-
375-
return state_schema or c.SQLMESH
353+
def get_state_schema(self, gateway_name: t.Optional[str] = None) -> t.Optional[str]:
354+
return self.get_gateway(gateway_name).state_schema
376355

377356
@property
378357
def default_gateway_name(self) -> str:

sqlmesh/dbt/loader.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
ConnectionConfig,
1212
GatewayConfig,
1313
ModelDefaultsConfig,
14-
DbtConfigInfo,
14+
DbtConfig as RootDbtConfig,
1515
)
1616
from sqlmesh.core.environment import EnvironmentStatements
1717
from sqlmesh.core.loader import CacheBase, LoadedProject, Loader
@@ -51,6 +51,7 @@ def sqlmesh_config(
5151
dbt_target_name: t.Optional[str] = None,
5252
variables: t.Optional[t.Dict[str, t.Any]] = None,
5353
register_comments: t.Optional[bool] = None,
54+
infer_state_schema_name: bool = False,
5455
**kwargs: t.Any,
5556
) -> Config:
5657
project_root = project_root or Path()
@@ -68,25 +69,28 @@ def sqlmesh_config(
6869
if not issubclass(loader, DbtLoader):
6970
raise ConfigError("The loader must be a DbtLoader.")
7071

71-
if context.profile_name is None:
72-
# Note: Profile.load() mutates `context` and will have already raised an exception if profile_name is not set,
73-
# but mypy doesnt know this because the field is defined as t.Optional[str]
74-
raise ConfigError(f"profile name must be set")
72+
gateway_kwargs = {}
73+
if infer_state_schema_name:
74+
profile_name = context.profile_name
75+
# Note: we deliberately isolate state based on the target *schema* and not the target name.
76+
# It is assumed that the project will define a target, eg 'dev', and then in each users own ~/.dbt/profiles.yml the schema
77+
# for the 'dev' target is overriden to something user-specific, rather than making the target name itself user-specific.
78+
# This means that the schema name is the indicator of isolated state, not the target name which may be re-used across multiple schemas.
79+
target_schema = profile.target.schema_
80+
gateway_kwargs["state_schema"] = f"sqlmesh_state_{profile_name}_{target_schema}"
7581

7682
return Config(
7783
loader=loader,
7884
model_defaults=model_defaults,
7985
variables=variables or {},
80-
dbt_config_info=DbtConfigInfo(
81-
profile_name=dbt_profile_name or context.profile_name,
82-
target_name=dbt_target_name or profile.target_name,
83-
),
86+
dbt=RootDbtConfig(infer_state_schema_name=infer_state_schema_name),
8487
**{
8588
"default_gateway": profile.target_name if "gateways" not in kwargs else "",
8689
"gateways": {
8790
profile.target_name: GatewayConfig(
8891
connection=profile.target.to_sqlmesh(**target_to_sqlmesh_args),
8992
state_connection=state_connection,
93+
**gateway_kwargs,
9094
)
9195
}, # type: ignore
9296
**kwargs,

tests/dbt/test_config.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,23 +1215,24 @@ def test_empty_vars_config(tmp_path):
12151215
assert project.context.variables == {}
12161216

12171217

1218-
def test_state_schema_naming_pattern(create_empty_project: EmptyProjectCreator):
1218+
def test_infer_state_schema_name(create_empty_project: EmptyProjectCreator):
12191219
project_dir, _ = create_empty_project("test_foo", "dev")
12201220

1221-
# no state_schema_naming_pattern, creating python config manually doesnt take into account
1222-
# any config yaml files that may be present, so we get the default state schema
1221+
# infer_state_schema_name defaults to False if omitted
12231222
config = sqlmesh_config(project_root=project_dir)
1224-
assert not config.state_schema_naming_pattern
1223+
assert config.dbt
1224+
assert not config.dbt.infer_state_schema_name
12251225
assert config.get_state_schema() == "sqlmesh"
12261226

12271227
# create_empty_project() uses the default dbt template for sqlmesh yaml config which
1228-
# sets state_schema_naming_pattern
1228+
# sets infer_state_schema_name=True
12291229
ctx = Context(paths=[project_dir])
1230-
assert ctx.config.state_schema_naming_pattern
1231-
assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_dev"
1230+
assert ctx.config.dbt
1231+
assert ctx.config.dbt.infer_state_schema_name
1232+
assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_main"
12321233
assert isinstance(ctx.state_sync, CachingStateSync)
12331234
assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync)
1234-
assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_dev"
1235+
assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_main"
12351236

12361237
# If the user delberately overrides state_schema then we should respect this choice
12371238
config_file = project_dir / "sqlmesh.yaml"
@@ -1240,7 +1241,8 @@ def test_state_schema_naming_pattern(create_empty_project: EmptyProjectCreator):
12401241
config_file.write_text(yaml_dump(config_yaml))
12411242

12421243
ctx = Context(paths=[project_dir])
1243-
assert ctx.config.state_schema_naming_pattern
1244+
assert ctx.config.dbt
1245+
assert ctx.config.dbt.infer_state_schema_name
12441246
assert ctx.config.get_state_schema() == "state_override"
12451247
assert isinstance(ctx.state_sync, CachingStateSync)
12461248
assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync)

tests/dbt/test_integration.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -625,21 +625,30 @@ def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path):
625625

626626
# start off with the prod target
627627
prod_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "prod"})
628-
assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod"
628+
assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod_schema"
629629
assert all("prod_schema" in fqn for fqn in prod_ctx.models)
630630
assert prod_ctx.plan(auto_apply=True).has_changes
631631
assert not prod_ctx.plan(auto_apply=True).has_changes
632632

633633
# dev target should have changes - new state separate from prod
634634
dev_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "dev"})
635-
assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev"
635+
assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema"
636636
assert all("dev_schema" in fqn for fqn in dev_ctx.models)
637637
assert dev_ctx.plan(auto_apply=True).has_changes
638638
assert not dev_ctx.plan(auto_apply=True).has_changes
639639

640640
# no explicitly specified target should use dev because that's what's set for the default in the profiles.yml
641641
assert profiles_yml["jaffle_shop"]["target"] == "dev"
642642
default_ctx = Context(paths=[jaffle_shop_duckdb])
643-
assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev"
643+
assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema"
644644
assert all("dev_schema" in fqn for fqn in default_ctx.models)
645645
assert not default_ctx.plan(auto_apply=True).has_changes
646+
647+
# an explicit state schema override set in `sqlmesh.yaml` should use that
648+
sqlmesh_yaml_file = jaffle_shop_duckdb / "sqlmesh.yaml"
649+
sqlmesh_yaml = yaml_load(sqlmesh_yaml_file)
650+
sqlmesh_yaml["gateways"] = {"dev": {"state_schema": "sqlmesh_dev_state_override"}}
651+
sqlmesh_yaml_file.write_text(yaml_dump(sqlmesh_yaml))
652+
default_ctx = Context(paths=[jaffle_shop_duckdb])
653+
assert default_ctx.config.get_state_schema() == "sqlmesh_dev_state_override"
654+
assert all("dev_schema" in fqn for fqn in default_ctx.models)

0 commit comments

Comments
 (0)