-
Notifications
You must be signed in to change notification settings - Fork 377
Expand file tree
/
Copy pathcache.py
More file actions
145 lines (115 loc) · 5.13 KB
/
cache.py
File metadata and controls
145 lines (115 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import annotations
import typing as t
from sqlmesh.core.model import SeedModel
from sqlmesh.core.snapshot import (
Snapshot,
SnapshotId,
SnapshotIdLike,
SnapshotInfoLike,
)
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync
from sqlmesh.utils.date import TimeLike, now_timestamp
class CachingStateSync(DelegatingStateSync):
"""In memory cache for snapshots that implements the state sync api.
Args:
state_sync: The base state sync.
ttl: The number of seconds a snapshot should be cached.
"""
def __init__(self, state_sync: StateSync, ttl: int = 120):
super().__init__(state_sync)
# The cache can contain a snapshot or False or None.
# False means that the snapshot does not exist in the state sync but has been requested before
# None means that the snapshot has not been requested.
self.snapshot_cache: t.Dict[
SnapshotId, t.Tuple[t.Optional[Snapshot | t.Literal[False]], int]
] = {}
self.ttl = ttl
def _from_cache(
self, snapshot_id: SnapshotId, now: int
) -> t.Optional[Snapshot | t.Literal[False]]:
snapshot: t.Optional[Snapshot | t.Literal[False]] = None
snapshot_expiration = self.snapshot_cache.get(snapshot_id)
if snapshot_expiration and snapshot_expiration[1] >= now:
snapshot = snapshot_expiration[0]
return snapshot
def get_snapshots(
self, snapshot_ids: t.Iterable[SnapshotIdLike]
) -> t.Dict[SnapshotId, Snapshot]:
existing = {}
missing = set()
now = now_timestamp()
expire_at = now + self.ttl * 1000
for s in snapshot_ids:
snapshot_id = s.snapshot_id
snapshot = self._from_cache(snapshot_id, now)
if snapshot is None:
self.snapshot_cache[snapshot_id] = (False, expire_at)
missing.add(snapshot_id)
elif snapshot:
existing[snapshot_id] = snapshot
if missing:
existing.update(self.state_sync.get_snapshots(missing))
for snapshot_id, snapshot in existing.items():
cached = self._from_cache(snapshot_id, now)
if cached and (not isinstance(cached.node, SeedModel) or cached.node.is_hydrated):
continue
self.snapshot_cache[snapshot_id] = (snapshot, expire_at)
return existing
def snapshots_exist(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> t.Set[SnapshotId]:
existing = set()
missing = set()
now = now_timestamp()
for s in snapshot_ids:
snapshot_id = s.snapshot_id
snapshot = self._from_cache(snapshot_id, now)
if snapshot:
existing.add(snapshot_id)
elif snapshot is None:
missing.add(snapshot_id)
if missing:
existing.update(self.state_sync.snapshots_exist(missing))
return existing
def push_snapshots(self, snapshots: t.Iterable[Snapshot]) -> None:
snapshots = tuple(snapshots)
for snapshot in snapshots:
self.snapshot_cache.pop(snapshot.snapshot_id, None)
self.state_sync.push_snapshots(snapshots)
def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
snapshot_ids = tuple(snapshot_ids)
for s in snapshot_ids:
self.snapshot_cache.pop(s.snapshot_id, None)
self.state_sync.delete_snapshots(snapshot_ids)
def delete_expired_snapshots(
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
) -> None:
current_ts = current_ts or now_timestamp()
self.snapshot_cache.clear()
self.state_sync.delete_expired_snapshots(current_ts=current_ts, ignore_ttl=ignore_ttl)
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
for snapshot_intervals in snapshots_intervals:
if snapshot_intervals.snapshot_id:
self.snapshot_cache.pop(snapshot_intervals.snapshot_id, None)
else:
# Evict all snapshots that share the same name
self.snapshot_cache = {
snapshot_id: value
for snapshot_id, value in self.snapshot_cache.items()
if snapshot_id.name != snapshot_intervals.name
}
self.state_sync.add_snapshots_intervals(snapshots_intervals)
def remove_intervals(
self,
snapshot_intervals: t.Sequence[t.Tuple[SnapshotInfoLike, Interval]],
remove_shared_versions: bool = False,
) -> None:
for s, _ in snapshot_intervals:
self.snapshot_cache.pop(s.snapshot_id, None)
self.state_sync.remove_intervals(snapshot_intervals, remove_shared_versions)
def unpause_snapshots(
self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike
) -> None:
self.snapshot_cache.clear()
self.state_sync.unpause_snapshots(snapshots, unpaused_dt)
def clear_cache(self) -> None:
self.snapshot_cache.clear()