Skip to content

Commit 4f29141

Browse files
EhabEaseeclaude
andcommitted
feat: support CLUSTER BY AUTO and CLUSTER BY NONE for Databricks liquid clustering
Adds parser, validator, and Databricks adapter support for the keyword forms of liquid clustering. Bare AUTO/NONE (unquoted VAR tokens) are recognised as keywords; backtick-quoted `auto`/`none` and parenthesised forms remain real column references. - Add LIQUID_CLUSTERING_KEYWORDS constant to avoid repeating the sentinel set across dialect, meta, definition, and adapter - Parser (dialect.py): detect VAR-token AUTO/NONE on clustered_by; strip Paren from single-column clustered_by to match partitioned_by normalisation - Validator (meta.py): normalise single string input to list; restore keyword sentinels from JSON strings on deserialisation; skip column-count check for keywords, gated on clustered_by + databricks - validate_definition (definition.py): skip keyword sentinels in the column-existence check, same gate - Adapter (databricks.py): emit CLUSTER BY AUTO / CLUSTER BY NONE without a tuple wrapper; raise ValueError on unexpected bare Var - Tests: parser round-trips, Python API (exp.Var and plain string), backtick-quoted columns, render_definition, JSON round-trip, non-Databricks rejection, mixed-list behaviour, adapter SQL emission Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: EhabEasee <ehab.elbadrawi@easee.com>
1 parent b44fdf6 commit 4f29141

8 files changed

Lines changed: 470 additions & 10 deletions

File tree

sqlmesh/core/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,5 @@
9696
HYBRID = "hybrid"
9797

9898
DISABLE_SQLMESH_STATE_MIGRATION = "SQLMESH__AIRFLOW__DISABLE_STATE_MIGRATION"
99+
100+
LIQUID_CLUSTERING_KEYWORDS: frozenset = frozenset({"AUTO", "NONE"})

sqlmesh/core/dialect.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from sqlglot.schema import MappingSchema
2525
from sqlglot.tokens import Token
2626

27-
from sqlmesh.core.constants import MAX_MODEL_DEFINITION_SIZE
27+
from sqlmesh.core.constants import LIQUID_CLUSTERING_KEYWORDS, MAX_MODEL_DEFINITION_SIZE
2828
from sqlmesh.utils import get_source_columns_to_types
2929
from sqlmesh.utils.errors import SQLMeshError, ConfigError
3030
from sqlmesh.utils.pandas import columns_to_types_from_df
@@ -663,6 +663,27 @@ def parse(self: Parser) -> t.Optional[exp.Expr]:
663663
value = exp.tuple_(*partitioned_by.this.expressions)
664664
else:
665665
value = partitioned_by.this
666+
elif key == "clustered_by":
667+
# Bare AUTO / NONE are Databricks liquid clustering keywords, not column refs.
668+
# Detect keywords by token type: unquoted bare identifiers arrive as VAR tokens.
669+
# Backtick-quoted identifiers (e.g. `auto`) have IDENTIFIER token type and are
670+
# treated as real column names.
671+
if (
672+
self._curr is not None
673+
and self._curr.token_type == TokenType.VAR
674+
and self._curr.text.upper() in LIQUID_CLUSTERING_KEYWORDS
675+
):
676+
value = exp.Var(this=self._curr.text.upper())
677+
self._advance()
678+
else:
679+
parsed = self._parse_bracket(self._parse_field(any_token=True))
680+
# Unwrap Paren wrapping a bare column to match partitioned_by normalisation:
681+
# clustered_by (a) → stored as Column(a), not Paren(Column(a)).
682+
# Preserve parens around function expressions: (TO_DATE(col)) stays as-is.
683+
if isinstance(parsed, exp.Paren) and isinstance(parsed.this, exp.Column):
684+
value = parsed.unnest()
685+
else:
686+
value = parsed
666687
else:
667688
value = self._parse_bracket(self._parse_field(any_token=True))
668689

