From f9c4eba45f50028b50589c741183198a21e76475 Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Tue, 24 Mar 2026 14:09:39 +0100 Subject: [PATCH 1/6] Add multicluster support for the event recorder --- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../filter_weigher_pipeline_controller.go | 2 +- .../reservations/failover/controller.go | 2 +- pkg/multicluster/client_test.go | 18 +- pkg/multicluster/recorder.go | 83 ++++++ pkg/multicluster/recorder_test.go | 256 ++++++++++++++++++ 9 files changed, 361 insertions(+), 8 deletions(-) create mode 100644 pkg/multicluster/recorder.go create mode 100644 pkg/multicluster/recorder_test.go diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go index 7163cc5a6..52ec37306 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainCinder - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-cinder-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-cinder-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller.go b/internal/scheduling/machines/filter_weigher_pipeline_controller.go index d76203812..35d51708a 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller.go @@ -222,7 +222,7 @@ func (c *FilterWeigherPipelineController) handleMachine() handler.EventHandler { func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainMachines - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-machines-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-machines-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller.go b/internal/scheduling/manila/filter_weigher_pipeline_controller.go index 6ab938511..128b7d719 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller.go @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainManila - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-manila-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-manila-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index bb9c5bb07..279ac1c3e 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -199,7 +199,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainNova - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-nova-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-nova-scheduler")} c.gatherer = &candidateGatherer{Client: mcl} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller.go b/internal/scheduling/pods/filter_weigher_pipeline_controller.go index ceba977b8..0ceee6485 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller.go @@ -234,7 +234,7 @@ func (c *FilterWeigherPipelineController) handlePod() handler.EventHandler { func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainPods - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-pods-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-pods-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/reservations/failover/controller.go b/internal/scheduling/reservations/failover/controller.go index 4cc7d3c1e..ef3d422b7 100644 --- a/internal/scheduling/reservations/failover/controller.go +++ b/internal/scheduling/reservations/failover/controller.go @@ -765,7 +765,7 @@ func (c *FailoverReservationController) patchReservationStatus(ctx context.Conte // SetupWithManager sets up the watch-based reconciler with the Manager. // This handles per-reservation reconciliation triggered by CRD changes. func (c *FailoverReservationController) SetupWithManager(mgr ctrl.Manager, mcl *multicluster.Client) error { - c.Recorder = mgr.GetEventRecorder("failover-reservation-controller") + c.Recorder = mcl.GetEventRecorder("failover-reservation-controller") return multicluster.BuildController(mcl, mgr). For(&v1alpha1.Reservation{}). diff --git a/pkg/multicluster/client_test.go b/pkg/multicluster/client_test.go index 64b0ae94c..290c61e1b 100644 --- a/pkg/multicluster/client_test.go +++ b/pkg/multicluster/client_test.go @@ -13,6 +13,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -74,8 +76,9 @@ func (f *fakeCache) getIndexFieldCalls() []indexFieldCall { // fakeCluster implements cluster.Cluster interface for testing. type fakeCluster struct { cluster.Cluster - fakeClient client.Client - fakeCache *fakeCache + fakeClient client.Client + fakeCache *fakeCache + fakeRecorder *fakeEventRecorder } func (f *fakeCluster) GetClient() client.Client { @@ -86,6 +89,17 @@ func (f *fakeCluster) GetCache() cache.Cache { return f.fakeCache } +func (f *fakeCluster) GetEventRecorder(_ string) events.EventRecorder { + if f.fakeRecorder != nil { + return f.fakeRecorder + } + return &fakeEventRecorder{} +} + +func (f *fakeCluster) GetEventRecorderFor(_ string) record.EventRecorder { + return record.NewFakeRecorder(100) +} + func newFakeCluster(scheme *runtime.Scheme, objs ...client.Object) *fakeCluster { return &fakeCluster{ fakeClient: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(), diff --git a/pkg/multicluster/recorder.go b/pkg/multicluster/recorder.go new file mode 100644 index 000000000..d36cc068d --- /dev/null +++ b/pkg/multicluster/recorder.go @@ -0,0 +1,83 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package multicluster + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/events" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cluster" +) + +// MultiClusterRecorder implements events.EventRecorder and routes events to the +// correct cluster based on the GVK of the "regarding" object. It uses the same +// routing logic as the multicluster Client's write path. +type MultiClusterRecorder struct { + client *Client + homeRecorder events.EventRecorder + recorders map[cluster.Cluster]events.EventRecorder +} + +// GetEventRecorder creates a multi-cluster-aware EventRecorder. It pre-creates +// a per-cluster recorder for the home cluster and every remote cluster currently +// registered in the client. The name parameter is passed through to each +// cluster's GetEventRecorder method (it becomes the reportingController in the +// Kubernetes Event). +func (c *Client) GetEventRecorder(name string) events.EventRecorder { + homeRecorder := c.HomeCluster.GetEventRecorder(name) + + recorders := make(map[cluster.Cluster]events.EventRecorder) + recorders[c.HomeCluster] = homeRecorder + + c.remoteClustersMu.RLock() + defer c.remoteClustersMu.RUnlock() + + for _, remotes := range c.remoteClusters { + for _, r := range remotes { + if _, exists := recorders[r.cluster]; !exists { + recorders[r.cluster] = r.cluster.GetEventRecorder(name) + } + } + } + + return &MultiClusterRecorder{ + client: c, + homeRecorder: homeRecorder, + recorders: recorders, + } +} + +// Eventf routes the event to the cluster that owns the "regarding" object. +// Falls back to the home cluster recorder if routing fails. +func (r *MultiClusterRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + recorder := r.recorderFor(regarding) + recorder.Eventf(regarding, related, eventtype, reason, action, note, args...) +} + +// recorderFor resolves which per-cluster recorder to use for the given object. +func (r *MultiClusterRecorder) recorderFor(obj runtime.Object) events.EventRecorder { + if obj == nil { + return r.homeRecorder + } + + gvk, err := r.client.GVKFromHomeScheme(obj) + if err != nil { + ctrl.Log.V(1).Info("multi-cluster recorder: failed to resolve GVK, using home recorder", "error", err) + return r.homeRecorder + } + + cl, err := r.client.clusterForWrite(gvk, obj) + if err != nil { + ctrl.Log.V(1).Info("multi-cluster recorder: no cluster matched, using home recorder", "gvk", gvk, "error", err) + return r.homeRecorder + } + + recorder, ok := r.recorders[cl] + if !ok { + ctrl.Log.V(1).Info("multi-cluster recorder: no pre-created recorder for cluster, using home recorder", "gvk", gvk) + return r.homeRecorder + } + + return recorder +} diff --git a/pkg/multicluster/recorder_test.go b/pkg/multicluster/recorder_test.go new file mode 100644 index 000000000..6a0888921 --- /dev/null +++ b/pkg/multicluster/recorder_test.go @@ -0,0 +1,256 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package multicluster + +import ( + "fmt" + "sync" + "testing" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// fakeEventRecorder captures Eventf calls for assertions. +type fakeEventRecorder struct { + mu sync.Mutex + calls []eventfCall +} + +type eventfCall struct { + regarding runtime.Object + eventtype string + reason string + action string + note string +} + +func (f *fakeEventRecorder) Eventf(regarding runtime.Object, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, eventfCall{ + regarding: regarding, + eventtype: eventtype, + reason: reason, + action: action, + note: fmt.Sprintf(note, args...), + }) +} + +func (f *fakeEventRecorder) getCalls() []eventfCall { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]eventfCall, len(f.calls)) + copy(out, f.calls) + return out +} + +func TestMultiClusterRecorder_HomeGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: homeRecorder, + } + + historyGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "History"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{historyGVK: true}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + history := &v1alpha1.History{ + ObjectMeta: metav1.ObjectMeta{Name: "nova-uuid-1"}, + } + recorder.Eventf(history, nil, corev1.EventTypeNormal, "SchedulingSucceeded", "Scheduled", "selected host: %s", "compute-1") + + calls := homeRecorder.getCalls() + if len(calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(calls)) + } + if calls[0].eventtype != corev1.EventTypeNormal { + t.Errorf("expected event type %q, got %q", corev1.EventTypeNormal, calls[0].eventtype) + } + if calls[0].reason != "SchedulingSucceeded" { + t.Errorf("expected reason %q, got %q", "SchedulingSucceeded", calls[0].reason) + } + if calls[0].note != "selected host: compute-1" { + t.Errorf("expected note %q, got %q", "selected host: compute-1", calls[0].note) + } +} + +func TestMultiClusterRecorder_RemoteGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + remoteRecorder := &fakeEventRecorder{} + + homeCluster := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: homeRecorder, + } + remote := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: remoteRecorder, + } + + reservationGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "Reservation"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + ResourceRouters: map[schema.GroupVersionKind]ResourceRouter{ + reservationGVK: ReservationsResourceRouter{}, + }, + remoteClusters: map[schema.GroupVersionKind][]remoteCluster{ + reservationGVK: {{cluster: remote, labels: map[string]string{"availabilityZone": "az-a"}}}, + }, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "res-1"}, + Spec: v1alpha1.ReservationSpec{AvailabilityZone: "az-a"}, + } + recorder.Eventf(res, nil, corev1.EventTypeNormal, "ValidationPassed", "Validated", "reservation validated") + + // Event should go to the remote recorder, not home. + homeCalls := homeRecorder.getCalls() + if len(homeCalls) != 0 { + t.Errorf("expected 0 home calls, got %d", len(homeCalls)) + } + remoteCalls := remoteRecorder.getCalls() + if len(remoteCalls) != 1 { + t.Fatalf("expected 1 remote call, got %d", len(remoteCalls)) + } + if remoteCalls[0].action != "Validated" { + t.Errorf("expected action %q, got %q", "Validated", remoteCalls[0].action) + } +} + +func TestMultiClusterRecorder_MultipleRemotes(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + remoteARecorder := &fakeEventRecorder{} + remoteBRecorder := &fakeEventRecorder{} + + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + remoteA := &fakeCluster{fakeRecorder: remoteARecorder, fakeCache: &fakeCache{}} + remoteB := &fakeCluster{fakeRecorder: remoteBRecorder, fakeCache: &fakeCache{}} + + reservationGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "Reservation"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + ResourceRouters: map[schema.GroupVersionKind]ResourceRouter{ + reservationGVK: ReservationsResourceRouter{}, + }, + remoteClusters: map[schema.GroupVersionKind][]remoteCluster{ + reservationGVK: { + {cluster: remoteA, labels: map[string]string{"availabilityZone": "az-a"}}, + {cluster: remoteB, labels: map[string]string{"availabilityZone": "az-b"}}, + }, + }, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + // Event for az-b should go to remoteB. + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "res-b"}, + Spec: v1alpha1.ReservationSpec{AvailabilityZone: "az-b"}, + } + recorder.Eventf(res, nil, corev1.EventTypeWarning, "SchedulingFailed", "FailedScheduling", "no host found") + + if len(remoteARecorder.getCalls()) != 0 { + t.Errorf("expected 0 calls to remote-a, got %d", len(remoteARecorder.getCalls())) + } + if len(remoteBRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 call to remote-b, got %d", len(remoteBRecorder.getCalls())) + } + if remoteBRecorder.getCalls()[0].reason != "SchedulingFailed" { + t.Errorf("expected reason %q, got %q", "SchedulingFailed", remoteBRecorder.getCalls()[0].reason) + } +} + +func TestMultiClusterRecorder_FallbackOnUnknownGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + // unknownType is not in the scheme — should fall back to home recorder. + obj := &unknownType{ObjectMeta: metav1.ObjectMeta{Name: "unknown-1"}} + recorder.Eventf(obj, nil, corev1.EventTypeNormal, "Test", "Test", "test message") + + if len(homeRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 home call on fallback, got %d", len(homeRecorder.getCalls())) + } +} + +func TestMultiClusterRecorder_FallbackOnNilRegarding(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + } + + recorder := mcl.GetEventRecorder("test-recorder") + recorder.Eventf(nil, nil, corev1.EventTypeNormal, "Test", "Test", "nil regarding") + + if len(homeRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 home call for nil regarding, got %d", len(homeRecorder.getCalls())) + } +} + +func TestMultiClusterRecorder_ConcurrentEventf(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + historyGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "History"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{historyGVK: true}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + const goroutines = 20 + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func(n int) { + defer wg.Done() + history := &v1alpha1.History{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("history-%d", n)}, + } + recorder.Eventf(history, nil, corev1.EventTypeNormal, "Test", "Test", "event %d", n) + }(i) + } + wg.Wait() + + if len(homeRecorder.getCalls()) != goroutines { + t.Errorf("expected %d calls, got %d", goroutines, len(homeRecorder.getCalls())) + } +} From 9ba897ad0b156af58d29dc8579beaa91f8de780f Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Tue, 24 Mar 2026 15:26:17 +0100 Subject: [PATCH 2/6] Refactor Eventf method signatures to use 'any' type for variadic arguments --- pkg/multicluster/recorder.go | 2 +- pkg/multicluster/recorder_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/multicluster/recorder.go b/pkg/multicluster/recorder.go index d36cc068d..8c7c78d3d 100644 --- a/pkg/multicluster/recorder.go +++ b/pkg/multicluster/recorder.go @@ -50,7 +50,7 @@ func (c *Client) GetEventRecorder(name string) events.EventRecorder { // Eventf routes the event to the cluster that owns the "regarding" object. // Falls back to the home cluster recorder if routing fails. -func (r *MultiClusterRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { +func (r *MultiClusterRecorder) Eventf(regarding, related runtime.Object, eventtype, reason, action, note string, args ...any) { recorder := r.recorderFor(regarding) recorder.Eventf(regarding, related, eventtype, reason, action, note, args...) } diff --git a/pkg/multicluster/recorder_test.go b/pkg/multicluster/recorder_test.go index 6a0888921..b23216e2e 100644 --- a/pkg/multicluster/recorder_test.go +++ b/pkg/multicluster/recorder_test.go @@ -29,7 +29,7 @@ type eventfCall struct { note string } -func (f *fakeEventRecorder) Eventf(regarding runtime.Object, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) { +func (f *fakeEventRecorder) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...any) { f.mu.Lock() defer f.mu.Unlock() f.calls = append(f.calls, eventfCall{ @@ -239,7 +239,7 @@ func TestMultiClusterRecorder_ConcurrentEventf(t *testing.T) { const goroutines = 20 var wg sync.WaitGroup wg.Add(goroutines) - for i := 0; i < goroutines; i++ { + for i := range goroutines { go func(n int) { defer wg.Done() history := &v1alpha1.History{ From 5ed676f6bd122494deec16be2adee6f87e605cdc Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Wed, 25 Mar 2026 08:02:46 +0100 Subject: [PATCH 3/6] Feedback --- pkg/multicluster/client_test.go | 2 +- pkg/multicluster/recorder_test.go | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/multicluster/client_test.go b/pkg/multicluster/client_test.go index 290c61e1b..4ea3900cb 100644 --- a/pkg/multicluster/client_test.go +++ b/pkg/multicluster/client_test.go @@ -78,7 +78,7 @@ type fakeCluster struct { cluster.Cluster fakeClient client.Client fakeCache *fakeCache - fakeRecorder *fakeEventRecorder + fakeRecorder events.EventRecorder } func (f *fakeCluster) GetClient() client.Client { diff --git a/pkg/multicluster/recorder_test.go b/pkg/multicluster/recorder_test.go index b23216e2e..90bdd71dc 100644 --- a/pkg/multicluster/recorder_test.go +++ b/pkg/multicluster/recorder_test.go @@ -171,14 +171,16 @@ func TestMultiClusterRecorder_MultipleRemotes(t *testing.T) { } recorder.Eventf(res, nil, corev1.EventTypeWarning, "SchedulingFailed", "FailedScheduling", "no host found") - if len(remoteARecorder.getCalls()) != 0 { - t.Errorf("expected 0 calls to remote-a, got %d", len(remoteARecorder.getCalls())) + remoteCallsA := remoteARecorder.getCalls() + if len(remoteCallsA) != 0 { + t.Errorf("expected 0 calls to remote-a, got %d", len(remoteCallsA)) } - if len(remoteBRecorder.getCalls()) != 1 { - t.Fatalf("expected 1 call to remote-b, got %d", len(remoteBRecorder.getCalls())) + remoteCallsB := remoteBRecorder.getCalls() + if len(remoteCallsB) != 1 { + t.Fatalf("expected 1 call to remote-b, got %d", len(remoteCallsB)) } - if remoteBRecorder.getCalls()[0].reason != "SchedulingFailed" { - t.Errorf("expected reason %q, got %q", "SchedulingFailed", remoteBRecorder.getCalls()[0].reason) + if remoteCallsB[0].reason != "SchedulingFailed" { + t.Errorf("expected reason %q, got %q", "SchedulingFailed", remoteCallsB[0].reason) } } @@ -250,7 +252,8 @@ func TestMultiClusterRecorder_ConcurrentEventf(t *testing.T) { } wg.Wait() - if len(homeRecorder.getCalls()) != goroutines { - t.Errorf("expected %d calls, got %d", goroutines, len(homeRecorder.getCalls())) + homeCalls := homeRecorder.getCalls() + if len(homeCalls) != goroutines { + t.Errorf("expected %d calls, got %d", goroutines, len(homeCalls)) } } From eb7c65f8ccfcb1c6b78ddcc3655edcbd74eee6f2 Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Wed, 25 Mar 2026 09:06:02 +0100 Subject: [PATCH 4/6] Add multicluster example with history crd --- docs/guides/multicluster/readme.md | 2 +- docs/guides/multicluster/run.sh | 8 +- docs/guides/multicluster/schedule.sh | 190 ++++++++++++++++++++ docs/guides/multicluster/test-pipeline.yaml | 12 ++ 4 files changed, 209 insertions(+), 3 deletions(-) create mode 100755 docs/guides/multicluster/schedule.sh create mode 100644 docs/guides/multicluster/test-pipeline.yaml diff --git a/docs/guides/multicluster/readme.md b/docs/guides/multicluster/readme.md index f65d19486..015d28775 100644 --- a/docs/guides/multicluster/readme.md +++ b/docs/guides/multicluster/readme.md @@ -1,7 +1,7 @@ # Cortex Multi-Cluster Testing > [!NOTE] -> If you want to skip the reading part, there's `run.sh` and `cleanup.sh` scripts in this directory that will set up and tear down the multi-cluster environment for you. +> If you want to skip the reading part, there's `run.sh` and `cleanup.sh` scripts in this directory that will set up and tear down the multi-cluster environment for you. If you want to test the Cortex provides support for multi-cluster deployments, where a "home" cluster hosts the cortex pods and one or more "remote" clusters are used to persist CRDs. A typical use case for this would be to offload the etcd storage for Cortex CRDs to a remote cluster, reducing the resource usage on the home cluster. Similarly, another use case is to have multiple remote clusters that maintain all the compute workloads and expose resources that Cortex needs to access, such as the `Hypervisor` resource. diff --git a/docs/guides/multicluster/run.sh b/docs/guides/multicluster/run.sh index 063de8ab6..f68956870 100755 --- a/docs/guides/multicluster/run.sh +++ b/docs/guides/multicluster/run.sh @@ -52,16 +52,20 @@ global: gvks: - kvm.cloud.sap/v1/Hypervisor - kvm.cloud.sap/v1/HypervisorList + - cortex.cloud/v1alpha1/History + - cortex.cloud/v1alpha1/HistoryList labels: - az: cortex-remote-az-a + availabilityZone: cortex-remote-az-a caCert: | $(cat /tmp/root-ca-remote-az-a.pem | sed 's/^/ /') - host: https://host.docker.internal:8445 gvks: - kvm.cloud.sap/v1/Hypervisor - kvm.cloud.sap/v1/HypervisorList + - cortex.cloud/v1alpha1/History + - cortex.cloud/v1alpha1/HistoryList labels: - az: cortex-remote-az-b + availabilityZone: cortex-remote-az-b caCert: | $(cat /tmp/root-ca-remote-az-b.pem | sed 's/^/ /') EOF diff --git a/docs/guides/multicluster/schedule.sh b/docs/guides/multicluster/schedule.sh new file mode 100755 index 000000000..2d8a47f0d --- /dev/null +++ b/docs/guides/multicluster/schedule.sh @@ -0,0 +1,190 @@ +#!/bin/bash + +set -e + +API_URL="http://localhost:8001/scheduler/nova/external" +INSTANCE_UUID=$(uuidgen | tr '[:upper:]' '[:lower:]') + +echo "Applying test pipeline to home cluster" +kubectl --context kind-cortex-home apply -f docs/guides/multicluster/test-pipeline.yaml + +echo "" +echo "Sending scheduling request for instance $INSTANCE_UUID" +echo "The test pipeline will schedule the instance on one of the hosts in cortex-remote-az-b". +echo "Hosts: hypervisor-1-az-a, hypervisor-2-az-a, hypervisor-1-az-b, hypervisor-2-az-b" +echo "" + +RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$API_URL" \ + -H "Content-Type: application/json" \ + -d @- </dev/null || echo "$BODY" + +sleep 1 +echo "" +echo "--- Check History CRDs in cortex-home ---" +kubectl --context kind-cortex-home get histories +kubectl --context kind-cortex-home get events --field-selector reason=SchedulingSucceeded +echo "" +echo "--- Check History CRDs in cortex-remote-az-a ---" +kubectl --context kind-cortex-remote-az-a get histories +kubectl --context kind-cortex-remote-az-a get events --field-selector reason=SchedulingSucceeded + +echo "" +echo "--- Check History CRDs in cortex-remote-az-b ---" +kubectl --context kind-cortex-remote-az-b get histories +kubectl --context kind-cortex-remote-az-b get events --field-selector reason=SchedulingSucceeded + + + diff --git a/docs/guides/multicluster/test-pipeline.yaml b/docs/guides/multicluster/test-pipeline.yaml new file mode 100644 index 000000000..667f4a7fe --- /dev/null +++ b/docs/guides/multicluster/test-pipeline.yaml @@ -0,0 +1,12 @@ +apiVersion: cortex.cloud/v1alpha1 +kind: Pipeline +metadata: + name: multicluster-test +spec: + schedulingDomain: nova + description: Minimal test pipeline for the multicluster guide. + type: filter-weigher + createHistory: true + filters: + - name: filter_correct_az + weighers: [] From 3f2b7ff3501c07b780276543e5ea02b81cf7685c Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Wed, 25 Mar 2026 09:13:19 +0100 Subject: [PATCH 5/6] Make example instance static --- docs/guides/multicluster/readme.md | 2 +- docs/guides/multicluster/schedule.sh | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/guides/multicluster/readme.md b/docs/guides/multicluster/readme.md index 015d28775..ab9c9e60a 100644 --- a/docs/guides/multicluster/readme.md +++ b/docs/guides/multicluster/readme.md @@ -1,7 +1,7 @@ # Cortex Multi-Cluster Testing > [!NOTE] -> If you want to skip the reading part, there's `run.sh` and `cleanup.sh` scripts in this directory that will set up and tear down the multi-cluster environment for you. If you want to test the +> If you want to skip the reading part, there's `run.sh` and `cleanup.sh` scripts in this directory that will set up and tear down the multi-cluster environment for you. If you want to test the multi-cluster setup you can run the `schedule.sh` script, which will create a scheduling request and show you how it gets processed across the clusters. Cortex provides support for multi-cluster deployments, where a "home" cluster hosts the cortex pods and one or more "remote" clusters are used to persist CRDs. A typical use case for this would be to offload the etcd storage for Cortex CRDs to a remote cluster, reducing the resource usage on the home cluster. Similarly, another use case is to have multiple remote clusters that maintain all the compute workloads and expose resources that Cortex needs to access, such as the `Hypervisor` resource. diff --git a/docs/guides/multicluster/schedule.sh b/docs/guides/multicluster/schedule.sh index 2d8a47f0d..674efd3fe 100755 --- a/docs/guides/multicluster/schedule.sh +++ b/docs/guides/multicluster/schedule.sh @@ -3,7 +3,7 @@ set -e API_URL="http://localhost:8001/scheduler/nova/external" -INSTANCE_UUID=$(uuidgen | tr '[:upper:]' '[:lower:]') +INSTANCE_UUID="cortex-test-instance-001" echo "Applying test pipeline to home cluster" kubectl --context kind-cortex-home apply -f docs/guides/multicluster/test-pipeline.yaml @@ -186,5 +186,11 @@ echo "--- Check History CRDs in cortex-remote-az-b ---" kubectl --context kind-cortex-remote-az-b get histories kubectl --context kind-cortex-remote-az-b get events --field-selector reason=SchedulingSucceeded +echo "---" +echo "Press enter to describe the History CRD in cortex-remote-az-b and see the details of the scheduling result" +read -r + +echo "--- Describe History CRD in cortex-remote-az-b ---" +kubectl --context kind-cortex-remote-az-b describe history nova-cortex-test-instance-001 From 52c910bae5aefec50864710cc21157a5ae7a7be3 Mon Sep 17 00:00:00 2001 From: Markus Wieland Date: Wed, 25 Mar 2026 09:23:45 +0100 Subject: [PATCH 6/6] Enhance scheduling script with detailed steps and error handling for History CRDs --- docs/guides/multicluster/schedule.sh | 99 ++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 21 deletions(-) diff --git a/docs/guides/multicluster/schedule.sh b/docs/guides/multicluster/schedule.sh index 674efd3fe..a1caaf24d 100755 --- a/docs/guides/multicluster/schedule.sh +++ b/docs/guides/multicluster/schedule.sh @@ -4,14 +4,37 @@ set -e API_URL="http://localhost:8001/scheduler/nova/external" INSTANCE_UUID="cortex-test-instance-001" +HISTORY_NAME="nova-$INSTANCE_UUID" + +# --- Step 1: Apply the test pipeline ----------------------------------------- + +echo "=== Step 1: Apply test pipeline ===" +echo "" +echo "The test pipeline is a minimal filter-weigher pipeline with:" +echo " - createHistory: true (so a History CRD is created for each decision)" +echo " - filter_correct_az (filters hosts not matching the requested AZ)" +echo " - no weighers (hosts are returned in their original order)" +echo "" -echo "Applying test pipeline to home cluster" kubectl --context kind-cortex-home apply -f docs/guides/multicluster/test-pipeline.yaml echo "" -echo "Sending scheduling request for instance $INSTANCE_UUID" -echo "The test pipeline will schedule the instance on one of the hosts in cortex-remote-az-b". -echo "Hosts: hypervisor-1-az-a, hypervisor-2-az-a, hypervisor-1-az-b, hypervisor-2-az-b" +echo "Press enter to send a scheduling request..." +read -r + +# --- Step 2: Send scheduling request ----------------------------------------- + +echo "=== Step 2: Send scheduling request ===" +echo "" +echo "Sending a Nova external scheduler request to the cortex API." +echo "" +echo " Instance UUID: $INSTANCE_UUID" +echo " Availability Zone: cortex-remote-az-b" +echo " Pipeline: multicluster-test" +echo " Candidate hosts: hypervisor-{1,2}-az-{a,b} (4 hosts across 2 AZs)" +echo "" +echo "The pipeline's filter_correct_az step should filter out the az-a hosts," +echo "leaving only hypervisor-1-az-b and hypervisor-2-az-b." echo "" RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$API_URL" \ @@ -167,30 +190,64 @@ EOF HTTP_CODE=$(echo "$RESPONSE" | tail -1) BODY=$(echo "$RESPONSE" | sed '$d') -echo "Response from scheduler:" -echo "HTTP $HTTP_CODE" +echo "Response (HTTP $HTTP_CODE):" echo "$BODY" | python3 -m json.tool 2>/dev/null || echo "$BODY" -sleep 1 -echo "" -echo "--- Check History CRDs in cortex-home ---" -kubectl --context kind-cortex-home get histories -kubectl --context kind-cortex-home get events --field-selector reason=SchedulingSucceeded +if [ "$HTTP_CODE" != "200" ]; then + echo "" + echo "ERROR: Scheduling request failed. Check the controller logs:" + echo " kubectl --context kind-cortex-home logs deploy/cortex-nova-scheduling-controller-manager" + exit 1 +fi + echo "" -echo "--- Check History CRDs in cortex-remote-az-a ---" -kubectl --context kind-cortex-remote-az-a get histories -kubectl --context kind-cortex-remote-az-a get events --field-selector reason=SchedulingSucceeded +echo "Press enter to check History CRDs and events across all clusters..." +read -r +# --- Step 3: Check History and Events ---------------------------------------- + +echo "=== Step 3: Check History CRDs and Events ===" +echo "" +echo "The pipeline has createHistory: true, so a History CRD named '$HISTORY_NAME'" +echo "should have been created. An event should also have been recorded on it." +echo "Based on the multicluster config, this should be on the remote cluster cortex-remote-az-b." echo "" -echo "--- Check History CRDs in cortex-remote-az-b ---" -kubectl --context kind-cortex-remote-az-b get histories -kubectl --context kind-cortex-remote-az-b get events --field-selector reason=SchedulingSucceeded -echo "---" -echo "Press enter to describe the History CRD in cortex-remote-az-b and see the details of the scheduling result" +sleep 1 + +for CLUSTER in kind-cortex-home kind-cortex-remote-az-a kind-cortex-remote-az-b; do + echo "--- $CLUSTER ---" + echo "Histories:" + kubectl --context "$CLUSTER" get histories 2>/dev/null || echo " (none)" + echo "Events:" + kubectl --context "$CLUSTER" get events --field-selector reason=SchedulingSucceeded 2>/dev/null || echo " (none)" + echo "" +done + +echo "Press enter to describe the History CRD and see the full scheduling result..." read -r -echo "--- Describe History CRD in cortex-remote-az-b ---" -kubectl --context kind-cortex-remote-az-b describe history nova-cortex-test-instance-001 +# --- Step 4: Describe History ------------------------------------------------ + +echo "=== Step 4: Describe History CRD ===" +echo "" +echo "The History CRD contains the full scheduling decision context:" +echo " - Which pipeline was used" +echo " - The target host that was selected" +echo " - An explanation of each filter/weigher step" +echo " - The Ready condition (True = host selected, False = no host found)" +echo "" +# Try all clusters to find where the History ended up. +for CLUSTER in kind-cortex-home kind-cortex-remote-az-a kind-cortex-remote-az-b; do + if kubectl --context "$CLUSTER" get history "$HISTORY_NAME" &>/dev/null; then + echo "Found History '$HISTORY_NAME' in $CLUSTER:" + echo "" + kubectl --context "$CLUSTER" describe history "$HISTORY_NAME" + exit 0 + fi +done +echo "WARNING: History '$HISTORY_NAME' was not found in any cluster." +echo "Check the controller logs for errors:" +echo " kubectl --context kind-cortex-home logs deploy/cortex-nova-scheduling-controller-manager | grep -i history"