Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,23 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
}
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, rateLimiter)

// Set up the InformerPopulator that runs on ALL pods (leader and followers)
// This ensures all pods have synced informer caches for webhook validation
klog.Info("Setting up informer populator")
informerPopulator := &resourcewatcher.InformerPopulator{
DiscoveryClient: discoverClient,
RESTMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
ResourceConfig: resourceConfig,
}

if err := mgr.Add(informerPopulator); err != nil {
klog.ErrorS(err, "Failed to setup informer populator")
return err
}

// Set up a runner that starts all the custom controllers we created above
// This runs ONLY on the leader and adds event handlers to the informers created by InformerPopulator
resourceChangeDetector := &resourcewatcher.ChangeDetector{
DiscoveryClient: discoverClient,
RESTMapper: mgr.GetRESTMapper(),
Expand Down
22 changes: 1 addition & 21 deletions pkg/controllers/placement/resource_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (r *Reconciler) fetchAllResourcesInOneNamespace(namespaceName string, place

trackedResource := r.InformerManager.GetNameSpaceScopedResources()
for _, gvr := range trackedResource {
if !r.shouldSelectResource(gvr) {
if !utils.ShouldProcessResource(gvr, r.RestMapper, r.ResourceConfig) {
continue
}
if !r.InformerManager.IsInformerSynced(gvr) {
Expand All @@ -406,26 +406,6 @@ func (r *Reconciler) fetchAllResourcesInOneNamespace(namespaceName string, place
return resources, nil
}

// shouldSelectResource returns whether a resource should be selected for propagation.
func (r *Reconciler) shouldSelectResource(gvr schema.GroupVersionResource) bool {
// By default, all of the APIs are allowed.
if r.ResourceConfig == nil {
return true
}
gvks, err := r.RestMapper.KindsFor(gvr)
if err != nil {
klog.ErrorS(err, "gvr(%s) transform failed: %v", gvr.String(), err)
return false
}
for _, gvk := range gvks {
if r.ResourceConfig.IsResourceDisabled(gvk) {
klog.V(2).InfoS("Skip watch resource", "group version kind", gvk.String())
return false
}
}
return true
}

// generateRawContent strips all the unnecessary fields to prepare the objects for dispatch.
func generateRawContent(object *unstructured.Unstructured) ([]byte, error) {
// Make a deep copy of the object as we are modifying it.
Expand Down
44 changes: 10 additions & 34 deletions pkg/resourcewatcher/change_dector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/cache"
Expand All @@ -45,7 +44,7 @@ var (
// ChangeDetector is a resource watcher which watches all types of resources in the cluster and reconcile the events.
type ChangeDetector struct {
// DiscoveryClient is used to do resource discovery.
DiscoveryClient *discovery.DiscoveryClient
DiscoveryClient discovery.DiscoveryInterface

// RESTMapper is used to convert between GVK and GVR
RESTMapper meta.RESTMapper
Expand Down Expand Up @@ -137,43 +136,20 @@ func (d *ChangeDetector) discoverAPIResourcesLoop(ctx context.Context, period ti
}, period)
}

// discoverResources goes through all the api resources in the cluster and create informers on selected types
// discoverResources goes through all the api resources in the cluster and adds event handlers to informers
func (d *ChangeDetector) discoverResources(dynamicResourceEventHandler cache.ResourceEventHandler) {
newResources, err := d.getWatchableResources()
var dynamicResources []informer.APIResourceMeta
if err != nil {
klog.ErrorS(err, "Failed to get all the api resources from the cluster")
}
for _, res := range newResources {
// all the static resources are disabled by default
if d.shouldWatchResource(res.GroupVersionResource) {
dynamicResources = append(dynamicResources, res)
}
resourcesToWatch := discoverWatchableResources(d.DiscoveryClient, d.RESTMapper, d.ResourceConfig)

// On the leader, add event handlers to informers that were already created by InformerPopulator
// The informers exist on all pods, but only the leader adds handlers and processes events
for _, res := range resourcesToWatch {
d.InformerManager.AddEventHandlerToInformer(res.GroupVersionResource, dynamicResourceEventHandler)
}
d.InformerManager.AddDynamicResources(dynamicResources, dynamicResourceEventHandler, err == nil)

// this will start the newly added informers if there is any
d.InformerManager.Start()
}

// gvrDisabled returns whether GroupVersionResource is disabled.
func (d *ChangeDetector) shouldWatchResource(gvr schema.GroupVersionResource) bool {
// By default, all of the APIs are allowed.
if d.ResourceConfig == nil {
return true
}

gvks, err := d.RESTMapper.KindsFor(gvr)
if err != nil {
klog.ErrorS(err, "gvr transform failed", "gvr", gvr.String())
return false
}
for _, gvk := range gvks {
if d.ResourceConfig.IsResourceDisabled(gvk) {
klog.V(4).InfoS("Skip watch resource", "group version kind", gvk.String())
return false
}
}
return true
klog.V(2).InfoS("Change detector: discovered resources", "count", len(resourcesToWatch))
}

// dynamicResourceFilter filters out resources that we don't want to watch
Expand Down
166 changes: 166 additions & 0 deletions pkg/resourcewatcher/change_detector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2025 The KubeFleet Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourcewatcher

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"

"github.com/kubefleet-dev/kubefleet/pkg/utils"
testinformer "github.com/kubefleet-dev/kubefleet/test/utils/informer"
testresource "github.com/kubefleet-dev/kubefleet/test/utils/resource"
)

func TestChangeDetector_discoverResources(t *testing.T) {
tests := []struct {
name string
discoveryResources []*metav1.APIResourceList
resourceConfig *utils.ResourceConfig
}{
{
name: "discovers and adds handlers for watchable resources",
discoveryResources: []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
testresource.APIResourceConfigMap(),
testresource.APIResourceSecret(),
},
},
},
resourceConfig: nil, // Allow all resources
},
{
name: "skips resources without list/watch verbs",
discoveryResources: []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
testresource.APIResourceWithVerbs("configmaps", "ConfigMap", true, []string{"get", "delete"}), // Missing list/watch
},
},
},
resourceConfig: nil,
},
{
name: "respects resource config filtering",
discoveryResources: []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
testresource.APIResourceConfigMap(),
testresource.APIResourceSecret(),
},
},
},
resourceConfig: func() *utils.ResourceConfig {
rc := utils.NewResourceConfig(false) // Skip mode
_ = rc.Parse("v1/Secret") // Skip secrets
return rc
}(),
},
{
name: "discovers apps group resources",
discoveryResources: []*metav1.APIResourceList{
{
GroupVersion: "apps/v1",
APIResources: []metav1.APIResource{
testresource.APIResourceDeployment(),
testresource.APIResourceStatefulSet(),
},
},
},
resourceConfig: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create fake discovery client
fakeClient := fake.NewSimpleClientset()
fakeDiscovery, ok := fakeClient.Discovery().(*fakediscovery.FakeDiscovery)
if !ok {
t.Fatal("Failed to cast to FakeDiscovery")
}
fakeDiscovery.Resources = tt.discoveryResources

// Create REST mapper
groupResources := []*restmapper.APIGroupResources{}
for _, resourceList := range tt.discoveryResources {
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
t.Fatalf("Failed to parse group version: %v", err)
}

groupResources = append(groupResources, &restmapper.APIGroupResources{
Group: metav1.APIGroup{
Name: gv.Group,
Versions: []metav1.GroupVersionForDiscovery{
{GroupVersion: resourceList.GroupVersion, Version: gv.Version},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: resourceList.GroupVersion,
Version: gv.Version,
},
},
VersionedResources: map[string][]metav1.APIResource{
gv.Version: resourceList.APIResources,
},
})
}
restMapper := restmapper.NewDiscoveryRESTMapper(groupResources)

