Skip to content

Commit 87ee659

Browse files
Wei WengWei Weng
authored andcommitted
decouple informer cache population and event handling
Signed-off-by: Wei Weng <Wei.Weng@microsoft.com>
1 parent ca6b83a commit 87ee659

File tree

10 files changed

+1122
-88
lines changed

10 files changed

+1122
-88
lines changed

cmd/hubagent/workload/setup.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,23 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
496496
}
497497
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, rateLimiter)
498498

499+
// Set up the InformerPopulator that runs on ALL pods (leader and followers)
500+
// This ensures all pods have synced informer caches for webhook validation
501+
klog.Info("Setting up informer populator")
502+
informerPopulator := &resourcewatcher.InformerPopulator{
503+
DiscoveryClient: discoverClient,
504+
RESTMapper: mgr.GetRESTMapper(),
505+
InformerManager: dynamicInformerManager,
506+
ResourceConfig: resourceConfig,
507+
}
508+
509+
if err := mgr.Add(informerPopulator); err != nil {
510+
klog.ErrorS(err, "Failed to setup informer populator")
511+
return err
512+
}
513+
499514
// Set up a runner that starts all the custom controllers we created above
515+
// This runs ONLY on the leader and adds event handlers to the informers created by InformerPopulator
500516
resourceChangeDetector := &resourcewatcher.ChangeDetector{
501517
DiscoveryClient: discoverClient,
502518
RESTMapper: mgr.GetRESTMapper(),

pkg/resourcewatcher/change_dector.go

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"golang.org/x/sync/errgroup"
2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26-
"k8s.io/apimachinery/pkg/runtime/schema"
2726
"k8s.io/apimachinery/pkg/util/wait"
2827
"k8s.io/client-go/discovery"
2928
"k8s.io/client-go/tools/cache"
@@ -137,43 +136,28 @@ func (d *ChangeDetector) discoverAPIResourcesLoop(ctx context.Context, period ti
137136
}, period)
138137
}
139138

