-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathv0001_init.py
More file actions
64 lines (51 loc) · 2.07 KB
/
v0001_init.py
File metadata and controls
64 lines (51 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""All migrations should be named _XXXX.py, they will be executed sequentially.
If a migration alters the payload of any pydantic models, you should not actually use them because
the running model may not be able to load them. Make sure that these migration files are standalone.
"""
from sqlglot import exp
from sqlmesh.utils.migration import index_text_type
def migrate_schemas(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
environments_table = "_environments"
versions_table = "_versions"
if schema:
engine_adapter.create_schema(schema)
snapshots_table = f"{schema}.{snapshots_table}"
environments_table = f"{schema}.{environments_table}"
versions_table = f"{schema}.{versions_table}"
index_type = index_text_type(engine_adapter.dialect)
engine_adapter.create_state_table(
snapshots_table,
{
"name": exp.DataType.build(index_type),
"identifier": exp.DataType.build(index_type),
"version": exp.DataType.build(index_type),
"snapshot": exp.DataType.build("text"),
},
primary_key=("name", "identifier"),
)
engine_adapter.create_index(snapshots_table, "name_version_idx", ("name", "version"))
engine_adapter.create_state_table(
environments_table,
{
"name": exp.DataType.build(index_type),
"snapshots": exp.DataType.build("text"),
"start_at": exp.DataType.build("text"),
"end_at": exp.DataType.build("text"),
"plan_id": exp.DataType.build("text"),
"previous_plan_id": exp.DataType.build("text"),
"expiration_ts": exp.DataType.build("bigint"),
},
primary_key=("name",),
)
engine_adapter.create_state_table(
versions_table,
{
"schema_version": exp.DataType.build("int"),
"sqlglot_version": exp.DataType.build("text"),
},
)
def migrate_rows(state_sync, **kwargs): # type: ignore
pass