diff --git a/docs/guides/multicluster/readme.md b/docs/guides/multicluster/readme.md index f65d19486..ab9c9e60a 100644 --- a/docs/guides/multicluster/readme.md +++ b/docs/guides/multicluster/readme.md @@ -1,7 +1,7 @@ # Cortex Multi-Cluster Testing > [!NOTE] -> If you want to skip the reading part, there's `run.sh` and `cleanup.sh` scripts in this directory that will set up and tear down the multi-cluster environment for you. +> If you want to skip the reading part, there's `run.sh` and `cleanup.sh` scripts in this directory that will set up and tear down the multi-cluster environment for you. If you want to test the multi-cluster setup you can run the `schedule.sh` script, which will create a scheduling request and show you how it gets processed across the clusters. Cortex provides support for multi-cluster deployments, where a "home" cluster hosts the cortex pods and one or more "remote" clusters are used to persist CRDs. A typical use case for this would be to offload the etcd storage for Cortex CRDs to a remote cluster, reducing the resource usage on the home cluster. Similarly, another use case is to have multiple remote clusters that maintain all the compute workloads and expose resources that Cortex needs to access, such as the `Hypervisor` resource. diff --git a/docs/guides/multicluster/run.sh b/docs/guides/multicluster/run.sh index 063de8ab6..f68956870 100755 --- a/docs/guides/multicluster/run.sh +++ b/docs/guides/multicluster/run.sh @@ -52,16 +52,20 @@ global: gvks: - kvm.cloud.sap/v1/Hypervisor - kvm.cloud.sap/v1/HypervisorList + - cortex.cloud/v1alpha1/History + - cortex.cloud/v1alpha1/HistoryList labels: - az: cortex-remote-az-a + availabilityZone: cortex-remote-az-a caCert: | $(cat /tmp/root-ca-remote-az-a.pem | sed 's/^/ /') - host: https://host.docker.internal:8445 gvks: - kvm.cloud.sap/v1/Hypervisor - kvm.cloud.sap/v1/HypervisorList + - cortex.cloud/v1alpha1/History + - cortex.cloud/v1alpha1/HistoryList labels: - az: cortex-remote-az-b + availabilityZone: cortex-remote-az-b caCert: | $(cat /tmp/root-ca-remote-az-b.pem | sed 's/^/ /') EOF diff --git a/docs/guides/multicluster/schedule.sh b/docs/guides/multicluster/schedule.sh new file mode 100755 index 000000000..a1caaf24d --- /dev/null +++ b/docs/guides/multicluster/schedule.sh @@ -0,0 +1,253 @@ +#!/bin/bash + +set -e + +API_URL="http://localhost:8001/scheduler/nova/external" +INSTANCE_UUID="cortex-test-instance-001" +HISTORY_NAME="nova-$INSTANCE_UUID" + +# --- Step 1: Apply the test pipeline ----------------------------------------- + +echo "=== Step 1: Apply test pipeline ===" +echo "" +echo "The test pipeline is a minimal filter-weigher pipeline with:" +echo " - createHistory: true (so a History CRD is created for each decision)" +echo " - filter_correct_az (filters hosts not matching the requested AZ)" +echo " - no weighers (hosts are returned in their original order)" +echo "" + +kubectl --context kind-cortex-home apply -f docs/guides/multicluster/test-pipeline.yaml + +echo "" +echo "Press enter to send a scheduling request..." +read -r + +# --- Step 2: Send scheduling request ----------------------------------------- + +echo "=== Step 2: Send scheduling request ===" +echo "" +echo "Sending a Nova external scheduler request to the cortex API." +echo "" +echo " Instance UUID: $INSTANCE_UUID" +echo " Availability Zone: cortex-remote-az-b" +echo " Pipeline: multicluster-test" +echo " Candidate hosts: hypervisor-{1,2}-az-{a,b} (4 hosts across 2 AZs)" +echo "" +echo "The pipeline's filter_correct_az step should filter out the az-a hosts," +echo "leaving only hypervisor-1-az-b and hypervisor-2-az-b." +echo "" + +RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$API_URL" \ + -H "Content-Type: application/json" \ + -d @- </dev/null || echo "$BODY" + +if [ "$HTTP_CODE" != "200" ]; then + echo "" + echo "ERROR: Scheduling request failed. Check the controller logs:" + echo " kubectl --context kind-cortex-home logs deploy/cortex-nova-scheduling-controller-manager" + exit 1 +fi + +echo "" +echo "Press enter to check History CRDs and events across all clusters..." +read -r + +# --- Step 3: Check History and Events ---------------------------------------- + +echo "=== Step 3: Check History CRDs and Events ===" +echo "" +echo "The pipeline has createHistory: true, so a History CRD named '$HISTORY_NAME'" +echo "should have been created. An event should also have been recorded on it." +echo "Based on the multicluster config, this should be on the remote cluster cortex-remote-az-b." +echo "" + +sleep 1 + +for CLUSTER in kind-cortex-home kind-cortex-remote-az-a kind-cortex-remote-az-b; do + echo "--- $CLUSTER ---" + echo "Histories:" + kubectl --context "$CLUSTER" get histories 2>/dev/null || echo " (none)" + echo "Events:" + kubectl --context "$CLUSTER" get events --field-selector reason=SchedulingSucceeded 2>/dev/null || echo " (none)" + echo "" +done + +echo "Press enter to describe the History CRD and see the full scheduling result..." +read -r + +# --- Step 4: Describe History ------------------------------------------------ + +echo "=== Step 4: Describe History CRD ===" +echo "" +echo "The History CRD contains the full scheduling decision context:" +echo " - Which pipeline was used" +echo " - The target host that was selected" +echo " - An explanation of each filter/weigher step" +echo " - The Ready condition (True = host selected, False = no host found)" +echo "" + +# Try all clusters to find where the History ended up. +for CLUSTER in kind-cortex-home kind-cortex-remote-az-a kind-cortex-remote-az-b; do + if kubectl --context "$CLUSTER" get history "$HISTORY_NAME" &>/dev/null; then + echo "Found History '$HISTORY_NAME' in $CLUSTER:" + echo "" + kubectl --context "$CLUSTER" describe history "$HISTORY_NAME" + exit 0 + fi +done + +echo "WARNING: History '$HISTORY_NAME' was not found in any cluster." +echo "Check the controller logs for errors:" +echo " kubectl --context kind-cortex-home logs deploy/cortex-nova-scheduling-controller-manager | grep -i history" diff --git a/docs/guides/multicluster/test-pipeline.yaml b/docs/guides/multicluster/test-pipeline.yaml new file mode 100644 index 000000000..667f4a7fe --- /dev/null +++ b/docs/guides/multicluster/test-pipeline.yaml @@ -0,0 +1,12 @@ +apiVersion: cortex.cloud/v1alpha1 +kind: Pipeline +metadata: + name: multicluster-test +spec: + schedulingDomain: nova + description: Minimal test pipeline for the multicluster guide. + type: filter-weigher + createHistory: true + filters: + - name: filter_correct_az + weighers: [] diff --git a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go index 7163cc5a6..52ec37306 100644 --- a/internal/scheduling/cinder/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/cinder/filter_weigher_pipeline_controller.go @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainCinder - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-cinder-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-cinder-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/machines/filter_weigher_pipeline_controller.go b/internal/scheduling/machines/filter_weigher_pipeline_controller.go index d76203812..35d51708a 100644 --- a/internal/scheduling/machines/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/machines/filter_weigher_pipeline_controller.go @@ -222,7 +222,7 @@ func (c *FilterWeigherPipelineController) handleMachine() handler.EventHandler { func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainMachines - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-machines-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-machines-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/manila/filter_weigher_pipeline_controller.go b/internal/scheduling/manila/filter_weigher_pipeline_controller.go index 6ab938511..128b7d719 100644 --- a/internal/scheduling/manila/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/manila/filter_weigher_pipeline_controller.go @@ -148,7 +148,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainManila - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-manila-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-manila-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index bb9c5bb07..279ac1c3e 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -199,7 +199,7 @@ func (c *FilterWeigherPipelineController) InitPipeline( func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainNova - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-nova-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-nova-scheduler")} c.gatherer = &candidateGatherer{Client: mcl} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err diff --git a/internal/scheduling/pods/filter_weigher_pipeline_controller.go b/internal/scheduling/pods/filter_weigher_pipeline_controller.go index ceba977b8..0ceee6485 100644 --- a/internal/scheduling/pods/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/pods/filter_weigher_pipeline_controller.go @@ -234,7 +234,7 @@ func (c *FilterWeigherPipelineController) handlePod() handler.EventHandler { func (c *FilterWeigherPipelineController) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error { c.Initializer = c c.SchedulingDomain = v1alpha1.SchedulingDomainPods - c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mgr.GetEventRecorder("cortex-pods-scheduler")} + c.HistoryManager = lib.HistoryClient{Client: mcl, Recorder: mcl.GetEventRecorder("cortex-pods-scheduler")} if err := mgr.Add(manager.RunnableFunc(c.InitAllPipelines)); err != nil { return err } diff --git a/internal/scheduling/reservations/failover/controller.go b/internal/scheduling/reservations/failover/controller.go index fbb4f9c01..f2175adc3 100644 --- a/internal/scheduling/reservations/failover/controller.go +++ b/internal/scheduling/reservations/failover/controller.go @@ -766,7 +766,7 @@ func (c *FailoverReservationController) patchReservationStatus(ctx context.Conte // SetupWithManager sets up the watch-based reconciler with the Manager. // This handles per-reservation reconciliation triggered by CRD changes. func (c *FailoverReservationController) SetupWithManager(mgr ctrl.Manager, mcl *multicluster.Client) error { - c.Recorder = mgr.GetEventRecorder("failover-reservation-controller") + c.Recorder = mcl.GetEventRecorder("failover-reservation-controller") bldr := multicluster.BuildController(mcl, mgr) bldr, err := bldr.WatchesMulticluster( diff --git a/pkg/multicluster/client_test.go b/pkg/multicluster/client_test.go index 64b0ae94c..4ea3900cb 100644 --- a/pkg/multicluster/client_test.go +++ b/pkg/multicluster/client_test.go @@ -13,6 +13,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -74,8 +76,9 @@ func (f *fakeCache) getIndexFieldCalls() []indexFieldCall { // fakeCluster implements cluster.Cluster interface for testing. type fakeCluster struct { cluster.Cluster - fakeClient client.Client - fakeCache *fakeCache + fakeClient client.Client + fakeCache *fakeCache + fakeRecorder events.EventRecorder } func (f *fakeCluster) GetClient() client.Client { @@ -86,6 +89,17 @@ func (f *fakeCluster) GetCache() cache.Cache { return f.fakeCache } +func (f *fakeCluster) GetEventRecorder(_ string) events.EventRecorder { + if f.fakeRecorder != nil { + return f.fakeRecorder + } + return &fakeEventRecorder{} +} + +func (f *fakeCluster) GetEventRecorderFor(_ string) record.EventRecorder { + return record.NewFakeRecorder(100) +} + func newFakeCluster(scheme *runtime.Scheme, objs ...client.Object) *fakeCluster { return &fakeCluster{ fakeClient: fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(), diff --git a/pkg/multicluster/recorder.go b/pkg/multicluster/recorder.go new file mode 100644 index 000000000..8c7c78d3d --- /dev/null +++ b/pkg/multicluster/recorder.go @@ -0,0 +1,83 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package multicluster + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/events" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cluster" +) + +// MultiClusterRecorder implements events.EventRecorder and routes events to the +// correct cluster based on the GVK of the "regarding" object. It uses the same +// routing logic as the multicluster Client's write path. +type MultiClusterRecorder struct { + client *Client + homeRecorder events.EventRecorder + recorders map[cluster.Cluster]events.EventRecorder +} + +// GetEventRecorder creates a multi-cluster-aware EventRecorder. It pre-creates +// a per-cluster recorder for the home cluster and every remote cluster currently +// registered in the client. The name parameter is passed through to each +// cluster's GetEventRecorder method (it becomes the reportingController in the +// Kubernetes Event). +func (c *Client) GetEventRecorder(name string) events.EventRecorder { + homeRecorder := c.HomeCluster.GetEventRecorder(name) + + recorders := make(map[cluster.Cluster]events.EventRecorder) + recorders[c.HomeCluster] = homeRecorder + + c.remoteClustersMu.RLock() + defer c.remoteClustersMu.RUnlock() + + for _, remotes := range c.remoteClusters { + for _, r := range remotes { + if _, exists := recorders[r.cluster]; !exists { + recorders[r.cluster] = r.cluster.GetEventRecorder(name) + } + } + } + + return &MultiClusterRecorder{ + client: c, + homeRecorder: homeRecorder, + recorders: recorders, + } +} + +// Eventf routes the event to the cluster that owns the "regarding" object. +// Falls back to the home cluster recorder if routing fails. +func (r *MultiClusterRecorder) Eventf(regarding, related runtime.Object, eventtype, reason, action, note string, args ...any) { + recorder := r.recorderFor(regarding) + recorder.Eventf(regarding, related, eventtype, reason, action, note, args...) +} + +// recorderFor resolves which per-cluster recorder to use for the given object. +func (r *MultiClusterRecorder) recorderFor(obj runtime.Object) events.EventRecorder { + if obj == nil { + return r.homeRecorder + } + + gvk, err := r.client.GVKFromHomeScheme(obj) + if err != nil { + ctrl.Log.V(1).Info("multi-cluster recorder: failed to resolve GVK, using home recorder", "error", err) + return r.homeRecorder + } + + cl, err := r.client.clusterForWrite(gvk, obj) + if err != nil { + ctrl.Log.V(1).Info("multi-cluster recorder: no cluster matched, using home recorder", "gvk", gvk, "error", err) + return r.homeRecorder + } + + recorder, ok := r.recorders[cl] + if !ok { + ctrl.Log.V(1).Info("multi-cluster recorder: no pre-created recorder for cluster, using home recorder", "gvk", gvk) + return r.homeRecorder + } + + return recorder +} diff --git a/pkg/multicluster/recorder_test.go b/pkg/multicluster/recorder_test.go new file mode 100644 index 000000000..90bdd71dc --- /dev/null +++ b/pkg/multicluster/recorder_test.go @@ -0,0 +1,259 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package multicluster + +import ( + "fmt" + "sync" + "testing" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// fakeEventRecorder captures Eventf calls for assertions. +type fakeEventRecorder struct { + mu sync.Mutex + calls []eventfCall +} + +type eventfCall struct { + regarding runtime.Object + eventtype string + reason string + action string + note string +} + +func (f *fakeEventRecorder) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...any) { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, eventfCall{ + regarding: regarding, + eventtype: eventtype, + reason: reason, + action: action, + note: fmt.Sprintf(note, args...), + }) +} + +func (f *fakeEventRecorder) getCalls() []eventfCall { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]eventfCall, len(f.calls)) + copy(out, f.calls) + return out +} + +func TestMultiClusterRecorder_HomeGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: homeRecorder, + } + + historyGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "History"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{historyGVK: true}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + history := &v1alpha1.History{ + ObjectMeta: metav1.ObjectMeta{Name: "nova-uuid-1"}, + } + recorder.Eventf(history, nil, corev1.EventTypeNormal, "SchedulingSucceeded", "Scheduled", "selected host: %s", "compute-1") + + calls := homeRecorder.getCalls() + if len(calls) != 1 { + t.Fatalf("expected 1 call, got %d", len(calls)) + } + if calls[0].eventtype != corev1.EventTypeNormal { + t.Errorf("expected event type %q, got %q", corev1.EventTypeNormal, calls[0].eventtype) + } + if calls[0].reason != "SchedulingSucceeded" { + t.Errorf("expected reason %q, got %q", "SchedulingSucceeded", calls[0].reason) + } + if calls[0].note != "selected host: compute-1" { + t.Errorf("expected note %q, got %q", "selected host: compute-1", calls[0].note) + } +} + +func TestMultiClusterRecorder_RemoteGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + remoteRecorder := &fakeEventRecorder{} + + homeCluster := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: homeRecorder, + } + remote := &fakeCluster{ + fakeClient: nil, + fakeCache: &fakeCache{}, + fakeRecorder: remoteRecorder, + } + + reservationGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "Reservation"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + ResourceRouters: map[schema.GroupVersionKind]ResourceRouter{ + reservationGVK: ReservationsResourceRouter{}, + }, + remoteClusters: map[schema.GroupVersionKind][]remoteCluster{ + reservationGVK: {{cluster: remote, labels: map[string]string{"availabilityZone": "az-a"}}}, + }, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "res-1"}, + Spec: v1alpha1.ReservationSpec{AvailabilityZone: "az-a"}, + } + recorder.Eventf(res, nil, corev1.EventTypeNormal, "ValidationPassed", "Validated", "reservation validated") + + // Event should go to the remote recorder, not home. + homeCalls := homeRecorder.getCalls() + if len(homeCalls) != 0 { + t.Errorf("expected 0 home calls, got %d", len(homeCalls)) + } + remoteCalls := remoteRecorder.getCalls() + if len(remoteCalls) != 1 { + t.Fatalf("expected 1 remote call, got %d", len(remoteCalls)) + } + if remoteCalls[0].action != "Validated" { + t.Errorf("expected action %q, got %q", "Validated", remoteCalls[0].action) + } +} + +func TestMultiClusterRecorder_MultipleRemotes(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + remoteARecorder := &fakeEventRecorder{} + remoteBRecorder := &fakeEventRecorder{} + + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + remoteA := &fakeCluster{fakeRecorder: remoteARecorder, fakeCache: &fakeCache{}} + remoteB := &fakeCluster{fakeRecorder: remoteBRecorder, fakeCache: &fakeCache{}} + + reservationGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "Reservation"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + ResourceRouters: map[schema.GroupVersionKind]ResourceRouter{ + reservationGVK: ReservationsResourceRouter{}, + }, + remoteClusters: map[schema.GroupVersionKind][]remoteCluster{ + reservationGVK: { + {cluster: remoteA, labels: map[string]string{"availabilityZone": "az-a"}}, + {cluster: remoteB, labels: map[string]string{"availabilityZone": "az-b"}}, + }, + }, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + // Event for az-b should go to remoteB. + res := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{Name: "res-b"}, + Spec: v1alpha1.ReservationSpec{AvailabilityZone: "az-b"}, + } + recorder.Eventf(res, nil, corev1.EventTypeWarning, "SchedulingFailed", "FailedScheduling", "no host found") + + remoteCallsA := remoteARecorder.getCalls() + if len(remoteCallsA) != 0 { + t.Errorf("expected 0 calls to remote-a, got %d", len(remoteCallsA)) + } + remoteCallsB := remoteBRecorder.getCalls() + if len(remoteCallsB) != 1 { + t.Fatalf("expected 1 call to remote-b, got %d", len(remoteCallsB)) + } + if remoteCallsB[0].reason != "SchedulingFailed" { + t.Errorf("expected reason %q, got %q", "SchedulingFailed", remoteCallsB[0].reason) + } +} + +func TestMultiClusterRecorder_FallbackOnUnknownGVK(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + // unknownType is not in the scheme — should fall back to home recorder. + obj := &unknownType{ObjectMeta: metav1.ObjectMeta{Name: "unknown-1"}} + recorder.Eventf(obj, nil, corev1.EventTypeNormal, "Test", "Test", "test message") + + if len(homeRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 home call on fallback, got %d", len(homeRecorder.getCalls())) + } +} + +func TestMultiClusterRecorder_FallbackOnNilRegarding(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + } + + recorder := mcl.GetEventRecorder("test-recorder") + recorder.Eventf(nil, nil, corev1.EventTypeNormal, "Test", "Test", "nil regarding") + + if len(homeRecorder.getCalls()) != 1 { + t.Fatalf("expected 1 home call for nil regarding, got %d", len(homeRecorder.getCalls())) + } +} + +func TestMultiClusterRecorder_ConcurrentEventf(t *testing.T) { + scheme := newTestScheme(t) + homeRecorder := &fakeEventRecorder{} + homeCluster := &fakeCluster{fakeRecorder: homeRecorder, fakeCache: &fakeCache{}} + + historyGVK := schema.GroupVersionKind{Group: "cortex.cloud", Version: "v1alpha1", Kind: "History"} + mcl := &Client{ + HomeCluster: homeCluster, + HomeScheme: scheme, + homeGVKs: map[schema.GroupVersionKind]bool{historyGVK: true}, + } + + recorder := mcl.GetEventRecorder("test-recorder") + + const goroutines = 20 + var wg sync.WaitGroup + wg.Add(goroutines) + for i := range goroutines { + go func(n int) { + defer wg.Done() + history := &v1alpha1.History{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("history-%d", n)}, + } + recorder.Eventf(history, nil, corev1.EventTypeNormal, "Test", "Test", "event %d", n) + }(i) + } + wg.Wait() + + homeCalls := homeRecorder.getCalls() + if len(homeCalls) != goroutines { + t.Errorf("expected %d calls, got %d", goroutines, len(homeCalls)) + } +}