// Create fake informer manager
fakeInformerManager := &testinformer.FakeManager{
APIResources: make(map[schema.GroupVersionKind]bool),
}

// Track handler additions
testHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
}

// Create ChangeDetector with the interface type
detector := &ChangeDetector{
DiscoveryClient: fakeDiscovery,
RESTMapper: restMapper,
InformerManager: fakeInformerManager,
ResourceConfig: tt.resourceConfig,
}

// Test discoverResources which discovers resources and adds handlers
detector.discoverResources(testHandler)

// The main goal is to verify no panics occur during discovery and handler addition
})
}
}

func TestChangeDetector_NeedLeaderElection(t *testing.T) {
detector := &ChangeDetector{}

// ChangeDetector SHOULD need leader election so only the leader processes events
if !detector.NeedLeaderElection() {
t.Error("ChangeDetector should need leader election")
}
}
102 changes: 102 additions & 0 deletions pkg/resourcewatcher/informer_populator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2025 The KubeFleet Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourcewatcher

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/kubefleet-dev/kubefleet/pkg/utils"
"github.com/kubefleet-dev/kubefleet/pkg/utils/informer"
)

const (
// informerPopulatorDiscoveryPeriod is how often the InformerPopulator rediscovers API resources
informerPopulatorDiscoveryPeriod = 30 * time.Second
)

