Skip to content

Commit c1e8015

Browse files
authored
Fix: Include catalog name in schema creation (#1249)
* include catalog schema creation * update tests * style
1 parent edbf96e commit c1e8015

File tree

7 files changed

+32
-15
lines changed

7 files changed

+32
-15
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,13 +457,19 @@ def create_view(
457457
)
458458

459459
def create_schema(
460-
self, schema_name: str, ignore_if_exists: bool = True, warn_on_error: bool = True
460+
self,
461+
schema_name: str,
462+
catalog_name: t.Optional[str] = None,
463+
ignore_if_exists: bool = True,
464+
warn_on_error: bool = True,
461465
) -> None:
462466
"""Create a schema from a name or qualified table name."""
467+
if catalog_name:
468+
schema_name = f"{catalog_name}.{schema_name}"
463469
try:
464470
self.execute(
465471
exp.Create(
466-
this=exp.to_identifier(schema_name.split(".")[0]),
472+
this=exp.to_table(schema_name),
467473
kind="SCHEMA",
468474
exists=ignore_if_exists,
469475
)

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,21 @@ def _is_session_active(self) -> bool:
103103
return self._session_id is not None
104104

105105
def create_schema(
106-
self, schema_name: str, ignore_if_exists: bool = True, warn_on_error: bool = True
106+
self,
107+
schema_name: str,
108+
catalog_name: t.Optional[str] = None,
109+
ignore_if_exists: bool = True,
110+
warn_on_error: bool = True,
107111
) -> None:
108112
"""Create a schema from a name or qualified table name."""
109113
from google.api_core.exceptions import Conflict
110114

111115
try:
112116
super().create_schema(
113-
schema_name, ignore_if_exists=ignore_if_exists, warn_on_error=False
117+
schema_name,
118+
catalog_name=catalog_name,
119+
ignore_if_exists=ignore_if_exists,
120+
warn_on_error=False,
114121
)
115122
except Exception as e:
116123
is_already_exists_error = isinstance(e, Conflict) and "Already Exists:" in str(e)

sqlmesh/core/snapshot/evaluator.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ def promote(
700700
) -> None:
701701
schema = view_name.schema_for_environment(environment_naming_info=environment_naming_info)
702702
if schema is not None:
703-
self.adapter.create_schema(schema)
703+
self.adapter.create_schema(schema, catalog_name=view_name.catalog)
704704

705705
target_name = view_name.for_environment(environment_naming_info)
706706
logger.info("Updating view '%s' to point at table '%s'", target_name, table_name)
@@ -736,7 +736,8 @@ def create(
736736
name: str,
737737
**render_kwargs: t.Any,
738738
) -> None:
739-
self.adapter.create_schema(exp.to_table(name).db)
739+
table = exp.to_table(name)
740+
self.adapter.create_schema(table.db, catalog_name=table.catalog)
740741

741742
logger.info("Creating table '%s'", name)
742743
if model.annotated:
@@ -913,7 +914,8 @@ def create(
913914
name: str,
914915
**render_kwargs: t.Any,
915916
) -> None:
916-
self.adapter.create_schema(exp.to_table(name).db)
917+
table = exp.to_table(name)
918+
self.adapter.create_schema(table.db, catalog_name=table.catalog)
917919

918920
logger.info("Creating view '%s'", name)
919921
self.adapter.create_view(

sqlmesh/core/test/definition.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,18 @@ def __init__(
6565

6666
def setUp(self) -> None:
6767
"""Load all input tables"""
68-
for table, rows in self.body.get("inputs", {}).items():
68+
for table_name, rows in self.body.get("inputs", {}).items():
6969
df = pd.DataFrame.from_records(rows) # noqa
7070
columns_to_types: dict[str, exp.DataType] = {}
7171

7272
for i, v in rows[0].items():
7373
# convert ruamel into python
7474
v = v.real if hasattr(v, "real") else v
7575
columns_to_types[i] = parse_one(type(v).__name__, into=exp.DataType)
76-
77-
self.engine_adapter.create_schema(table)
78-
self.engine_adapter.create_view(_test_fixture_name(table), df, columns_to_types)
76+
table = exp.to_table(table_name)
77+
if table.db:
78+
self.engine_adapter.create_schema(table.db, catalog_name=table.catalog)
79+
self.engine_adapter.create_view(_test_fixture_name(table_name), df, columns_to_types)
7980

8081
def tearDown(self) -> None:
8182
"""Drop all input tables"""

tests/core/engine_adapter/test_base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ def test_create_schema(make_mocked_engine_adapter: t.Callable):
4646
adapter = make_mocked_engine_adapter(EngineAdapter)
4747
adapter.create_schema("test_schema")
4848
adapter.create_schema("test_schema", ignore_if_exists=False)
49+
adapter.create_schema("test_schema", catalog_name="test_catalog")
4950

5051
adapter.cursor.execute.assert_has_calls(
5152
[
5253
call('CREATE SCHEMA IF NOT EXISTS "test_schema"'),
5354
call('CREATE SCHEMA "test_schema"'),
55+
call('CREATE SCHEMA IF NOT EXISTS "test_catalog"."test_schema"'),
5456
]
5557
)
5658

tests/core/engine_adapter/test_duckdb.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ def test_create_view(adapter: EngineAdapter, duck_conn):
2121

2222

2323
def test_create_schema(adapter: EngineAdapter, duck_conn):
24-
adapter.create_schema("test_schema")
2524
adapter.create_schema("test_schema")
2625
assert duck_conn.execute(
2726
"SELECT 1 FROM information_schema.schemata WHERE schema_name = 'test_schema'"

tests/core/test_snapshot_evaluator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def x(evaluator, y=None) -> None:
137137

138138
adapter_mock.create_schema.assert_has_calls(
139139
[
140-
call("sqlmesh__test_schema"),
140+
call("sqlmesh__test_schema", catalog_name=""),
141141
]
142142
)
143143

@@ -189,7 +189,7 @@ def test_promote(mocker: MockerFixture, adapter_mock, make_snapshot):
189189

190190
evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env"))
191191

192-
adapter_mock.create_schema.assert_called_once_with("test_schema__test_env")
192+
adapter_mock.create_schema.assert_called_once_with("test_schema__test_env", catalog_name=None)
193193
adapter_mock.create_view.assert_called_once_with(
194194
"test_schema__test_env.test_model",
195195
parse_one(
@@ -359,7 +359,7 @@ def test_promote_model_info(mocker: MockerFixture):
359359
EnvironmentNamingInfo(name="test_env"),
360360
)
361361

362-
adapter_mock.create_schema.assert_called_once_with("test_schema__test_env")
362+
adapter_mock.create_schema.assert_called_once_with("test_schema__test_env", catalog_name=None)
363363
adapter_mock.create_view.assert_called_once_with(
364364
"test_schema__test_env.test_model",
365365
parse_one("SELECT * FROM physical_schema.test_schema__test_model__1"),

0 commit comments

Comments
 (0)