Skip to content

Commit 150f41f

Browse files
authored
Merge branch 'main' into feature/DRM/revert-to-shared-connection-for-databricks-access-token-connections
2 parents 27a52ad + 192fbe9 commit 150f41f

9 files changed

Lines changed: 452 additions & 4 deletions

File tree

docs/concepts/models/overview.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ This table lists each engine's support for `TABLE` and `VIEW` object comments:
184184
| DuckDB <=0.9 | N | N |
185185
| DuckDB >=0.10 | Y | Y |
186186
| MySQL | Y | Y |
187-
| MSSQL | N | N |
187+
| MSSQL | Y | Y |
188188
| Postgres | Y | Y |
189189
| GCP Postgres | Y | Y |
190190
| Redshift | Y | N |

docs/integrations/engines/databricks.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,28 @@ The only relevant SQLMesh configuration parameter is the optional `catalog` para
271271
| `disable_databricks_connect` | When running locally, disable the use of Databricks Connect for all model operations (so use SQL Connector for all models) | bool | N |
272272
| `disable_spark_session` | Do not use SparkSession if it is available (like when running in a notebook). | bool | N |
273273

274+
### Query tags
275+
276+
Databricks SQL Connector supports per-query tags through the `query_tags` model session property. Specify tags as a `MAP(...)` of string keys to string or `NULL` values:
277+
278+
```sql
279+
MODEL (
280+
name sqlmesh_example.tagged_model,
281+
dialect databricks,
282+
session_properties (
283+
query_tags = MAP(
284+
'team', 'data-eng',
285+
'app', 'sqlmesh',
286+
'feature', NULL
287+
)
288+
)
289+
);
290+
291+
SELECT 1 AS id;
292+
```
293+
294+
Query tags are only applied when SQLMesh executes SQL through the Databricks SQL Connector. They are not applied when SQLMesh routes execution through Databricks Connect, a Databricks notebook SparkSession, or the Spark engine adapter.
295+
274296
## Model table properties to support altering tables
275297

