Skip to content

Commit 66c610a

Browse files
authored
Fix: Improve Databricks Catalog setting (#1240)
* improve databricks set catalog * still support non-unity * update comment
1 parent 545baae commit 66c610a

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ ignore_missing_imports = True
8484
[mypy-slack_sdk.*]
8585
ignore_missing_imports = True
8686

87+
[mypy-py4j.*]
88+
ignore_missing_imports = True
89+
8790
[autoflake]
8891
in-place = True
8992
expand-star-imports = True

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,16 @@ def spark(self) -> PySparkSession:
103103
).getOrCreate()
104104
catalog = self._extra_config.get("catalog")
105105
if catalog:
106+
from py4j.protocol import Py4JError
107+
106108
# Note: Spark 3.4+ Only API
107-
self._spark.catalog.setCurrentCatalog(catalog)
109+
try:
110+
self.spark.catalog.setCurrentCatalog(catalog)
111+
# If `setCurrentCatalog` should work for both non-unity and Unity single user
112+
# clusters. If it fails then we try `USE CATALOG` which is Unity only but works
113+
# across all clusters
114+
except Py4JError:
115+
self.spark.sql(f"USE CATALOG {catalog}")
108116
self._spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
109117
return self._spark
110118

sqlmesh/engines/spark/db_api/spark_session.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,14 @@ def cursor(self) -> SparkSessionCursor:
7373
pass
7474
if self.catalog:
7575
# Note: Spark 3.4+ Only API
76-
self.spark.catalog.setCurrentCatalog(self.catalog)
76+
from py4j.protocol import Py4JError
77+
78+
try:
79+
self.spark.catalog.setCurrentCatalog(self.catalog)
80+
# Databricks does not support `setCurrentCatalog` with Unity catalog
81+
# and shared clusters so we use the Databricks Unity only SQL command instead
82+
except Py4JError:
83+
self.spark.sql(f"USE CATALOG {self.catalog}")
7784
self.spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
7885
self.spark.conf.set("hive.exec.dynamic.partition", "true")
7986
self.spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

0 commit comments

Comments
 (0)