// make sure that our InformerPopulator implements controller runtime interfaces
var (
_ manager.Runnable = &InformerPopulator{}
_ manager.LeaderElectionRunnable = &InformerPopulator{}
)

// InformerPopulator discovers API resources and creates informers for them WITHOUT adding event handlers.
// This allows follower pods to have synced informer caches for webhook validation while the leader's
// ChangeDetector adds event handlers and runs controllers.
type InformerPopulator struct {
// DiscoveryClient is used to do resource discovery.
DiscoveryClient discovery.DiscoveryInterface

// RESTMapper is used to convert between GVK and GVR
RESTMapper meta.RESTMapper

// InformerManager manages all the dynamic informers created by the discovery client
InformerManager informer.Manager

// ResourceConfig contains all the API resources that we won't select based on the allowed or skipped propagating APIs option.
ResourceConfig *utils.ResourceConfig
}

// Start runs the informer populator, discovering resources and creating informers.
// This runs on ALL pods (leader and followers) to ensure all have synced caches.
func (p *InformerPopulator) Start(ctx context.Context) error {
klog.InfoS("Starting the informer populator")
defer klog.InfoS("The informer populator is stopped")

// Run initial discovery to create informers
p.discoverAndCreateInformers()

// Wait for initial cache sync
p.InformerManager.WaitForCacheSync()
klog.InfoS("Informer populator: initial cache sync complete")

// Continue discovering resources periodically to handle CRD installations
wait.UntilWithContext(ctx, func(ctx context.Context) {
p.discoverAndCreateInformers()
}, informerPopulatorDiscoveryPeriod)

return nil
}

// discoverAndCreateInformers discovers API resources and creates informers WITHOUT adding event handlers
func (p *InformerPopulator) discoverAndCreateInformers() {
resourcesToWatch := discoverWatchableResources(p.DiscoveryClient, p.RESTMapper, p.ResourceConfig)

// Create informers directly without adding event handlers.
// This avoids adding any event handlers on follower pods
for _, res := range resourcesToWatch {
p.InformerManager.CreateInformerForResource(res)
}

// Start any newly created informers
p.InformerManager.Start()

klog.V(2).InfoS("Informer populator: discovered resources", "count", len(resourcesToWatch))
}

// NeedLeaderElection implements LeaderElectionRunnable interface.
// Returns false so this runs on ALL pods (leader and followers).
func (p *InformerPopulator) NeedLeaderElection() bool {
return false
}
Loading
Loading