Skip to content

Commit 89d5740

Browse files
authored
Fix(fabric): Ensure that Fabric connections do not try to use a catalog once it's been dropped (#5314)
1 parent 45a2db6 commit 89d5740

File tree

5 files changed

+118
-1
lines changed

5 files changed

+118
-1
lines changed

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,19 @@ def _create_catalog(self, catalog_name: exp.Identifier) -> None:
121121
def _drop_catalog(self, catalog_name: exp.Identifier) -> None:
122122
"""Drop a catalog (warehouse) in Microsoft Fabric via REST API."""
123123
warehouse_name = catalog_name.sql(dialect=self.dialect, identify=False)
124+
current_catalog = self.get_current_catalog()
124125

125126
logger.info(f"Deleting Fabric warehouse: {warehouse_name}")
126127
self.api_client.delete_warehouse(warehouse_name)
127128

129+
if warehouse_name == current_catalog:
130+
# Somewhere around 2025-09-08, Fabric started validating the "Database=" connection argument and throwing 'Authentication failed' if the database doesnt exist
131+
# In addition, set_current_catalog() is implemented using a threadlocal variable "target_catalog"
132+
# So, when we drop a warehouse, and there are still threads with "target_catalog" set to reference it, any operations on those threads
133+
# that use an either use an existing connection pointing to this warehouse or trigger a new connection
134+
# will fail with an 'Authentication Failed' error unless we close all connections here, which also clears all the threadlocal data
135+
self.close()
136+
128137
def set_current_catalog(self, catalog_name: str) -> None:
129138
"""
130139
Set the current catalog for Microsoft Fabric connections.

sqlmesh/utils/connection_pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ def close_all(self, exclude_calling_thread: bool = False) -> None:
227227
self._thread_connections.pop(thread_id)
228228
self._thread_cursors.pop(thread_id, None)
229229
self._discard_transaction(thread_id)
230-
self._thread_attributes.pop(thread_id, None)
230+
231+
self._thread_attributes.clear()
231232

232233

233234
class ThreadLocalSharedConnectionPool(_ThreadLocalBase):

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3743,6 +3743,14 @@ def _set_config(gateway: str, config: Config) -> None:
37433743
assert not md.tables
37443744
assert not md.managed_tables
37453745

3746+
if ctx.dialect == "fabric":
3747+
# TestContext is using a different EngineAdapter instance / connection pool instance to the SQLMesh context
3748+
# When the SQLMesh context drops :snapshot_schema using its EngineAdapter, connections in TestContext are unaware
3749+
# and still have their threadlocal "target_catalog" attribute pointing to a catalog that no longer exists
3750+
# Trying to establish a connection to a nonexistant catalog produces an error, so we close all connections here
3751+
# to clear the threadlocal attributes
3752+
ctx.engine_adapter.close()
3753+
37463754
md = ctx.get_metadata_results(snapshot_schema)
37473755
assert not md.views
37483756
assert not md.managed_tables

tests/core/engine_adapter/integration/test_integration_fabric.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import typing as t
2+
import threading
3+
import queue
24
import pytest
35
from pytest import FixtureRequest
46
from sqlmesh.core.engine_adapter import FabricEngineAdapter
7+
from sqlmesh.utils.connection_pool import ThreadLocalConnectionPool
58
from tests.core.engine_adapter.integration import TestContext
9+
from concurrent.futures import ThreadPoolExecutor
610

711
from tests.core.engine_adapter.integration import (
812
TestContext,
@@ -39,3 +43,75 @@ def test_create_drop_catalog(ctx: TestContext, engine_adapter: FabricEngineAdapt
3943
finally:
4044
# if doesnt exist, should be no-op, not error
4145
ctx.drop_catalog(catalog_name)
46+
47+
48+
def test_drop_catalog_clears_threadlocals_that_reference_it(
49+
ctx: TestContext, engine_adapter: FabricEngineAdapter
50+
):
51+
catalog_name = ctx.add_test_suffix("test_drop_catalog")
52+
default_catalog = engine_adapter.get_current_catalog()
53+
54+
assert isinstance(engine_adapter._connection_pool, ThreadLocalConnectionPool)
55+
56+
# sets the connection attribute for this thread
57+
engine_adapter.create_catalog(catalog_name)
58+
assert engine_adapter._target_catalog is None
59+
engine_adapter.set_current_catalog(catalog_name)
60+
assert engine_adapter.get_current_catalog() == catalog_name
61+
assert engine_adapter._target_catalog == catalog_name
62+
63+
lock = threading.RLock()
64+
65+
def _set_and_return_catalog_in_another_thread(
66+
q: queue.Queue, engine_adapter: FabricEngineAdapter
67+
) -> t.Optional[str]:
68+
q.put("thread_started")
69+
70+
assert engine_adapter.get_current_catalog() == default_catalog
71+
assert engine_adapter._target_catalog is None
72+
73+
engine_adapter.set_current_catalog(catalog_name)
74+
assert engine_adapter.get_current_catalog() == catalog_name
75+
assert engine_adapter._target_catalog == catalog_name
76+
77+
q.put("catalog_set_in_thread")
78+
79+
# block this thread while we drop the catalog in the main test thread
80+
lock.acquire()
81+
82+
# the current catalog should have been cleared from the threadlocal connection attributes
83+
# when this catalog was dropped by the outer thread, causing it to fall back to the default catalog
84+
try:
85+
assert engine_adapter._target_catalog is None
86+
return engine_adapter.get_current_catalog()
87+
finally:
88+
lock.release()
89+
90+
q: queue.Queue = queue.Queue()
91+
92+
with ThreadPoolExecutor() as executor:
93+
lock.acquire() # we have the lock, thread will be blocked until we release it
94+
95+
future = executor.submit(_set_and_return_catalog_in_another_thread, q, engine_adapter)
96+
97+
assert q.get() == "thread_started"
98+
assert not future.done()
99+
100+
try:
101+
assert q.get(timeout=20) == "catalog_set_in_thread"
102+
except:
103+
if exec := future.exception():
104+
raise exec
105+
raise
106+
107+
ctx.drop_catalog(catalog_name)
108+
assert not future.done()
109+
110+
lock.release() # yield the lock to the thread
111+
112+
# block until thread complete
113+
result = future.result()
114+
115+
# both threads should be automatically using the default catalog now
116+
assert result == default_catalog
117+
assert engine_adapter.get_current_catalog() == default_catalog

tests/utils/test_connection_pool.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,29 @@ def thread():
210210
assert cursor_mock_thread_two.begin.call_count == 1
211211

212212

213+
def test_thread_local_connection_pool_attributes(mocker: MockerFixture):
214+
pool = ThreadLocalConnectionPool(connection_factory=lambda: mocker.Mock())
215+
216+
pool.set_attribute("foo", "bar")
217+
current_threadid = get_ident()
218+
219+
def _in_thread(pool: ThreadLocalConnectionPool):
220+
assert get_ident() != current_threadid
221+
pool.set_attribute("foo", "baz")
222+
223+
with ThreadPoolExecutor() as executor:
224+
future = executor.submit(_in_thread, pool)
225+
assert not future.exception()
226+
227+
assert pool.get_all_attributes("foo") == ["bar", "baz"]
228+
assert pool.get_attribute("foo") == "bar"
229+
230+
pool.close_all()
231+
232+
assert pool.get_all_attributes("foo") == []
233+
assert pool.get_attribute("foo") is None
234+
235+
213236
def test_thread_local_shared_connection_pool(mocker: MockerFixture):
214237
cursor_mock_thread_one = mocker.Mock()
215238
cursor_mock_thread_two = mocker.Mock()

0 commit comments

Comments
 (0)