Skip to content

Commit 42fbc64

Browse files
authored
Chore: Cache results of get_data_objects (#5467)
1 parent 8342c37 commit 42fbc64

File tree

14 files changed

+618
-47
lines changed

14 files changed

+618
-47
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 118 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def __init__(
161161
self.correlation_id = correlation_id
162162
self._schema_differ_overrides = schema_differ_overrides
163163
self._query_execution_tracker = query_execution_tracker
164+
self._data_object_cache: t.Dict[str, t.Optional[DataObject]] = {}
164165

165166
def with_settings(self, **kwargs: t.Any) -> EngineAdapter:
166167
extra_kwargs = {
@@ -983,6 +984,13 @@ def _create_table(
983984
),
984985
track_rows_processed=track_rows_processed,
985986
)
987+
# Extract table name to clear cache
988+
table_name = (
989+
table_name_or_schema.this
990+
if isinstance(table_name_or_schema, exp.Schema)
991+
else table_name_or_schema
992+
)
993+
self._clear_data_object_cache(table_name)
986994

987995
def _build_create_table_exp(
988996
self,
@@ -1038,7 +1046,8 @@ def create_table_like(
10381046
target_table_name: The name of the table to create. Can be fully qualified or just table name.
10391047
source_table_name: The name of the table to base the new table on.
10401048
"""
1041-
self.create_table(target_table_name, self.columns(source_table_name), exists=exists)
1049+
self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs)
1050+
self._clear_data_object_cache(target_table_name)
10421051

10431052
def clone_table(
10441053
self,
@@ -1074,6 +1083,7 @@ def clone_table(
10741083
**kwargs,
10751084
)
10761085
)
1086+
self._clear_data_object_cache(target_table_name)
10771087

10781088
def drop_data_object(self, data_object: DataObject, ignore_if_not_exists: bool = True) -> None:
10791089
"""Drops a data object of arbitrary type.
@@ -1139,6 +1149,7 @@ def _drop_object(
11391149
drop_args["cascade"] = cascade
11401150

11411151
self.execute(exp.Drop(this=exp.to_table(name), kind=kind, exists=exists, **drop_args))
1152+
self._clear_data_object_cache(name)
11421153

11431154
def get_alter_operations(
11441155
self,
@@ -1329,6 +1340,8 @@ def create_view(
13291340
quote_identifiers=self.QUOTE_IDENTIFIERS_IN_VIEWS,
13301341
)
13311342

1343+
self._clear_data_object_cache(view_name)
1344+
13321345
# Register table comment with commands if the engine doesn't support doing it in CREATE
13331346
if (
13341347
table_description
@@ -1458,8 +1471,14 @@ def columns(
14581471
}
14591472

14601473
def table_exists(self, table_name: TableName) -> bool:
1474+
table = exp.to_table(table_name)
1475+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
1476+
if data_object_cache_key in self._data_object_cache:
1477+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
1478+
return self._data_object_cache[data_object_cache_key] is not None
1479+
14611480
try:
1462-
self.execute(exp.Describe(this=exp.to_table(table_name), kind="TABLE"))
1481+
self.execute(exp.Describe(this=table, kind="TABLE"))
14631482
return True
14641483
except Exception:
14651484
return False
@@ -2253,40 +2272,99 @@ def rename_table(
22532272
"Tried to rename table across catalogs which is not supported"
22542273
)
22552274
self._rename_table(old_table_name, new_table_name)
2275+
self._clear_data_object_cache(old_table_name)
2276+
self._clear_data_object_cache(new_table_name)
22562277

2257-
def get_data_object(self, target_name: TableName) -> t.Optional[DataObject]:
2278+
def get_data_object(
2279+
self, target_name: TableName, safe_to_cache: bool = False
2280+
) -> t.Optional[DataObject]:
22582281
target_table = exp.to_table(target_name)
22592282
existing_data_objects = self.get_data_objects(
2260-
schema_(target_table.db, target_table.catalog), {target_table.name}
2283+
schema_(target_table.db, target_table.catalog),
2284+
{target_table.name},
2285+
safe_to_cache=safe_to_cache,
22612286
)
22622287
if existing_data_objects:
22632288
return existing_data_objects[0]
22642289
return None
22652290

22662291
def get_data_objects(
2267-
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
2292+
self,
2293+
schema_name: SchemaName,
2294+
object_names: t.Optional[t.Set[str]] = None,
2295+
safe_to_cache: bool = False,
22682296
) -> t.List[DataObject]:
22692297
"""Lists all data objects in the target schema.
22702298
22712299
Args:
22722300
schema_name: The name of the schema to list data objects from.
22732301
object_names: If provided, only return data objects with these names.
2302+
safe_to_cache: Whether it is safe to cache the results of this call.
22742303
22752304
Returns:
22762305
A list of data objects in the target schema.
22772306
"""
22782307
if object_names is not None:
22792308
if not object_names:
22802309
return []
2281-
object_names_list = list(object_names)
2282-
batches = [
2283-
object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE]
2284-
for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE)
2285-
]
2286-
return [
2287-
obj for batch in batches for obj in self._get_data_objects(schema_name, set(batch))
2288-
]
2289-
return self._get_data_objects(schema_name)
2310+
2311+
# Check cache for each object name
2312+
target_schema = to_schema(schema_name)
2313+
cached_objects = []
2314+
missing_names = set()
2315+
2316+
for name in object_names:
2317+
cache_key = _get_data_object_cache_key(
2318+
target_schema.catalog, target_schema.db, name
2319+
)
2320+
if cache_key in self._data_object_cache:
2321+
logger.debug("Data object cache hit: %s", cache_key)
2322+
data_object = self._data_object_cache[cache_key]
2323+
# If the object is none, then the table was previously looked for but not found
2324+
if data_object:
2325+
cached_objects.append(data_object)
2326+
else:
2327+
logger.debug("Data object cache miss: %s", cache_key)
2328+
missing_names.add(name)
2329+
2330+
# Fetch missing objects from database
2331+
if missing_names:
2332+
object_names_list = list(missing_names)
2333+
batches = [
2334+
object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE]
2335+
for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE)
2336+
]
2337+
2338+
fetched_objects = []
2339+
fetched_object_names = set()
2340+
for batch in batches:
2341+
objects = self._get_data_objects(schema_name, set(batch))
2342+
for obj in objects:
2343+
if safe_to_cache:
2344+
cache_key = _get_data_object_cache_key(
2345+
obj.catalog, obj.schema_name, obj.name
2346+
)
2347+
self._data_object_cache[cache_key] = obj
2348+
fetched_objects.append(obj)
2349+
fetched_object_names.add(obj.name)
2350+
2351+
if safe_to_cache:
2352+
for missing_name in missing_names - fetched_object_names:
2353+
cache_key = _get_data_object_cache_key(
2354+
target_schema.catalog, target_schema.db, missing_name
2355+
)
2356+
self._data_object_cache[cache_key] = None
2357+
2358+
return cached_objects + fetched_objects
2359+
2360+
return cached_objects
2361+
2362+
fetched_objects = self._get_data_objects(schema_name)
2363+
if safe_to_cache:
2364+
for obj in fetched_objects:
2365+
cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name)
2366+
self._data_object_cache[cache_key] = obj
2367+
return fetched_objects
22902368

22912369
def fetchone(
22922370
self,
@@ -2693,6 +2771,17 @@ def _to_sql(self, expression: exp.Expression, quote: bool = True, **kwargs: t.An
26932771

26942772
return expression.sql(**sql_gen_kwargs, copy=False) # type: ignore
26952773

2774+
def _clear_data_object_cache(self, table_name: t.Optional[TableName] = None) -> None:
2775+
"""Clears the cache entry for the given table name, or clears the entire cache if table_name is None."""
2776+
if table_name is None:
2777+
logger.debug("Clearing entire data object cache")
2778+
self._data_object_cache.clear()
2779+
else:
2780+
table = exp.to_table(table_name)
2781+
cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
2782+
logger.debug("Clearing data object cache key: %s", cache_key)
2783+
self._data_object_cache.pop(cache_key, None)
2784+
26962785
def _get_data_objects(
26972786
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
26982787
) -> t.List[DataObject]:
@@ -2878,6 +2967,15 @@ def _create_column_comments(
28782967
exc_info=True,
28792968
)
28802969

2970+
def _create_table_like(
2971+
self,
2972+
target_table_name: TableName,
2973+
source_table_name: TableName,
2974+
exists: bool,
2975+
**kwargs: t.Any,
2976+
) -> None:
2977+
self.create_table(target_table_name, self.columns(source_table_name), exists=exists)
2978+
28812979
def _rename_table(
28822980
self,
28832981
old_table_name: TableName,
@@ -2940,3 +3038,9 @@ def _decoded_str(value: t.Union[str, bytes]) -> str:
29403038
if isinstance(value, bytes):
29413039
return value.decode("utf-8")
29423040
return value
3041+
3042+
3043+
def _get_data_object_cache_key(catalog: t.Optional[str], schema_name: str, object_name: str) -> str:
3044+
"""Returns a cache key for a data object based on its fully qualified name."""
3045+
catalog = f"{catalog}." if catalog else ""
3046+
return f"{catalog}{schema_name}.{object_name}"

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from __future__ import annotations
22

33
import typing as t
4+
import logging
45

56
from sqlglot import exp
67

78
from sqlmesh.core.dialect import to_schema
8-
from sqlmesh.core.engine_adapter import EngineAdapter
9+
from sqlmesh.core.engine_adapter.base import EngineAdapter, _get_data_object_cache_key
910
from sqlmesh.core.engine_adapter.shared import (
1011
CatalogSupport,
1112
CommentCreationTable,
@@ -20,6 +21,9 @@
2021
from sqlmesh.core.engine_adapter._typing import QueryOrDF
2122

2223

24+
logger = logging.getLogger(__name__)
25+
26+
2327
class BasePostgresEngineAdapter(EngineAdapter):
2428
DEFAULT_BATCH_SIZE = 400
2529
COMMENT_CREATION_TABLE = CommentCreationTable.COMMENT_COMMAND_ONLY
@@ -75,6 +79,10 @@ def table_exists(self, table_name: TableName) -> bool:
7579
Reference: https://github.com/aws/amazon-redshift-python-driver/blob/master/redshift_connector/cursor.py#L528-L553
7680
"""
7781
table = exp.to_table(table_name)
82+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
83+
if data_object_cache_key in self._data_object_cache:
84+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
85+
return self._data_object_cache[data_object_cache_key] is not None
7886

7987
sql = (
8088
exp.select("1")

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sqlglot.transforms import remove_precision_parameterized_types
99

1010
from sqlmesh.core.dialect import to_schema
11+
from sqlmesh.core.engine_adapter.base import _get_data_object_cache_key
1112
from sqlmesh.core.engine_adapter.mixins import (
1213
ClusteredByMixin,
1314
RowDiffMixin,
@@ -744,6 +745,12 @@ def insert_overwrite_by_partition(
744745
)
745746

746747
def table_exists(self, table_name: TableName) -> bool:
748+
table = exp.to_table(table_name)
749+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
750+
if data_object_cache_key in self._data_object_cache:
751+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
752+
return self._data_object_cache[data_object_cache_key] is not None
753+
747754
try:
748755
from google.cloud.exceptions import NotFound
749756
except ModuleNotFoundError:

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def _insert_overwrite_by_condition(
224224
target_columns_to_types = target_columns_to_types or self.columns(target_table)
225225

226226
temp_table = self._get_temp_table(target_table)
227-
self._create_table_like(temp_table, target_table)
227+
self.create_table_like(temp_table, target_table)
228228

229229
# REPLACE BY KEY: extract kwargs if present
230230
dynamic_key = kwargs.get("dynamic_key")
@@ -456,7 +456,11 @@ def insert_overwrite_by_partition(
456456
)
457457

458458
def _create_table_like(
459-
self, target_table_name: TableName, source_table_name: TableName
459+
self,
460+
target_table_name: TableName,
461+
source_table_name: TableName,
462+
exists: bool,
463+
**kwargs: t.Any,
460464
) -> None:
461465
"""Create table with identical structure as source table"""
462466
self.execute(
@@ -632,16 +636,15 @@ def _drop_object(
632636
kind: What kind of object to drop. Defaults to TABLE
633637
**drop_args: Any extra arguments to set on the Drop expression
634638
"""
635-
self.execute(
636-
exp.Drop(
637-
this=exp.to_table(name),
638-
kind=kind,
639-
exists=exists,
640-
cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
641-
if self.engine_run_mode.is_cluster
642-
else None,
643-
**drop_args,
644-
)
639+
super()._drop_object(
640+
name=name,
641+
exists=exists,
642+
kind=kind,
643+
cascade=cascade,
644+
cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
645+
if self.engine_run_mode.is_cluster
646+
else None,
647+
**drop_args,
645648
)
646649

647650
def _build_partitioned_by_exp(

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import typing as t
6+
import logging
67

78
from sqlglot import exp
89

@@ -13,6 +14,7 @@
1314
InsertOverwriteStrategy,
1415
MERGE_SOURCE_ALIAS,
1516
MERGE_TARGET_ALIAS,
17+
_get_data_object_cache_key,
1618
)
1719
from sqlmesh.core.engine_adapter.mixins import (
1820
GetCurrentCatalogFromFunctionMixin,
@@ -36,6 +38,9 @@
3638
from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF
3739

3840

41+
logger = logging.getLogger(__name__)
42+
43+
3944
@set_catalog()
4045
class MSSQLEngineAdapter(
4146
EngineAdapterWithIndexSupport,
@@ -144,6 +149,10 @@ def build_var_length_col(
144149
def table_exists(self, table_name: TableName) -> bool:
145150
"""MsSql doesn't support describe so we query information_schema."""
146151
table = exp.to_table(table_name)
152+
data_object_cache_key = _get_data_object_cache_key(table.catalog, table.db, table.name)
153+
if data_object_cache_key in self._data_object_cache:
154+
logger.debug("Table existence cache hit: %s", data_object_cache_key)
155+
return self._data_object_cache[data_object_cache_key] is not None
147156

148157
sql = (
149158
exp.select("1")

sqlmesh/core/engine_adapter/mysql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ def _create_column_comments(
164164
exc_info=True,
165165
)
166166

167-
def create_table_like(
167+
def _create_table_like(
168168
self,
169169
target_table_name: TableName,
170170
source_table_name: TableName,
171-
exists: bool = True,
171+
exists: bool,
172172
**kwargs: t.Any,
173173
) -> None:
174174
self.execute(

0 commit comments

Comments
 (0)