From 6489f2616df41fc041c67682bcc06d1e7e94af71 Mon Sep 17 00:00:00 2001 From: Ankitsinghsisodya Date: Wed, 22 Apr 2026 00:45:35 +0530 Subject: [PATCH 1/6] feat: add disableCache flag to ApiServerSource adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a disableCache field to ApiServerSourceSpec that, when true, causes the adapter to skip the initial LIST call and only watch for new events. This reduces API server load from O(resources*namespaces) LIST requests to O(resources) Watch connections — critical for clusters with 1000+ namespaces (resolves knative/eventing#8642). The flag is propagated from spec → ReceiveAdapterArgs → K_SOURCE_CONFIG JSON → adapter Config. A new startWatchOnly mode handles the watch-only loop with reconnect on error. --- config/core/resources/apiserversource.yaml | 3 + pkg/adapter/apiserver/adapter.go | 66 ++++++++- pkg/adapter/apiserver/adapter_test.go | 136 ++++++++++++++++++ pkg/adapter/apiserver/config.go | 5 + pkg/apis/sources/v1/apiserver_types.go | 7 + .../apiserversource/apiserversource.go | 1 + .../resources/receive_adapter.go | 2 + .../resources/receive_adapter_test.go | 74 ++++++++++ 8 files changed, 291 insertions(+), 3 deletions(-) diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml index 2e118384d85..a092c9376b8 100644 --- a/config/core/resources/apiserversource.yaml +++ b/config/core/resources/apiserversource.yaml @@ -79,6 +79,9 @@ spec: description: Extensions specify what attribute are added or overridden on the outbound event. Each `Extensions` key-value pair are set on the event as an attribute extension independently. type: object x-kubernetes-preserve-unknown-fields: true + disableCache: + description: If true, the adapter skips the initial LIST call and only watches for new events. Reduces API server load when watching resources across many namespaces. Pre-existing objects will not emit events on adapter startup. + type: boolean mode: description: EventMode controls the format of the event. `Reference` sends a dataref event type for the resource under watch. `Resource` send the full resource lifecycle event. Defaults to `Reference` type: string diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 4606912988f..14922e77574 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -82,13 +82,20 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er return fmt.Errorf("failed to collect resource matches: %v", err) } - // we have two modes of operation for the ApiServerSource adapter: - // 1. Resilient Mode (Default): The adapter uses `reflector.Run()` to continuously retry establishing watches + // we have three modes of operation for the ApiServerSource adapter: + // 1. No-Cache Mode: The adapter skips the initial LIST call and only watches for new events. + // This reduces API server load significantly when watching across many namespaces. + // Pre-existing objects will not emit events on startup. + // 2. Resilient Mode (Default): The adapter uses `reflector.Run()` to continuously retry establishing watches // on resources, making it resilient to transient errors or delayed permission grants. - // 2. Fail-Fast Mode: In this mode, the adapter uses `reflector.ListAndWatchWithContext()`. If any resource watch + // 3. Fail-Fast Mode: In this mode, the adapter uses `reflector.ListAndWatchWithContext()`. If any resource watch // fails to be established on the first attempt, the entire adapter will fail immediately. This provides faster // feedback and a clearer failure state in environments where permissions are expected to be correct at startup. + if a.config.DisableCache { + a.logger.Info("Starting in no-cache mode. Initial LIST is skipped; only new events will be emitted.") + return a.startWatchOnly(ctx, stopCh, delegate, matches) + } if a.config.FailFast { a.logger.Info("Starting in fail-fast mode. Any single watch failure will stop the adapter.") return a.startFailFast(ctx, stopCh, delegate, matches) @@ -97,6 +104,59 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er return a.startResilient(ctx, stopCh, delegate, matches) } +// startWatchOnly starts watches without an initial LIST call, so pre-existing objects +// are not enumerated. This avoids the N*namespaces LIST requests that cause client-side +// throttling in large clusters. +func (a *apiServerAdapter) startWatchOnly(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error { + for _, match := range matches { + if match.apiResource == nil { + a.logger.Errorf("could not retrieve information about resource %s: it doesn't exist. skipping...", match.resourceWatch.GVR.String()) + continue + } + for _, res := range match.resourceInterfaces { + go func(ri dynamic.ResourceInterface, labelSelector string) { + for { + select { + case <-ctx.Done(): + return + default: + } + + lo := metav1.ListOptions{LabelSelector: labelSelector} + w, err := ri.Watch(ctx, lo) + if err != nil { + a.logger.Errorw("watch error, retrying", zap.Error(err)) + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + } + continue + } + + for event := range w.ResultChan() { + obj, ok := event.Object.(*unstructured.Unstructured) + if !ok { + continue + } + switch event.Type { + case watch.Added: + _ = delegate.Add(obj) + case watch.Modified: + _ = delegate.Update(obj) + case watch.Deleted: + _ = delegate.Delete(obj) + } + } + } + }(res, match.resourceWatch.LabelSelector) + } + } + + <-stopCh + return nil +} + func (a *apiServerAdapter) startResilient(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error { // Local stop channel. stop := make(chan struct{}) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index dc408a8aae6..52b40c4e4b1 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -380,3 +380,139 @@ func TestAdapter_FailFast(t *testing.T) { }) } } + +func TestAdapter_DisableCache(t *testing.T) { + ce := adaptertest.NewTestClient() + + config := Config{ + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{ + GVR: schema.GroupVersionResource{ + Version: "v1", + Resource: "pods", + }, + }}, + EventMode: "Resource", + DisableCache: true, + } + + ctx, _ := pkgtesting.SetupFakeContext(t) + + a := &apiServerAdapter{ + ce: ce, + logger: logging.FromContext(ctx), + config: config, + + discover: makeDiscoveryClient(), + k8s: makeDynamicClient(simplePod("foo", "default")), + source: "unit-test", + name: "unittest", + } + + ctx, cancel := context.WithCancel(ctx) + done := make(chan struct{}) + go func() { + defer close(done) + err := a.Start(ctx) + if err != nil { + t.Logf("Start returned error: %v", err) + } + }() + + // Give the watch goroutines time to start. + time.Sleep(500 * time.Millisecond) + + cancel() + <-done +} + +func TestAdapter_DisableCacheSkipsList(t *testing.T) { + ce := adaptertest.NewTestClient() + + listCalled := false + gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + + dynClient := makeDynamicClient(simplePod("existing-pod", "default")) + // Wrap the fake client to detect List calls. + trackedClient := &listTrackingClient{Interface: dynClient, gvr: gvr, listCalled: &listCalled} + + config := Config{ + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{GVR: gvr}}, + EventMode: "Resource", + DisableCache: true, + } + + ctx, _ := pkgtesting.SetupFakeContext(t) + ctx, cancel := context.WithCancel(ctx) + + a := &apiServerAdapter{ + ce: ce, + logger: logging.FromContext(ctx), + config: config, + discover: makeDiscoveryClient(), + k8s: trackedClient, + source: "unit-test", + name: "unittest", + } + + done := make(chan struct{}) + go func() { + defer close(done) + _ = a.Start(ctx) + }() + + time.Sleep(500 * time.Millisecond) + cancel() + <-done + + if listCalled { + t.Error("expected no LIST call when DisableCache=true, but List was called") + } +} + +// listTrackingClient wraps dynamic.Interface to detect List calls on a specific GVR. +type listTrackingClient struct { + dynamic.Interface + gvr schema.GroupVersionResource + listCalled *bool +} + +func (c *listTrackingClient) Resource(gvr schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + return &listTrackingResourceClient{ + NamespaceableResourceInterface: c.Interface.Resource(gvr), + gvr: gvr, + targetGVR: c.gvr, + listCalled: c.listCalled, + } +} + +type listTrackingResourceClient struct { + dynamic.NamespaceableResourceInterface + gvr schema.GroupVersionResource + targetGVR schema.GroupVersionResource + listCalled *bool +} + +func (r *listTrackingResourceClient) Namespace(ns string) dynamic.ResourceInterface { + return &listTrackingNamespacedClient{ + ResourceInterface: r.NamespaceableResourceInterface.Namespace(ns), + targetGVR: r.targetGVR, + gvr: r.gvr, + listCalled: r.listCalled, + } +} + +type listTrackingNamespacedClient struct { + dynamic.ResourceInterface + gvr schema.GroupVersionResource + targetGVR schema.GroupVersionResource + listCalled *bool +} + +func (r *listTrackingNamespacedClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { + if r.gvr == r.targetGVR { + *r.listCalled = true + } + return r.ResourceInterface.List(ctx, opts) +} diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go index 58831190a53..3d423d246e2 100644 --- a/pkg/adapter/apiserver/config.go +++ b/pkg/adapter/apiserver/config.go @@ -74,4 +74,9 @@ type Config struct { // (via the features.knative.dev/apiserversource-skip-permissions-check annotation), and the ApiServerSource // adapter should not keep trying to establish watches on resources that it perhaps does not have permissions for. FailFast bool `json:"failFast,omitempty"` + + // DisableCache if true, the adapter skips the initial LIST call and only watches for new events. + // This reduces API server load when watching resources across many namespaces. + // Pre-existing objects will not emit events on adapter startup. + DisableCache bool `json:"disableCache,omitempty"` } diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go index 7510771f040..17cc5ad2c4f 100644 --- a/pkg/apis/sources/v1/apiserver_types.go +++ b/pkg/apis/sources/v1/apiserver_types.go @@ -96,6 +96,13 @@ type ApiServerSourceSpec struct { // // +optional Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` + + // DisableCache if true, the adapter skips the initial LIST call and only + // watches for new events. This significantly reduces API server load when + // watching resources across many namespaces. Note: pre-existing objects + // will not emit events on adapter startup. + // +optional + DisableCache bool `json:"disableCache,omitempty"` } // ApiServerSourceStatus defines the observed state of ApiServerSource diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index 2d87187c4ea..d47202917d7 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -252,6 +252,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer AllNamespaces: allNamespaces, NodeSelector: featureFlags.NodeSelector(), FailFast: skipPermissions == "true", + DisableCache: src.Spec.DisableCache, } expected, err := resources.MakeReceiveAdapter(&adapterArgs) diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index b9af0d94d4a..baf012ba41d 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -51,6 +51,7 @@ type ReceiveAdapterArgs struct { AllNamespaces bool NodeSelector map[string]string FailFast bool + DisableCache bool } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for @@ -140,6 +141,7 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) { AllNamespaces: args.AllNamespaces, Filters: args.Source.Spec.Filters, FailFast: args.FailFast, + DisableCache: args.DisableCache, } for _, r := range args.Source.Spec.Resources { diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 70a7f0058de..148a8814f08 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -332,3 +332,77 @@ Test certificate content here }) } } + +func TestMakeReceiveAdapterWithDisableCache(t *testing.T) { + name := "source-name" + + tests := []struct { + name string + disableCache bool + expectedConfig string + }{ + { + name: "DisableCache true", + disableCache: true, + expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource","disableCache":true}`, + }, + { + name: "DisableCache false", + disableCache: false, + expectedConfig: `{"namespaces":["source-namespace"],"allNamespaces":false,"resources":[{"gvr":{"Group":"","Version":"","Resource":"namespaces"}}],"mode":"Resource"}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + src := &v1.ApiServerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "source-namespace", + UID: "1234", + }, + Spec: v1.ApiServerSourceSpec{ + Resources: []v1.APIVersionKindSelector{{ + APIVersion: "", + Kind: "Namespace", + }}, + EventMode: "Resource", + ServiceAccountName: "source-svc-acct", + DisableCache: tt.disableCache, + }, + } + + got, err := MakeReceiveAdapter(&ReceiveAdapterArgs{ + Image: "test-image", + Source: src, + Labels: map[string]string{"test-key": "test-value"}, + SinkURI: "sink-uri", + Configs: &source.EmptyVarsGenerator{}, + Namespaces: []string{"source-namespace"}, + DisableCache: tt.disableCache, + }) + + if err != nil { + t.Fatalf("MakeReceiveAdapter() error = %v", err) + } + + var sourceConfigValue string + for _, container := range got.Spec.Template.Spec.Containers { + for _, env := range container.Env { + if env.Name == "K_SOURCE_CONFIG" { + sourceConfigValue = env.Value + break + } + } + } + + if sourceConfigValue == "" { + t.Fatal("K_SOURCE_CONFIG environment variable not found") + } + + if sourceConfigValue != tt.expectedConfig { + t.Errorf("K_SOURCE_CONFIG value mismatch:\nexpected: %s\ngot: %s", tt.expectedConfig, sourceConfigValue) + } + }) + } +} From c79388856fcc74aa055172f0f5558df5917fb113 Mon Sep 17 00:00:00 2001 From: Ankitsinghsisodya Date: Wed, 22 Apr 2026 00:59:10 +0530 Subject: [PATCH 2/6] fix: correct disableCache watch semantics and backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback on #9036: - Fix Watch(rv="") replay bug: perform a lightweight LIST(limit=1) before each Watch to obtain the current resourceVersion, so Watch starts from that point and does not replay synthetic ADDED events for pre-existing objects (which is what rv="" triggers per the Kubernetes API contract). - Handle watch.Error 410 Gone: detect StatusReasonGone in drainWatchEvents and return so watchResourceLoop re-lists for a fresh resourceVersion, breaking the infinite tight-retry loop that would otherwise occur. - Replace hardcoded 5s retry with exponential backoff+jitter (wait.Backoff, 1s→60s cap) via watchResourceLoop, preventing thundering herd on reconnect. - Log warning when both DisableCache and FailFast are set, documenting that DisableCache takes precedence. - Replace TestAdapter_DisableCacheSkipsList (wrong: asserted no LIST was called) with TestAdapter_DisableCacheLightweightList (correct: asserts LIST is called with Limit=1). Add TestAdapter_DisableCacheEventDelivery which injects a watch event via a fake watcher and asserts a CloudEvent is delivered to the sink client. --- pkg/adapter/apiserver/adapter.go | 149 +++++++++++++++++------ pkg/adapter/apiserver/adapter_test.go | 166 +++++++++++++++++++++----- 2 files changed, 243 insertions(+), 72 deletions(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 14922e77574..9d1dce4998c 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -19,6 +19,7 @@ package apiserver import ( "context" "fmt" + "math" "sync" "time" @@ -28,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" @@ -104,57 +106,126 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er return a.startResilient(ctx, stopCh, delegate, matches) } -// startWatchOnly starts watches without an initial LIST call, so pre-existing objects -// are not enumerated. This avoids the N*namespaces LIST requests that cause client-side -// throttling in large clusters. +// startWatchOnly starts watches for all matched resources. It performs a lightweight +// LIST (limit=1) per resource interface to obtain the current resourceVersion, then +// issues a Watch from that point. This means pre-existing objects do not produce +// events on startup, and startup API load is O(resources*namespaces) lightweight +// LISTs rather than full object dumps. +// +// When both DisableCache and FailFast are set, DisableCache takes precedence. func (a *apiServerAdapter) startWatchOnly(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error { + if a.config.FailFast { + a.logger.Warn("disableCache=true takes precedence over failFast=true; running in no-cache mode without fail-fast behavior") + } for _, match := range matches { if match.apiResource == nil { a.logger.Errorf("could not retrieve information about resource %s: it doesn't exist. skipping...", match.resourceWatch.GVR.String()) continue } for _, res := range match.resourceInterfaces { - go func(ri dynamic.ResourceInterface, labelSelector string) { - for { - select { - case <-ctx.Done(): - return - default: - } + go a.watchResourceLoop(ctx, res, match.resourceWatch.LabelSelector, delegate) + } + } + <-stopCh + return nil +} - lo := metav1.ListOptions{LabelSelector: labelSelector} - w, err := ri.Watch(ctx, lo) - if err != nil { - a.logger.Errorw("watch error, retrying", zap.Error(err)) - select { - case <-ctx.Done(): - return - case <-time.After(5 * time.Second): - } - continue - } +func noCacheBackoff() wait.Backoff { + return wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Jitter: 0.1, + Steps: math.MaxInt32, + Cap: 60 * time.Second, + } +} - for event := range w.ResultChan() { - obj, ok := event.Object.(*unstructured.Unstructured) - if !ok { - continue - } - switch event.Type { - case watch.Added: - _ = delegate.Add(obj) - case watch.Modified: - _ = delegate.Update(obj) - case watch.Deleted: - _ = delegate.Delete(obj) - } - } - } - }(res, match.resourceWatch.LabelSelector) +// watchResourceLoop runs a continuous List+Watch loop for a single resource interface. +// A lightweight LIST (limit=1) is issued before each Watch to obtain the current +// resourceVersion so that Watch does not replay synthetic ADDED events for +// pre-existing objects (the behaviour when resourceVersion="" is passed to Watch). +// On watch.Error with StatusReasonGone (HTTP 410 — resourceVersion expired from the +// watch cache), the loop re-lists to get a fresh resourceVersion. +// All failures back off exponentially with jitter before retrying. +func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.ResourceInterface, labelSelector string, delegate cache.Store) { + backoff := noCacheBackoff() + for ctx.Err() == nil { + list, err := ri.List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + Limit: 1, + }) + if err != nil { + if ctx.Err() != nil { + return + } + a.logger.Errorw("failed to list for resourceVersion, retrying", zap.Error(err)) + select { + case <-ctx.Done(): + return + case <-time.After(backoff.Step()): + } + continue } + rv := list.GetResourceVersion() + backoff = noCacheBackoff() + + w, err := ri.Watch(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + ResourceVersion: rv, + }) + if err != nil { + if ctx.Err() != nil { + return + } + a.logger.Errorw("watch error, retrying", zap.Error(err)) + select { + case <-ctx.Done(): + return + case <-time.After(backoff.Step()): + } + continue + } + backoff = noCacheBackoff() + a.drainWatchEvents(ctx, w, delegate) } +} - <-stopCh - return nil +// drainWatchEvents reads from a watch until the context is done, the watch +// channel closes, or a watch.Error event is received. On StatusReasonGone +// (HTTP 410), it returns immediately so watchResourceLoop can re-list. +func (a *apiServerAdapter) drainWatchEvents(ctx context.Context, w watch.Interface, delegate cache.Store) { + defer w.Stop() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-w.ResultChan(): + if !ok { + return + } + switch event.Type { + case watch.Added: + if obj, ok := event.Object.(*unstructured.Unstructured); ok { + _ = delegate.Add(obj) + } + case watch.Modified: + if obj, ok := event.Object.(*unstructured.Unstructured); ok { + _ = delegate.Update(obj) + } + case watch.Deleted: + if obj, ok := event.Object.(*unstructured.Unstructured); ok { + _ = delegate.Delete(obj) + } + case watch.Error: + if status, ok := event.Object.(*metav1.Status); ok && status.Reason == metav1.StatusReasonGone { + a.logger.Info("watch resourceVersion expired (410 Gone), will re-list") + } else { + a.logger.Errorw("received watch error event", zap.Any("object", event.Object)) + } + return + } + } + } } func (a *apiServerAdapter) startResilient(ctx context.Context, stopCh <-chan struct{}, delegate cache.Store, matches []resourceWatchMatch) error { diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 52b40c4e4b1..bdb2c1b4da6 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -18,6 +18,7 @@ package apiserver import ( "context" + "sync" "testing" "time" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" discoveryfake "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic" @@ -426,20 +428,20 @@ func TestAdapter_DisableCache(t *testing.T) { <-done } -func TestAdapter_DisableCacheSkipsList(t *testing.T) { - ce := adaptertest.NewTestClient() - - listCalled := false +// TestAdapter_DisableCacheLightweightList verifies that DisableCache=true uses a +// lightweight LIST (Limit=1) to obtain the current resourceVersion before watching, +// rather than skipping LIST entirely (which would cause rv="" and replay all existing objects). +func TestAdapter_DisableCacheLightweightList(t *testing.T) { gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + tracker := &listOptionsTracker{} dynClient := makeDynamicClient(simplePod("existing-pod", "default")) - // Wrap the fake client to detect List calls. - trackedClient := &listTrackingClient{Interface: dynClient, gvr: gvr, listCalled: &listCalled} + trackedClient := &listOptsTrackingClient{Interface: dynClient, gvr: gvr, tracker: tracker} config := Config{ - Namespaces: []string{"default"}, - Resources: []ResourceWatch{{GVR: gvr}}, - EventMode: "Resource", + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{GVR: gvr}}, + EventMode: "Resource", DisableCache: true, } @@ -447,7 +449,7 @@ func TestAdapter_DisableCacheSkipsList(t *testing.T) { ctx, cancel := context.WithCancel(ctx) a := &apiServerAdapter{ - ce: ce, + ce: adaptertest.NewTestClient(), logger: logging.FromContext(ctx), config: config, discover: makeDiscoveryClient(), @@ -466,53 +468,151 @@ func TestAdapter_DisableCacheSkipsList(t *testing.T) { cancel() <-done - if listCalled { - t.Error("expected no LIST call when DisableCache=true, but List was called") + opts := tracker.get() + if len(opts) == 0 { + t.Fatal("expected at least one LIST call to obtain resourceVersion, got none") + } + for _, o := range opts { + if o.Limit != 1 { + t.Errorf("expected LIST with Limit=1 (lightweight), got Limit=%d", o.Limit) + } } } -// listTrackingClient wraps dynamic.Interface to detect List calls on a specific GVR. -type listTrackingClient struct { +// TestAdapter_DisableCacheEventDelivery verifies that watch events are converted +// to CloudEvents and delivered to the sink when DisableCache=true. +func TestAdapter_DisableCacheEventDelivery(t *testing.T) { + testCE := adaptertest.NewTestClient() + gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + + fakeWatcher := watch.NewRaceFreeFake() + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + dynClient := dynamicfake.NewSimpleDynamicClient(scheme, simplePod("existing-pod", "default")) + dynClient.PrependWatchReactor("pods", func(_ kubetesting.Action) (bool, watch.Interface, error) { + return true, fakeWatcher, nil + }) + + config := Config{ + Namespaces: []string{"default"}, + Resources: []ResourceWatch{{GVR: gvr}}, + EventMode: "Resource", + DisableCache: true, + } + + ctx, _ := pkgtesting.SetupFakeContext(t) + ctx, cancel := context.WithCancel(ctx) + + a := &apiServerAdapter{ + ce: testCE, + logger: logging.FromContext(ctx), + config: config, + discover: makeDiscoveryClient(), + k8s: dynClient, + source: "unit-test", + name: "unittest", + namespace: "default", + } + + done := make(chan struct{}) + go func() { + defer close(done) + _ = a.Start(ctx) + }() + + // Wait for the adapter to reach the watch. + time.Sleep(200 * time.Millisecond) + + // Inject an ADDED event as *unstructured.Unstructured (the type the adapter expects). + newPod := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "new-pod", + "namespace": "default", + "resourceVersion": "12345", + }, + }, + } + fakeWatcher.Add(newPod) + + // Poll until at least one CloudEvent is sent (or timeout). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && len(testCE.Sent()) == 0 { + time.Sleep(50 * time.Millisecond) + } + + cancel() + <-done + + if len(testCE.Sent()) == 0 { + t.Error("expected at least one CloudEvent to be sent, but none were received") + } +} + +// listOptionsTracker records ListOptions from List calls for later assertion. +type listOptionsTracker struct { + mu sync.Mutex + options []metav1.ListOptions +} + +func (t *listOptionsTracker) record(opts metav1.ListOptions) { + t.mu.Lock() + defer t.mu.Unlock() + t.options = append(t.options, opts) +} + +func (t *listOptionsTracker) get() []metav1.ListOptions { + t.mu.Lock() + defer t.mu.Unlock() + result := make([]metav1.ListOptions, len(t.options)) + copy(result, t.options) + return result +} + +// listOptsTrackingClient wraps dynamic.Interface to record List calls for a specific GVR. +type listOptsTrackingClient struct { dynamic.Interface - gvr schema.GroupVersionResource - listCalled *bool + gvr schema.GroupVersionResource + tracker *listOptionsTracker } -func (c *listTrackingClient) Resource(gvr schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { - return &listTrackingResourceClient{ +func (c *listOptsTrackingClient) Resource(gvr schema.GroupVersionResource) dynamic.NamespaceableResourceInterface { + return &listOptsTrackingResourceClient{ NamespaceableResourceInterface: c.Interface.Resource(gvr), gvr: gvr, targetGVR: c.gvr, - listCalled: c.listCalled, + tracker: c.tracker, } } -type listTrackingResourceClient struct { +type listOptsTrackingResourceClient struct { dynamic.NamespaceableResourceInterface - gvr schema.GroupVersionResource - targetGVR schema.GroupVersionResource - listCalled *bool + gvr schema.GroupVersionResource + targetGVR schema.GroupVersionResource + tracker *listOptionsTracker } -func (r *listTrackingResourceClient) Namespace(ns string) dynamic.ResourceInterface { - return &listTrackingNamespacedClient{ +func (r *listOptsTrackingResourceClient) Namespace(ns string) dynamic.ResourceInterface { + return &listOptsTrackingNamespacedClient{ ResourceInterface: r.NamespaceableResourceInterface.Namespace(ns), - targetGVR: r.targetGVR, gvr: r.gvr, - listCalled: r.listCalled, + targetGVR: r.targetGVR, + tracker: r.tracker, } } -type listTrackingNamespacedClient struct { +type listOptsTrackingNamespacedClient struct { dynamic.ResourceInterface - gvr schema.GroupVersionResource - targetGVR schema.GroupVersionResource - listCalled *bool + gvr schema.GroupVersionResource + targetGVR schema.GroupVersionResource + tracker *listOptionsTracker } -func (r *listTrackingNamespacedClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { +func (r *listOptsTrackingNamespacedClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { if r.gvr == r.targetGVR { - *r.listCalled = true + r.tracker.record(opts) } return r.ResourceInterface.List(ctx, opts) } From 3999a9866867f62d4992f9613b9a4021ca7d6ffa Mon Sep 17 00:00:00 2001 From: Ankitsinghsisodya Date: Wed, 22 Apr 2026 02:35:31 +0530 Subject: [PATCH 3/6] fix(apiserversource): address review feedback on disableCache implementation - Remove DisableCache from stable v1 spec; move to annotation-driven pattern matching FailFast (features.knative.dev/apiserversource-disable-cache) - Export DisableCacheAnnotation and SkipPermissionsAnnotation as typed constants in apiserver_types.go; reconciler uses them - Validate at admission: reject when both disable-cache and skip-permissions annotations are set simultaneously - Fix goroutine leak in startWatchOnly: add sync.WaitGroup + unified watchCtx derived from both ctx and stopCh signals (S2/M5) - Log errors from delegate.Add/Update/Delete instead of silently discarding with _ (S3) - Add TimeoutSeconds=5min to all Watch calls to force periodic reconnection on stale streams (M1) - Replace time.After with time.NewTimer+Stop to eliminate timer leak in watchResourceLoop backoff select (M3) --- config/core/resources/apiserversource.yaml | 3 -- pkg/adapter/apiserver/adapter.go | 44 ++++++++++++++--- pkg/apis/sources/v1/apiserver_types.go | 14 +++--- pkg/apis/sources/v1/apiserver_validation.go | 9 +++- .../sources/v1/apiserver_validation_test.go | 47 +++++++++++++++++++ .../apiserversource/apiserversource.go | 9 ++-- .../resources/receive_adapter_test.go | 4 +- 7 files changed, 107 insertions(+), 23 deletions(-) diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml index a092c9376b8..2e118384d85 100644 --- a/config/core/resources/apiserversource.yaml +++ b/config/core/resources/apiserversource.yaml @@ -79,9 +79,6 @@ spec: description: Extensions specify what attribute are added or overridden on the outbound event. Each `Extensions` key-value pair are set on the event as an attribute extension independently. type: object x-kubernetes-preserve-unknown-fields: true - disableCache: - description: If true, the adapter skips the initial LIST call and only watches for new events. Reduces API server load when watching resources across many namespaces. Pre-existing objects will not emit events on adapter startup. - type: boolean mode: description: EventMode controls the format of the event. `Reference` sends a dataref event type for the resource under watch. `Resource` send the full resource lifecycle event. Defaults to `Reference` type: string diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 9d1dce4998c..cd397affe1d 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -117,16 +117,31 @@ func (a *apiServerAdapter) startWatchOnly(ctx context.Context, stopCh <-chan str if a.config.FailFast { a.logger.Warn("disableCache=true takes precedence over failFast=true; running in no-cache mode without fail-fast behavior") } + + watchCtx, cancelWatchers := context.WithCancel(ctx) + defer cancelWatchers() + + var wg sync.WaitGroup for _, match := range matches { if match.apiResource == nil { a.logger.Errorf("could not retrieve information about resource %s: it doesn't exist. skipping...", match.resourceWatch.GVR.String()) continue } for _, res := range match.resourceInterfaces { - go a.watchResourceLoop(ctx, res, match.resourceWatch.LabelSelector, delegate) + wg.Add(1) + go func() { + defer wg.Done() + a.watchResourceLoop(watchCtx, res, match.resourceWatch.LabelSelector, delegate) + }() } } - <-stopCh + + select { + case <-stopCh: + case <-ctx.Done(): + } + cancelWatchers() + wg.Wait() return nil } @@ -140,6 +155,10 @@ func noCacheBackoff() wait.Backoff { } } +// noCacheWatchTimeout is the server-side timeout for each Watch call in no-cache mode. +// Forces periodic reconnection so stale connections don't persist indefinitely. +var noCacheWatchTimeout int64 = 5 * 60 + // watchResourceLoop runs a continuous List+Watch loop for a single resource interface. // A lightweight LIST (limit=1) is issued before each Watch to obtain the current // resourceVersion so that Watch does not replay synthetic ADDED events for @@ -159,10 +178,12 @@ func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.Res return } a.logger.Errorw("failed to list for resourceVersion, retrying", zap.Error(err)) + t := time.NewTimer(backoff.Step()) select { case <-ctx.Done(): + t.Stop() return - case <-time.After(backoff.Step()): + case <-t.C: } continue } @@ -172,16 +193,19 @@ func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.Res w, err := ri.Watch(ctx, metav1.ListOptions{ LabelSelector: labelSelector, ResourceVersion: rv, + TimeoutSeconds: &noCacheWatchTimeout, }) if err != nil { if ctx.Err() != nil { return } a.logger.Errorw("watch error, retrying", zap.Error(err)) + t := time.NewTimer(backoff.Step()) select { case <-ctx.Done(): + t.Stop() return - case <-time.After(backoff.Step()): + case <-t.C: } continue } @@ -206,15 +230,21 @@ func (a *apiServerAdapter) drainWatchEvents(ctx context.Context, w watch.Interfa switch event.Type { case watch.Added: if obj, ok := event.Object.(*unstructured.Unstructured); ok { - _ = delegate.Add(obj) + if err := delegate.Add(obj); err != nil { + a.logger.Errorw("failed to add object to delegate store", zap.Error(err)) + } } case watch.Modified: if obj, ok := event.Object.(*unstructured.Unstructured); ok { - _ = delegate.Update(obj) + if err := delegate.Update(obj); err != nil { + a.logger.Errorw("failed to update object in delegate store", zap.Error(err)) + } } case watch.Deleted: if obj, ok := event.Object.(*unstructured.Unstructured); ok { - _ = delegate.Delete(obj) + if err := delegate.Delete(obj); err != nil { + a.logger.Errorw("failed to delete object from delegate store", zap.Error(err)) + } } case watch.Error: if status, ok := event.Object.(*metav1.Status); ok && status.Reason == metav1.StatusReasonGone { diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go index 17cc5ad2c4f..43b8977d60a 100644 --- a/pkg/apis/sources/v1/apiserver_types.go +++ b/pkg/apis/sources/v1/apiserver_types.go @@ -25,6 +25,14 @@ import ( "knative.dev/pkg/kmeta" ) +const ( + // DisableCacheAnnotation controls whether the adapter skips the initial LIST + // and only watches for new events, reducing API server load in large clusters. + DisableCacheAnnotation = "features.knative.dev/apiserversource-disable-cache" + // SkipPermissionsAnnotation controls whether the adapter skips permission checks on startup. + SkipPermissionsAnnotation = "features.knative.dev/apiserversource-skip-permissions-check" +) + // +genclient // +genreconciler // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -97,12 +105,6 @@ type ApiServerSourceSpec struct { // +optional Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` - // DisableCache if true, the adapter skips the initial LIST call and only - // watches for new events. This significantly reduces API server load when - // watching resources across many namespaces. Note: pre-existing objects - // will not emit events on adapter startup. - // +optional - DisableCache bool `json:"disableCache,omitempty"` } // ApiServerSourceStatus defines the observed state of ApiServerSource diff --git a/pkg/apis/sources/v1/apiserver_validation.go b/pkg/apis/sources/v1/apiserver_validation.go index 3d18b12737e..7b854ace056 100644 --- a/pkg/apis/sources/v1/apiserver_validation.go +++ b/pkg/apis/sources/v1/apiserver_validation.go @@ -35,7 +35,14 @@ const ( ) func (c *ApiServerSource) Validate(ctx context.Context) *apis.FieldError { - return c.Spec.Validate(ctx).ViaField("spec") + errs := c.Spec.Validate(ctx).ViaField("spec") + if c.Annotations[DisableCacheAnnotation] == "true" && c.Annotations[SkipPermissionsAnnotation] == "true" { + errs = errs.Also(apis.ErrGeneric( + DisableCacheAnnotation+" and "+SkipPermissionsAnnotation+" are mutually exclusive", + "metadata.annotations", + )) + } + return errs } func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError { diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go index 883a04a6e9a..7667feb4657 100644 --- a/pkg/apis/sources/v1/apiserver_validation_test.go +++ b/pkg/apis/sources/v1/apiserver_validation_test.go @@ -269,6 +269,53 @@ func TestAPIServerValidationCallsSpecValidation(t *testing.T) { assert.EqualError(t, err, "missing field(s): spec.resources", "Spec is not validated!") } +func TestAPIServerDisableCacheAnnotationValidation(t *testing.T) { + validSrc := func() *ApiServerSource { + return &ApiServerSource{ + Spec: ApiServerSourceSpec{ + EventMode: "Resource", + Resources: []APIVersionKindSelector{{APIVersion: "v1", Kind: "Pod"}}, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{APIVersion: "v1", Kind: "Service", Name: "svc"}, + }, + }, + }, + } + } + + t.Run("both annotations set is rejected", func(t *testing.T) { + src := validSrc() + src.Annotations = map[string]string{ + DisableCacheAnnotation: "true", + SkipPermissionsAnnotation: "true", + } + err := src.Validate(context.TODO()) + if err == nil { + t.Fatal("expected validation error for conflicting annotations, got nil") + } + if !errors.Is(err, err) { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("only disable-cache annotation is valid", func(t *testing.T) { + src := validSrc() + src.Annotations = map[string]string{DisableCacheAnnotation: "true"} + if err := src.Validate(context.TODO()); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + + t.Run("only skip-permissions annotation is valid", func(t *testing.T) { + src := validSrc() + src.Annotations = map[string]string{SkipPermissionsAnnotation: "true"} + if err := src.Validate(context.TODO()); err != nil { + t.Errorf("unexpected error: %v", err) + } + }) +} + func TestAPIServerFiltersValidation(t *testing.T) { tests := []struct { name string diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index d47202917d7..f4ce648b999 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -60,8 +60,7 @@ const ( apiserversourceDeploymentCreated = "ApiServerSourceDeploymentCreated" apiserversourceDeploymentUpdated = "ApiServerSourceDeploymentUpdated" - component = "apiserversource" - skipPermissionsAnnotation = "features.knative.dev/apiserversource-skip-permissions-check" + component = "apiserversource" ) func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event { @@ -150,7 +149,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour // We don't check if it really exists because in case it does not exist, the value is an empty string // which also serves our purposes as by default we will check permissions annotations := source.GetAnnotations() - skipPermissions := annotations[skipPermissionsAnnotation] + skipPermissions := annotations[v1.SkipPermissionsAnnotation] if skipPermissions == "true" { // If skip permissions, mark enough permissions directly source.Status.MarkSufficientPermissions() @@ -236,7 +235,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer // } annotations := src.GetAnnotations() - skipPermissions := annotations[skipPermissionsAnnotation] + skipPermissions := annotations[v1.SkipPermissionsAnnotation] featureFlags := feature.FromContext(ctx) @@ -252,7 +251,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer AllNamespaces: allNamespaces, NodeSelector: featureFlags.NodeSelector(), FailFast: skipPermissions == "true", - DisableCache: src.Spec.DisableCache, + DisableCache: src.Annotations[v1.DisableCacheAnnotation] == "true", } expected, err := resources.MakeReceiveAdapter(&adapterArgs) diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 148a8814f08..8f7099f13f7 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -175,6 +175,9 @@ O2dgzikq8iSy1BlRsVw= }, { Name: source.EnvLoggingCfg, Value: "", + }, { + Name: source.EnvKlogVerbosity, + Value: "", }, { Name: source.EnvObservabilityCfg, Value: "", @@ -368,7 +371,6 @@ func TestMakeReceiveAdapterWithDisableCache(t *testing.T) { }}, EventMode: "Resource", ServiceAccountName: "source-svc-acct", - DisableCache: tt.disableCache, }, } From a9f767329414f6cfe932810e75a4e9b20bc07b18 Mon Sep 17 00:00:00 2001 From: Ankitsinghsisodya Date: Wed, 22 Apr 2026 02:46:42 +0530 Subject: [PATCH 4/6] fix(style): remove stray EnvKlogVerbosity ref and trailing blank line - Remove source.EnvKlogVerbosity from receive_adapter_test.go; constant belongs to feat/klog-verbosity and was accidentally merged via stash - Remove trailing blank line in ApiServerSourceSpec struct body to satisfy goimports check --- pkg/apis/sources/v1/apiserver_types.go | 1 - .../apiserversource/resources/receive_adapter_test.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go index 43b8977d60a..84c10e5fa0e 100644 --- a/pkg/apis/sources/v1/apiserver_types.go +++ b/pkg/apis/sources/v1/apiserver_types.go @@ -104,7 +104,6 @@ type ApiServerSourceSpec struct { // // +optional Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` - } // ApiServerSourceStatus defines the observed state of ApiServerSource diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 8f7099f13f7..1923995957a 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -175,9 +175,6 @@ O2dgzikq8iSy1BlRsVw= }, { Name: source.EnvLoggingCfg, Value: "", - }, { - Name: source.EnvKlogVerbosity, - Value: "", }, { Name: source.EnvObservabilityCfg, Value: "", From e4ad531997b8480783b023f28ed89913a65e8448 Mon Sep 17 00:00:00 2001 From: Ankitsinghsisodya Date: Wed, 22 Apr 2026 02:57:55 +0530 Subject: [PATCH 5/6] fix(apiserversource): fix backoff reset and dead test assertion Backoff was reset on every successful LIST and Watch creation, so LIST-ok -> Watch-fail could hammer the API server at 1 req/s forever without ever reaching the 60s cap. Now resets only when a watch survives >30s; short-lived drains incur a backoff step so normal watch-close does not spin hot. Test assertion errors.Is(err, err) was always true; replaced with checks that error contains "mutually exclusive" and path "metadata.annotations". --- pkg/adapter/apiserver/adapter.go | 16 ++++++++++++++-- pkg/apis/sources/v1/apiserver_validation_test.go | 9 +++++++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index cd397affe1d..b88be6a8449 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -188,7 +188,6 @@ func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.Res continue } rv := list.GetResourceVersion() - backoff = noCacheBackoff() w, err := ri.Watch(ctx, metav1.ListOptions{ LabelSelector: labelSelector, @@ -209,8 +208,21 @@ func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.Res } continue } - backoff = noCacheBackoff() + + watchStart := time.Now() a.drainWatchEvents(ctx, w, delegate) + if time.Since(watchStart) > 30*time.Second { + // Watch was long-lived; treat connection as stable and reset backoff. + backoff = noCacheBackoff() + } else { + t := time.NewTimer(backoff.Step()) + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + } + } } } diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go index 7667feb4657..aeed47f3196 100644 --- a/pkg/apis/sources/v1/apiserver_validation_test.go +++ b/pkg/apis/sources/v1/apiserver_validation_test.go @@ -19,6 +19,7 @@ package v1 import ( "context" "errors" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -294,8 +295,12 @@ func TestAPIServerDisableCacheAnnotationValidation(t *testing.T) { if err == nil { t.Fatal("expected validation error for conflicting annotations, got nil") } - if !errors.Is(err, err) { - t.Errorf("unexpected error: %v", err) + errStr := err.Error() + if !strings.Contains(errStr, "mutually exclusive") { + t.Errorf("error missing 'mutually exclusive': %v", err) + } + if !strings.Contains(errStr, "metadata.annotations") { + t.Errorf("error missing field path 'metadata.annotations': %v", err) } }) From d8067fae68ae5af528b22676abeaf68130c2cde0 Mon Sep 17 00:00:00 2001 From: Ankitsinghsisodya Date: Wed, 22 Apr 2026 03:19:55 +0530 Subject: [PATCH 6/6] fix(apiserversource): address strict PR review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove DisableCache/SkipPermissions mutual exclusion — semantically orthogonal; dead branch in startWatchOnly becomes live again - Guard rv=="" after LIST to prevent watch cache replay on empty resourceVersion - Replace package-level noCacheWatchTimeout var with call-site local to eliminate cross-goroutine mutation footgun - Fix flaky tests: remove time.Sleep in favour of polling loops and pre-buffered fake watcher events --- pkg/adapter/apiserver/adapter.go | 18 ++++++++--- pkg/adapter/apiserver/adapter_test.go | 31 ++++++++++--------- pkg/apis/sources/v1/apiserver_validation.go | 9 +----- .../sources/v1/apiserver_validation_test.go | 15 ++------- 4 files changed, 34 insertions(+), 39 deletions(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index b88be6a8449..afb068b40d9 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -155,10 +155,6 @@ func noCacheBackoff() wait.Backoff { } } -// noCacheWatchTimeout is the server-side timeout for each Watch call in no-cache mode. -// Forces periodic reconnection so stale connections don't persist indefinitely. -var noCacheWatchTimeout int64 = 5 * 60 - // watchResourceLoop runs a continuous List+Watch loop for a single resource interface. // A lightweight LIST (limit=1) is issued before each Watch to obtain the current // resourceVersion so that Watch does not replay synthetic ADDED events for @@ -188,11 +184,23 @@ func (a *apiServerAdapter) watchResourceLoop(ctx context.Context, ri dynamic.Res continue } rv := list.GetResourceVersion() + if rv == "" { + a.logger.Warn("LIST returned empty resourceVersion, will retry after backoff") + t := time.NewTimer(backoff.Step()) + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + } + continue + } + timeout := int64(5 * 60) w, err := ri.Watch(ctx, metav1.ListOptions{ LabelSelector: labelSelector, ResourceVersion: rv, - TimeoutSeconds: &noCacheWatchTimeout, + TimeoutSeconds: &timeout, }) if err != nil { if ctx.Err() != nil { diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index bdb2c1b4da6..a44f524c0c0 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -421,9 +421,6 @@ func TestAdapter_DisableCache(t *testing.T) { } }() - // Give the watch goroutines time to start. - time.Sleep(500 * time.Millisecond) - cancel() <-done } @@ -464,7 +461,10 @@ func TestAdapter_DisableCacheLightweightList(t *testing.T) { _ = a.Start(ctx) }() - time.Sleep(500 * time.Millisecond) + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && len(tracker.get()) == 0 { + time.Sleep(10 * time.Millisecond) + } cancel() <-done @@ -489,6 +489,11 @@ func TestAdapter_DisableCacheEventDelivery(t *testing.T) { scheme := runtime.NewScheme() _ = corev1.AddToScheme(scheme) dynClient := dynamicfake.NewSimpleDynamicClient(scheme, simplePod("existing-pod", "default")) + dynClient.PrependReactor("list", "pods", func(_ kubetesting.Action) (bool, runtime.Object, error) { + list := &unstructured.UnstructuredList{} + list.SetResourceVersion("100") + return true, list, nil + }) dynClient.PrependWatchReactor("pods", func(_ kubetesting.Action) (bool, watch.Interface, error) { return true, fakeWatcher, nil }) @@ -514,16 +519,8 @@ func TestAdapter_DisableCacheEventDelivery(t *testing.T) { namespace: "default", } - done := make(chan struct{}) - go func() { - defer close(done) - _ = a.Start(ctx) - }() - - // Wait for the adapter to reach the watch. - time.Sleep(200 * time.Millisecond) - - // Inject an ADDED event as *unstructured.Unstructured (the type the adapter expects). + // Pre-buffer the event before starting the adapter. The fake watcher has a + // 100-item channel; the adapter will drain it once the Watch() call is made. newPod := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "v1", @@ -537,6 +534,12 @@ func TestAdapter_DisableCacheEventDelivery(t *testing.T) { } fakeWatcher.Add(newPod) + done := make(chan struct{}) + go func() { + defer close(done) + _ = a.Start(ctx) + }() + // Poll until at least one CloudEvent is sent (or timeout). deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) && len(testCE.Sent()) == 0 { diff --git a/pkg/apis/sources/v1/apiserver_validation.go b/pkg/apis/sources/v1/apiserver_validation.go index 7b854ace056..3d18b12737e 100644 --- a/pkg/apis/sources/v1/apiserver_validation.go +++ b/pkg/apis/sources/v1/apiserver_validation.go @@ -35,14 +35,7 @@ const ( ) func (c *ApiServerSource) Validate(ctx context.Context) *apis.FieldError { - errs := c.Spec.Validate(ctx).ViaField("spec") - if c.Annotations[DisableCacheAnnotation] == "true" && c.Annotations[SkipPermissionsAnnotation] == "true" { - errs = errs.Also(apis.ErrGeneric( - DisableCacheAnnotation+" and "+SkipPermissionsAnnotation+" are mutually exclusive", - "metadata.annotations", - )) - } - return errs + return c.Spec.Validate(ctx).ViaField("spec") } func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError { diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go index aeed47f3196..ae44325e766 100644 --- a/pkg/apis/sources/v1/apiserver_validation_test.go +++ b/pkg/apis/sources/v1/apiserver_validation_test.go @@ -19,7 +19,6 @@ package v1 import ( "context" "errors" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -285,22 +284,14 @@ func TestAPIServerDisableCacheAnnotationValidation(t *testing.T) { } } - t.Run("both annotations set is rejected", func(t *testing.T) { + t.Run("both annotations set is valid", func(t *testing.T) { src := validSrc() src.Annotations = map[string]string{ DisableCacheAnnotation: "true", SkipPermissionsAnnotation: "true", } - err := src.Validate(context.TODO()) - if err == nil { - t.Fatal("expected validation error for conflicting annotations, got nil") - } - errStr := err.Error() - if !strings.Contains(errStr, "mutually exclusive") { - t.Errorf("error missing 'mutually exclusive': %v", err) - } - if !strings.Contains(errStr, "metadata.annotations") { - t.Errorf("error missing field path 'metadata.annotations': %v", err) + if err := src.Validate(context.TODO()); err != nil { + t.Errorf("unexpected error for combined annotations: %v", err) } })