Skip to content

Commit 268adb9

Browse files
committed
Fix: implement table properties handling for Redshift engine adapter
Signed-off-by: Simen Strand <simen.strand@netcheck.de>
1 parent 3be5bba commit 268adb9

2 files changed

Lines changed: 214 additions & 0 deletions

File tree

sqlmesh/core/engine_adapter/redshift.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import typing as t
55

66
from sqlglot import exp
7+
from sqlglot.helper import ensure_list
78

89
from sqlmesh.core.dialect import to_schema
910
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
@@ -30,6 +31,7 @@
3031

3132
from sqlmesh.core._typing import SchemaName, TableName
3233
from sqlmesh.core.engine_adapter.base import QueryOrDF, Query
34+
from sqlmesh.core.node import IntervalUnit
3335

3436
logger = logging.getLogger(__name__)
3537

@@ -249,6 +251,63 @@ def create_view(
249251
**create_kwargs,
250252
)
251253

254+
def _build_table_properties_exp(
255+
self,
256+
catalog_name: t.Optional[str] = None,
257+
table_format: t.Optional[str] = None,
258+
storage_format: t.Optional[str] = None,
259+
partitioned_by: t.Optional[t.List[exp.Expr]] = None,
260+
partition_interval_unit: t.Optional[IntervalUnit] = None,
261+
clustered_by: t.Optional[t.List[exp.Expr]] = None,
262+
table_properties: t.Optional[t.Dict[str, exp.Expr]] = None,
263+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
264+
table_description: t.Optional[str] = None,
265+
table_kind: t.Optional[str] = None,
266+
**kwargs: t.Any,
267+
) -> t.Optional[exp.Properties]:
268+
properties: t.List[exp.Expr] = []
269+
270+
if table_description:
271+
properties.append(
272+
exp.SchemaCommentProperty(
273+
this=exp.Literal.string(self._truncate_table_comment(table_description))
274+
)
275+
)
276+
277+
def _to_identifier_if_string(expression: exp.Expr) -> exp.Expr:
278+
if isinstance(expression, exp.Literal) and expression.is_string:
279+
return exp.to_identifier(expression.this)
280+
return expression.copy()
281+
282+
if table_properties:
283+
table_properties = {k.upper(): v for k, v in table_properties.items()}
284+
285+
table_type = self._pop_creatable_type_from_properties(table_properties)
286+
properties.extend(ensure_list(table_type))
287+
288+
diststyle = table_properties.get("DISTSTYLE")
289+
if diststyle:
290+
properties.append(exp.DistStyleProperty(this=exp.var(diststyle.name.upper())))
291+
292+
distkey = table_properties.get("DISTKEY")
293+
if distkey:
294+
properties.append(exp.DistKeyProperty(this=_to_identifier_if_string(distkey)))
295+
296+
sortkey = table_properties.get("SORTKEY")
297+
if sortkey:
298+
sortkey_expressions = sortkey.expressions if sortkey.expressions else [sortkey]
299+
properties.append(
300+
exp.SortKeyProperty(
301+
this=[
302+
_to_identifier_if_string(expression)
303+
for expression in sortkey_expressions
304+
],
305+
compound=False,
306+
)
307+
)
308+
309+
return exp.Properties(expressions=properties) if properties else None
310+
252311
def replace_query(
253312
self,
254313
table_name: TableName,

tests/core/engine_adapter/test_redshift.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
from sqlglot import expressions as exp
99
from sqlglot import parse_one
1010

11+
import sqlmesh.core.dialect as d
1112
from sqlmesh.core.engine_adapter import RedshiftEngineAdapter
1213
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
14+
from sqlmesh.core.model import load_sql_based_model
15+
from sqlmesh.core.model.definition import SqlModel
1316
from sqlmesh.utils.errors import SQLMeshError
1417
from tests.core.engine_adapter import to_sql_calls
1518

@@ -32,6 +35,158 @@ def test_columns(adapter: t.Callable):
3235
assert resp == {"col": exp.DataType.build("INT")}
3336

3437

38+
def test_create_table_physical_properties(make_mocked_engine_adapter: t.Callable):
39+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
40+
41+
adapter.create_table(
42+
"test_schema.test_table",
43+
{
44+
"id_file": exp.DataType.build("INT"),
45+
"batch_time": exp.DataType.build("TIMESTAMP"),
46+
},
47+
table_properties={
48+
"diststyle": exp.column("key"),
49+
"distkey": exp.to_column("id_file"),
50+
"sortkey": exp.to_column("batch_time"),
51+
},
52+
)
53+
54+
assert to_sql_calls(adapter) == [
55+
'CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id_file" INTEGER, "batch_time" TIMESTAMP) DISTSTYLE KEY DISTKEY("id_file") SORTKEY("batch_time")',
56+
]
57+
58+
59+
@pytest.mark.parametrize(
60+
("diststyle", "expected"),
61+
[
62+
("auto", "AUTO"),
63+
("even", "EVEN"),
64+
("key", "KEY"),
65+
("all", "ALL"),
66+
],
67+
)
68+
def test_create_table_physical_properties_diststyle_values(
69+
make_mocked_engine_adapter: t.Callable,
70+
diststyle: str,
71+
expected: str,
72+
):
73+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
74+
table_properties = {"diststyle": exp.column(diststyle)}
75+
if diststyle == "key":
76+
table_properties["distkey"] = exp.to_column("id_file")
77+
78+
adapter.create_table(
79+
"test_schema.test_table",
80+
{"id_file": exp.DataType.build("INT")},
81+
table_properties=table_properties,
82+
)
83+
84+
expected_distkey = ' DISTKEY("id_file")' if diststyle == "key" else ""
85+
assert to_sql_calls(adapter) == [
86+
f'CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id_file" INTEGER) DISTSTYLE {expected}{expected_distkey}',
87+
]
88+
89+
90+
def test_create_table_physical_properties_distkey_without_diststyle(
91+
make_mocked_engine_adapter: t.Callable,
92+
):
93+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
94+
95+
adapter.create_table(
96+
"test_schema.test_table",
97+
{"id_file": exp.DataType.build("INT")},
98+
table_properties={"distkey": exp.to_column("id_file")},
99+
)
100+
101+
assert to_sql_calls(adapter) == [
102+
'CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id_file" INTEGER) DISTKEY("id_file")',
103+
]
104+
105+
106+
def test_create_table_physical_properties_multi_column_sortkey(
107+
make_mocked_engine_adapter: t.Callable,
108+
):
109+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
110+
111+
adapter.create_table(
112+
"test_schema.test_table",
113+
{
114+
"id_file": exp.DataType.build("INT"),
115+
"batch_time": exp.DataType.build("TIMESTAMP"),
116+
"event_time": exp.DataType.build("TIMESTAMP"),
117+
},
118+
table_properties={
119+
"diststyle": exp.column("key"),
120+
"distkey": exp.to_column("id_file"),
121+
"sortkey": exp.Tuple(
122+
expressions=[exp.to_column("batch_time"), exp.to_column("event_time")]
123+
),
124+
},
125+
)
126+
127+
assert to_sql_calls(adapter) == [
128+
'CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id_file" INTEGER, "batch_time" TIMESTAMP, "event_time" TIMESTAMP) DISTSTYLE KEY DISTKEY("id_file") SORTKEY("batch_time", "event_time")',
129+
]
130+
131+
132+
def test_create_table_physical_properties_with_string_columns(
133+
make_mocked_engine_adapter: t.Callable,
134+
):
135+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
136+
137+
adapter.create_table(
138+
"test_schema.test_table",
139+
{
140+
"id_file": exp.DataType.build("INT"),
141+
"batch_time": exp.DataType.build("TIMESTAMP"),
142+
},
143+
table_properties={
144+
"diststyle": exp.Literal.string("key"),
145+
"distkey": exp.Literal.string("id_file"),
146+
"sortkey": exp.Literal.string("batch_time"),
147+
},
148+
)
149+
150+
assert to_sql_calls(adapter) == [
151+
'CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id_file" INTEGER, "batch_time" TIMESTAMP) DISTSTYLE KEY DISTKEY("id_file") SORTKEY("batch_time")',
152+
]
153+
154+
155+
def test_create_table_physical_properties_from_model_definition(
156+
make_mocked_engine_adapter: t.Callable,
157+
):
158+
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
159+
model: SqlModel = t.cast(
160+
SqlModel,
161+
load_sql_based_model(
162+
d.parse(
163+
"""
164+
MODEL (
165+
name test_schema.test_table,
166+
kind full,
167+
physical_properties (
168+
diststyle = key,
169+
distkey = "id_file",
170+
sortkey = "batch_time"
171+
)
172+
);
173+
SELECT id_file::INT, batch_time::TIMESTAMP;
174+
"""
175+
)
176+
),
177+
)
178+
179+
adapter.create_table(
180+
model.name,
181+
target_columns_to_types=model.columns_to_types_or_raise,
182+
table_properties=model.physical_properties,
183+
)
184+
185+
assert to_sql_calls(adapter) == [
186+
'CREATE TABLE IF NOT EXISTS "test_schema"."test_table" ("id_file" INTEGER, "batch_time" TIMESTAMP) DISTSTYLE KEY DISTKEY("id_file") SORTKEY("batch_time")',
187+
]
188+
189+
35190
def test_varchar_size_workaround(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
36191
adapter = make_mocked_engine_adapter(RedshiftEngineAdapter)
37192

0 commit comments

Comments
 (0)