Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ def state_sync(self) -> StateSync:
self._state_sync = self._new_state_sync()

if self._state_sync.get_versions(validate=False).schema_version == 0:
self.console.log_status_update("Initializing new project state...")
self._state_sync.migrate(default_catalog=self.default_catalog)
self._state_sync.get_versions()
self._state_sync = CachingStateSync(self._state_sync) # type: ignore
Expand Down
10 changes: 6 additions & 4 deletions sqlmesh/core/state_sync/db/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ def get_environment_statements(self, environment: str) -> t.List[EnvironmentStat
return []

def _environment_from_row(self, row: t.Tuple[str, ...]) -> Environment:
return Environment(**{field: row[i] for i, field in enumerate(Environment.all_fields())})
return Environment(
**{field: row[i] for i, field in enumerate(sorted(Environment.all_fields()))}
)

def _environment_summmary_from_row(self, row: t.Tuple[str, ...]) -> EnvironmentSummary:
return EnvironmentSummary(
**{field: row[i] for i, field in enumerate(EnvironmentSummary.all_fields())}
**{field: row[i] for i, field in enumerate(sorted(EnvironmentSummary.all_fields()))}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sorting needed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all_fields() returns a set

)

def _environments_query(
Expand All @@ -298,7 +300,7 @@ def _environments_query(
lock_for_update: bool = False,
required_fields: t.Optional[t.List[str]] = None,
) -> exp.Select:
query_fields = required_fields if required_fields else Environment.all_fields()
query_fields = required_fields if required_fields else sorted(Environment.all_fields())
query = (
exp.select(*(exp.to_identifier(field) for field in query_fields))
.from_(self.environments_table)
Expand Down Expand Up @@ -328,7 +330,7 @@ def _fetch_environment_summaries(
self.engine_adapter,
self._environments_query(
where=where,
required_fields=list(EnvironmentSummary.all_fields()),
required_fields=sorted(EnvironmentSummary.all_fields()),
),
)
]
Expand Down
7 changes: 6 additions & 1 deletion sqlmesh/core/state_sync/db/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,14 @@ def _apply_migrations(

snapshot_count_before = self.snapshot_state.count() if versions.schema_version else None

state_table_exist = any(self.engine_adapter.table_exists(t) for t in self._state_tables)

for migration in migrations:
logger.info(f"Applying migration {migration}")
migration.migrate(state_sync, default_catalog=default_catalog)
migration.migrate_ddl(state_sync, default_catalog=default_catalog)
if state_table_exist:
# No need to run DML for the initial migration since all tables are empty
migration.migrate_dml(state_sync, default_catalog=default_catalog)

snapshot_count_after = self.snapshot_state.count()

Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0001_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
Expand Down Expand Up @@ -58,3 +58,7 @@ def migrate(state_sync, **kwargs): # type: ignore
"sqlglot_version": exp.DataType.build("text"),
},
)


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0002_remove_identify.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Remove identify=True kwarg for rendering sql"""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0003_move_batch_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from sqlglot import exp


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
snapshots_table = "_snapshots"
if state_sync.schema:
snapshots_table = f"{state_sync.schema}.{snapshots_table}"
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0004_environmnent_add_finalized_at.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlglot import exp


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
Expand All @@ -21,3 +21,7 @@ def migrate(state_sync, **kwargs): # type: ignore
)

engine_adapter.execute(alter_table_exp)


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0005_create_seed_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
seeds_table = "_seeds"
if state_sync.schema:
Expand All @@ -22,3 +22,7 @@ def migrate(state_sync, **kwargs): # type: ignore
},
primary_key=("name", "identifier"),
)


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0006_change_seed_hash.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Seed hashes moved from to_string to to_json for performance."""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0007_env_table_info_to_kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ def _hash(data): # type: ignore
return str(zlib.crc32(";".join("" if d is None else d for d in data).encode("utf-8")))


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0008_create_intervals_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
intervals_table = "_intervals"
if state_sync.schema:
Expand Down Expand Up @@ -36,3 +36,7 @@ def migrate(state_sync, **kwargs): # type: ignore
engine_adapter.create_index(
intervals_table, "name_identifier_idx", ("name", "identifier", "created_ts")
)


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0009_remove_pre_post_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0010_seed_hash_batch_size.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Seed metadata hashes now correctly include the batch_size."""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
16 changes: 13 additions & 3 deletions sqlmesh/migrations/v0011_add_model_kind_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
import pandas as pd

def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
Expand All @@ -30,6 +28,18 @@ def migrate(state_sync, **kwargs): # type: ignore
)
engine_adapter.execute(alter_table_exp)


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
if schema:
snapshots_table = f"{schema}.{snapshots_table}"

index_type = index_text_type(engine_adapter.dialect)

new_snapshots = []

for name, identifier, version, snapshot in engine_adapter.fetchall(
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0012_update_jinja_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0013_serde_using_model_dialects.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0014_fix_dev_intervals.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Fix snapshot intervals that have been erroneously marked as dev."""


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
schema = state_sync.schema
intervals_table = "_intervals"
if schema:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlmesh.utils.migration import blob_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
Expand All @@ -24,3 +24,7 @@ def migrate(state_sync, **kwargs): # type: ignore
)

engine_adapter.execute(alter_table_exp)


def migrate_dml(state_sync, **kwargs): # type: ignore
pass
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0016_fix_windows_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0017_fix_windows_seed_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0018_rename_snapshot_model_to_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
9 changes: 8 additions & 1 deletion sqlmesh/migrations/v0019_add_env_suffix_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlglot import exp


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
Expand All @@ -21,6 +21,13 @@ def migrate(state_sync, **kwargs): # type: ignore
)
engine_adapter.execute(alter_table_exp)


def migrate_dml(state_sync, **kwargs): # type: ignore
engine_adapter = state_sync.engine_adapter
environments_table = "_environments"
if state_sync.schema:
environments_table = f"{state_sync.schema}.{environments_table}"

state_sync.engine_adapter.update_table(
environments_table,
{"suffix_target": "schema"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0021_fix_table_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
6 changes: 5 additions & 1 deletion sqlmesh/migrations/v0022_move_project_to_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from sqlmesh.utils.migration import index_text_type


def migrate(state_sync, **kwargs): # type: ignore
def migrate_ddl(state_sync, **kwargs): # type: ignore
pass


def migrate_dml(state_sync, **kwargs): # type: ignore
import pandas as pd

engine_adapter = state_sync.engine_adapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from sqlmesh.utils.dag import DAG


def migrate(state_sync: t.Any, **kwargs) -> None: # type: ignore
def migrate_ddl(state_sync: t.Any, **kwargs) -> None: # type: ignore
pass


def migrate_dml(state_sync: t.Any, **kwargs) -> None: # type: ignore
engine_adapter = state_sync.engine_adapter
schema = state_sync.schema
snapshots_table = "_snapshots"
Expand Down
Loading
Loading