Skip to content

Commit c7f5771

Browse files
committed
address comments
1 parent ea82df0 commit c7f5771

File tree

7 files changed

+69
-12
lines changed

7 files changed

+69
-12
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,8 +1470,14 @@ def columns(
14701470
}
14711471

14721472
def table_exists(self, table_name: TableName) -> bool:
1473+
table = exp.to_table(table_name)
1474+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
1475+
if data_object_cache_key in self._data_object_cache:
1476+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
1477+
return self._data_object_cache[data_object_cache_key] is not None
1478+
14731479
try:
1474-
self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
1480+
self.execute(exp.Describe(this=table, kind="TABLE"))
14751481
return True
14761482
except Exception:
14771483
return False
@@ -2301,11 +2307,13 @@ def get_data_objects(
23012307
target_schema.catalog, target_schema.db, name
23022308
)
23032309
if cache_key in self._data_object_cache:
2310+
logger.debug("Data object cache hit: %s", cache_key)
23042311
data_object = self._data_object_cache[cache_key]
23052312
# If the object is none, then the table was previously looked for but not found
23062313
if data_object:
23072314
cached_objects.append(data_object)
23082315
else:
2316+
logger.debug("Data object cache miss: %s", cache_key)
23092317
missing_names.add(name)
23102318

23112319
# Fetch missing objects from database
@@ -2321,7 +2329,6 @@ def get_data_objects(
23212329
for obj in self._get_data_objects(schema_name, set(batch))
23222330
]
23232331

2324-
# Cache the fetched objects
23252332
for obj in fetched_objects:
23262333
cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name)
23272334
self._data_object_cache[cache_key] = obj
@@ -2336,7 +2343,12 @@ def get_data_objects(
23362343
return cached_objects + fetched_objects
23372344

23382345
return cached_objects
2339-
return self._get_data_objects(schema_name)
2346+
2347+
fetched_objects = self._get_data_objects(schema_name)
2348+
for obj in fetched_objects:
2349+
cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name)
2350+
self._data_object_cache[cache_key] = obj
2351+
return fetched_objects
23402352

