Skip to content

Commit 3a10d59

Browse files
authored
Fix: Remove obsolete attributes from the Plan DAGs table (#2229)
1 parent 2c72f44 commit 3a10d59

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Trim irrelevant attributes from the plan DAGs state."""
2+
3+
import json
4+
5+
import pandas as pd
6+
from sqlglot import exp
7+
8+
from sqlmesh.utils.migration import index_text_type
9+
10+
11+
def migrate(state_sync, **kwargs): # type: ignore
12+
engine_adapter = state_sync.engine_adapter
13+
schema = state_sync.schema
14+
plan_dags_table = "_plan_dags"
15+
if schema:
16+
plan_dags_table = f"{schema}.{plan_dags_table}"
17+
18+
new_dag_specs = []
19+
20+
for request_id, dag_id, dag_spec in engine_adapter.fetchall(
21+
exp.select("request_id", "dag_id", "dag_spec").from_(plan_dags_table),
22+
quote_identifiers=True,
23+
):
24+
parsed_dag_spec = json.loads(dag_spec)
25+
for snapshot in parsed_dag_spec.get("new_snapshots", []):
26+
snapshot["node"].pop("hash_raw_query", None)
27+
28+
for indirect_versions in snapshot.get("indirect_versions", {}).values():
29+
for indirect_version in indirect_versions:
30+
# Only keep version and change_category.
31+
version = indirect_version.get("version")
32+
change_category = indirect_version.get("change_category")
33+
indirect_version.clear()
34+
indirect_version["version"] = version
35+
indirect_version["change_category"] = change_category
36+
37+
new_dag_specs.append(
38+
{
39+
"request_id": request_id,
40+
"dag_id": dag_id,
41+
"dag_spec": json.dumps(parsed_dag_spec),
42+
}
43+
)
44+
45+
if new_dag_specs:
46+
engine_adapter.delete_from(plan_dags_table, "TRUE")
47+
48+
index_type = index_text_type(engine_adapter.dialect)
49+
50+
engine_adapter.insert_append(
51+
plan_dags_table,
52+
pd.DataFrame(new_dag_specs),
53+
columns_to_types={
54+
"request_id": exp.DataType.build(index_type),
55+
"dag_id": exp.DataType.build(index_type),
56+
"dag_spec": exp.DataType.build("text"),
57+
},
58+
contains_json=True,
59+
)

0 commit comments

Comments
 (0)