Skip to content

Commit d0adc72

Browse files
committed
only cache data objects mantained by sqlmesh
1 parent c7f5771 commit d0adc72

File tree

7 files changed

+190
-68
lines changed

7 files changed

+190
-68
lines changed

sqlmesh/core/engine_adapter/base.py

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,7 +1046,8 @@ def create_table_like(
10461046
target_table_name: The name of the table to create. Can be fully qualified or just table name.
10471047
source_table_name: The name of the table to base the new table on.
10481048
"""
1049-
self.create_table(target_table_name, self.columns(source_table_name), exists=exists)
1049+
self._create_table_like(target_table_name, source_table_name, exists=exists, **kwargs)
1050+
self._clear_data_object_cache(target_table_name)
10501051

10511052
def clone_table(
10521053
self,
@@ -2271,24 +2272,34 @@ def rename_table(
22712272
"Tried to rename table across catalogs which is not supported"
22722273
)
22732274
self._rename_table(old_table_name, new_table_name)
2275+
self._clear_data_object_cache(old_table_name)
2276+
self._clear_data_object_cache(new_table_name)
22742277

2275-
def get_data_object(self, target_name: TableName) -> t.Optional[DataObject]:
2278+
def get_data_object(
2279+
self, target_name: TableName, safe_to_cache: bool = False
2280+
) -> t.Optional[DataObject]:
22762281
target_table = exp.to_table(target_name)
22772282
existing_data_objects = self.get_data_objects(
2278-
schema_(target_table.db, target_table.catalog), {target_table.name}
2283+
schema_(target_table.db, target_table.catalog),
2284+
{target_table.name},
2285+
safe_to_cache=safe_to_cache,
22792286
)
22802287
if existing_data_objects:
22812288
return existing_data_objects[0]
22822289
return None
22832290

22842291
def get_data_objects(
2285-
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
2292+
self,
2293+
schema_name: SchemaName,
2294+
object_names: t.Optional[t.Set[str]] = None,
2295+
safe_to_cache: bool = False,
22862296
) -> t.List[DataObject]:
22872297
"""Lists all data objects in the target schema.
22882298
22892299
Args:
22902300
schema_name: The name of the schema to list data objects from.
22912301
object_names: If provided, only return data objects with these names.
2302+
safe_to_cache: Whether it is safe to cache the results of this call.
22922303
22932304
Returns:
22942305
A list of data objects in the target schema.
@@ -2323,31 +2334,36 @@ def get_data_objects(
23232334
object_names_list[i : i + self.DATA_OBJECT_FILTER_BATCH_SIZE]
23242335
for i in range(0, len(object_names_list), self.DATA_OBJECT_FILTER_BATCH_SIZE)
23252336
]
2326-
fetched_objects = [
2327-
obj
2328-
for batch in batches
2329-
for obj in self._get_data_objects(schema_name, set(batch))
2330-
]
2331-
2332-
for obj in fetched_objects:
2333-
cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name)
2334-
self._data_object_cache[cache_key] = obj
23352337

2336-
fetched_object_names = {obj.name for obj in fetched_objects}
2337-
for missing_name in missing_names - fetched_object_names:
2338-
cache_key = _get_data_object_cache_key(
2339-
target_schema.catalog, target_schema.db, missing_name
2340-
)
2341-
self._data_object_cache[cache_key] = None
2338+
fetched_objects = []
2339+
fetched_object_names = set()
2340+
for batch in batches:
2341+
objects = self._get_data_objects(schema_name, set(batch))
2342+
for obj in objects:
2343+
if safe_to_cache:
2344+
cache_key = _get_data_object_cache_key(
2345+
obj.catalog, obj.schema_name, obj.name
2346+
)
2347+
self._data_object_cache[cache_key] = obj
2348+
fetched_objects.append(obj)
2349+
fetched_object_names.add(obj.name)
2350+
2351+
if safe_to_cache:
2352+
for missing_name in missing_names - fetched_object_names:
2353+
cache_key = _get_data_object_cache_key(
2354+
target_schema.catalog, target_schema.db, missing_name
2355+
)
2356+
self._data_object_cache[cache_key] = None
23422357

23432358
return cached_objects + fetched_objects
23442359

23452360
return cached_objects
23462361

23472362
fetched_objects = self._get_data_objects(schema_name)
2348-
for obj in fetched_objects:
2349-
cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name)
2350-
self._data_object_cache[cache_key] = obj
2363+
if safe_to_cache:
2364+
for obj in fetched_objects:
2365+
cache_key = _get_data_object_cache_key(obj.catalog, obj.schema_name, obj.name)
2366+
self._data_object_cache[cache_key] = obj
23512367
return fetched_objects
23522368