140-
// discoverResources goes through all the api resources in the cluster and create informers on selected types
139+
// discoverResources goes through all the api resources in the cluster and adds event handlers to informers
141140
func (d *ChangeDetector) discoverResources(dynamicResourceEventHandler cache.ResourceEventHandler) {
142-
newResources, err := d.getWatchableResources()
141+
newResources, err := getWatchableResources(d.DiscoveryClient)
143142
var dynamicResources []informer.APIResourceMeta
144143
if err != nil {
145144
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
146145
}
147146
for _, res := range newResources {
148147
// all the static resources are disabled by default
149-
if d.shouldWatchResource(res.GroupVersionResource) {
148+
if shouldWatchResource(res.GroupVersionResource, d.RESTMapper, d.ResourceConfig) {
150149
dynamicResources = append(dynamicResources, res)
151150
}
152151
}
153-
d.InformerManager.AddDynamicResources(dynamicResources, dynamicResourceEventHandler, err == nil)
154-
// this will start the newly added informers if there is any
155-
d.InformerManager.Start()
156-
}
157152

158-
// gvrDisabled returns whether GroupVersionResource is disabled.
159-
func (d *ChangeDetector) shouldWatchResource(gvr schema.GroupVersionResource) bool {
160-
// By default, all of the APIs are allowed.
161-
if d.ResourceConfig == nil {
162-
return true
153+
// On the leader, add event handlers to informers that were already created by InformerPopulator
154+
// The informers exist on all pods, but only the leader adds handlers and processes events
155+
for _, res := range dynamicResources {
156+
d.InformerManager.AddEventHandlerToInformer(res.GroupVersionResource, dynamicResourceEventHandler)
163157
}
164158

165-
gvks, err := d.RESTMapper.KindsFor(gvr)
166-
if err != nil {
167-
klog.ErrorS(err, "gvr transform failed", "gvr", gvr.String())
168-
return false
169-
}
170-
for _, gvk := range gvks {
171-
if d.ResourceConfig.IsResourceDisabled(gvk) {
172-
klog.V(4).InfoS("Skip watch resource", "group version kind", gvk.String())
173-
return false
174-
}
175-
}
176-
return true
159+
// this will start the newly added informers if there is any
160+
d.InformerManager.Start()
177161
}
178162

179163
// dynamicResourceFilter filters out resources that we don't want to watch
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resourcewatcher
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"k8s.io/apimachinery/pkg/api/meta"
24+
"k8s.io/apimachinery/pkg/util/wait"
25+
"k8s.io/client-go/discovery"
26+
"k8s.io/klog/v2"
27+
"sigs.k8s.io/controller-runtime/pkg/manager"
28+
29+
"github.com/kubefleet-dev/kubefleet/pkg/utils"
30+
"github.com/kubefleet-dev/kubefleet/pkg/utils/informer"
31+
)
32+
33+
const (
34+
// informerPopulatorDiscoveryPeriod is how often the InformerPopulator rediscovers API resources
35+
informerPopulatorDiscoveryPeriod = 30 * time.Second
36+
)
37+
38+
// make sure that our InformerPopulator implements controller runtime interfaces
39+
var (
40+
_ manager.Runnable = &InformerPopulator{}
41+
_ manager.LeaderElectionRunnable = &InformerPopulator{}
42+
)
43+
44+
// InformerPopulator discovers API resources and creates informers for them WITHOUT adding event handlers.
45+
// This allows follower pods to have synced informer caches for webhook validation while the leader's
46+
// ChangeDetector adds event handlers and runs controllers.
47+
type InformerPopulator struct {
48+
// DiscoveryClient is used to do resource discovery.
49+
DiscoveryClient discovery.DiscoveryInterface
50+
51+
// RESTMapper is used to convert between GVK and GVR
52+
RESTMapper meta.RESTMapper
53+
54+
// InformerManager manages all the dynamic informers created by the discovery client
55+
InformerManager informer.Manager
56+
57+
// ResourceConfig contains all the API resources that we won't select based on the allowed or skipped propagating APIs option.
58+
ResourceConfig *utils.ResourceConfig
59+
}
60+
61+
// Start runs the informer populator, discovering resources and creating informers.
62+
// This runs on ALL pods (leader and followers) to ensure all have synced caches.
63+
func (p *InformerPopulator) Start(ctx context.Context) error {
64+
klog.InfoS("Starting the informer populator")
65+
defer klog.InfoS("The informer populator is stopped")
66+
67+
// Run initial discovery to create informers
68+
p.discoverAndCreateInformers()
69+
70+
// Wait for initial cache sync
71+
p.InformerManager.WaitForCacheSync()
72+
klog.InfoS("Informer populator: initial cache sync complete")
73+
74+
// Continue discovering resources periodically to handle CRD installations
75+
wait.UntilWithContext(ctx, func(ctx context.Context) {
76+
p.discoverAndCreateInformers()
77+
}, informerPopulatorDiscoveryPeriod)
78+
79+
return nil
80+
}
81+
82+
// discoverAndCreateInformers discovers API resources and creates informers WITHOUT adding event handlers
83+
func (p *InformerPopulator) discoverAndCreateInformers() {
84+
newResources, err := getWatchableResources(p.DiscoveryClient)
85+
if err != nil {
86+
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
87+
}
88+
89+
var resourcesToPopulate []informer.APIResourceMeta
90+
for _, res := range newResources {
91+
if shouldWatchResource(res.GroupVersionResource, p.RESTMapper, p.ResourceConfig) {
92+
resourcesToPopulate = append(resourcesToPopulate, res)
93+
}
94+
}
95+
96+
// Create informers directly without adding event handlers.
97+
// This avoids adding any event handlers on follower pods
98+
for _, res := range resourcesToPopulate {
99+
p.InformerManager.CreateInformerForResource(res)
100+
}
101+
102+
// Start any newly created informers
103+
p.InformerManager.Start()
104+
105+
if err == nil {
106+
klog.V(2).InfoS("Informer populator: discovered resources", "count", len(resourcesToPopulate))
107+
}
108+
}
109+
110+
// NeedLeaderElection implements LeaderElectionRunnable interface.
111+
// Returns false so this runs on ALL pods (leader and followers).
112+
func (p *InformerPopulator) NeedLeaderElection() bool {
113+
return false
114+
}

0 commit comments

Comments
 (0)