Skip to content

Commit 5ef76b7

Browse files
committed
test: Update fake.cache
- Fixed `fake.Cache`: Corrected the Start method's locking logic and updated WaitForCacheSync to properly wait for all informers to synchronize using k8scache.WaitForCacheSync. - Implemented `WatchList` Support: Modified MemoryStorage.Watch to support SendInitialEvents by asynchronously sending synthetic ADDED events for existing objects, followed by a Bookmark event with the k8s.io/initial-events-end: true annotation. - Improved Watcher Robustness: Updated Watcher.handleEvents to gracefully handle empty ResourceVersion strings and modified Watcher.sendEvent to allow Bookmark events to bypass list filter validation.
1 parent dbf2d77 commit 5ef76b7

3 files changed

Lines changed: 107 additions & 38 deletions

File tree

pkg/syncer/syncertest/fake/cache.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (c *Cache) Start(ctx context.Context) error {
119119
<-ctx.Done()
120120

121121
c.mux.RLock()
122-
defer c.mux.RLock()
122+
defer c.mux.RUnlock()
123123
// Stop the informers
124124
c.informerCtxCancel()
125125

@@ -129,6 +129,14 @@ func (c *Cache) Start(ctx context.Context) error {
129129
// WaitForCacheSync returns true when the cached informers are all synced.
130130
// Returns false if the context is done first.
131131
func (c *Cache) WaitForCacheSync(ctx context.Context) bool {
132+
if !c.waitForStart(ctx) {
133+
return false
134+
}
135+
136+
return k8scache.WaitForCacheSync(ctx.Done(), c.HasSyncedFuncs()...)
137+
}
138+
139+
func (c *Cache) waitForStart(ctx context.Context) bool {
132140
select {
133141
case <-c.startWait:
134142
return true

pkg/syncer/syncertest/fake/storage.go

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"k8s.io/apimachinery/pkg/util/strategicpatch"
4444
"k8s.io/apimachinery/pkg/watch"
4545
"k8s.io/klog/v2"
46+
"k8s.io/utils/ptr"
4647
"sigs.k8s.io/cli-utils/pkg/testutil"
4748
"sigs.k8s.io/controller-runtime/pkg/client"
4849
)
@@ -1041,10 +1042,64 @@ func (ms *MemoryStorage) Watch(_ context.Context, exampleList client.ObjectList,
10411042
klog.V(6).Infof("Watching %s (Options: %+v)",
10421043
kinds.ObjectSummary(exampleList), opts)
10431044
watcher := NewWatcher(ms.watchSupervisor, gvk.GroupKind(), exampleList, opts)
1044-
// TODO: Should Client.Watch's context.Done cancel the background stream or just the initial request?
1045-
// If yes, StartWatcher needs to take a context.
1046-
// client-go's FakeDynamicClient.Watch seems to just ignore the context, so that's what we're doing here too.
1047-
watcher.Start(context.Background())
1045+
1046+
if opts.Raw != nil && ptr.Deref(opts.Raw.SendInitialEvents, false) {
1047+
// Start the watcher before sending initial events, so it's ready to receive them.
1048+
watcher.Start(context.Background())
1049+
1050+
// Send initial events asynchronously to avoid blocking the Watch call.
1051+
go func() {
1052+
ms.lock.RLock()
1053+
defer ms.lock.RUnlock()
1054+
1055+
// Get the current set of objects
1056+
for _, obj := range ms.listObjects(gvk.GroupKind()) {
1057+
// Convert to a unstructured object with optional version conversion
1058+
uObj, err := kinds.ToUnstructuredWithVersion(obj, gvk, ms.scheme)
1059+
if err != nil {
1060+
klog.Errorf("Failed to convert object for initial watch events: %v", err)
1061+
continue
1062+
}
1063+
// Skip objects that don't match the ListOptions filters
1064+
ok, err := matchesListFilters(uObj, opts, ms.scheme)
1065+
if err != nil {
1066+
klog.Errorf("Failed to filter object for initial watch events: %v", err)
1067+
continue
1068+
}
1069+
if !ok {
1070+
continue
1071+
}
1072+
// Send synthetic Added event
1073+
watcher.inCh <- watch.Event{
1074+
Type: watch.Added,
1075+
Object: uObj,
1076+
}
1077+
}
1078+
1079+
// Send bookmark marking end of initial events.
1080+
// Reflector requires this to consider the watch synced when using SendInitialEvents.
1081+
bookmarkObj, err := kinds.NewObjectForGVK(gvk, ms.scheme)
1082+
if err != nil {
1083+
return
1084+
}
1085+
cObj, err := kinds.ObjectAsClientObject(bookmarkObj)
1086+
if err != nil {
1087+
return
1088+
}
1089+
// Use an empty ResourceVersion if none was specified
1090+
cObj.SetResourceVersion(opts.Raw.ResourceVersion)
1091+
core.SetAnnotation(cObj, metav1.InitialEventsAnnotationKey, "true")
1092+
watcher.inCh <- watch.Event{
1093+
Type: watch.Bookmark,
1094+
Object: cObj,
1095+
}
1096+
}()
1097+
} else {
1098+
// TODO: Should Client.Watch's context.Done cancel the background stream or just the initial request?
1099+
// If yes, StartWatcher needs to take a context.
1100+
// client-go's FakeDynamicClient.Watch seems to just ignore the context, so that's what we're doing here too.
1101+
watcher.Start(context.Background())
1102+
}
10481103
return watcher, nil
10491104
}
10501105

pkg/syncer/syncertest/fake/watcher.go

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -254,30 +254,33 @@ func (fw *Watcher) handleEvents(ctx context.Context) {
254254
// since those events are being sent on parallel goroutines.
255255
if event.Type != watch.Error {
256256
obj := event.Object.(client.Object)
257-
// parse ResourceVersion as int
258-
newRV, err := strconv.Atoi(obj.GetResourceVersion())
259-
if err != nil {
260-
err = fmt.Errorf("invalid ResourceVersion %q for object %s: %w", obj.GetResourceVersion(), kinds.ObjectSummary(obj), err)
261-
fw.sendEvent(ctx, watch.Event{
262-
Type: watch.Error,
263-
Object: &apierrors.NewInternalError(err).ErrStatus,
264-
})
265-
continue
266-
}
267-
uid := obj.GetUID()
268-
if event.Type == watch.Modified {
269-
oldRV := lastSeenVersions[uid]
270-
if newRV <= oldRV {
271-
// drop event - newer ResourceVersion already sent
272-
klog.Warningf("Watcher.handleEvents: dropping event (old ResourceVersion): %s",
273-
log.AsJSON(event))
257+
rvStr := obj.GetResourceVersion()
258+
if rvStr != "" {
259+
// parse ResourceVersion as int
260+
newRV, err := strconv.Atoi(rvStr)
261+
if err != nil {
262+
err = fmt.Errorf("invalid ResourceVersion %q for object %s: %w", rvStr, kinds.ObjectSummary(obj), err)
263+
fw.sendEvent(ctx, watch.Event{
264+
Type: watch.Error,
265+
Object: &apierrors.NewInternalError(err).ErrStatus,
266+
})
274267
continue
275268
}
269+
uid := obj.GetUID()
270+
if event.Type == watch.Modified {
271+
oldRV := lastSeenVersions[uid]
272+
if newRV <= oldRV {
273+
// drop event - newer ResourceVersion already sent
274+
klog.Warningf("Watcher.handleEvents: dropping event (old ResourceVersion): %s",
275+
log.AsJSON(event))
276+
continue
277+
}
278+
}
279+
// Store the last known ResourceVersion for future comparison.
280+
// Store even if deleted, in case a modified event is received afterwards.
281+
// TODO: Garbage collect uid entries after deletion (probably not necessary for unit tests)
282+
lastSeenVersions[uid] = newRV
276283
}
277-
// Store the last known ResourceVersion for future comparison.
278-
// Store even if deleted, in case a modified event is received afterwards.
279-
// TODO: Garbage collect uid entries after deletion (probably not necessary for unit tests)
280-
lastSeenVersions[uid] = newRV
281284
}
282285

283286
// Input event received.
@@ -306,18 +309,21 @@ func (fw *Watcher) sendEvent(ctx context.Context, event watch.Event) {
306309
}
307310
event.Object = obj
308311

309-
// Check if input object matches list option filters
310-
matches, err = matchesListFilters(event.Object, fw.options, fw.scheme)
311-
if err != nil {
312-
fw.sendEvent(ctx, watch.Event{
313-
Type: watch.Error,
314-
Object: &apierrors.NewInternalError(err).ErrStatus,
315-
})
316-
return
317-
}
318-
if !matches {
319-
// No match
320-
return
312+
// Check if input object matches list option filters.
313+
// Skip for Bookmark events, as they may not match filters.
314+
if event.Type != watch.Bookmark {
315+
matches, err = matchesListFilters(event.Object, fw.options, fw.scheme)
316+
if err != nil {
317+
fw.sendEvent(ctx, watch.Event{
318+
Type: watch.Error,
319+
Object: &apierrors.NewInternalError(err).ErrStatus,
320+
})
321+
return
322+
}
323+
if !matches {
324+
// No match
325+
return
326+
}
321327
}
322328
}
323329

0 commit comments

Comments
 (0)