@@ -1096,8 +1117,9 @@ def extend_sqlglot() -> None:
10961117
DColonCast: lambda self, e: f"{self.sql(e, 'this')}::{self.sql(e, 'to')}",
10971118
Jinja: lambda self, e: e.name,
10981119
JinjaQuery: lambda self, e: f"{JINJA_QUERY_BEGIN};\n{e.name}\n{JINJA_END};",
1099-
JinjaStatement: lambda self,
1100-
e: f"{JINJA_STATEMENT_BEGIN};\n{e.name}\n{JINJA_END};",
1120+
JinjaStatement: lambda self, e: (
1121+
f"{JINJA_STATEMENT_BEGIN};\n{e.name}\n{JINJA_END};"
1122+
),
11011123
VirtualUpdateStatement: lambda self, e: _on_virtual_update_sql(self, e),
11021124
MacroDef: lambda self, e: f"@DEF({self.sql(e.this)}, {self.sql(e.expression)})",
11031125
MacroFunc: _macro_func_sql,

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from sqlglot import exp
88

9+
from sqlmesh.core.constants import LIQUID_CLUSTERING_KEYWORDS
910
from sqlmesh.core.dialect import to_schema
1011
from sqlmesh.core.engine_adapter.mixins import GrantsFromInfoSchemaMixin
1112
from sqlmesh.core.engine_adapter.shared import (
@@ -386,10 +387,16 @@ def _build_table_properties_exp(
386387
table_kind=table_kind,
387388
)
388389
if clustered_by:
389-
# Databricks expects wrapped CLUSTER BY expressions
390-
clustered_by_exp = exp.Cluster(
391-
expressions=[exp.Tuple(expressions=[c.copy() for c in clustered_by])]
392-
)
390+
if len(clustered_by) == 1 and isinstance(clustered_by[0], exp.Var):
391+
if clustered_by[0].name.upper() not in LIQUID_CLUSTERING_KEYWORDS:
392+
raise ValueError(f"Unexpected bare Var in clustered_by: {clustered_by[0]!r}")
393+
# exp.Cluster with a bare Var generates: CLUSTER BY AUTO (no parens)
394+
clustered_by_exp = exp.Cluster(expressions=[clustered_by[0].copy()])
395+
else:
396+
# Databricks expects column expressions wrapped in a tuple
397+
clustered_by_exp = exp.Cluster(
398+
expressions=[exp.Tuple(expressions=[c.copy() for c in clustered_by])]
399+
)
393400
expressions = properties.expressions if properties else []
394401
expressions.append(clustered_by_exp)
395402
properties = exp.Properties(expressions=expressions)

sqlmesh/core/model/definition.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,12 @@ def validate_definition(self) -> None:
992992
values = [
993993
col.name
994994
for expr in values
995+
if not (
996+
field == "clustered_by"
997+
and (self.dialect or "").lower() == "databricks"
998+
and isinstance(expr, exp.Var)
999+
and expr.name.upper() in c.LIQUID_CLUSTERING_KEYWORDS
1000+
)
9951001
for col in t.cast(
9961002
exp.Expr, exp.maybe_parse(expr, dialect=self.dialect)
9971003
).find_all(exp.Column)

sqlmesh/core/model/meta.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from sqlmesh.core import dialect as d
1414
from sqlmesh.core.config.common import VirtualEnvironmentMode
15+
from sqlmesh.core.constants import LIQUID_CLUSTERING_KEYWORDS
1516
from sqlmesh.core.config.linter import LinterConfig
1617
from sqlmesh.core.dialect import normalize_model_name
1718
from sqlmesh.utils import classproperty
@@ -190,10 +191,13 @@ def _gateway_validator(cls, v: t.Any) -> t.Optional[str]:
190191

191192
@field_validator("partitioned_by_", "clustered_by", mode="before")
192193
def _partition_and_cluster_validator(cls, v: t.Any, info: ValidationInfo) -> t.List[exp.Expr]:
194+
field = info.field_name or ""
195+
dialect = (get_dialect(info) or "").lower()
196+
193197
if (
194198
isinstance(v, list)
195199
and all(isinstance(i, str) for i in v)
196-
and (info.field_name or "") == "partitioned_by_"
200+
and field == "partitioned_by_"
197201
):
198202
# this branch gets hit when we are deserializing from json because `partitioned_by` is stored as a List[str]
199203
# however, we should only invoke this if the list contains strings because this validator is also
@@ -206,9 +210,33 @@ def _partition_and_cluster_validator(cls, v: t.Any, info: ValidationInfo) -> t.L
206210
)
207211
v = parsed.this.expressions if isinstance(parsed.this, exp.Schema) else v
208212

213+
if isinstance(v, str) and field == "clustered_by":
214+
v = [v]
215+
216+
if isinstance(v, list) and field == "clustered_by" and dialect == "databricks":
217+
# When deserializing from JSON, clustered_by is stored as List[str].
218+
# Restore keyword sentinels (AUTO/NONE) before list_of_fields_validator normalises
219+
# them into quoted columns.
220+
v = [
221+
exp.Var(this=item.upper())
222+
if isinstance(item, str) and item.upper() in LIQUID_CLUSTERING_KEYWORDS
223+
else item
224+
for item in v
225+
]
226+
209227
expressions = list_of_fields_validator(v, validation_data(info))
210228

211229
for expression in expressions:
230+
# AUTO and NONE are Databricks liquid clustering keywords, not column references.
231+
# Only skip for clustered_by with the Databricks dialect — meaningless elsewhere.
232+
if (
233+
field == "clustered_by"
234+
and dialect == "databricks"
235+
and isinstance(expression, exp.Var)
236+
and expression.name.upper() in LIQUID_CLUSTERING_KEYWORDS
237+
):
238+
continue
239+
212240
num_cols = len(list(expression.find_all(exp.Column)))
213241

214242
error_msg: t.Optional[str] = None

tests/core/engine_adapter/test_databricks.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,31 @@ def test_create_table_clustered_by(mocker: MockFixture, make_mocked_engine_adapt
458458
]
459459

460460

461+
@pytest.mark.parametrize("keyword", ["AUTO", "NONE"])
462+
def test_create_table_clustered_by_keyword(
463+
keyword: str, mocker: MockFixture, make_mocked_engine_adapter: t.Callable
464+
):
465+
mocker.patch(
466+
"sqlmesh.core.engine_adapter.databricks.DatabricksEngineAdapter.set_current_catalog"
467+
)
468+
adapter = make_mocked_engine_adapter(DatabricksEngineAdapter, default_catalog="test_catalog")
469+
470+
columns_to_types = {
471+
"cola": exp.DataType.build("INT"),
472+
"colb": exp.DataType.build("TEXT"),
473+
}
474+
adapter.create_table(
475+
"test_table",
476+
columns_to_types,
477+
clustered_by=[exp.Var(this=keyword)],
478+
)
479+
480+
sql_calls = to_sql_calls(adapter)
481+
assert sql_calls == [
482+
f"CREATE TABLE IF NOT EXISTS `test_table` (`cola` INT, `colb` STRING) CLUSTER BY {keyword}",
483+
]
484+
485+
461486
def test_get_data_objects_distinguishes_view_types(mocker):
462487
adapter = DatabricksEngineAdapter(lambda: None, default_catalog="test_catalog")
463488

tests/core/test_dialect.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,149 @@ def test_sqlglot_extended_correctly(dialect: str) -> None:
760760
assert ast.sql(dialect=dialect) == "MODEL (\nname foo\n)"
761761

762762

763+
def test_format_model_expressions_clustered_by():
764+
# Unquoted AUTO / NONE → formatted without backticks or parens
765+
for keyword in ("AUTO", "NONE"):
766+
assert format_model_expressions(
767+
parse(
768+
f"""
769+
MODEL (
770+
name db.test,
771+
kind FULL,
772+
dialect databricks,
773+
clustered_by {keyword}
774+
);
775+
SELECT 1 AS a
776+
"""
777+
)
778+
) == (
779+
f"MODEL (\n"
780+
f" name db.test,\n"
781+
f" kind FULL,\n"
782+
f" dialect databricks,\n"
783+
f" clustered_by {keyword}\n"
784+
f");\n\nSELECT\n 1 AS a"
785+
)
786+
787+
# Backtick-quoted `auto` / `none` → treated as a column, rendered quoted
788+
for name in ("auto", "none"):
789+
assert format_model_expressions(
790+
parse(
791+
f"""
792+
MODEL (
793+
name db.test,
794+
kind FULL,
795+
dialect databricks,
796+
clustered_by `{name}`
797+
);
798+
SELECT 1 AS `{name}`
799+
"""
800+
)
801+
) == (
802+
f"MODEL (\n"
803+
f" name db.test,\n"
804+
f" kind FULL,\n"
805+
f" dialect databricks,\n"
806+
f' clustered_by "{name}"\n'
807+
f');\n\nSELECT\n 1 AS "{name}"'
808+
)
809+
810+
# Parens-wrapped (auto) → treated as a column, parens stripped for single column
811+
# (same normalisation as partitioned_by (a) → a); quoting happens at model-load time
812+
assert format_model_expressions(
813+
parse(
814+
"""
815+
MODEL (
816+
name db.test,
817+
kind FULL,
818+
dialect databricks,
819+
clustered_by (auto)
820+
);
821+
SELECT 1 AS auto
822+
"""
823+
)
824+
) == (
825+
"MODEL (\n"
826+
" name db.test,\n"
827+
" kind FULL,\n"
828+
" dialect databricks,\n"
829+
" clustered_by auto\n"
830+
");\n\nSELECT\n 1 AS auto"
831+
)
832+
833+
# Multi-column → parens preserved, identifiers as-written
834+
# (quoting happens when the model is loaded, not at format time)
835+
assert format_model_expressions(
836+
parse(
837+
"""
838+
MODEL (
839+
name db.test,
840+
kind FULL,
841+
dialect databricks,
842+
clustered_by (a, b)
843+
);
844+
SELECT 1 AS a, 2 AS b
845+
"""
846+
)
847+
) == (
848+
"MODEL (\n"
849+
" name db.test,\n"
850+
" kind FULL,\n"
851+
" dialect databricks,\n"
852+
" clustered_by (a, b)\n"
853+
");\n\nSELECT\n 1 AS a,\n 2 AS b"
854+
)
855+
856+
857+
@pytest.mark.parametrize("keyword", ["AUTO", "NONE"])
858+
def test_format_model_expressions_clustered_by_non_databricks(keyword: str):
859+
"""AUTO/NONE without dialect or with a non-Databricks dialect is parsed as a bare identifier."""
860+
# Without dialect — AUTO/NONE parsed as a plain column name (no special keyword handling)
861+
assert format_model_expressions(
862+
parse(
863+
f"""
864+
MODEL (
865+
name db.test,
866+
kind FULL,
867+
clustered_by {keyword}
868+
);
869+
SELECT 1 AS {keyword.lower()}
870+
"""
871+
)
872+
) == (
873+
f"MODEL (\n"
874+
f" name db.test,\n"
875+
f" kind FULL,\n"
876+
f" clustered_by {keyword}\n"
877+
f");\n\nSELECT\n 1 AS {keyword.lower()}"
878+
)
879+
880+
881+
@pytest.mark.parametrize("keyword", ["AUTO", "NONE"])
882+
def test_format_model_expressions_clustered_by_mixed_list(keyword: str):
883+
"""AUTO/NONE inside a parenthesised list is treated as a regular column name."""
884+
assert format_model_expressions(
885+
parse(
886+
f"""
887+
MODEL (
888+
name db.test,
889+
kind FULL,
890+
dialect databricks,
891+
clustered_by (a, {keyword})
892+
);
893+
SELECT 1 AS a, 2 AS {keyword.lower()}
894+
"""
895+
)
896+
) == (
897+
f"MODEL (\n"
898+
f" name db.test,\n"
899+
f" kind FULL,\n"
900+
f" dialect databricks,\n"
901+
f" clustered_by (a, {keyword})\n"
902+
f");\n\nSELECT\n 1 AS a,\n 2 AS {keyword.lower()}"
903+
)
904+
905+
763906
def test_connected_identifier():
764907
ast = d.parse_one("""SELECT ("x"at time zone 'utc')::timestamp as x""", "redshift")
765908
assert ast.sql("redshift") == """SELECT CAST(("x" AT TIME ZONE 'utc') AS TIMESTAMP) AS x"""

0 commit comments

Comments
 (0)