-
Notifications
You must be signed in to change notification settings - Fork 377
Expand file tree
/
Copy pathtest_postgres.py
More file actions
179 lines (141 loc) · 6.44 KB
/
test_postgres.py
File metadata and controls
179 lines (141 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import typing as t
import pytest
from pytest_mock import MockFixture
from pytest_mock.plugin import MockerFixture
from sqlglot import exp, parse_one
from sqlglot.helper import ensure_list
from sqlmesh.core.engine_adapter import PostgresEngineAdapter
from sqlmesh.utils.errors import SQLMeshError
from tests.core.engine_adapter import to_sql_calls
pytestmark = [pytest.mark.engine, pytest.mark.postgres]
@pytest.mark.parametrize(
"kwargs, expected",
[
(
{
"schema_name": "test_schema",
},
'DROP SCHEMA IF EXISTS "test_schema"',
),
(
{
"schema_name": "test_schema",
"ignore_if_not_exists": False,
},
'DROP SCHEMA "test_schema"',
),
(
{
"schema_name": "test_schema",
"cascade": True,
},
'DROP SCHEMA IF EXISTS "test_schema" CASCADE',
),
(
{
"schema_name": "test_schema",
"cascade": True,
"ignore_if_not_exists": False,
},
'DROP SCHEMA "test_schema" CASCADE',
),
],
)
def test_drop_schema(kwargs, expected, make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
adapter.drop_schema(**kwargs)
assert to_sql_calls(adapter) == ensure_list(expected)
def test_drop_schema_with_catalog(make_mocked_engine_adapter: t.Callable, mocker: MockFixture):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
adapter.get_current_catalog = mocker.MagicMock(return_value="other_catalog")
with pytest.raises(
SQLMeshError, match="requires that all catalog operations be against a single catalog"
):
adapter.drop_schema("test_catalog.test_schema")
def test_comments(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
adapter.create_table(
"test_table",
{"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")},
table_description="\\",
column_descriptions={"a": "\\"},
)
sql_calls = to_sql_calls(adapter)
assert sql_calls == [
'CREATE TABLE IF NOT EXISTS "test_table" ("a" INT, "b" INT)',
"""COMMENT ON TABLE "test_table" IS '\\'""",
"""COMMENT ON COLUMN "test_table"."a" IS '\\'""",
]
def test_create_table_like(make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
adapter.create_table_like("target_table", "source_table")
adapter.cursor.execute.assert_called_once_with(
'CREATE TABLE IF NOT EXISTS "target_table" (LIKE "source_table" INCLUDING ALL)'
)
def test_merge_version_gte_15(make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
adapter.server_version = (15, 0)
adapter.merge(
target_table="target",
source_table=t.cast(exp.Select, parse_one('SELECT "ID", ts, val FROM source')),
columns_to_types={
"ID": exp.DataType.build("int"),
"ts": exp.DataType.build("timestamp"),
"val": exp.DataType.build("int"),
},
unique_key=[exp.to_identifier("ID", quoted=True)],
)
sql_calls = to_sql_calls(adapter)
assert sql_calls == [
"""MERGE INTO "target" AS "__MERGE_TARGET__" USING (SELECT "ID", "ts", "val" FROM "source") AS "__MERGE_SOURCE__" ON "__MERGE_TARGET__"."ID" = "__MERGE_SOURCE__"."ID" WHEN MATCHED THEN UPDATE SET "ID" = "__MERGE_SOURCE__"."ID", "ts" = "__MERGE_SOURCE__"."ts", "val" = "__MERGE_SOURCE__"."val" WHEN NOT MATCHED THEN INSERT ("ID", "ts", "val") VALUES ("__MERGE_SOURCE__"."ID", "__MERGE_SOURCE__"."ts", "__MERGE_SOURCE__"."val")"""
]
def test_merge_version_lt_15(
make_mocked_engine_adapter: t.Callable, make_temp_table_name: t.Callable, mocker: MockerFixture
):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
adapter.server_version = (14, 0)
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table")
table_name = "test"
temp_table_id = "abcdefgh"
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
adapter.merge(
target_table="target",
source_table=t.cast(exp.Select, parse_one('SELECT "ID", ts, val FROM source')),
columns_to_types={
"ID": exp.DataType.build("int"),
"ts": exp.DataType.build("timestamp"),
"val": exp.DataType.build("int"),
},
unique_key=[exp.to_identifier("ID", quoted=True)],
)
sql_calls = to_sql_calls(adapter)
assert sql_calls == [
'CREATE TABLE "__temp_test_abcdefgh" AS SELECT CAST("ID" AS INT) AS "ID", CAST("ts" AS TIMESTAMP) AS "ts", CAST("val" AS INT) AS "val" FROM (SELECT "ID", "ts", "val" FROM "source") AS "_subquery"',
'DELETE FROM "target" WHERE "ID" IN (SELECT "ID" FROM "__temp_test_abcdefgh")',
'INSERT INTO "target" ("ID", "ts", "val") SELECT DISTINCT ON ("ID") "ID", "ts", "val" FROM "__temp_test_abcdefgh"',
'DROP TABLE IF EXISTS "__temp_test_abcdefgh"',
]
def test_alter_table_drop_column_cascade(make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
current_table_name = "test_table"
target_table_name = "target_table"
def table_columns(table_name: str) -> t.Dict[str, exp.DataType]:
if table_name == current_table_name:
return {"id": exp.DataType.build("int"), "test_column": exp.DataType.build("int")}
return {"id": exp.DataType.build("int")}
adapter.columns = table_columns
adapter.alter_table(adapter.get_alter_expressions(current_table_name, target_table_name))
assert to_sql_calls(adapter) == [
'ALTER TABLE "test_table" DROP COLUMN "test_column" CASCADE',
]
def test_server_version(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
adapter = make_mocked_engine_adapter(PostgresEngineAdapter)
fetchone_mock = mocker.patch.object(adapter, "fetchone")
fetchone_mock.return_value = ("14.0",)
assert adapter.server_version == (14, 0)
del adapter.server_version
fetchone_mock.return_value = ("15.8",)
assert adapter.server_version == (15, 8)
del adapter.server_version
fetchone_mock.return_value = ("15.13 (Debian 15.13-1.pgdg120+1)",)
assert adapter.server_version == (15, 13)