Skip to content

Commit 8a5b5f4

Browse files
authored
fix: databricks with_log_level (#3823)
1 parent 285c999 commit 8a5b5f4

File tree

2 files changed

+17
-8
lines changed

2 files changed

+17
-8
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from sqlmesh.core.model.kind import TimeColumn
4242
from sqlmesh.core.schema_diff import SchemaDiffer
4343
from sqlmesh.utils import columns_to_types_all_known, random_id
44-
from sqlmesh.utils.connection_pool import create_connection_pool
44+
from sqlmesh.utils.connection_pool import create_connection_pool, ConnectionPool
4545
from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column
4646
from sqlmesh.utils.errors import (
4747
SQLMeshError,
@@ -79,7 +79,7 @@ class EngineAdapter:
7979
with the underlying engine and data store.
8080
8181
Args:
82-
connection_factory: a callable which produces a new Database API-compliant
82+
connection_factory_or_pool: a callable which produces a new Database API-compliant
8383
connection on every call.
8484
dialect: The dialect with which this adapter is associated.
8585
multithreaded: Indicates whether this adapter will be used by more than one thread.
@@ -109,7 +109,7 @@ class EngineAdapter:
109109

110110
def __init__(
111111
self,
112-
connection_factory: t.Callable[[], t.Any],
112+
connection_factory_or_pool: t.Union[t.Callable[[], t.Any], ConnectionPool],
113113
dialect: str = "",
114114
sql_gen_kwargs: t.Optional[t.Dict[str, Dialect | bool | str]] = None,
115115
multithreaded: bool = False,
@@ -122,8 +122,12 @@ def __init__(
122122
**kwargs: t.Any,
123123
):
124124
self.dialect = dialect.lower() or self.DIALECT
125-
self._connection_pool = create_connection_pool(
126-
connection_factory, multithreaded, cursor_init=cursor_init
125+
self._connection_pool = (
126+
connection_factory_or_pool
127+
if isinstance(connection_factory_or_pool, ConnectionPool)
128+
else create_connection_pool(
129+
connection_factory_or_pool, multithreaded, cursor_init=cursor_init
130+
)
127131
)
128132
self._sql_gen_kwargs = sql_gen_kwargs or {}
129133
self._default_catalog = default_catalog
@@ -135,17 +139,16 @@ def __init__(
135139

136140
def with_log_level(self, level: int) -> EngineAdapter:
137141
adapter = self.__class__(
138-
lambda: None,
142+
self._connection_pool,
139143
dialect=self.dialect,
140144
sql_gen_kwargs=self._sql_gen_kwargs,
141145
default_catalog=self._default_catalog,
142146
execute_log_level=level,
143147
register_comments=self._register_comments,
148+
null_connection=True,
144149
**self._extra_config,
145150
)
146151

147-
adapter._connection_pool = self._connection_pool
148-
149152
return adapter
150153

151154
@property

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ def clone_table(
273273
def wap_supported(self, table_name: TableName) -> bool:
274274
return False
275275

276+
def close(self) -> t.Any:
277+
"""Closes all open connections and releases all allocated resources."""
278+
super().close()
279+
if self._spark_engine_adapter:
280+
self._spark_engine_adapter.close()
281+
276282
@property
277283
def default_catalog(self) -> t.Optional[str]:
278284
try:

0 commit comments

Comments
 (0)