Skip to content

Commit 2ba2730

Browse files
sinelineGuillem G.
authored andcommitted
feat: Implement special DDL generation for Snowflake Iceberg tables with PARTITION BY to correctly handle property ordering and CTAS limitations.
1 parent 8263170 commit 2ba2730

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed

sqlmesh/core/engine_adapter/snowflake.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,135 @@ def _create_table(
207207
elif table_kind == self.MANAGED_TABLE_KIND:
208208
table_kind = f"DYNAMIC {table_format} TABLE"
209209

210+
partitioned_by = kwargs.get("partitioned_by")
211+
212+
# For Iceberg tables with PARTITION BY, we must handle the DDL specially because:
213+
# 1. sqlglot reorders PartitionedByProperty to appear before other properties,
214+
# but Snowflake requires: CATALOG → EXTERNAL_VOLUME → BASE_LOCATION → PARTITION BY → PATH_LAYOUT
215+
# 2. Snowflake's CTAS variant does not support PARTITION BY at all.
216+
#
217+
# Solution: Build a CREATE expression WITHOUT PartitionedByProperty, render it to SQL,
218+
# then inject "PARTITION BY (...)" at the correct position (before PATH_LAYOUT).
219+
# For CTAS, we also split into CREATE + INSERT.
220+
if (
221+
partitioned_by
222+
and table_kind
223+
and "ICEBERG" in table_kind.upper()
224+
and target_columns_to_types
225+
):
226+
table = (
227+
table_name_or_schema
228+
if isinstance(table_name_or_schema, exp.Schema)
229+
else exp.to_table(table_name_or_schema)
230+
)
231+
232+
# Ensure schema with column definitions
233+
if not isinstance(table, exp.Schema):
234+
columns_to_types_all_known = all(
235+
dt.this != exp.DataType.Type.UNKNOWN for dt in target_columns_to_types.values()
236+
)
237+
if columns_to_types_all_known:
238+
table = exp.Schema(
239+
this=table,
240+
expressions=[
241+
exp.ColumnDef(this=exp.to_identifier(col), kind=dtype)
242+
for col, dtype in target_columns_to_types.items()
243+
],
244+
)
245+
246+
# Build properties WITHOUT PartitionedByProperty (sqlglot would reorder it)
247+
properties = self._build_table_properties_exp(
248+
**kwargs,
249+
target_columns_to_types=target_columns_to_types,
250+
table_description=(
251+
table_description
252+
if self.COMMENT_CREATION_TABLE.supports_schema_def and self.comments_enabled
253+
else None
254+
),
255+
table_kind=table_kind,
256+
)
257+
258+
# Create the DDL expression (no AS SELECT, even if we have a CTAS expression)
259+
create_exp = exp.Create(
260+
this=table,
261+
kind=table_kind or "TABLE",
262+
replace=replace,
263+
exists=False if replace else exists,
264+
properties=properties,
265+
)
266+
# Use identify=True to quote all identifiers, matching how SQLMesh
267+
# quotes identifiers in INSERT/DELETE statements. Without this,
268+
# unquoted identifiers get uppercased by Snowflake, while SQLMesh's
269+
# INSERT uses quoted lowercase — pointing to different objects.
270+
ddl_sql = create_exp.sql(dialect=self.dialect, identify=True)
271+
272+
# Build the PARTITION BY clause string
273+
partition_cols = ", ".join(
274+
col.sql(dialect=self.dialect, identify=True) for col in partitioned_by
275+
)
276+
partition_clause = f" PARTITION BY ({partition_cols})"
277+
278+
# Inject PARTITION BY right after the column definitions closing paren.
279+
# Snowflake requires: CREATE ICEBERG TABLE (...cols...) PARTITION BY (...) COMMENT=... CATALOG=... etc.
280+
# We track parenthesis depth to find the end of the column list, handling
281+
# nested types like DECIMAL(38, 0) correctly.
282+
paren_depth = 0
283+
col_end_pos = -1
284+
for i, c in enumerate(ddl_sql):
285+
if c == '(':
286+
paren_depth += 1
287+
elif c == ')':
288+
paren_depth -= 1
289+
if paren_depth == 0:
290+
col_end_pos = i + 1
291+
break
292+
293+
if col_end_pos > 0:
294+
ddl_sql = ddl_sql[:col_end_pos] + partition_clause + ddl_sql[col_end_pos:]
295+
else:
296+
# Fallback: append at end
297+
ddl_sql += partition_clause
298+
299+
# Ensure the schema exists before creating the Iceberg table.
300+
# SQLMesh uses staging schemas (e.g. sqlmesh__MIQ_ICEBERG) that may not
301+
# exist yet when this custom DDL path runs.
302+
# Use quoted identifiers to preserve case, matching the CREATE TABLE DDL.
303+
target_table = (
304+
table_name_or_schema.this
305+
if isinstance(table_name_or_schema, exp.Schema)
306+
else exp.to_table(table_name_or_schema)
307+
)
308+
schema_parts = []
309+
if target_table.catalog:
310+
schema_parts.append(f'"{target_table.catalog}"')
311+
if target_table.db:
312+
schema_parts.append(f'"{target_table.db}"')
313+
if schema_parts:
314+
schema_fqn = ".".join(schema_parts)
315+
self.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_fqn}")
316+
317+
self.execute(ddl_sql)
318+
self._clear_data_object_cache(
319+
table_name_or_schema.this
320+
if isinstance(table_name_or_schema, exp.Schema)
321+
else table_name_or_schema
322+
)
323+
324+
# If we had a CTAS expression, insert the data separately
325+
if expression is not None:
326+
table_name = (
327+
table_name_or_schema.this
328+
if isinstance(table_name_or_schema, exp.Schema)
329+
else table_name_or_schema
330+
)
331+
self._insert_append_query(
332+
table_name,
333+
expression,
334+
target_columns_to_types,
335+
track_rows_processed=False,
336+
)
337+
return
338+
210339
super()._create_table(
211340
table_name_or_schema=table_name_or_schema,
212341
expression=expression,

0 commit comments

Comments
 (0)