Skip to content

Commit 3fb41f2

Browse files
committed
fix: cache experiment
WIP. Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
1 parent b9b3e44 commit 3fb41f2

5 files changed

Lines changed: 295 additions & 19 deletions

File tree

pkg/controller/runtime/internal/cache/cache.go

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,22 @@ func (cache *ResourceCache) CachePut(r resource.Resource) {
137137

138138
// CacheRemove handles deleted objects.
139139
func (cache *ResourceCache) CacheRemove(r resource.Resource) {
140-
cache.getHandler(r.Metadata().Namespace(), r.Metadata().Type()).remove(r)
140+
cache.CacheRemoveByPointer(r.Metadata())
141+
}
142+
143+
func (cache *ResourceCache) CacheRemoveByPointer(ptr *resource.Metadata) {
144+
// cache.getHandler(ptr.Namespace(), ptr.Type()).remove(ptr)
145+
tombstone := newCacheTombstone(ptr)
146+
cache.getHandler(ptr.Namespace(), ptr.Type()).put(tombstone)
147+
}
148+
149+
// ClearTombstones removes all tombstones from the cache.
150+
//
151+
// TODO: call this periodically in a goroutine, e.g., in the controller runtime.
152+
//
153+
// TODO: only remove tombstones older than X.
154+
func (cache *ResourceCache) ClearTombstones(namespace resource.Namespace, resourceType resource.Type) {
155+
cache.getHandler(namespace, resourceType).clearTombstones()
141156
}
142157

143158
// WrapState returns a cached wrapped state, which serves some operations from the cache bypassing the underlying state.
@@ -147,3 +162,57 @@ func (cache *ResourceCache) WrapState(st state.CoreState) state.CoreState {
147162
st: st,
148163
}
149164
}
165+
166+
var _ resource.Resource = (*cacheTombstone)(nil)
167+
168+
// cacheTombstone is a resource without a Spec.
169+
//
170+
// Tombstones are used to present state of a deleted resource.
171+
type cacheTombstone struct {
172+
ref resource.Metadata
173+
}
174+
175+
// newCacheTombstone builds a tombstone from resource reference.
176+
func newCacheTombstone(ref resource.Reference) *cacheTombstone {
177+
return &cacheTombstone{
178+
ref: resource.NewMetadata(ref.Namespace(), ref.Type(), ref.ID(), ref.Version()),
179+
}
180+
}
181+
182+
// String method for debugging/logging.
183+
func (t *cacheTombstone) String() string {
184+
return fmt.Sprintf("cacheTombstone(%s)", t.ref.String())
185+
}
186+
187+
// Metadata for the resource.
188+
//
189+
// Metadata.Version should change each time Spec changes.
190+
func (t *cacheTombstone) Metadata() *resource.Metadata {
191+
return &t.ref
192+
}
193+
194+
// Spec is not implemented for tobmstones.
195+
func (t *cacheTombstone) Spec() any {
196+
panic("tombstone doesn't contain spec")
197+
}
198+
199+
// DeepCopy returns self, as tombstone is immutable.
200+
func (t *cacheTombstone) DeepCopy() resource.Resource { //nolint:ireturn
201+
return t
202+
}
203+
204+
// cacheTombstone implements Tombstoned interface.
205+
func (t *cacheTombstone) cacheTombstone() {
206+
}
207+
208+
// Tombstoned is a marker interface for Tombstones.
209+
type cacheTombstoned interface {
210+
cacheTombstone()
211+
}
212+
213+
// IsTombstone checks if resource is represented by the cacheTombstone.
214+
func isCacheTombstone(res resource.Resource) bool {
215+
_, ok := res.(cacheTombstoned)
216+
217+
return ok
218+
}

pkg/controller/runtime/internal/cache/cache_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ func TestCacheOperations(t *testing.T) {
171171
})
172172

173173
assert.Equal(t, "1000", metrics.CachedResources.Get("A").String())
174+
assert.Equal(t, "10000", metrics.CachedResources.Get("B").String()) // drops do not cause the cache to be cleared due to the tombstone
175+
176+
c.ClearTombstones("b", "B")
174177
assert.Equal(t, "5000", metrics.CachedResources.Get("B").String())
175178
}
176179

pkg/controller/runtime/internal/cache/handler.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,13 @@ func (h *cacheHandler) get(ctx context.Context, id resource.ID, opts ...state.Ge
7979
return nil, ErrNotFound(resource.NewMetadata(h.key.Namespace, h.key.Type, id, resource.VersionUndefined))
8080
}
8181

82+
res := h.resources[idx]
83+
if isCacheTombstone(res) {
84+
return nil, ErrNotFound(resource.NewMetadata(h.key.Namespace, h.key.Type, id, resource.VersionUndefined))
85+
}
86+
8287
// return a copy of the resource to satisfy State semantics
83-
return h.resources[idx].DeepCopy(), nil
88+
return res.DeepCopy(), nil
8489
}
8590