276298
If you are making a change to the structure of a table that is [forward only](../../guides/incremental_time.md#forward-only-models), then you may need to add the following to your model's `physical_properties`:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ bigquery = [
5151
# pinned an older SQLGlot which is incompatible with SQLMesh
5252
bigframes = ["bigframes>=1.32.0"]
5353
clickhouse = ["clickhouse-connect"]
54-
databricks = ["databricks-sql-connector[pyarrow]"]
54+
databricks = ["databricks-sql-connector[pyarrow]>=4.2.6"]
5555
dev = [
5656
"agate",
5757
"beautifulsoup4",

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,43 @@
3030
logger = logging.getLogger(__name__)
3131

3232

33+
def _query_tags(
34+
query_tags: t.Optional[t.Union[exp.Expr, str, int, float, bool]],
35+
) -> t.Optional[t.Dict[str, t.Optional[str]]]:
36+
if not query_tags:
37+
return None
38+
39+
if not isinstance(query_tags, (exp.Map, exp.VarMap)):
40+
raise SQLMeshError("Invalid value for `session_properties.query_tags`. Must be a map.")
41+
42+
keys = query_tags.args.get("keys")
43+
values = query_tags.args.get("values")
44+
if not isinstance(keys, exp.Array) or not isinstance(values, exp.Array):
45+
raise SQLMeshError(
46+
"Invalid value for `session_properties.query_tags`. Must be a map with array "
47+
"keys and array values."
48+
)
49+
50+
tags: t.Dict[str, t.Optional[str]] = {}
51+
for key, value in zip(keys.expressions, values.expressions):
52+
if not isinstance(key, exp.Literal) or not key.is_string:
53+
raise SQLMeshError(
54+
"Invalid key in `session_properties.query_tags`. Keys must be string literals."
55+
)
56+
57+
if isinstance(value, exp.Null):
58+
tags[key.this] = None
59+
elif isinstance(value, exp.Literal) and value.is_string:
60+
tags[key.this] = value.this
61+
else:
62+
raise SQLMeshError(
63+
"Invalid value in `session_properties.query_tags`. Values must be string "
64+
"literals or NULL."
65+
)
66+
67+
return tags
68+
69+
3370
class DatabricksEngineAdapter(SparkEngineAdapter, GrantsFromInfoSchemaMixin):
3471
DIALECT = "databricks"
3572
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.REPLACE_WHERE
@@ -98,6 +135,12 @@ def _use_spark_session(self) -> bool:
98135
def is_spark_session_connection(self) -> bool:
99136
return isinstance(self.connection, SparkSessionConnection)
100137

138+
@property
139+
def _is_databricks_sql_connector_connection(self) -> bool:
140+
return not self.is_spark_session_connection and not self._connection_pool.get_attribute(
141+
"use_spark_engine_adapter"
142+
)
143+
101144
def _set_spark_engine_adapter_if_needed(self) -> None:
102145
self._spark_engine_adapter = None
103146

@@ -181,10 +224,23 @@ def _begin_session(self, properties: SessionProperties) -> t.Any:
181224
"""Begin a new session."""
182225
# Align the different possible connectors to a single catalog
183226
self.set_current_catalog(self.default_catalog) # type: ignore
227+
self._connection_pool.set_attribute("query_tags", _query_tags(properties.get("query_tags")))
184228

185229
def _end_session(self) -> None:
230+
self._connection_pool.set_attribute("query_tags", None)
186231
self._connection_pool.set_attribute("use_spark_engine_adapter", False)
187232

233+
def _execute(self, sql: str, track_rows_processed: bool = False, **kwargs: t.Any) -> None:
234+
query_tags = self._connection_pool.get_attribute("query_tags")
235+
if (
236+
query_tags
237+
and "query_tags" not in kwargs
238+
and self._is_databricks_sql_connector_connection
239+
):
240+
kwargs["query_tags"] = query_tags
241+
242+
return super()._execute(sql, track_rows_processed, **kwargs)
243+
188244
def _df_to_source_queries(
189245
self,
190246
df: DF,

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from textwrap import dedent
56
import typing as t
67
import logging
78

@@ -53,8 +54,8 @@ class MSSQLEngineAdapter(
5354
SUPPORTS_TUPLE_IN = False
5455
SUPPORTS_MATERIALIZED_VIEWS = False
5556
CURRENT_CATALOG_EXPRESSION = exp.func("db_name")
56-
COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED
57-
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
57+
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
58+
COMMENT_CREATION_VIEW = CommentCreationView.COMMENT_COMMAND_ONLY
5859
SUPPORTS_REPLACE_TABLE = False
5960
MAX_IDENTIFIER_LENGTH = 128
6061
SUPPORTS_QUERY_EXECUTION_TRACKING = True
@@ -457,3 +458,83 @@ def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expr]) -> N
457458
)
458459

459460
return super().delete_from(table_name, where)
461+
462+
def _build_create_comment_table_exp(
463+
self, table: exp.Table, table_comment: str, table_kind: str = "TABLE"
464+
) -> exp.Comment | str:
465+
template = dedent("""
466+
DECLARE @comment sql_variant = {comment};
467+
DECLARE @property_name VARCHAR(128) = 'MS_Description';
468+
DECLARE @schema_name VARCHAR(128) = {schema_name};
469+
DECLARE @object_name VARCHAR(128) = {object_name};
470+
DECLARE @object_kind VARCHAR(128) = '{object_kind}';
471+
DECLARE @existing sql_variant;
472+
473+
SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, DEFAULT, DEFAULT);
474+
475+
IF @comment IS NULL
476+
BEGIN
477+
IF @existing IS NOT NULL
478+
EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name;
479+
END
480+
ELSE
481+
BEGIN
482+
IF @existing IS NULL
483+
EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name;
484+
ELSE IF @existing != @comment
485+
EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name;
486+
END
487+
""")
488+
tsql_text = template.format(
489+
comment=exp.Literal.string(table_comment or "NULL").sql(
490+
dialect=self.dialect, identify=False
491+
),
492+
schema_name=exp.Literal.string(table.db or "dbo").sql(
493+
dialect=self.dialect, identify=False
494+
),
495+
object_name=exp.Literal.string(table.name).sql(dialect=self.dialect, identify=False),
496+
object_kind=table_kind,
497+
)
498+
return tsql_text
499+
500+
def _build_create_comment_column_exp(
501+
self, table: exp.Table, column_name: str, column_comment: str, table_kind: str = "TABLE"
502+
) -> exp.Comment | str:
503+
template = dedent("""
504+
DECLARE @comment sql_variant = {comment};
505+
DECLARE @property_name VARCHAR(128) = 'MS_Description';
506+
DECLARE @schema_name VARCHAR(128) = {schema_name};
507+
DECLARE @object_name VARCHAR(128) = {object_name};
508+
DECLARE @object_kind VARCHAR(128) = '{object_kind}';
509+
DECLARE @column_name VARCHAR(128) = {column_name};
510+
DECLARE @existing sql_variant;
511+
512+
SELECT TOP 1 @existing = CAST(VALUE AS NVARCHAR) FROM fn_listextendedproperty(@property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name);
513+
514+
IF @comment IS NULL
515+
BEGIN
516+
IF @existing IS NOT NULL
517+
EXEC sp_dropextendedproperty @property_name, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name;
518+
END
519+
ELSE
520+
BEGIN
521+
IF @existing IS NULL
522+
EXEC sp_addextendedproperty @property_name,@comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name;
523+
ELSE IF @existing != @comment
524+
EXEC sp_updateextendedproperty @property_name, @comment, 'schema', @schema_name, @object_kind, @object_name, 'column', @column_name;
525+
END
526+
""")
527+
528+
tsql_text = template.format(
529+
comment=exp.Literal.string(column_comment or "NULL").sql(
530+
dialect=self.dialect, identify=False
531+
),
532+
schema_name=exp.Literal.string(table.db or "dbo").sql(
533+
dialect=self.dialect, identify=False
534+
),
535+
object_name=exp.Literal.string(table.name).sql(dialect=self.dialect, identify=False),
536+
object_kind=table_kind,
537+
column_name=exp.Literal.string(column_name).sql(dialect=self.dialect, identify=False),
538+
)
539+
540+
return tsql_text

sqlmesh/core/model/meta.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,38 @@ def session_properties_validator(cls, v: t.Any, info: ValidationInfo) -> t.Any:
396396
raise ConfigError(
397397
"Invalid value for `session_properties.authorization`. Must be a string literal."
398398
)
399+
elif prop_name == "query_tags":
400+
query_tags = eq.right
401+
if isinstance(query_tags, (d.MacroFunc, d.MacroVar)):
402+
continue
403+
404+
if not isinstance(query_tags, (exp.Map, exp.VarMap)):
405+
raise ConfigError(
406+
"Invalid value for `session_properties.query_tags`. Must be a map."
407+
)
408+
409+
keys = query_tags.args.get("keys")
410+
values = query_tags.args.get("values")
411+
if not isinstance(keys, exp.Array) or not isinstance(values, exp.Array):
412+
raise ConfigError(
413+
"Invalid value for `session_properties.query_tags`. Must be a map with array "
414+
"keys and array values."
415+
)
416+
417+
for key, value in zip(keys.expressions, values.expressions):
418+
if not isinstance(key, exp.Literal) or not key.is_string:
419+
raise ConfigError(
420+
"Invalid key in `session_properties.query_tags`. Keys must be string literals."
421+
)
422+
423+
if not (
424+
isinstance(value, exp.Null)
425+
or (isinstance(value, exp.Literal) and value.is_string)
426+
):
427+
raise ConfigError(
428+
"Invalid value in `session_properties.query_tags`. Values must be string "
429+
"literals or NULL."
430+
)
399431

400432
return parsed_session_properties
401433

tests/core/engine_adapter/integration/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,14 @@ def get_table_comment(
526526
AND c.relkind = '{"v" if table_kind == "VIEW" else "r"}'
527527
;
528528
"""
529+
elif self.dialect == "tsql":
530+
kind = "table" if table_kind == "BASE TABLE" else "view"
531+
query = f"""
532+
SELECT
533+
ep.name,
534+
CAST(ep.value AS NVARCHAR(MAX)) comment
535+
FROM fn_listextendedproperty('MS_Description', 'schema', '{schema_name}', '{kind}', '{table_name}', DEFAULT, DEFAULT) ep
536+
"""
529537

530538
result = self.engine_adapter.fetchall(query)
531539

@@ -636,6 +644,16 @@ def get_column_comments(
636644
AND c.relkind = '{"v" if table_kind == "VIEW" else "r"}'
637645
;
638646
"""
647+
elif self.dialect == "tsql":
648+
kind = "table" if table_kind == "BASE TABLE" else "view"
649+
query = f"""
650+
SELECT
651+
col.COLUMN_NAME column_name,
652+
CAST(ep.value AS NVARCHAR(MAX)) comment
653+
FROM INFORMATION_SCHEMA.COLUMNS col
654+
CROSS APPLY fn_listextendedproperty('MS_Description', 'schema', col.TABLE_SCHEMA, '{kind}', col.TABLE_NAME, 'column', col.COLUMN_NAME) ep
655+
WHERE col.TABLE_SCHEMA = '{schema_name}' AND col.TABLE_NAME = '{table_name}'
656+
"""
639657

640658
result = self.engine_adapter.fetchall(query)
641659

0 commit comments

Comments
 (0)