diff --git a/pkg/syncer/syncertest/fake/cache.go b/pkg/syncer/syncertest/fake/cache.go index aa46cc1a04..92c37c5c91 100644 --- a/pkg/syncer/syncertest/fake/cache.go +++ b/pkg/syncer/syncertest/fake/cache.go @@ -119,7 +119,7 @@ func (c *Cache) Start(ctx context.Context) error { <-ctx.Done() c.mux.RLock() - defer c.mux.RLock() + defer c.mux.RUnlock() // Stop the informers c.informerCtxCancel() @@ -129,6 +129,14 @@ func (c *Cache) Start(ctx context.Context) error { // WaitForCacheSync returns true when the cached informers are all synced. // Returns false if the context is done first. func (c *Cache) WaitForCacheSync(ctx context.Context) bool { + if !c.waitForStart(ctx) { + return false + } + + return k8scache.WaitForCacheSync(ctx.Done(), c.HasSyncedFuncs()...) +} + +func (c *Cache) waitForStart(ctx context.Context) bool { select { case <-c.startWait: return true diff --git a/pkg/syncer/syncertest/fake/storage.go b/pkg/syncer/syncertest/fake/storage.go index e2c1fbee38..27ce44bfc8 100644 --- a/pkg/syncer/syncertest/fake/storage.go +++ b/pkg/syncer/syncertest/fake/storage.go @@ -43,6 +43,7 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/cli-utils/pkg/testutil" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -1041,10 +1042,66 @@ func (ms *MemoryStorage) Watch(_ context.Context, exampleList client.ObjectList, klog.V(6).Infof("Watching %s (Options: %+v)", kinds.ObjectSummary(exampleList), opts) watcher := NewWatcher(ms.watchSupervisor, gvk.GroupKind(), exampleList, opts) - // TODO: Should Client.Watch's context.Done cancel the background stream or just the initial request? - // If yes, StartWatcher needs to take a context. - // client-go's FakeDynamicClient.Watch seems to just ignore the context, so that's what we're doing here too. - watcher.Start(context.Background()) + + if opts.Raw != nil && ptr.Deref(opts.Raw.SendInitialEvents, false) { + // Start the watcher before sending initial events, so it's ready to receive them. + watcher.Start(context.Background()) + + // Send initial events asynchronously to avoid blocking the Watch call. + go func() { + ms.lock.RLock() + defer ms.lock.RUnlock() + + // Get the current set of objects + for _, obj := range ms.listObjects(gvk.GroupKind()) { + // Convert to a unstructured object with optional version conversion + uObj, err := kinds.ToUnstructuredWithVersion(obj, gvk, ms.scheme) + if err != nil { + klog.Errorf("Failed to convert object for initial watch events: %v", err) + continue + } + // Skip objects that don't match the ListOptions filters + ok, err := matchesListFilters(uObj, opts, ms.scheme) + if err != nil { + klog.Errorf("Failed to filter object for initial watch events: %v", err) + continue + } + if !ok { + continue + } + // Send synthetic Added event + watcher.inCh <- watch.Event{ + Type: watch.Added, + Object: uObj, + } + } + + // Send bookmark marking end of initial events. + // Reflector requires this to consider the watch synced when using SendInitialEvents. + bookmarkObj, err := kinds.NewObjectForGVK(gvk, ms.scheme) + if err != nil { + klog.Errorf("Failed to create bookmark object for initial watch events: %v", err) + return + } + cObj, err := kinds.ObjectAsClientObject(bookmarkObj) + if err != nil { + klog.Errorf("Failed to convert bookmark object to client object for initial watch events: %v", err) + return + } + // Use an empty ResourceVersion if none was specified + cObj.SetResourceVersion(opts.Raw.ResourceVersion) + core.SetAnnotation(cObj, metav1.InitialEventsAnnotationKey, "true") + watcher.inCh <- watch.Event{ + Type: watch.Bookmark, + Object: cObj, + } + }() + } else { + // TODO: Should Client.Watch's context.Done cancel the background stream or just the initial request? + // If yes, StartWatcher needs to take a context. + // client-go's FakeDynamicClient.Watch seems to just ignore the context, so that's what we're doing here too. + watcher.Start(context.Background()) + } return watcher, nil } diff --git a/pkg/syncer/syncertest/fake/watcher.go b/pkg/syncer/syncertest/fake/watcher.go index d6b41667b7..5393e847fe 100644 --- a/pkg/syncer/syncertest/fake/watcher.go +++ b/pkg/syncer/syncertest/fake/watcher.go @@ -254,30 +254,33 @@ func (fw *Watcher) handleEvents(ctx context.Context) { // since those events are being sent on parallel goroutines. if event.Type != watch.Error { obj := event.Object.(client.Object) - // parse ResourceVersion as int - newRV, err := strconv.Atoi(obj.GetResourceVersion()) - if err != nil { - err = fmt.Errorf("invalid ResourceVersion %q for object %s: %w", obj.GetResourceVersion(), kinds.ObjectSummary(obj), err) - fw.sendEvent(ctx, watch.Event{ - Type: watch.Error, - Object: &apierrors.NewInternalError(err).ErrStatus, - }) - continue - } - uid := obj.GetUID() - if event.Type == watch.Modified { - oldRV := lastSeenVersions[uid] - if newRV <= oldRV { - // drop event - newer ResourceVersion already sent - klog.Warningf("Watcher.handleEvents: dropping event (old ResourceVersion): %s", - log.AsJSON(event)) + rvStr := obj.GetResourceVersion() + if rvStr != "" { + // parse ResourceVersion as int + newRV, err := strconv.Atoi(rvStr) + if err != nil { + err = fmt.Errorf("invalid ResourceVersion %q for object %s: %w", rvStr, kinds.ObjectSummary(obj), err) + fw.sendEvent(ctx, watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(err).ErrStatus, + }) continue } + uid := obj.GetUID() + if event.Type == watch.Modified { + oldRV := lastSeenVersions[uid] + if newRV <= oldRV { + // drop event - newer ResourceVersion already sent + klog.Warningf("Watcher.handleEvents: dropping event (old ResourceVersion): %s", + log.AsJSON(event)) + continue + } + } + // Store the last known ResourceVersion for future comparison. + // Store even if deleted, in case a modified event is received afterwards. + // TODO: Garbage collect uid entries after deletion (probably not necessary for unit tests) + lastSeenVersions[uid] = newRV } - // Store the last known ResourceVersion for future comparison. - // Store even if deleted, in case a modified event is received afterwards. - // TODO: Garbage collect uid entries after deletion (probably not necessary for unit tests) - lastSeenVersions[uid] = newRV } // Input event received. @@ -306,18 +309,21 @@ func (fw *Watcher) sendEvent(ctx context.Context, event watch.Event) { } event.Object = obj - // Check if input object matches list option filters - matches, err = matchesListFilters(event.Object, fw.options, fw.scheme) - if err != nil { - fw.sendEvent(ctx, watch.Event{ - Type: watch.Error, - Object: &apierrors.NewInternalError(err).ErrStatus, - }) - return - } - if !matches { - // No match - return + // Check if input object matches list option filters. + // Skip for Bookmark events, as they may not match filters. + if event.Type != watch.Bookmark { + matches, err = matchesListFilters(event.Object, fw.options, fw.scheme) + if err != nil { + fw.sendEvent(ctx, watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(err).ErrStatus, + }) + return + } + if !matches { + // No match + return + } } }