Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/syncer/syncertest/fake/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Cache) Start(ctx context.Context) error {
<-ctx.Done()

c.mux.RLock()
defer c.mux.RLock()
defer c.mux.RUnlock()
// Stop the informers
c.informerCtxCancel()

Expand All @@ -129,6 +129,14 @@ func (c *Cache) Start(ctx context.Context) error {
// WaitForCacheSync returns true when the cached informers are all synced.
// Returns false if the context is done first.
func (c *Cache) WaitForCacheSync(ctx context.Context) bool {
if !c.waitForStart(ctx) {
return false
}

return k8scache.WaitForCacheSync(ctx.Done(), c.HasSyncedFuncs()...)
}

func (c *Cache) waitForStart(ctx context.Context) bool {
select {
case <-c.startWait:
return true
Expand Down
65 changes: 61 additions & 4 deletions pkg/syncer/syncertest/fake/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/cli-utils/pkg/testutil"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -1041,10 +1042,66 @@ func (ms *MemoryStorage) Watch(_ context.Context, exampleList client.ObjectList,
klog.V(6).Infof("Watching %s (Options: %+v)",
kinds.ObjectSummary(exampleList), opts)
watcher := NewWatcher(ms.watchSupervisor, gvk.GroupKind(), exampleList, opts)
// TODO: Should Client.Watch's context.Done cancel the background stream or just the initial request?
// If yes, StartWatcher needs to take a context.
// client-go's FakeDynamicClient.Watch seems to just ignore the context, so that's what we're doing here too.
watcher.Start(context.Background())

if opts.Raw != nil && ptr.Deref(opts.Raw.SendInitialEvents, false) {
// Start the watcher before sending initial events, so it's ready to receive them.
watcher.Start(context.Background())

// Send initial events asynchronously to avoid blocking the Watch call.
go func() {
ms.lock.RLock()
defer ms.lock.RUnlock()

// Get the current set of objects
for _, obj := range ms.listObjects(gvk.GroupKind()) {
// Convert to a unstructured object with optional version conversion
uObj, err := kinds.ToUnstructuredWithVersion(obj, gvk, ms.scheme)
if err != nil {
klog.Errorf("Failed to convert object for initial watch events: %v", err)
continue
}
// Skip objects that don't match the ListOptions filters
ok, err := matchesListFilters(uObj, opts, ms.scheme)
if err != nil {
klog.Errorf("Failed to filter object for initial watch events: %v", err)
continue
}
if !ok {
continue
}
// Send synthetic Added event
watcher.inCh <- watch.Event{
Type: watch.Added,
Object: uObj,
}
}

// Send bookmark marking end of initial events.
// Reflector requires this to consider the watch synced when using SendInitialEvents.
bookmarkObj, err := kinds.NewObjectForGVK(gvk, ms.scheme)
if err != nil {
klog.Errorf("Failed to create bookmark object for initial watch events: %v", err)
return
}
cObj, err := kinds.ObjectAsClientObject(bookmarkObj)
if err != nil {
klog.Errorf("Failed to convert bookmark object to client object for initial watch events: %v", err)
return
}
// Use an empty ResourceVersion if none was specified
cObj.SetResourceVersion(opts.Raw.ResourceVersion)
core.SetAnnotation(cObj, metav1.InitialEventsAnnotationKey, "true")
watcher.inCh <- watch.Event{
Type: watch.Bookmark,
Object: cObj,
}
}()
} else {
// TODO: Should Client.Watch's context.Done cancel the background stream or just the initial request?
// If yes, StartWatcher needs to take a context.
// client-go's FakeDynamicClient.Watch seems to just ignore the context, so that's what we're doing here too.
watcher.Start(context.Background())
}
return watcher, nil
}

Expand Down
72 changes: 39 additions & 33 deletions pkg/syncer/syncertest/fake/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,30 +254,33 @@ func (fw *Watcher) handleEvents(ctx context.Context) {
// since those events are being sent on parallel goroutines.
if event.Type != watch.Error {
obj := event.Object.(client.Object)
// parse ResourceVersion as int
newRV, err := strconv.Atoi(obj.GetResourceVersion())
if err != nil {
err = fmt.Errorf("invalid ResourceVersion %q for object %s: %w", obj.GetResourceVersion(), kinds.ObjectSummary(obj), err)
fw.sendEvent(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(err).ErrStatus,
})
continue
}
uid := obj.GetUID()
if event.Type == watch.Modified {
oldRV := lastSeenVersions[uid]
if newRV <= oldRV {
// drop event - newer ResourceVersion already sent
klog.Warningf("Watcher.handleEvents: dropping event (old ResourceVersion): %s",
log.AsJSON(event))
rvStr := obj.GetResourceVersion()
if rvStr != "" {
Comment on lines +257 to +258

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Adding a check for an empty ResourceVersion string before attempting to parse it is a good defensive programming practice. This prevents strconv.Atoi from returning an error for valid empty strings, improving the robustness of the watcher.

Suggested change
rvStr := obj.GetResourceVersion()
if rvStr != "" {
rvStr := obj.GetResourceVersion()
if rvStr != "" {

// parse ResourceVersion as int
newRV, err := strconv.Atoi(rvStr)
if err != nil {
err = fmt.Errorf("invalid ResourceVersion %q for object %s: %w", rvStr, kinds.ObjectSummary(obj), err)
fw.sendEvent(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(err).ErrStatus,
})
continue
}
uid := obj.GetUID()
if event.Type == watch.Modified {
oldRV := lastSeenVersions[uid]
if newRV <= oldRV {
// drop event - newer ResourceVersion already sent
klog.Warningf("Watcher.handleEvents: dropping event (old ResourceVersion): %s",
log.AsJSON(event))
continue
}
}
// Store the last known ResourceVersion for future comparison.
// Store even if deleted, in case a modified event is received afterwards.
// TODO: Garbage collect uid entries after deletion (probably not necessary for unit tests)
lastSeenVersions[uid] = newRV
}
// Store the last known ResourceVersion for future comparison.
// Store even if deleted, in case a modified event is received afterwards.
// TODO: Garbage collect uid entries after deletion (probably not necessary for unit tests)
lastSeenVersions[uid] = newRV
}

// Input event received.
Expand Down Expand Up @@ -306,18 +309,21 @@ func (fw *Watcher) sendEvent(ctx context.Context, event watch.Event) {
}
event.Object = obj

// Check if input object matches list option filters
matches, err = matchesListFilters(event.Object, fw.options, fw.scheme)
if err != nil {
fw.sendEvent(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(err).ErrStatus,
})
return
}
if !matches {
// No match
return
// Check if input object matches list option filters.
// Skip for Bookmark events, as they may not match filters.
if event.Type != watch.Bookmark {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Skipping list filter validation for watch.Bookmark events is a necessary adjustment. Bookmark events often carry metadata rather than full object specifications, and forcing them through object filters could lead to incorrect filtering or errors. This change ensures that Bookmark events are handled appropriately without unintended side effects.

Suggested change
if event.Type != watch.Bookmark {
if event.Type != watch.Bookmark {

matches, err = matchesListFilters(event.Object, fw.options, fw.scheme)
if err != nil {
fw.sendEvent(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(err).ErrStatus,
})
return
}
if !matches {
// No match
return
}
}
}

Expand Down