Skip to content

Commit a49f887

Browse files
Wei WengWei Weng
authored andcommitted
wait for informer cache to be ready
Signed-off-by: Wei Weng <Wei.Weng@microsoft.com>
1 parent 0552f2e commit a49f887

File tree

6 files changed

+514
-1
lines changed

6 files changed

+514
-1
lines changed

cmd/hubagent/main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/kubefleet-dev/kubefleet/cmd/hubagent/options"
4747
"github.com/kubefleet-dev/kubefleet/cmd/hubagent/workload"
4848
mcv1beta1 "github.com/kubefleet-dev/kubefleet/pkg/controllers/membercluster/v1beta1"
49+
"github.com/kubefleet-dev/kubefleet/pkg/utils/validator"
4950
"github.com/kubefleet-dev/kubefleet/pkg/webhook"
5051
// +kubebuilder:scaffold:imports
5152
)
@@ -165,7 +166,17 @@ func main() {
165166

166167
ctx := ctrl.SetupSignalHandler()
167168
if err := workload.SetupControllers(ctx, &wg, mgr, config, opts); err != nil {
168-
klog.ErrorS(err, "unable to set up ready check")
169+
klog.ErrorS(err, "unable to set up controllers")
170+
exitWithErrorFunc()
171+
}
172+
173+
// Add readiness check for dynamic informer cache AFTER controllers are set up.
174+
// This ensures the discovery cache is populated before the hub agent is marked ready,
175+
// which is critical for all controllers that rely on dynamic resource discovery.
176+
// AddReadyzCheck adds additional readiness check instead of replacing the one registered earlier provided the name is different.
177+
// Both registered checks need to pass for the manager to be considered ready.
178+
if err := mgr.AddReadyzCheck("informer-cache", webhook.ResourceInformerReadinessChecker(validator.ResourceInformer)); err != nil {
179+
klog.ErrorS(err, "unable to set up informer cache readiness check")
169180
exitWithErrorFunc()
170181
}
171182

pkg/utils/informer/informermanager.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ type Manager interface {
6161
// GetNameSpaceScopedResources returns the list of namespace scoped resources we are watching.
6262
GetNameSpaceScopedResources() []schema.GroupVersionResource
6363

64+
// GetAllResources returns the list of all resources (both cluster-scoped and namespace-scoped) we are watching.
65+
GetAllResources() []schema.GroupVersionResource
66+
6467
// IsClusterScopedResources returns if a resource is cluster scoped.
6568
IsClusterScopedResources(resource schema.GroupVersionKind) bool
6669

@@ -224,6 +227,19 @@ func (s *informerManagerImpl) GetNameSpaceScopedResources() []schema.GroupVersio
224227
return res
225228
}
226229

230+
func (s *informerManagerImpl) GetAllResources() []schema.GroupVersionResource {
231+
s.resourcesLock.RLock()
232+
defer s.resourcesLock.RUnlock()
233+
234+
res := make([]schema.GroupVersionResource, 0, len(s.apiResources))
235+
for _, resource := range s.apiResources {
236+
if resource.isPresent {
237+
res = append(res, resource.GroupVersionResource)
238+
}
239+
}
240+
return res
241+
}
242+
227243
func (s *informerManagerImpl) IsClusterScopedResources(gvk schema.GroupVersionKind) bool {
228244
s.resourcesLock.RLock()
229245
defer s.resourcesLock.RUnlock()
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package informer
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"k8s.io/apimachinery/pkg/runtime/schema"
24+
"k8s.io/client-go/dynamic/fake"
25+
"k8s.io/client-go/kubernetes/scheme"
26+
)
27+
28+
func TestGetAllResources(t *testing.T) {
29+
tests := []struct {
30+
name string
31+
namespaceScopedResources []APIResourceMeta
32+
clusterScopedResources []APIResourceMeta
33+
staticResources []APIResourceMeta
34+
expectedResourceCount int
35+
expectedNamespacedCount int
36+
}{
37+
{
38+
name: "mixed cluster and namespace scoped resources",
39+
namespaceScopedResources: []APIResourceMeta{
40+
{
41+
GroupVersionKind: schema.GroupVersionKind{
42+
Group: "",
43+
Version: "v1",
44+
Kind: "ConfigMap",
45+
},
46+
GroupVersionResource: schema.GroupVersionResource{
47+
Group: "",
48+
Version: "v1",
49+
Resource: "configmaps",
50+
},
51+
IsClusterScoped: false,
52+
},
53+
{
54+
GroupVersionKind: schema.GroupVersionKind{
55+
Group: "",
56+
Version: "v1",
57+
Kind: "Secret",
58+
},
59+
GroupVersionResource: schema.GroupVersionResource{
60+
Group: "",
61+
Version: "v1",
62+
Resource: "secrets",
63+
},
64+
IsClusterScoped: false,
65+
},
66+
},
67+
clusterScopedResources: []APIResourceMeta{
68+
{
69+
GroupVersionKind: schema.GroupVersionKind{
70+
Group: "",
71+
Version: "v1",
72+
Kind: "Namespace",
73+
},
74+
GroupVersionResource: schema.GroupVersionResource{
75+
Group: "",
76+
Version: "v1",
77+
Resource: "namespaces",
78+
},
79+
IsClusterScoped: true,
80+
},
81+
},
82+
staticResources: []APIResourceMeta{
83+
{
84+
GroupVersionKind: schema.GroupVersionKind{
85+
Group: "",
86+
Version: "v1",
87+
Kind: "Node",
88+
},
89+
GroupVersionResource: schema.GroupVersionResource{
90+
Group: "",
91+
Version: "v1",
92+
Resource: "nodes",
93+
},
94+
IsClusterScoped: true,
95+
isStaticResource: true,
96+
},
97+
},
98+
expectedResourceCount: 4, // All resources including static
99+
expectedNamespacedCount: 2, // Only namespace-scoped, excluding static
100+
},
101+
{
102+
name: "no resources",
103+
expectedResourceCount: 0,
104+
expectedNamespacedCount: 0,
105+
},
106+
{
107+
name: "only namespace scoped resources",
108+
namespaceScopedResources: []APIResourceMeta{
109+
{
110+
GroupVersionKind: schema.GroupVersionKind{
111+
Group: "apps",
112+
Version: "v1",
113+
Kind: "Deployment",
114+
},
115+
GroupVersionResource: schema.GroupVersionResource{
116+
Group: "apps",
117+
Version: "v1",
118+
Resource: "deployments",
119+
},
120+
IsClusterScoped: false,
121+
},
122+
},
123+
expectedResourceCount: 1,
124+
expectedNamespacedCount: 1,
125+
},
126+
{
127+
name: "only cluster scoped resources",
128+
clusterScopedResources: []APIResourceMeta{
129+
{
130+
GroupVersionKind: schema.GroupVersionKind{
131+
Group: "rbac.authorization.k8s.io",
132+
Version: "v1",
133+
Kind: "ClusterRole",
134+
},
135+
GroupVersionResource: schema.GroupVersionResource{
136+
Group: "rbac.authorization.k8s.io",
137+
Version: "v1",
138+
Resource: "clusterroles",
139+
},
140+
IsClusterScoped: true,
141+
},
142+
},
143+
expectedResourceCount: 1,
144+
expectedNamespacedCount: 0,
145+
},
146+
}
147+
148+
for _, tt := range tests {
149+
t.Run(tt.name, func(t *testing.T) {
150+
// Create a fake dynamic client
151+
fakeClient := fake.NewSimpleDynamicClient(scheme.Scheme)
152+
stopCh := make(chan struct{})
153+
defer close(stopCh)
154+
155+
mgr := NewInformerManager(fakeClient, 0, stopCh)
156+
implMgr := mgr.(*informerManagerImpl)
157+
158+
// Add namespace-scoped resources
159+
for _, res := range tt.namespaceScopedResources {
160+
res.isPresent = true
161+
implMgr.apiResources[res.GroupVersionKind] = &res
162+
}
163+
164+
// Add cluster-scoped resources
165+
for _, res := range tt.clusterScopedResources {
166+
res.isPresent = true
167+
implMgr.apiResources[res.GroupVersionKind] = &res
168+
}
169+
170+
// Add static resources
171+
for _, res := range tt.staticResources {
172+
res.isPresent = true
173+
implMgr.apiResources[res.GroupVersionKind] = &res
174+
}
175+
176+
// Test GetAllResources
177+
allResources := mgr.GetAllResources()
178+
assert.Equal(t, tt.expectedResourceCount, len(allResources), "GetAllResources should return correct count")
179+
180+
// Verify all expected resources are present
181+
resourceMap := make(map[schema.GroupVersionResource]bool)
182+
for _, gvr := range allResources {
183+
resourceMap[gvr] = true
184+
}
185+
186+
for _, res := range tt.namespaceScopedResources {
187+
assert.True(t, resourceMap[res.GroupVersionResource], "namespace-scoped resource %v should be in GetAllResources", res.GroupVersionResource)
188+
}
189+
190+
for _, res := range tt.clusterScopedResources {
191+
assert.True(t, resourceMap[res.GroupVersionResource], "cluster-scoped resource %v should be in GetAllResources", res.GroupVersionResource)
192+
}
193+
194+
for _, res := range tt.staticResources {
195+
assert.True(t, resourceMap[res.GroupVersionResource], "static resource %v should be in GetAllResources", res.GroupVersionResource)
196+
}
197+
198+
// Test GetNameSpaceScopedResources
199+
namespacedResources := mgr.GetNameSpaceScopedResources()
200+
assert.Equal(t, tt.expectedNamespacedCount, len(namespacedResources), "GetNameSpaceScopedResources should return correct count")
201+
202+
// Verify only namespace-scoped, non-static resources are present
203+
namespacedMap := make(map[schema.GroupVersionResource]bool)
204+
for _, gvr := range namespacedResources {
205+
namespacedMap[gvr] = true
206+
}
207+
208+
for _, res := range tt.namespaceScopedResources {
209+
assert.True(t, namespacedMap[res.GroupVersionResource], "namespace-scoped resource %v should be in GetNameSpaceScopedResources", res.GroupVersionResource)
210+
}
211+
212+
// Verify cluster-scoped and static resources are NOT in namespace-scoped list
213+
for _, res := range tt.clusterScopedResources {
214+
assert.False(t, namespacedMap[res.GroupVersionResource], "cluster-scoped resource %v should NOT be in GetNameSpaceScopedResources", res.GroupVersionResource)
215+
}
216+
217+
for _, res := range tt.staticResources {
218+
assert.False(t, namespacedMap[res.GroupVersionResource], "static resource %v should NOT be in GetNameSpaceScopedResources", res.GroupVersionResource)
219+
}
220+
})
221+
}
222+
}
223+
224+
func TestGetAllResources_NotPresent(t *testing.T) {
225+
// Test that resources marked as not present are excluded
226+
fakeClient := fake.NewSimpleDynamicClient(scheme.Scheme)
227+
stopCh := make(chan struct{})
228+
defer close(stopCh)
229+
230+
mgr := NewInformerManager(fakeClient, 0, stopCh)
231+
implMgr := mgr.(*informerManagerImpl)
232+
233+
// Add a resource that is present
234+
presentRes := APIResourceMeta{
235+
GroupVersionKind: schema.GroupVersionKind{
236+
Group: "",
237+
Version: "v1",
238+
Kind: "ConfigMap",
239+
},
240+
GroupVersionResource: schema.GroupVersionResource{
241+
Group: "",
242+
Version: "v1",
243+
Resource: "configmaps",
244+
},
245+
IsClusterScoped: false,
246+
isPresent: true,
247+
}
248+
implMgr.apiResources[presentRes.GroupVersionKind] = &presentRes
249+
250+
// Add a resource that is NOT present (deleted)
251+
notPresentRes := APIResourceMeta{
252+
GroupVersionKind: schema.GroupVersionKind{
253+
Group: "",
254+
Version: "v1",
255+
Kind: "Secret",
256+
},
257+
GroupVersionResource: schema.GroupVersionResource{
258+
Group: "",
259+
Version: "v1",
260+
Resource: "secrets",
261+
},
262+
IsClusterScoped: false,
263+
isPresent: false,
264+
}
265+
implMgr.apiResources[notPresentRes.GroupVersionKind] = &notPresentRes
266+
267+
allResources := mgr.GetAllResources()
268+
assert.Equal(t, 1, len(allResources), "should only return present resources")
269+
assert.Equal(t, presentRes.GroupVersionResource, allResources[0], "should return the present resource")
270+
}

pkg/webhook/readiness.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package webhook
18+
19+
import (
20+
"fmt"
21+
"net/http"
22+
23+
"k8s.io/apimachinery/pkg/runtime/schema"
24+
"k8s.io/klog/v2"
25+
26+
"github.com/kubefleet-dev/kubefleet/pkg/utils/informer"
27+
)
28+
29+
// ResourceInformerReadinessChecker creates a readiness check function that verifies
30+
// all resource informer caches are synced before marking the pod as ready.
31+
// This prevents the webhook from accepting requests before the discovery cache is populated.
32+
func ResourceInformerReadinessChecker(resourceInformer informer.Manager) func(*http.Request) error {
33+
return func(_ *http.Request) error {
34+
if resourceInformer == nil {
35+
return fmt.Errorf("resource informer not initialized")
36+
}
37+
38+
// Require ALL informer caches to be synced before marking ready
39+
allResources := resourceInformer.GetAllResources()
40+
if len(allResources) == 0 {
41+
// This can happen during startup when the ResourceInformer is created but the ChangeDetector
42+
// hasn't discovered and registered any resources yet via AddDynamicResources().
43+
return fmt.Errorf("resource informer not ready: no resources registered")
44+
}
45+
46+
// Check that ALL informers have synced
47+
unsyncedResources := []schema.GroupVersionResource{}
48+
for _, gvr := range allResources {
49+
if !resourceInformer.IsInformerSynced(gvr) {
50+
unsyncedResources = append(unsyncedResources, gvr)
51+
}
52+
}
53+
54+
if len(unsyncedResources) > 0 {
55+
return fmt.Errorf("resource informer not ready: %d/%d informers not synced yet", len(unsyncedResources), len(allResources))
56+
}
57+
58+
klog.V(5).InfoS("All resource informers synced", "totalInformers", len(allResources))
59+
return nil
60+
}
61+
}

0 commit comments

Comments
 (0)