23412353
def fetchone(
23422354
self,
@@ -2746,10 +2758,12 @@ def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.An
27462758
def _clear_data_object_cache(self, table_name: t.Optional[TableName] = None) -> None:
27472759
"""Clears the cache entry for the given table name, or clears the entire cache if table_name is None."""
27482760
if table_name is None:
2761+
logger.debug("Clearing entire data object cache")
27492762
self._data_object_cache.clear()
27502763
else:
27512764
table = exp.to_table(table_name)
27522765
cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
2766+
logger.debug("Clearing data object cache key: %s", cache_key)
27532767
self._data_object_cache.pop(cache_key, None)
27542768

27552769
def _get_data_objects(
@@ -3003,7 +3017,5 @@ def _decoded_str(value: t.Union[str, bytes]) -> str:
30033017

30043018
def _get_data_object_cache_key(catalog: t.Optional[str], schema_name: str, object_name: str) -> str:
30053019
"""Returns a cache key for a data object based on its fully qualified name."""
3006-
catalog_part = catalog.lower() if catalog else ""
3007-
schema_part = schema_name.lower()
3008-
object_part = object_name.lower()
3009-
return f"{catalog_part}.{schema_part}.{object_part}"
3020+
catalog = catalog or ""
3021+
return f"{catalog}.{schema_name}.{object_name}"

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from __future__ import annotations
22

33
import typing as t
4+
import logging
45

56
from sqlglot import exp
67

78
from sqlmesh.core.dialect import to_schema
8-
from sqlmesh.core.engine_adapter import EngineAdapter
9+
from sqlmesh.core.engine_adapter.base import EngineAdapter, _get_data_object_cache_key
910
from sqlmesh.core.engine_adapter.shared import (
1011
CatalogSupport,
1112
CommentCreationTable,
@@ -20,6 +21,9 @@
2021
from sqlmesh.core.engine_adapter._typing import QueryOrDF
2122

2223

24+
logger = logging.getLogger(__name__)
25+
26+
2327
class BasePostgresEngineAdapter(EngineAdapter):
2428
DEFAULT_BATCH_SIZE = 400
2529
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
@@ -75,6 +79,10 @@ def table_exists(self, table_name: TableName) -> bool:
7579
Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553
7680
"""
7781
table = exp.to_table(table_name)
82+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
83+
if data_object_cache_key in self._data_object_cache:
84+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
85+
return self._data_object_cache[data_object_cache_key] is not None
7886

7987
sql = (
8088
exp.select("1")

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sqlglot.transforms import remove_precision_parameterized_types
99

1010
from sqlmesh.core.dialect import to_schema
11+
from sqlmesh.core.engine_adapter.base import _get_data_object_cache_key
1112
from sqlmesh.core.engine_adapter.mixins import (
1213
ClusteredByMixin,
1314
RowDiffMixin,
@@ -744,6 +745,12 @@ def insert_overwrite_by_partition(
744745
)
745746

746747
def table_exists(self, table_name: TableName) -> bool:
748+
table = exp.to_table(table_name)
749+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
750+
if data_object_cache_key in self._data_object_cache:
751+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
752+
return self._data_object_cache[data_object_cache_key] is not None
753+
747754
try:
748755
from google.cloud.exceptions import NotFound
749756
except ModuleNotFoundError:

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import typing as t
6+
import logging
67

78
from sqlglot import exp
89

@@ -13,6 +14,7 @@
1314
InsertOverwriteStrategy,
1415
MERGE_SOURCE_ALIAS,
1516
MERGE_TARGET_ALIAS,
17+
_get_data_object_cache_key,
1618
)
1719
from sqlmesh.core.engine_adapter.mixins import (
1820
GetCurrentCatalogFromFunctionMixin,
@@ -36,6 +38,9 @@
3638
from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF
3739

3840

41+
logger = logging.getLogger(__name__)
42+
43+
3944
@set_catalog()
4045
class MSSQLEngineAdapter(
4146
EngineAdapterWithIndexSupport,
@@ -144,6 +149,10 @@ def build_var_length_col(
144149
def table_exists(self, table_name: TableName) -> bool:
145150
"""MsSql doesn't support describe so we query information_schema."""
146151
table = exp.to_table(table_name)
152+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
153+
if data_object_cache_key in self._data_object_cache:
154+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
155+
return self._data_object_cache[data_object_cache_key] is not None
147156

148157
sql = (
149158
exp.select("1")

sqlmesh/core/engine_adapter/postgres.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PostgresEngineAdapter(
3434
HAS_VIEW_BINDING = True
3535
CURRENT_CATALOG_EXPRESSION = exp.column("current_catalog")
3636
SUPPORTS_REPLACE_TABLE = False
37-
MAX_IDENTIFIER_LENGTH = 63
37+
MAX_IDENTIFIER_LENGTH: t.Optional[int] = 63
3838
SUPPORTS_QUERY_EXECUTION_TRACKING = True
3939
SCHEMA_DIFFER_KWARGS = {
4040
"parameterized_type_defaults": {

tests/core/engine_adapter/test_base.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3723,6 +3723,27 @@ def test_data_object_cache_get_data_objects(
37233723
assert mock_get_data_objects.call_count == 1 # Should not increase
37243724

37253725

3726+
def test_data_object_cache_get_data_objects_no_object_names(
3727+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
3728+
):
3729+
adapter = make_mocked_engine_adapter(EngineAdapter, patch_get_data_objects=False)
3730+
3731+
table1 = DataObject(catalog=None, schema="test_schema", name="table1", type="table")
3732+
table2 = DataObject(catalog=None, schema="test_schema", name="table2", type="table")
3733+
3734+
mock_get_data_objects = mocker.patch.object(
3735+
adapter, "_get_data_objects", return_value=[table1, table2]
3736+
)
3737+
3738+
result1 = adapter.get_data_objects("test_schema")
3739+
assert len(result1) == 2
3740+
assert mock_get_data_objects.call_count == 1
3741+
3742+
result2 = adapter.get_data_objects("test_schema", {"table1", "table2"})
3743+
assert len(result2) == 2
3744+
assert mock_get_data_objects.call_count == 1 # Should not increase
3745+
3746+
37263747
def test_data_object_cache_get_data_object(
37273748
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
37283749
):

tests/dbt/test_transformation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ def test_dbt_custom_materialization():
122122
selected_model = list(plan.selected_models)[0]
123123
assert selected_model == "model.sushi.custom_incremental_model"
124124

125-
qoery = "SELECT * FROM sushi.custom_incremental_model ORDER BY created_at"
125+
query = "SELECT * FROM sushi.custom_incremental_model ORDER BY created_at"
126126
hook_table = "SELECT * FROM hook_table ORDER BY id"
127127
sushi_context.apply(plan)
128-
result = sushi_context.engine_adapter.fetchdf(qoery)
128+
result = sushi_context.engine_adapter.fetchdf(query)
129129
assert len(result) == 1
130130
assert {"created_at", "id"}.issubset(result.columns)
131131

@@ -140,7 +140,7 @@ def test_dbt_custom_materialization():
140140
tomorrow = datetime.now() + timedelta(days=1)
141141
sushi_context.run(select_models=["sushi.custom_incremental_model"], execution_time=tomorrow)
142142

143-
result_after_run = sushi_context.engine_adapter.fetchdf(qoery)
143+
result_after_run = sushi_context.engine_adapter.fetchdf(query)
144144
assert {"created_at", "id"}.issubset(result_after_run.columns)
145145

146146
# this should have added new unique values for the new row

0 commit comments

Comments
 (0)