8691
func (h *cacheHandler) contextWithTeardown(ctx context.Context, id resource.ID) (context.Context, error) {
@@ -160,6 +165,10 @@ func (h *cacheHandler) list(ctx context.Context, opts ...state.ListOption) (reso
160165
resources := slices.Clone(h.resources)
161166
h.mu.Unlock()
162167

168+
resources = xslices.Filter(resources, func(r resource.Resource) bool {
169+
return !isCacheTombstone(r)
170+
})
171+
163172
// micro optimization: apply filter only if some filters are specified
164173
if !value.IsZero(options.IDQuery) || options.LabelQueries != nil {
165174
resources = xslices.Filter(resources, func(r resource.Resource) bool {
@@ -190,6 +199,15 @@ func (h *cacheHandler) put(r resource.Resource) {
190199
})
191200

192201
if found {
202+
existing := h.resources[idx]
203+
existingVersion := existing.Metadata().Version()
204+
newVersion := r.Metadata().Version()
205+
206+
stale := !isCacheTombstone(existing) && newVersion.Value() < existingVersion.Value()
207+
if stale {
208+
return
209+
}
210+
193211
h.resources[idx] = r
194212
} else {
195213
h.resources = slices.Insert(h.resources, idx, r)
@@ -205,24 +223,16 @@ func (h *cacheHandler) put(r resource.Resource) {
205223
}
206224
}
207225

208-
func (h *cacheHandler) remove(r resource.Resource) {
226+
func (h *cacheHandler) clearTombstones() {
209227
h.mu.Lock()
210228
defer h.mu.Unlock()
211229

212-
idx, found := slices.BinarySearchFunc(h.resources, r.Metadata().ID(), func(r resource.Resource, id resource.ID) int {
213-
return cmp.Compare(r.Metadata().ID(), id)
214-
})
215-
216-
if found {
217-
h.resources = slices.Delete(h.resources, idx, idx+1)
218-
219-
metrics.CachedResources.Add(r.Metadata().Type(), -1)
220-
}
230+
before := len(h.resources)
231+
h.resources = slices.DeleteFunc(h.resources, isCacheTombstone)
232+
after := len(h.resources)
233+
delta := after - before
221234

222-
if ch, ok := h.teardownWaiters[r.Metadata().ID()]; ok {
223-
close(ch)
224-
delete(h.teardownWaiters, r.Metadata().ID())
225-
}
235+
metrics.CachedResources.Add(h.key.Type, int64(delta))
226236
}
227237

228238
func (h *cacheHandler) len() int {

pkg/controller/runtime/internal/cache/state.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package cache
66

77
import (
88
"context"
9+
"sync"
910

1011
"github.com/cosi-project/runtime/pkg/resource"
1112
"github.com/cosi-project/runtime/pkg/state"
@@ -20,6 +21,8 @@ import (
2021
type stateWrapper struct {
2122
cache *ResourceCache
2223
st state.CoreState
24+
25+
lock sync.Mutex
2326
}
2427

2528
// Check interfaces.
@@ -49,7 +52,19 @@ func (wrapper *stateWrapper) List(ctx context.Context, r resource.Kind, opts ...
4952
//
5053
// If a resource already exists, Create returns an error.
5154
func (wrapper *stateWrapper) Create(ctx context.Context, r resource.Resource, opts ...state.CreateOption) error {
52-
return wrapper.st.Create(ctx, r, opts...)
55+
wrapper.lock.Lock()
56+
defer wrapper.lock.Unlock()
57+
58+
err := wrapper.st.Create(ctx, r, opts...)
59+
if err != nil {
60+
return err
61+
}
62+
63+
if wrapper.cache.IsHandled(r.Metadata().Namespace(), r.Metadata().Type()) {
64+
wrapper.cache.CachePut(r)
65+
}
66+
67+
return nil
5368
}
5469

5570
// Update a resource.
@@ -58,15 +73,50 @@ func (wrapper *stateWrapper) Create(ctx context.Context, r resource.Resource, op
5873
// On update current version of resource `new` in the state should match
5974
// the version on the backend, otherwise conflict error is returned.
6075
func (wrapper *stateWrapper) Update(ctx context.Context, newResource resource.Resource, opts ...state.UpdateOption) error {
61-
return wrapper.st.Update(ctx, newResource, opts...)
76+
wrapper.lock.Lock()
77+
defer wrapper.lock.Unlock()
78+
79+
if err := wrapper.st.Update(ctx, newResource, opts...); err != nil {
80+
return err
81+
}
82+
83+
if wrapper.cache.IsHandled(newResource.Metadata().Namespace(), newResource.Metadata().Type()) {
84+
wrapper.cache.CachePut(newResource)
85+
}
86+
87+
return nil
6288
}
6389

6490
// Destroy a resource.
6591
//
6692
// If a resource doesn't exist, error is returned.
6793
// If a resource has pending finalizers, error is returned.
6894
func (wrapper *stateWrapper) Destroy(ctx context.Context, ptr resource.Pointer, opts ...state.DestroyOption) error {
69-
return wrapper.st.Destroy(ctx, ptr, opts...)
95+
wrapper.lock.Lock()
96+
defer wrapper.lock.Unlock()
97+
98+
var cached resource.Resource
99+
100+
if wrapper.cache.IsHandled(ptr.Namespace(), ptr.Type()) {
101+
var err error
102+
if cached, err = wrapper.cache.Get(ctx, ptr); err != nil {
103+
return err
104+
}
105+
}
106+
107+
if err := wrapper.st.Destroy(ctx, ptr, opts...); err != nil {
108+
if cached != nil && state.IsNotFoundError(err) {
109+
wrapper.cache.CacheRemoveByPointer(cached.Metadata())
110+
}
111+
112+
return err
113+
}
114+
115+
if cached != nil {
116+
wrapper.cache.CacheRemoveByPointer(cached.Metadata())
117+
}
118+
119+
return nil
70120
}
71121

72122
// Watch state of a resource by type.

0 commit comments

Comments
 (0)