23532369
def fetchone(
@@ -2951,6 +2967,15 @@ def _create_column_comments(
29512967
exc_info=True,
29522968
)
29532969

2970+
def _create_table_like(
2971+
self,
2972+
target_table_name: TableName,
2973+
source_table_name: TableName,
2974+
exists: bool,
2975+
**kwargs: t.Any,
2976+
) -> None:
2977+
self.create_table(target_table_name, self.columns(source_table_name), exists=exists)
2978+
29542979
def _rename_table(
29552980
self,
29562981
old_table_name: TableName,
@@ -3017,5 +3042,5 @@ def _decoded_str(value: t.Union[str, bytes]) -> str:
30173042

30183043
def _get_data_object_cache_key(catalog: t.Optional[str], schema_name: str, object_name: str) -> str:
30193044
"""Returns a cache key for a data object based on its fully qualified name."""
3020-
catalog = catalog or ""
3021-
return f"{catalog}.{schema_name}.{object_name}"
3045+
catalog = f"{catalog}." if catalog else ""
3046+
return f"{catalog}{schema_name}.{object_name}"

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def _insert_overwrite_by_condition(
224224
target_columns_to_types = target_columns_to_types or self.columns(target_table)
225225

226226
temp_table = self._get_temp_table(target_table)
227-
self._create_table_like(temp_table, target_table)
227+
self.create_table_like(temp_table, target_table)
228228

229229
# REPLACE BY KEY: extract kwargs if present
230230
dynamic_key = kwargs.get("dynamic_key")
@@ -456,7 +456,11 @@ def insert_overwrite_by_partition(
456456
)
457457

458458
def _create_table_like(
459-
self, target_table_name: TableName, source_table_name: TableName
459+
self,
460+
target_table_name: TableName,
461+
source_table_name: TableName,
462+
exists: bool,
463+
**kwargs: t.Any,
460464
) -> None:
461465
"""Create table with identical structure as source table"""
462466
self.execute(
@@ -632,16 +636,15 @@ def _drop_object(
632636
kind: What kind of object to drop. Defaults to TABLE
633637
**drop_args: Any extra arguments to set on the Drop expression
634638
"""
635-
self.execute(
636-
exp.Drop(
637-
this=exp.to_table(name),
638-
kind=kind,
639-
exists=exists,
640-
cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
641-
if self.engine_run_mode.is_cluster
642-
else None,
643-
**drop_args,
644-
)
639+
super()._drop_object(
640+
name=name,
641+
exists=exists,
642+
kind=kind,
643+
cascade=cascade,
644+
cluster=exp.OnCluster(this=exp.to_identifier(self.cluster))
645+
if self.engine_run_mode.is_cluster
646+
else None,
647+
**drop_args,
645648
)
646649

647650
def _build_partitioned_by_exp(

sqlmesh/core/engine_adapter/mysql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ def _create_column_comments(
164164
exc_info=True,
165165
)
166166

167-
def create_table_like(
167+
def _create_table_like(
168168
self,
169169
target_table_name: TableName,
170170
source_table_name: TableName,
171-
exists: bool = True,
171+
exists: bool,
172172
**kwargs: t.Any,
173173
) -> None:
174174
self.execute(

sqlmesh/core/engine_adapter/postgres.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ def _fetch_native_df(
7979
self._connection_pool.commit()
8080
return df
8181

82-
def create_table_like(
82+
def _create_table_like(
8383
self,
8484
target_table_name: TableName,
8585
source_table_name: TableName,
86-
exists: bool = True,
86+
exists: bool,
8787
**kwargs: t.Any,
8888
) -> None:
8989
self.execute(

sqlmesh/core/engine_adapter/spark.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,14 +402,16 @@ def get_current_database(self) -> str:
402402
return self.spark.catalog.currentDatabase()
403403
return self.fetchone(exp.select(exp.func("current_database")))[0] # type: ignore
404404

405-
def get_data_object(self, target_name: TableName) -> t.Optional[DataObject]:
405+
def get_data_object(
406+
self, target_name: TableName, safe_to_cache: bool = False
407+
) -> t.Optional[DataObject]:
406408
target_table = exp.to_table(target_name)
407409
if isinstance(target_table.this, exp.Dot) and target_table.this.expression.name.startswith(
408410
f"{self.BRANCH_PREFIX}{self.WAP_PREFIX}"
409411
):
410412
# Exclude the branch name
411413
target_table.set("this", target_table.this.this)
412-
return super().get_data_object(target_table)
414+
return super().get_data_object(target_table, safe_to_cache=safe_to_cache)
413415

414416
def create_state_table(
415417
self,

sqlmesh/core/snapshot/evaluator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1564,7 +1564,9 @@ def _get_data_objects_in_schema(
15641564
gateway: t.Optional[str] = None,
15651565
) -> t.List[DataObject]:
15661566
logger.info("Listing data objects in schema %s", schema.sql())
1567-
return self.get_adapter(gateway).get_data_objects(schema, object_names)
1567+
return self.get_adapter(gateway).get_data_objects(
1568+
schema, object_names, safe_to_cache=True
1569+
)
15681570

15691571
with self.concurrent_context():
15701572
existing_objects: t.List[DataObject] = []

0 commit comments

Comments
 (0)