From 3d0630417c4794fccb057bf34c27a6cb32e992bd Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Thu, 4 Dec 2025 17:58:42 +0800 Subject: [PATCH 1/3] Fixed the priority queue implementation Signed-off-by: michaelawyu --- cmd/memberagent/main.go | 13 +- .../v1beta1/member_suite_test.go | 4 +- pkg/controllers/workapplier/controller.go | 227 ++----- pkg/controllers/workapplier/pq.go | 229 +++++++ pkg/controllers/workapplier/pq_test.go | 604 ++++++++++++++++++ pkg/controllers/workapplier/suite_test.go | 16 +- 6 files changed, 917 insertions(+), 176 deletions(-) create mode 100644 pkg/controllers/workapplier/pq.go create mode 100644 pkg/controllers/workapplier/pq_test.go diff --git a/cmd/memberagent/main.go b/cmd/memberagent/main.go index 5ef220d99..94b217408 100644 --- a/cmd/memberagent/main.go +++ b/cmd/memberagent/main.go @@ -373,7 +373,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk) return err } - // create the work controller, so we can pass it to the internal member cluster reconciler + // Set up the work applier. Note that it is referenced by the InternalMemberCluster controller. // Set up the requeue rate limiter for the work applier. // @@ -413,7 +413,8 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb *workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs, ) - workController := workapplier.NewReconciler( + workObjAgeForPrioritizedProcessing := time.Minute * time.Duration(*watchWorkReconcileAgeMinutes) + workApplier := workapplier.NewReconciler( hubMgr.GetClient(), targetNS, spokeDynamicClient, @@ -426,12 +427,12 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb // Use the default worker count (4) for parallelized manifest processing. parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers), time.Minute*time.Duration(*deletionWaitTime), - *watchWorkWithPriorityQueue, - *watchWorkReconcileAgeMinutes, requeueRateLimiter, + *watchWorkWithPriorityQueue, + workObjAgeForPrioritizedProcessing, ) - if err = workController.SetupWithManager(hubMgr); err != nil { + if err = workApplier.SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work") return err } @@ -459,7 +460,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb ctx, hubMgr.GetClient(), memberMgr.GetConfig(), memberMgr.GetClient(), - workController, + workApplier, pp) if err != nil { klog.ErrorS(err, "Failed to create InternalMemberCluster v1beta1 reconciler") diff --git a/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go b/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go index 6176e4926..8af999722 100644 --- a/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go +++ b/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() { // This controller is created for testing purposes only; no reconciliation loop is actually // run. - workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil) + workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0) propertyProvider1 = &manuallyUpdatedProvider{} member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1) @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() { // This controller is created for testing purposes only; no reconciliation loop is actually // run. - workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil) + workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0) member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index c4e4df323..0831e66fe 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -19,10 +19,10 @@ package workapplier import ( "context" "fmt" + "sync" "time" "go.uber.org/atomic" - "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,12 +41,10 @@ import ( ctrloption "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" - "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" "github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter" parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer" @@ -54,136 +52,19 @@ import ( const ( patchDetailPerObjLimit = 100 + + minWorkObjAgeForPrioritizedQueueing = time.Minute * 30 ) const ( workFieldManagerName = "work-api-agent" ) -var ( - workAgeToReconcile = 1 * time.Hour -) - -// Custom type to hold a reconcile.Request and a priority value -type priorityQueueItem struct { - reconcile.Request - Priority int -} - -// PriorityQueueEventHandler is a custom event handler for adding objects to the priority queue. -type PriorityQueueEventHandler struct { - Queue priorityqueue.PriorityQueue[priorityQueueItem] // The priority queue to manage events - Client client.Client // store the client to make API calls -} - -// Implement priorityqueue.Item interface for priorityQueueItem -func (i priorityQueueItem) GetPriority() int { - return i.Priority -} - -func (h *PriorityQueueEventHandler) WorkPendingApply(ctx context.Context, obj client.Object) bool { - var work fleetv1beta1.Work - ns := obj.GetNamespace() - name := obj.GetName() - err := h.Client.Get(ctx, client.ObjectKey{ - Namespace: ns, - Name: name, - }, &work) - if err != nil { - // Log and return - klog.ErrorS(err, "Failed to get the work", "name", name, "ns", ns) - return true - } - availCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable) - appliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied) - - if availCond != nil && appliedCond != nil { - // check if the object has been recently modified - availCondLastUpdatedTime := availCond.LastTransitionTime.Time - appliedCondLastUpdatedTime := appliedCond.LastTransitionTime.Time - if time.Since(availCondLastUpdatedTime) < workAgeToReconcile || time.Since(appliedCondLastUpdatedTime) < workAgeToReconcile { - return true - } - } - - if condition.IsConditionStatusTrue(availCond, work.GetGeneration()) && - condition.IsConditionStatusTrue(appliedCond, work.GetGeneration()) { - return false - } - - // Work not yet applied - return true -} - -func (h *PriorityQueueEventHandler) AddToPriorityQueue(ctx context.Context, obj client.Object, alwaysAdd bool) { - req := reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - } - - objAge := time.Since(obj.GetCreationTimestamp().Time) - - var objPriority int - if alwaysAdd || objAge < workAgeToReconcile || h.WorkPendingApply(ctx, obj) { - // Newer or pending objects get higher priority - // Negate the Unix timestamp to give higher priority to newer timestamps - objPriority = -int(time.Now().Unix()) - } else { - // skip adding older objects with no changes - klog.V(2).InfoS("adding old item to priorityQueueItem", "obj", req.Name, "age", objAge) - objPriority = int(obj.GetCreationTimestamp().Unix()) - } - - // Create the custom priorityQueueItem with the request and priority - item := priorityQueueItem{ - Request: req, - Priority: objPriority, - } - - h.Queue.Add(item) - klog.V(2).InfoS("Created PriorityQueueItem", "priority", objPriority, "obj", req.Name, "queue size", h.Queue.Len()) -} - -func (h *PriorityQueueEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.AddToPriorityQueue(ctx, evt.Object, false) -} - -func (h *PriorityQueueEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.AddToPriorityQueue(ctx, evt.Object, true) -} - -func (h *PriorityQueueEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - // Ignore updates where only the status changed - oldObj := evt.ObjectOld.DeepCopyObject() - newObj := evt.ObjectNew.DeepCopyObject() - - // Zero out the status - if oldWork, ok := oldObj.(*fleetv1beta1.Work); ok { - oldWork.Status = fleetv1beta1.WorkStatus{} - } - if newWork, ok := newObj.(*fleetv1beta1.Work); ok { - newWork.Status = fleetv1beta1.WorkStatus{} - } - - if !equality.Semantic.DeepEqual(oldObj, newObj) { - // ignore status changes to prevent noise - h.AddToPriorityQueue(ctx, evt.ObjectNew, true) - return - } - klog.V(4).InfoS("ignoring update event with only status change", "work", evt.ObjectNew.GetName()) -} - -func (h *PriorityQueueEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.AddToPriorityQueue(ctx, evt.Object, false) -} - var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter = NewRequeueMultiStageWithExponentialBackoffRateLimiter( // Allow 1 attempt of fixed delay; this helps give objects a bit of headroom to get available (or have // diffs reported). 1, - // Use a fixed delay of 5 seconds for the first two attempts. + // Use a fixed delay of 5 seconds for the first attempt. // // Important (chenyu1): before the introduction of the requeue rate limiter, the work // applier uses static requeue intervals, specifically 5 seconds (if the work object is unavailable), @@ -216,19 +97,24 @@ var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimite // Reconciler reconciles a Work object. type Reconciler struct { - hubClient client.Client - workNameSpace string - spokeDynamicClient dynamic.Interface - spokeClient client.Client - restMapper meta.RESTMapper - recorder record.EventRecorder - concurrentReconciles int - watchWorkWithPriorityQueue bool - watchWorkReconcileAgeMinutes int - deletionWaitTime time.Duration - joined *atomic.Bool - parallelizer parallelizerutil.Parallelizer - requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter + hubClient client.Client + workNameSpace string + spokeDynamicClient dynamic.Interface + spokeClient client.Client + restMapper meta.RESTMapper + recorder record.EventRecorder + concurrentReconciles int + deletionWaitTime time.Duration + joined *atomic.Bool + parallelizer parallelizerutil.Parallelizer + requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter + usePriorityQueue bool + workObjAgeForPrioritizedProcessing time.Duration + // The custom priority queue in use if the option watchWorkWithPriorityQueue is enabled. + // + // Note that this variable is set only after the controller starts. + pq priorityqueue.PriorityQueue[reconcile.Request] + pqSetupOnce sync.Once } // NewReconciler returns a new Work object reconciler for the work applier. @@ -239,9 +125,9 @@ func NewReconciler( concurrentReconciles int, parallelizer parallelizerutil.Parallelizer, deletionWaitTime time.Duration, - watchWorkWithPriorityQueue bool, - watchWorkReconcileAgeMinutes int, requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter, + usePriorityQueue bool, + workObjAgeForPrioritizedProcessing time.Duration, ) *Reconciler { if requeueRateLimiter == nil { klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter") @@ -252,23 +138,37 @@ func NewReconciler( parallelizer = parallelizerutil.NewParallelizer(1) } + woAgeForPrioritizedProcessing := workObjAgeForPrioritizedProcessing + if usePriorityQueue && woAgeForPrioritizedProcessing < minWorkObjAgeForPrioritizedQueueing { + klog.V(2).InfoS("Work object age for prioritized processing is too short; set to the longer default", "workObjAgeForPrioritizedProcessing", woAgeForPrioritizedProcessing) + woAgeForPrioritizedProcessing = minWorkObjAgeForPrioritizedQueueing + } + return &Reconciler{ - hubClient: hubClient, - spokeDynamicClient: spokeDynamicClient, - spokeClient: spokeClient, - restMapper: restMapper, - recorder: recorder, - concurrentReconciles: concurrentReconciles, - parallelizer: parallelizer, - watchWorkWithPriorityQueue: watchWorkWithPriorityQueue, - watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes, - workNameSpace: workNameSpace, - joined: atomic.NewBool(false), - deletionWaitTime: deletionWaitTime, - requeueRateLimiter: requeueRateLimiter, + hubClient: hubClient, + spokeDynamicClient: spokeDynamicClient, + spokeClient: spokeClient, + restMapper: restMapper, + recorder: recorder, + concurrentReconciles: concurrentReconciles, + parallelizer: parallelizer, + workNameSpace: workNameSpace, + joined: atomic.NewBool(false), + deletionWaitTime: deletionWaitTime, + requeueRateLimiter: requeueRateLimiter, + usePriorityQueue: usePriorityQueue, + workObjAgeForPrioritizedProcessing: woAgeForPrioritizedProcessing, } } +// PriorityQueue returns the priority queue (if any) in use by the reconciler. +// +// Note that the priority queue is only set after the reconciler starts (i.e., the work applier +// has been set up with the controller manager). +func (r *Reconciler) PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] { + return r.pq +} + type ManifestProcessingApplyOrReportDiffResultType string const ( @@ -728,22 +628,29 @@ func (r *Reconciler) Leave(ctx context.Context) error { // SetupWithManager wires up the controller. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { - // Create the priority queue using the rate limiter and a queue name - queue := priorityqueue.New[priorityQueueItem]("apply-work-queue") + if r.usePriorityQueue { + eventHandler := &priorityBasedWorkObjEventHandler{ + qm: r, + workObjAgeForPrioritizedProcessing: r.workObjAgeForPrioritizedProcessing, + } - // Create the event handler that uses the priority queue - eventHandler := &PriorityQueueEventHandler{ - Queue: queue, // Attach the priority queue to the event handler - Client: r.hubClient, - } + newPQ := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { + withRateLimiterOpt := func(opts *priorityqueue.Opts[reconcile.Request]) { + opts.RateLimiter = rateLimiter + } + r.pqSetupOnce.Do(func() { + r.pq = priorityqueue.New(controllerName, withRateLimiterOpt) + }) + return r.pq + } - if r.watchWorkWithPriorityQueue { - workAgeToReconcile = time.Duration(r.watchWorkReconcileAgeMinutes) * time.Minute return ctrl.NewControllerManagedBy(mgr).Named("work-applier-controller"). WithOptions(ctrloption.Options{ MaxConcurrentReconciles: r.concurrentReconciles, + NewQueue: newPQ, }). For(&fleetv1beta1.Work{}). + // Use custom event handler to allow access to the priority queue interface. Watches(&fleetv1beta1.Work{}, eventHandler). Complete(r) } diff --git a/pkg/controllers/workapplier/pq.go b/pkg/controllers/workapplier/pq.go new file mode 100644 index 000000000..6f30a28ad --- /dev/null +++ b/pkg/controllers/workapplier/pq.go @@ -0,0 +1,229 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workapplier + +import ( + "context" + "fmt" + "time" + + "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" + "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +const ( + // A list of priority levels and their targets for the work applier priority queue. + // + // The work applier, when a priority queue is in use, will prioritize requests in the following + // order: + // * with highest priority (-2): all Create/Delete events, and all Update events + // that concern recently created Work objects or Work objects that are in a failed/undeterminted + // state (apply op/availability check failure, or diff reporting failure). + // * with medium priority (-1): all other Update events. + // * with default priority (0): all requeues (with or with errors), and all Generic events. + // + // Note that requests with the same priority level will be processed in the FIFO order. + // + // TO-DO (chenyu1): evaluate if/how we need to/should prioritize requeues properly. + highPriorityLevel = 2 + mediumPriorityLevel = 1 + defaultPriorityLevel = 0 +) + +type CustomPriorityQueueManager interface { + PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] +} + +var _ handler.TypedEventHandler[client.Object, reconcile.Request] = &priorityBasedWorkObjEventHandler{} + +// priorityBasedWorkObjEventHandler implements the TypedEventHandler interface. +// +// It is used to process work object events in a priority-based manner with a priority queue. +type priorityBasedWorkObjEventHandler struct { + qm CustomPriorityQueueManager + workObjAgeForPrioritizedProcessing time.Duration +} + +// Create implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Create(_ context.Context, createEvent event.TypedCreateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + // + // Normally when this method is called, the priority queue has been initialized. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received a Create event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Create event") + return + } + + // Enqueue the request with high priority. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(highPriorityLevel), + } + workObjName := createEvent.Object.GetName() + workObjNS := createEvent.Object.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +// Delete implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Delete(_ context.Context, deleteEvent event.TypedDeleteEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received a Delete event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Delete event") + return + } + + // Enqueue the request with high priority. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(highPriorityLevel), + } + workObjName := deleteEvent.Object.GetName() + workObjNS := deleteEvent.Object.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +// Update implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Update(_ context.Context, updateEvent event.TypedUpdateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received an Update event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Update event") + return + } + + // Ignore status only updates. + if updateEvent.ObjectOld.GetGeneration() == updateEvent.ObjectNew.GetGeneration() { + return + } + + oldWorkObj, oldOK := updateEvent.ObjectOld.(*fleetv1beta1.Work) + newWorkObj, newOK := updateEvent.ObjectNew.(*fleetv1beta1.Work) + if !oldOK || !newOK { + wrappedErr := fmt.Errorf("received an Update event, but the objects cannot be cast to Work objects") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Update event") + return + } + + pri := h.determineUpdateEventPriority(oldWorkObj, newWorkObj) + opts := priorityqueue.AddOpts{ + Priority: ptr.To(pri), + } + workObjName := newWorkObj.GetName() + workObjNS := newWorkObj.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +// Generic implements the TypedEventHandler interface. +func (h *priorityBasedWorkObjEventHandler) Generic(_ context.Context, genericEvent event.TypedGenericEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // Do a sanity check. + if h.qm.PriorityQueue() == nil { + wrappedErr := fmt.Errorf("received a Generic event, but the priority queue is not initialized") + _ = controller.NewUnexpectedBehaviorError(wrappedErr) + klog.ErrorS(wrappedErr, "Failed to process Generic event") + return + } + + // Enqueue the request with default priority. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(defaultPriorityLevel), + } + workObjName := genericEvent.Object.GetName() + workObjNS := genericEvent.Object.GetNamespace() + req := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: workObjNS, + Name: workObjName, + }, + } + h.qm.PriorityQueue().AddWithOpts(opts, req) +} + +func (h *priorityBasedWorkObjEventHandler) determineUpdateEventPriority(oldWorkObj, newWorkObj *fleetv1beta1.Work) int { + // If the work object is recently created (its age is within the given threshold), + // process its Update event with high priority. + + // The age is expected to be the same for both old and new work objects, as the field + // is immutable and not user configurable. + workObjAge := time.Since(newWorkObj.CreationTimestamp.Time) + if workObjAge <= h.workObjAgeForPrioritizedProcessing { + return highPriorityLevel + } + + // Check if the work object is in a failed/undetermined state. + oldApplyStrategy := oldWorkObj.Spec.ApplyStrategy + isReportDiffModeEnabled := oldApplyStrategy != nil && oldApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff + + appliedCond := meta.FindStatusCondition(oldWorkObj.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied) + availableCond := meta.FindStatusCondition(oldWorkObj.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable) + diffReportedCond := meta.FindStatusCondition(oldWorkObj.Status.Conditions, fleetv1beta1.WorkConditionTypeDiffReported) + + // Note (chenyu1): it might be true that the Update event involves an apply strategy change; however, the prioritization + // logic stays the same: if the old work object is in a failed/undetermined state, the apply strategy change + // should receive the highest priority; otherwise, the Update event should be processed with medium priority. + switch { + case isReportDiffModeEnabled && condition.IsConditionStatusTrue(diffReportedCond, oldWorkObj.Generation): + // The ReportDiff mode is enabled and the status suggests that the diff reporting has been completed successfully. + // Use medium priority for the Update event. + return mediumPriorityLevel + case isReportDiffModeEnabled: + // The ReportDiff mode is enabled, but the diff reporting has not been completed yet or has failed. + // Use high priority for the Update event. + return highPriorityLevel + case condition.IsConditionStatusTrue(appliedCond, oldWorkObj.Generation) && condition.IsConditionStatusTrue(availableCond, oldWorkObj.Generation): + // The apply strategy is set to the CSA/SSA mode and the work object is applied and available. + // Use medium priority for the Update event. + return mediumPriorityLevel + default: + // The apply strategy is set to the CSA/SSA mode and the work object is in a failed/undetermined state. + // Use high priority for the Update event. + return highPriorityLevel + } +} diff --git a/pkg/controllers/workapplier/pq_test.go b/pkg/controllers/workapplier/pq_test.go new file mode 100644 index 000000000..4932a647b --- /dev/null +++ b/pkg/controllers/workapplier/pq_test.go @@ -0,0 +1,604 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workapplier + +import ( + "context" + "fmt" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/google/go-cmp/cmp" + fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +const ( + workObjAgeForPrioritizedProcessingTestOnly = time.Minute * 5 + + pqName = "test-pq" + workNameForPriorityTestingTmpl = "prioritized-work-%s" +) + +type pqWrapper struct { + pq priorityqueue.PriorityQueue[reconcile.Request] +} + +// PriorityQueue implements the CustomPriorityQueueManager interface. +func (p *pqWrapper) PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] { + return p.pq +} + +// Verify that pqWrapper implements the CustomPriorityQueueManager interface. +var _ CustomPriorityQueueManager = &pqWrapper{} + +// ExpectedDequeuedKeyAndPriority is used in tests to represent an expected dequeued key along with its priority. +type ExpectedDequeuedKeyAndPriority struct { + Key reconcile.Request + Priority int +} + +// TestCreateEventHandler tests the Create event handler of the priority-based Work object event handler. +func TestCreateEventHandler(t *testing.T) { + ctx := context.Background() + pq := priorityqueue.New[reconcile.Request](pqName) + pqEventHandler := &priorityBasedWorkObjEventHandler{ + qm: &pqWrapper{pq: pq}, + workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + } + + // Add two keys with medium and default priority levels respectively. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(mediumPriorityLevel), + } + workWithMediumPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "medium") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithMediumPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + opts = priorityqueue.AddOpts{ + Priority: ptr.To(defaultPriorityLevel), + } + workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") + key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + // Handle a CreateEvent, which should add a new key with high priority. + workJustCreatedName := fmt.Sprintf(workNameForPriorityTestingTmpl, "just-created") + workObj := fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workJustCreatedName, + }, + } + pqEventHandler.Create(ctx, event.TypedCreateEvent[client.Object]{Object: &workObj}, nil) + + // Check the queue length. + if !cmp.Equal(pq.Len(), 3) { + t.Fatalf("priority queue length, expected 3, got %d", pq.Len()) + } + + // Check if the first item dequeued is the one added by the CreateEvent handler (high priority). + item, pri, _ := pq.GetWithPriority() + wantItem := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workJustCreatedName, + }, + } + if diff := cmp.Diff(item, wantItem); diff != "" { + t.Errorf("dequeued item mismatch (-got, +want):\n%s", diff) + } + if !cmp.Equal(pri, highPriorityLevel) { + t.Errorf("priority of dequeued item, expected %d, got %d", highPriorityLevel, pri) + } +} + +// TestUpdateEventHandler_NormalOps tests the Update event handler of the priority-based Work object event handler +// under normal operations. +func TestUpdateEventHandler_NormalOps(t *testing.T) { + ctx := context.Background() + pq := priorityqueue.New[reconcile.Request](pqName) + pqEventHandler := &priorityBasedWorkObjEventHandler{ + qm: &pqWrapper{pq: pq}, + workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + } + + // Add a key with default priority levels respectively. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(defaultPriorityLevel), + } + workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + // Handle an UpdateEvent that concerns a Work object with ReportDiff strategy and has been + // processed successfully long before (>5 minutes ago). + workInReportDiffModeAndProcessedLongBfrName := fmt.Sprintf(workNameForPriorityTestingTmpl, "report-diff-processed-long-bfr") + longAgo := time.Now().Add(-time.Minute * 10) + oldWorkObj := &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workInReportDiffModeAndProcessedLongBfrName, + CreationTimestamp: metav1.Time{Time: longAgo}, + }, + Spec: fleetv1beta1.WorkSpec{ + ApplyStrategy: &fleetv1beta1.ApplyStrategy{ + Type: fleetv1beta1.ApplyStrategyTypeReportDiff, + }, + }, + Status: fleetv1beta1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeDiffReported, + Status: metav1.ConditionTrue, + }, + }, + }, + } + newWorkObj := oldWorkObj.DeepCopy() + // Simulate an update. + newWorkObj.Generation += 1 + pqEventHandler.Update(ctx, event.TypedUpdateEvent[client.Object]{ObjectOld: oldWorkObj, ObjectNew: newWorkObj}, nil) + + // Handle an UpdateEvent that concerns a normal Work object that was created very recently (<5 minutes ago). + workInCSAModeAndJustProcessedName := fmt.Sprintf(workNameForPriorityTestingTmpl, "csa-just-processed") + shortWhileAgo := time.Now().Add(-time.Minute * 2) + oldWorkObj = &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workInCSAModeAndJustProcessedName, + CreationTimestamp: metav1.Time{Time: shortWhileAgo}, + }, + Spec: fleetv1beta1.WorkSpec{}, + Status: fleetv1beta1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + }, + }, + }, + } + newWorkObj = oldWorkObj.DeepCopy() + // Simulate an update. + newWorkObj.Generation += 1 + pqEventHandler.Update(ctx, event.TypedUpdateEvent[client.Object]{ObjectOld: oldWorkObj, ObjectNew: newWorkObj}, nil) + + // Check the queue length. + if !cmp.Equal(pq.Len(), 3) { + t.Fatalf("priority queue length, expected 3, got %d", pq.Len()) + } + + // Dequeue all items and check if the keys and their assigned priorities are as expected. + wantDequeuedItemsWithPriorities := []ExpectedDequeuedKeyAndPriority{ + { + Key: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workInCSAModeAndJustProcessedName, + }, + }, + Priority: highPriorityLevel, + }, + { + Key: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workInReportDiffModeAndProcessedLongBfrName, + }, + }, + Priority: mediumPriorityLevel, + }, + { + Key: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workWithDefaultPriName, + }, + }, + Priority: defaultPriorityLevel, + }, + } + + for i := 0; i < 3; i++ { + item, pri, _ := pq.GetWithPriority() + wantItemWithPri := wantDequeuedItemsWithPriorities[i] + if diff := cmp.Diff(item, wantItemWithPri.Key); diff != "" { + t.Errorf("dequeued item #%d mismatch (-got, +want):\n%s", i, diff) + } + if !cmp.Equal(pri, wantItemWithPri.Priority) { + t.Errorf("priority of dequeued item #%d, expected %d, got %d", i, wantItemWithPri.Priority, pri) + } + } +} + +// TestUpdateEventHandler_Erred tests the Update event handler of the priority-based Work object event handler +// when it encounters errors. +func TestUpdateEventHandler_Erred(t *testing.T) { + ctx := context.Background() + + workObj := &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: fmt.Sprintf(workNameForPriorityTestingTmpl, "erred"), + }, + } + statusOnlyUpdateEvent := event.TypedUpdateEvent[client.Object]{ + ObjectOld: workObj, + ObjectNew: workObj.DeepCopy(), + } + + nsObj := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: nsName, + Generation: 1, + }, + } + invalidUpdateEvent1 := event.TypedUpdateEvent[client.Object]{ + ObjectOld: nsObj, + ObjectNew: workObj, + } + invalidUpdateEvent2 := event.TypedUpdateEvent[client.Object]{ + ObjectOld: workObj, + ObjectNew: nsObj, + } + + testCases := []struct { + name string + updateEvent event.TypedUpdateEvent[client.Object] + }{ + { + name: "status only update", + updateEvent: statusOnlyUpdateEvent, + }, + { + // Normally this should never occur. + name: "invalid update event with the old object not being a Work", + updateEvent: invalidUpdateEvent1, + }, + { + // Normally this should never occur. + name: "invalid update event with the new object not being a Work", + updateEvent: invalidUpdateEvent2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pq := priorityqueue.New[reconcile.Request](pqName) + pqEventHandler := &priorityBasedWorkObjEventHandler{ + qm: &pqWrapper{pq: pq}, + workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + } + pqEventHandler.Update(ctx, tc.updateEvent, nil) + + // Check the queue length. + if !cmp.Equal(pq.Len(), 0) { + t.Fatalf("priority queue length, expected 0, got %d", pq.Len()) + } + }) + } +} + +// TestDeleteEventHandler tests the Delete event handler of the priority-based Work object event handler. +func TestDeleteEventHandler(t *testing.T) { + ctx := context.Background() + pq := priorityqueue.New[reconcile.Request](pqName) + pqEventHandler := &priorityBasedWorkObjEventHandler{ + qm: &pqWrapper{pq: pq}, + workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + } + + // Add two keys with medium and default priority levels respectively. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(mediumPriorityLevel), + } + workWithMediumPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "medium") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithMediumPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + opts = priorityqueue.AddOpts{ + Priority: ptr.To(defaultPriorityLevel), + } + workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") + key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + // Handle a DeleteEvent, which should add a new key with high priority. + workJustDeletedName := fmt.Sprintf(workNameForPriorityTestingTmpl, "just-deleted") + workObj := fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workJustDeletedName, + }, + } + pqEventHandler.Delete(ctx, event.TypedDeleteEvent[client.Object]{Object: &workObj}, nil) + + // Check the queue length. + if !cmp.Equal(pq.Len(), 3) { + t.Fatalf("priority queue length, expected 3, got %d", pq.Len()) + } + + // Check if the first item dequeued is the one added by the DeleteEvent handler (high priority). + item, pri, _ := pq.GetWithPriority() + wantItem := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workJustDeletedName, + }, + } + if diff := cmp.Diff(item, wantItem); diff != "" { + t.Errorf("dequeued item mismatch (-got, +want):\n%s", diff) + } + if !cmp.Equal(pri, highPriorityLevel) { + t.Errorf("priority of dequeued item, expected %d, got %d", highPriorityLevel, pri) + } +} + +// TestGenericEventHandler tests the Generic event handler of the priority-based Work object event handler. +func TestGenericEventHandler(t *testing.T) { + ctx := context.Background() + pq := priorityqueue.New[reconcile.Request](pqName) + pqEventHandler := &priorityBasedWorkObjEventHandler{ + qm: &pqWrapper{pq: pq}, + workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + } + + // Add two keys with high and medium priority levels respectively. + opts := priorityqueue.AddOpts{ + Priority: ptr.To(highPriorityLevel), + } + workWithHighPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "high") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithHighPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + opts = priorityqueue.AddOpts{ + Priority: ptr.To(mediumPriorityLevel), + } + workWithMediumPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "medium") + key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithMediumPriName} + pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) + + // Handle a GenericEvent, which should add a new key with default priority. + workGenericEventName := fmt.Sprintf(workNameForPriorityTestingTmpl, "generic") + workObj := fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workGenericEventName, + }, + } + pqEventHandler.Generic(ctx, event.TypedGenericEvent[client.Object]{Object: &workObj}, nil) + + // Check the queue length. + if !cmp.Equal(pq.Len(), 3) { + t.Fatalf("priority queue length, expected 3, got %d", pq.Len()) + } + + // Dequeue all items and check if the keys and their assigned priorities are as expected. + wantDequeuedItemsWithPriorities := []ExpectedDequeuedKeyAndPriority{ + { + Key: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workWithHighPriName, + }, + }, + Priority: highPriorityLevel, + }, + { + Key: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workWithMediumPriName, + }, + }, + Priority: mediumPriorityLevel, + }, + { + Key: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: memberReservedNSName1, + Name: workGenericEventName, + }, + }, + Priority: defaultPriorityLevel, + }, + } + + for i := 0; i < 3; i++ { + item, pri, _ := pq.GetWithPriority() + wantItemWithPri := wantDequeuedItemsWithPriorities[i] + if diff := cmp.Diff(item, wantItemWithPri.Key); diff != "" { + t.Errorf("dequeued item #%d mismatch (-got, +want):\n%s", i, diff) + } + if !cmp.Equal(pri, wantItemWithPri.Priority) { + t.Errorf("priority of dequeued item #%d, expected %d, got %d", i, wantItemWithPri.Priority, pri) + } + } +} + +func TestDetermineUpdateEventPriority(t *testing.T) { + now := metav1.Now() + longAgo := metav1.NewTime(now.Add(-time.Minute * 10)) + + pq := priorityqueue.New[reconcile.Request](pqName) + pqEventHandler := &priorityBasedWorkObjEventHandler{ + qm: &pqWrapper{pq: pq}, + workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + } + + testCases := []struct { + name string + oldWorkObj *fleetv1beta1.Work + newWorkObj *fleetv1beta1.Work + wantPriority int + }{ + { + name: "fresh work object", + newWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: now, + }, + }, + wantPriority: highPriorityLevel, + }, + { + name: "reportDiff mode, diff reported", + oldWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + Spec: fleetv1beta1.WorkSpec{ + ApplyStrategy: &fleetv1beta1.ApplyStrategy{ + Type: fleetv1beta1.ApplyStrategyTypeReportDiff, + }, + }, + Status: fleetv1beta1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeDiffReported, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + newWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + }, + wantPriority: mediumPriorityLevel, + }, + { + name: "reportDiff mode, diff not reported", + oldWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + Spec: fleetv1beta1.WorkSpec{ + ApplyStrategy: &fleetv1beta1.ApplyStrategy{ + Type: fleetv1beta1.ApplyStrategyTypeReportDiff, + }, + }, + Status: fleetv1beta1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeDiffReported, + Status: metav1.ConditionFalse, + }, + }, + }, + }, + newWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + }, + wantPriority: highPriorityLevel, + }, + { + name: "CSA/SSA mode, applied and available", + oldWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + Spec: fleetv1beta1.WorkSpec{}, + Status: fleetv1beta1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + newWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + }, + wantPriority: mediumPriorityLevel, + }, + { + name: "CSA/SSA mode, not applied and available", + oldWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + Spec: fleetv1beta1.WorkSpec{}, + Status: fleetv1beta1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionFalse, + }, + }, + }, + }, + newWorkObj: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: memberReservedNSName1, + Name: workName, + CreationTimestamp: longAgo, + }, + }, + wantPriority: highPriorityLevel, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pri := pqEventHandler.determineUpdateEventPriority(tc.oldWorkObj, tc.newWorkObj) + if !cmp.Equal(pri, tc.wantPriority) { + t.Errorf("determined priority, expected %d, got %d", tc.wantPriority, pri) + } + }) + } +} diff --git a/pkg/controllers/workapplier/suite_test.go b/pkg/controllers/workapplier/suite_test.go index ecab1d12f..ac3c0c650 100644 --- a/pkg/controllers/workapplier/suite_test.go +++ b/pkg/controllers/workapplier/suite_test.go @@ -277,9 +277,9 @@ var _ = BeforeSuite(func() { maxConcurrentReconciles, parallelizer.NewParallelizer(workerCount), 30*time.Second, - true, - 60, - nil, // Use the default backoff rate limiter. + nil, // Use the default backoff rate limiter. + false, // Disable priority queueing. + 0, ) Expect(workApplier1.SetupWithManager(hubMgr1)).To(Succeed()) @@ -326,9 +326,9 @@ var _ = BeforeSuite(func() { maxConcurrentReconciles, parallelizer.NewParallelizer(workerCount), 30*time.Second, - true, - 60, superLongExponentialBackoffRateLimiter, + false, // Disable priority queueing. + 0, ) // Due to name conflicts, the second work applier must be set up manually. err = ctrl.NewControllerManagedBy(hubMgr2).Named("work-applier-controller-duplicate"). @@ -370,9 +370,9 @@ var _ = BeforeSuite(func() { maxConcurrentReconciles, pWithDelay, 30*time.Second, - true, - 60, - nil, // Use the default backoff rate limiter. + nil, // Use the default backoff rate limiter. + false, // Disable priority queueing. + 0, ) // Due to name conflicts, the third work applier must be set up manually. err = ctrl.NewControllerManagedBy(hubMgr3).Named("work-applier-controller-waved-parallel-processing"). From 28fe672debf2ceb6dbacc2739b3e960793ad412f Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Fri, 5 Dec 2025 01:14:50 +0800 Subject: [PATCH 2/3] Minor fixes Signed-off-by: michaelawyu --- pkg/controllers/workapplier/pq.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/workapplier/pq.go b/pkg/controllers/workapplier/pq.go index 6f30a28ad..caceaebcd 100644 --- a/pkg/controllers/workapplier/pq.go +++ b/pkg/controllers/workapplier/pq.go @@ -42,10 +42,10 @@ const ( // // The work applier, when a priority queue is in use, will prioritize requests in the following // order: - // * with highest priority (-2): all Create/Delete events, and all Update events + // * with highest priority (2): all Create/Delete events, and all Update events // that concern recently created Work objects or Work objects that are in a failed/undeterminted // state (apply op/availability check failure, or diff reporting failure). - // * with medium priority (-1): all other Update events. + // * with medium priority (1): all other Update events. // * with default priority (0): all requeues (with or with errors), and all Generic events. // // Note that requests with the same priority level will be processed in the FIFO order. From da45d2149e1b2f351bf6b203c75f79be49024af8 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Fri, 19 Dec 2025 01:01:33 +1100 Subject: [PATCH 3/3] Minor fixes Signed-off-by: michaelawyu --- cmd/memberagent/main.go | 47 +++-- .../v1beta1/member_suite_test.go | 4 +- pkg/controllers/workapplier/controller.go | 78 ++++---- pkg/controllers/workapplier/pq.go | 87 +++++--- pkg/controllers/workapplier/pq_test.go | 185 +++++++++++------- pkg/controllers/workapplier/suite_test.go | 9 +- 6 files changed, 249 insertions(+), 161 deletions(-) diff --git a/cmd/memberagent/main.go b/cmd/memberagent/main.go index 94b217408..169b9394c 100644 --- a/cmd/memberagent/main.go +++ b/cmd/memberagent/main.go @@ -77,21 +77,19 @@ var ( "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.") // TODO(weiweng): only keep enableV1Alpha1APIs for backward compatibility with helm charts. Remove soon. - enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.") - enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.") - propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.") - region = flag.String("region", "", "The region where the member cluster resides.") - cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.") - watchWorkWithPriorityQueue = flag.Bool("enable-watch-work-with-priority-queue", false, "If set, the apply_work controller will watch/reconcile work objects that are created new or have recent updates") - watchWorkReconcileAgeMinutes = flag.Int("watch-work-reconcile-age", 60, "maximum age (in minutes) of work objects for apply_work controller to watch/reconcile") - deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference") - enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling") - pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling") - hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling") - hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") - memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.") + enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.") + propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.") + region = flag.String("region", "", "The region where the member cluster resides.") + cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.") + deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference") + enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling") + pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling") + hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling") + hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") + memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.") // Work applier requeue rate limiter settings. workApplierRequeueRateLimiterAttemptsWithFixedDelay = flag.Int("work-applier-requeue-rate-limiter-attempts-with-fixed-delay", 1, "If set, the work applier will requeue work objects with a fixed delay for the specified number of attempts before switching to exponential backoff.") @@ -102,6 +100,12 @@ var ( workApplierRequeueRateLimiterExponentialBaseForFastBackoff = flag.Float64("work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff", 1.5, "If set, the work applier will start to back off fast at this factor after it completes the slow backoff stage, until it reaches the fast backoff delay cap. Its value should be larger than the base value for the slow backoff stage.") workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds = flag.Float64("work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds", 900, "If set, the work applier will not back off longer than this value in seconds when it is in the fast backoff stage.") workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs = flag.Bool("work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs", true, "If set, the rate limiter will skip the slow backoff stage and start fast backoff immediately for work objects that are available or have diff reported.") + + // Work applier priority queue settings. + enableWorkApplierPriorityQueue = flag.Bool("enable-work-applier-priority-queue", false, "If set, the work applier will use a priority queue to process work objects.") + workApplierPriorityLinearEquationCoeffA = flag.Int("work-applier-priority-linear-equation-coeff-a", -3, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient A in the equation.") + workApplierPriorityLinearEquationCoeffB = flag.Int("work-applier-priority-linear-equation-coeff-b", 100, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient B in the equation.") + // Azure property provider feature gates. isAzProviderCostPropertiesEnabled = flag.Bool("use-cost-properties-in-azure-provider", true, "If set, the Azure property provider will expose cost properties in the member cluster.") isAzProviderAvailableResPropertiesEnabled = flag.Bool("use-available-res-properties-in-azure-provider", true, "If set, the Azure property provider will expose available resources properties in the member cluster.") @@ -133,6 +137,13 @@ func main() { klog.ErrorS(errors.New("either enable-v1alpha1-apis or enable-v1beta1-apis is required"), "Invalid APIs flags") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } + // TO-DO (chenyu1): refactor the validation logic. + if workApplierPriorityLinearEquationCoeffA == nil || *workApplierPriorityLinearEquationCoeffA >= 0 { + klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffA is set incorrectly; must use a value less than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffA") + } + if workApplierPriorityLinearEquationCoeffB == nil || *workApplierPriorityLinearEquationCoeffB <= 0 { + klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffB is set incorrectly; must use a value greater than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffB") + } hubURL := os.Getenv("HUB_SERVER_URL") @@ -413,7 +424,6 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb *workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs, ) - workObjAgeForPrioritizedProcessing := time.Minute * time.Duration(*watchWorkReconcileAgeMinutes) workApplier := workapplier.NewReconciler( hubMgr.GetClient(), targetNS, @@ -428,8 +438,9 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers), time.Minute*time.Duration(*deletionWaitTime), requeueRateLimiter, - *watchWorkWithPriorityQueue, - workObjAgeForPrioritizedProcessing, + *enableWorkApplierPriorityQueue, + workApplierPriorityLinearEquationCoeffA, + workApplierPriorityLinearEquationCoeffB, ) if err = workApplier.SetupWithManager(hubMgr); err != nil { diff --git a/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go b/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go index 8af999722..6869d78ef 100644 --- a/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go +++ b/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() { // This controller is created for testing purposes only; no reconciliation loop is actually // run. - workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0) + workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil) propertyProvider1 = &manuallyUpdatedProvider{} member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1) @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() { // This controller is created for testing purposes only; no reconciliation loop is actually // run. - workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0) + workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil) member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index 0831e66fe..6d03a2fe8 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -52,8 +52,6 @@ import ( const ( patchDetailPerObjLimit = 100 - - minWorkObjAgeForPrioritizedQueueing = time.Minute * 30 ) const ( @@ -97,24 +95,25 @@ var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimite // Reconciler reconciles a Work object. type Reconciler struct { - hubClient client.Client - workNameSpace string - spokeDynamicClient dynamic.Interface - spokeClient client.Client - restMapper meta.RESTMapper - recorder record.EventRecorder - concurrentReconciles int - deletionWaitTime time.Duration - joined *atomic.Bool - parallelizer parallelizerutil.Parallelizer - requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter - usePriorityQueue bool - workObjAgeForPrioritizedProcessing time.Duration + hubClient client.Client + workNameSpace string + spokeDynamicClient dynamic.Interface + spokeClient client.Client + restMapper meta.RESTMapper + recorder record.EventRecorder + concurrentReconciles int + deletionWaitTime time.Duration + joined *atomic.Bool + parallelizer parallelizerutil.Parallelizer + requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter + usePriorityQueue bool // The custom priority queue in use if the option watchWorkWithPriorityQueue is enabled. // // Note that this variable is set only after the controller starts. - pq priorityqueue.PriorityQueue[reconcile.Request] - pqSetupOnce sync.Once + pq priorityqueue.PriorityQueue[reconcile.Request] + priLinearEqCoeffA int + priLinearEqCoeffB int + pqSetupOnce sync.Once } // NewReconciler returns a new Work object reconciler for the work applier. @@ -127,7 +126,8 @@ func NewReconciler( deletionWaitTime time.Duration, requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter, usePriorityQueue bool, - workObjAgeForPrioritizedProcessing time.Duration, + priorityLinearEquationCoeffA *int, + priorityLinearEquationCoeffB *int, ) *Reconciler { if requeueRateLimiter == nil { klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter") @@ -137,27 +137,28 @@ func NewReconciler( klog.V(2).InfoS("parallelizer is not set; using the default parallelizer with a worker count of 1") parallelizer = parallelizerutil.NewParallelizer(1) } - - woAgeForPrioritizedProcessing := workObjAgeForPrioritizedProcessing - if usePriorityQueue && woAgeForPrioritizedProcessing < minWorkObjAgeForPrioritizedQueueing { - klog.V(2).InfoS("Work object age for prioritized processing is too short; set to the longer default", "workObjAgeForPrioritizedProcessing", woAgeForPrioritizedProcessing) - woAgeForPrioritizedProcessing = minWorkObjAgeForPrioritizedQueueing + if priorityLinearEquationCoeffA == nil || priorityLinearEquationCoeffB == nil { + // Use the default settings if either co-efficient is not set for correctness reasons. + klog.V(2).InfoS("priority linear equation coefficients are not set; using the default settings") + priorityLinearEquationCoeffA = ptr.To(-3) + priorityLinearEquationCoeffB = ptr.To(int(highestPriorityLevel)) } return &Reconciler{ - hubClient: hubClient, - spokeDynamicClient: spokeDynamicClient, - spokeClient: spokeClient, - restMapper: restMapper, - recorder: recorder, - concurrentReconciles: concurrentReconciles, - parallelizer: parallelizer, - workNameSpace: workNameSpace, - joined: atomic.NewBool(false), - deletionWaitTime: deletionWaitTime, - requeueRateLimiter: requeueRateLimiter, - usePriorityQueue: usePriorityQueue, - workObjAgeForPrioritizedProcessing: woAgeForPrioritizedProcessing, + hubClient: hubClient, + spokeDynamicClient: spokeDynamicClient, + spokeClient: spokeClient, + restMapper: restMapper, + recorder: recorder, + concurrentReconciles: concurrentReconciles, + parallelizer: parallelizer, + workNameSpace: workNameSpace, + joined: atomic.NewBool(false), + deletionWaitTime: deletionWaitTime, + requeueRateLimiter: requeueRateLimiter, + usePriorityQueue: usePriorityQueue, + priLinearEqCoeffA: *priorityLinearEquationCoeffA, + priLinearEqCoeffB: *priorityLinearEquationCoeffB, } } @@ -630,8 +631,9 @@ func (r *Reconciler) Leave(ctx context.Context) error { func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { if r.usePriorityQueue { eventHandler := &priorityBasedWorkObjEventHandler{ - qm: r, - workObjAgeForPrioritizedProcessing: r.workObjAgeForPrioritizedProcessing, + qm: r, + priLinearEqCoeffA: r.priLinearEqCoeffA, + priLinearEqCoeffB: r.priLinearEqCoeffB, } newPQ := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] { diff --git a/pkg/controllers/workapplier/pq.go b/pkg/controllers/workapplier/pq.go index caceaebcd..a61c1166a 100644 --- a/pkg/controllers/workapplier/pq.go +++ b/pkg/controllers/workapplier/pq.go @@ -37,23 +37,33 @@ import ( fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" ) +// Note (chenyu1): the work applier is set to periodically requeue Work objects for processing; +// when the KubeFleet agent restarts, the work applier will also have to re-process all the existing +// Work objects for correctness reasons. In environments where a large number of Work objects +// are present, with the default FIFO queue implementation, the work applier might have to spend +// a significant amount of time processing old Work objects that have not been changed for a while +// and are already in a consistent state, before it can get to process recently created/updated Work +// objects, which results in increased latency for new placements to complete. To address this issue, +// we enabled the usage of priority queues in the work applier, so that events +// from newly created/updated Work objects will be processed first. + const ( - // A list of priority levels and their targets for the work applier priority queue. + // A list of priority levels for the work applier priority queue. // // The work applier, when a priority queue is in use, will prioritize requests in the following // order: - // * with highest priority (2): all Create/Delete events, and all Update events + // * with high priority (>0): all Create/Delete events, and all Update events // that concern recently created Work objects or Work objects that are in a failed/undeterminted - // state (apply op/availability check failure, or diff reporting failure). - // * with medium priority (1): all other Update events. - // * with default priority (0): all requeues (with or with errors), and all Generic events. + // state (apply op/availability check failure, or diff reporting failure). For specifics, + // see the implementation details below. + // Note that the top priority level is capped at 100 for consistency/cleanness reasons. + // * with default priority (0): all other Update events. + // * with low priority (-1): all requeues (with or with errors), and all Generic events. // // Note that requests with the same priority level will be processed in the FIFO order. - // - // TO-DO (chenyu1): evaluate if/how we need to/should prioritize requeues properly. - highPriorityLevel = 2 - mediumPriorityLevel = 1 + highestPriorityLevel = 100 defaultPriorityLevel = 0 + lowPriorityLevel = -1 ) type CustomPriorityQueueManager interface { @@ -66,15 +76,34 @@ var _ handler.TypedEventHandler[client.Object, reconcile.Request] = &priorityBas // // It is used to process work object events in a priority-based manner with a priority queue. type priorityBasedWorkObjEventHandler struct { - qm CustomPriorityQueueManager - workObjAgeForPrioritizedProcessing time.Duration + qm CustomPriorityQueueManager + + priLinearEqCoeffA int + priLinearEqCoeffB int +} + +// calcArgBasedPriWithLinearEquation calculates the priority for a work object +// based on its age using a linear equation: Pri(Work) = A * AgeSinceCreationInMinutes(Work) + B, +// where A and B are user-configurable coefficients. +// +// The calculated priority is capped between defaultPriorityLevel and highestPriorityLevel. +func (h *priorityBasedWorkObjEventHandler) calcArgBasedPriWithLinearEquation(workObjAgeMinutes int) int { + priority := h.priLinearEqCoeffA*workObjAgeMinutes + h.priLinearEqCoeffB + if priority < defaultPriorityLevel { + return defaultPriorityLevel + } + if priority > highestPriorityLevel { + return highestPriorityLevel + } + return priority } // Create implements the TypedEventHandler interface. func (h *priorityBasedWorkObjEventHandler) Create(_ context.Context, createEvent event.TypedCreateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) { // Do a sanity check. // - // Normally when this method is called, the priority queue has been initialized. + // Normally when this method is called, the priority queue has been initialized. The check is added + // as the implementation has no control over when this method is called. if h.qm.PriorityQueue() == nil { wrappedErr := fmt.Errorf("received a Create event, but the priority queue is not initialized") _ = controller.NewUnexpectedBehaviorError(wrappedErr) @@ -82,9 +111,9 @@ func (h *priorityBasedWorkObjEventHandler) Create(_ context.Context, createEvent return } - // Enqueue the request with high priority. + // Enqueue the request with the highest priority. opts := priorityqueue.AddOpts{ - Priority: ptr.To(highPriorityLevel), + Priority: ptr.To(highestPriorityLevel), } workObjName := createEvent.Object.GetName() workObjNS := createEvent.Object.GetNamespace() @@ -107,9 +136,9 @@ func (h *priorityBasedWorkObjEventHandler) Delete(_ context.Context, deleteEvent return } - // Enqueue the request with high priority. + // Enqueue the request with the highest priority. opts := priorityqueue.AddOpts{ - Priority: ptr.To(highPriorityLevel), + Priority: ptr.To(highestPriorityLevel), } workObjName := deleteEvent.Object.GetName() workObjNS := deleteEvent.Object.GetNamespace() @@ -171,9 +200,9 @@ func (h *priorityBasedWorkObjEventHandler) Generic(_ context.Context, genericEve return } - // Enqueue the request with default priority. + // Enqueue the request with low priority. opts := priorityqueue.AddOpts{ - Priority: ptr.To(defaultPriorityLevel), + Priority: ptr.To(lowPriorityLevel), } workObjName := genericEvent.Object.GetName() workObjNS := genericEvent.Object.GetNamespace() @@ -192,12 +221,14 @@ func (h *priorityBasedWorkObjEventHandler) determineUpdateEventPriority(oldWorkO // The age is expected to be the same for both old and new work objects, as the field // is immutable and not user configurable. - workObjAge := time.Since(newWorkObj.CreationTimestamp.Time) - if workObjAge <= h.workObjAgeForPrioritizedProcessing { - return highPriorityLevel - } + workObjAgeMinutes := int(time.Since(newWorkObj.CreationTimestamp.Time).Minutes()) // Check if the work object is in a failed/undetermined state. + // + // * If the work object is in such a state, process its Update event with highest priority (even if the work object + // was created long ago); + // * Otherwise, prioritize the processing of the Update event if the work object is recently created; + // * Use the default priority level for all other cases. oldApplyStrategy := oldWorkObj.Spec.ApplyStrategy isReportDiffModeEnabled := oldApplyStrategy != nil && oldApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff @@ -211,19 +242,19 @@ func (h *priorityBasedWorkObjEventHandler) determineUpdateEventPriority(oldWorkO switch { case isReportDiffModeEnabled && condition.IsConditionStatusTrue(diffReportedCond, oldWorkObj.Generation): // The ReportDiff mode is enabled and the status suggests that the diff reporting has been completed successfully. - // Use medium priority for the Update event. - return mediumPriorityLevel + // Determine the priority level based on the age of the Work object. + return h.calcArgBasedPriWithLinearEquation(workObjAgeMinutes) case isReportDiffModeEnabled: // The ReportDiff mode is enabled, but the diff reporting has not been completed yet or has failed. // Use high priority for the Update event. - return highPriorityLevel + return highestPriorityLevel case condition.IsConditionStatusTrue(appliedCond, oldWorkObj.Generation) && condition.IsConditionStatusTrue(availableCond, oldWorkObj.Generation): // The apply strategy is set to the CSA/SSA mode and the work object is applied and available. - // Use medium priority for the Update event. - return mediumPriorityLevel + // Determine the priority level based on the age of the Work object. + return h.calcArgBasedPriWithLinearEquation(workObjAgeMinutes) default: // The apply strategy is set to the CSA/SSA mode and the work object is in a failed/undetermined state. // Use high priority for the Update event. - return highPriorityLevel + return highestPriorityLevel } } diff --git a/pkg/controllers/workapplier/pq_test.go b/pkg/controllers/workapplier/pq_test.go index 4932a647b..4d413cc8c 100644 --- a/pkg/controllers/workapplier/pq_test.go +++ b/pkg/controllers/workapplier/pq_test.go @@ -36,8 +36,6 @@ import ( ) const ( - workObjAgeForPrioritizedProcessingTestOnly = time.Minute * 5 - pqName = "test-pq" workNameForPriorityTestingTmpl = "prioritized-work-%s" ) @@ -65,23 +63,26 @@ func TestCreateEventHandler(t *testing.T) { ctx := context.Background() pq := priorityqueue.New[reconcile.Request](pqName) pqEventHandler := &priorityBasedWorkObjEventHandler{ - qm: &pqWrapper{pq: pq}, - workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + qm: &pqWrapper{pq: pq}, + // For simplicity reasons, set all Update events for completed Work objects to have the priority level of 80 + // regardless of their ages. + priLinearEqCoeffA: 0, + priLinearEqCoeffB: 80, } - // Add two keys with medium and default priority levels respectively. + // Add two keys with default and low priority levels respectively. opts := priorityqueue.AddOpts{ - Priority: ptr.To(mediumPriorityLevel), + Priority: ptr.To(defaultPriorityLevel), } - workWithMediumPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "medium") - key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithMediumPriName} + workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) opts = priorityqueue.AddOpts{ - Priority: ptr.To(defaultPriorityLevel), + Priority: ptr.To(lowPriorityLevel), } - workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") - key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} + workWithLowPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "low") + key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithLowPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) // Handle a CreateEvent, which should add a new key with high priority. @@ -110,8 +111,8 @@ func TestCreateEventHandler(t *testing.T) { if diff := cmp.Diff(item, wantItem); diff != "" { t.Errorf("dequeued item mismatch (-got, +want):\n%s", diff) } - if !cmp.Equal(pri, highPriorityLevel) { - t.Errorf("priority of dequeued item, expected %d, got %d", highPriorityLevel, pri) + if !cmp.Equal(pri, highestPriorityLevel) { + t.Errorf("priority of dequeued item, expected %d, got %d", highestPriorityLevel, pri) } } @@ -121,22 +122,23 @@ func TestUpdateEventHandler_NormalOps(t *testing.T) { ctx := context.Background() pq := priorityqueue.New[reconcile.Request](pqName) pqEventHandler := &priorityBasedWorkObjEventHandler{ - qm: &pqWrapper{pq: pq}, - workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + qm: &pqWrapper{pq: pq}, + priLinearEqCoeffA: -10, + priLinearEqCoeffB: 80, } - // Add a key with default priority levels respectively. + // Add a key with low priority level. opts := priorityqueue.AddOpts{ - Priority: ptr.To(defaultPriorityLevel), + Priority: ptr.To(lowPriorityLevel), } - workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") - key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} + workWithLowPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "low") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithLowPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) // Handle an UpdateEvent that concerns a Work object with ReportDiff strategy and has been - // processed successfully long before (>5 minutes ago). + // processed successfully long ago (30 minutes). workInReportDiffModeAndProcessedLongBfrName := fmt.Sprintf(workNameForPriorityTestingTmpl, "report-diff-processed-long-bfr") - longAgo := time.Now().Add(-time.Minute * 10) + longAgo := time.Now().Add(-time.Minute * 30) oldWorkObj := &fleetv1beta1.Work{ ObjectMeta: metav1.ObjectMeta{ Namespace: memberReservedNSName1, @@ -162,7 +164,7 @@ func TestUpdateEventHandler_NormalOps(t *testing.T) { newWorkObj.Generation += 1 pqEventHandler.Update(ctx, event.TypedUpdateEvent[client.Object]{ObjectOld: oldWorkObj, ObjectNew: newWorkObj}, nil) - // Handle an UpdateEvent that concerns a normal Work object that was created very recently (<5 minutes ago). + // Handle an UpdateEvent that concerns a normal Work object that was created very recently (2 minutes ago). workInCSAModeAndJustProcessedName := fmt.Sprintf(workNameForPriorityTestingTmpl, "csa-just-processed") shortWhileAgo := time.Now().Add(-time.Minute * 2) oldWorkObj = &fleetv1beta1.Work{ @@ -204,7 +206,7 @@ func TestUpdateEventHandler_NormalOps(t *testing.T) { Name: workInCSAModeAndJustProcessedName, }, }, - Priority: highPriorityLevel, + Priority: 60, // -10 * 2 + 80 = 60 }, { Key: reconcile.Request{ @@ -213,16 +215,16 @@ func TestUpdateEventHandler_NormalOps(t *testing.T) { Name: workInReportDiffModeAndProcessedLongBfrName, }, }, - Priority: mediumPriorityLevel, + Priority: defaultPriorityLevel, // -10 * 30 + 80 = -220 -> capped to defaultPriorityLevel (0) }, { Key: reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: memberReservedNSName1, - Name: workWithDefaultPriName, + Name: workWithLowPriName, }, }, - Priority: defaultPriorityLevel, + Priority: lowPriorityLevel, }, } @@ -294,8 +296,7 @@ func TestUpdateEventHandler_Erred(t *testing.T) { t.Run(tc.name, func(t *testing.T) { pq := priorityqueue.New[reconcile.Request](pqName) pqEventHandler := &priorityBasedWorkObjEventHandler{ - qm: &pqWrapper{pq: pq}, - workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + qm: &pqWrapper{pq: pq}, } pqEventHandler.Update(ctx, tc.updateEvent, nil) @@ -312,26 +313,29 @@ func TestDeleteEventHandler(t *testing.T) { ctx := context.Background() pq := priorityqueue.New[reconcile.Request](pqName) pqEventHandler := &priorityBasedWorkObjEventHandler{ - qm: &pqWrapper{pq: pq}, - workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + qm: &pqWrapper{pq: pq}, + // For simplicity reasons, set all Update events for completed Work objects to have the priority level of 80 + // regardless of their ages. + priLinearEqCoeffA: 0, + priLinearEqCoeffB: 80, } - // Add two keys with medium and default priority levels respectively. + // Add two keys with default and low priority levels respectively. opts := priorityqueue.AddOpts{ - Priority: ptr.To(mediumPriorityLevel), + Priority: ptr.To(defaultPriorityLevel), } - workWithMediumPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "medium") - key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithMediumPriName} + workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) opts = priorityqueue.AddOpts{ - Priority: ptr.To(defaultPriorityLevel), + Priority: ptr.To(lowPriorityLevel), } - workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") - key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} + workWithLowPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "low") + key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithLowPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) - // Handle a DeleteEvent, which should add a new key with high priority. + // Handle a DeleteEvent, which should add a new key with the highest priority. workJustDeletedName := fmt.Sprintf(workNameForPriorityTestingTmpl, "just-deleted") workObj := fleetv1beta1.Work{ ObjectMeta: metav1.ObjectMeta{ @@ -357,8 +361,8 @@ func TestDeleteEventHandler(t *testing.T) { if diff := cmp.Diff(item, wantItem); diff != "" { t.Errorf("dequeued item mismatch (-got, +want):\n%s", diff) } - if !cmp.Equal(pri, highPriorityLevel) { - t.Errorf("priority of dequeued item, expected %d, got %d", highPriorityLevel, pri) + if !cmp.Equal(pri, highestPriorityLevel) { + t.Errorf("priority of dequeued item, expected %d, got %d", highestPriorityLevel, pri) } } @@ -367,23 +371,26 @@ func TestGenericEventHandler(t *testing.T) { ctx := context.Background() pq := priorityqueue.New[reconcile.Request](pqName) pqEventHandler := &priorityBasedWorkObjEventHandler{ - qm: &pqWrapper{pq: pq}, - workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + qm: &pqWrapper{pq: pq}, + // For simplicity reasons, set all Update events for completed Work objects to have the priority level of 80 + // regardless of their ages. + priLinearEqCoeffA: 0, + priLinearEqCoeffB: 80, } - // Add two keys with high and medium priority levels respectively. + // Add two keys with highest and default priority levels respectively. opts := priorityqueue.AddOpts{ - Priority: ptr.To(highPriorityLevel), + Priority: ptr.To(highestPriorityLevel), } - workWithHighPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "high") - key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithHighPriName} + workWithHighestPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "highest") + key := types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithHighestPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) opts = priorityqueue.AddOpts{ - Priority: ptr.To(mediumPriorityLevel), + Priority: ptr.To(defaultPriorityLevel), } - workWithMediumPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "medium") - key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithMediumPriName} + workWithDefaultPriName := fmt.Sprintf(workNameForPriorityTestingTmpl, "default") + key = types.NamespacedName{Namespace: memberReservedNSName1, Name: workWithDefaultPriName} pq.AddWithOpts(opts, reconcile.Request{NamespacedName: key}) // Handle a GenericEvent, which should add a new key with default priority. @@ -407,19 +414,19 @@ func TestGenericEventHandler(t *testing.T) { Key: reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: memberReservedNSName1, - Name: workWithHighPriName, + Name: workWithHighestPriName, }, }, - Priority: highPriorityLevel, + Priority: highestPriorityLevel, }, { Key: reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: memberReservedNSName1, - Name: workWithMediumPriName, + Name: workWithDefaultPriName, }, }, - Priority: mediumPriorityLevel, + Priority: defaultPriorityLevel, }, { Key: reconcile.Request{ @@ -428,7 +435,7 @@ func TestGenericEventHandler(t *testing.T) { Name: workGenericEventName, }, }, - Priority: defaultPriorityLevel, + Priority: lowPriorityLevel, }, } @@ -446,12 +453,13 @@ func TestGenericEventHandler(t *testing.T) { func TestDetermineUpdateEventPriority(t *testing.T) { now := metav1.Now() - longAgo := metav1.NewTime(now.Add(-time.Minute * 10)) + longAgo := metav1.NewTime(now.Add(-time.Minute * 30)) pq := priorityqueue.New[reconcile.Request](pqName) pqEventHandler := &priorityBasedWorkObjEventHandler{ - qm: &pqWrapper{pq: pq}, - workObjAgeForPrioritizedProcessing: workObjAgeForPrioritizedProcessingTestOnly, + qm: &pqWrapper{pq: pq}, + priLinearEqCoeffA: -10, + priLinearEqCoeffB: 80, } testCases := []struct { @@ -460,17 +468,6 @@ func TestDetermineUpdateEventPriority(t *testing.T) { newWorkObj *fleetv1beta1.Work wantPriority int }{ - { - name: "fresh work object", - newWorkObj: &fleetv1beta1.Work{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: memberReservedNSName1, - Name: workName, - CreationTimestamp: now, - }, - }, - wantPriority: highPriorityLevel, - }, { name: "reportDiff mode, diff reported", oldWorkObj: &fleetv1beta1.Work{ @@ -500,7 +497,7 @@ func TestDetermineUpdateEventPriority(t *testing.T) { CreationTimestamp: longAgo, }, }, - wantPriority: mediumPriorityLevel, + wantPriority: defaultPriorityLevel, }, { name: "reportDiff mode, diff not reported", @@ -531,7 +528,7 @@ func TestDetermineUpdateEventPriority(t *testing.T) { CreationTimestamp: longAgo, }, }, - wantPriority: highPriorityLevel, + wantPriority: highestPriorityLevel, }, { name: "CSA/SSA mode, applied and available", @@ -562,7 +559,7 @@ func TestDetermineUpdateEventPriority(t *testing.T) { CreationTimestamp: longAgo, }, }, - wantPriority: mediumPriorityLevel, + wantPriority: defaultPriorityLevel, }, { name: "CSA/SSA mode, not applied and available", @@ -589,7 +586,7 @@ func TestDetermineUpdateEventPriority(t *testing.T) { CreationTimestamp: longAgo, }, }, - wantPriority: highPriorityLevel, + wantPriority: highestPriorityLevel, }, } @@ -602,3 +599,47 @@ func TestDetermineUpdateEventPriority(t *testing.T) { }) } } + +// TestCalcArgBasedPriWithLinearEquation tests the calcArgBasedPriWithLinearEquation method. +func TestCalculateArgBasedPriWithLinearEquation(t *testing.T) { + pqEventHandler := &priorityBasedWorkObjEventHandler{ + priLinearEqCoeffA: -10, + priLinearEqCoeffB: highestPriorityLevel + 20, // 120 + } + + testCases := []struct { + name string + workObjAgeMinutes int + wantPri int + }{ + { + name: "just created (capped)", + workObjAgeMinutes: 0, + wantPri: highestPriorityLevel, + }, + { + name: "5 minutes old", + workObjAgeMinutes: 5, + wantPri: 70, + }, + { + name: "8 minutes old", + workObjAgeMinutes: 8, + wantPri: 40, + }, + { + name: "15 minutes old (capped)", + workObjAgeMinutes: 15, + wantPri: defaultPriorityLevel, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pri := pqEventHandler.calcArgBasedPriWithLinearEquation(tc.workObjAgeMinutes) + if !cmp.Equal(pri, tc.wantPri) { + t.Errorf("calculated priority, expected %d, got %d", tc.wantPri, pri) + } + }) + } +} diff --git a/pkg/controllers/workapplier/suite_test.go b/pkg/controllers/workapplier/suite_test.go index ac3c0c650..8a62e0dbd 100644 --- a/pkg/controllers/workapplier/suite_test.go +++ b/pkg/controllers/workapplier/suite_test.go @@ -279,7 +279,8 @@ var _ = BeforeSuite(func() { 30*time.Second, nil, // Use the default backoff rate limiter. false, // Disable priority queueing. - 0, + nil, // Use the default priority linear equation coefficients. + nil, // Use the default priority linear equation coefficients. ) Expect(workApplier1.SetupWithManager(hubMgr1)).To(Succeed()) @@ -328,7 +329,8 @@ var _ = BeforeSuite(func() { 30*time.Second, superLongExponentialBackoffRateLimiter, false, // Disable priority queueing. - 0, + nil, // Use the default priority linear equation coefficients. + nil, // Use the default priority linear equation coefficients. ) // Due to name conflicts, the second work applier must be set up manually. err = ctrl.NewControllerManagedBy(hubMgr2).Named("work-applier-controller-duplicate"). @@ -372,7 +374,8 @@ var _ = BeforeSuite(func() { 30*time.Second, nil, // Use the default backoff rate limiter. false, // Disable priority queueing. - 0, + nil, // Use the default priority linear equation coefficients. + nil, // Use the default priority linear equation coefficients. ) // Due to name conflicts, the third work applier must be set up manually. err = ctrl.NewControllerManagedBy(hubMgr3).Named("work-applier-controller-waved-parallel-processing").