From a0b54b2464978fb4f73326d668acacaff14e5e46 Mon Sep 17 00:00:00 2001 From: Wenxue Zhao Date: Wed, 29 Oct 2025 18:50:43 -0400 Subject: [PATCH] pkg: add status manager for condition handling Signed-off-by: Wenxue Zhao --- pkg/status/manager.go | 149 ++++++++++++++++++++++++++++++++++++++++++ pkg/status/patch.go | 118 +++++++++++++++++++++++++++++++++ pkg/status/type.go | 53 +++++++++++++++ 3 files changed, 320 insertions(+) create mode 100644 pkg/status/manager.go create mode 100644 pkg/status/patch.go create mode 100644 pkg/status/type.go diff --git a/pkg/status/manager.go b/pkg/status/manager.go new file mode 100644 index 00000000..d939c122 --- /dev/null +++ b/pkg/status/manager.go @@ -0,0 +1,149 @@ +package status + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Manager helps manage conditions in a Kubernetes resource status. +type Manager struct { + conditions *[]metav1.Condition + generation int64 // Store the generation +} + +// NewManager creates a new condition manager for the given slice of conditions and generation. +// The slice passed must be a pointer, as the manager will modify it directly. +func NewManager(conditions *[]metav1.Condition, generation int64) *Manager { + if conditions == nil { + // Initialize if nil to prevent panics. + emptyConditions := make([]metav1.Condition, 0) + conditions = &emptyConditions + } + return &Manager{conditions: conditions, generation: generation} +} + +// set is a private helper to set or update a condition using meta.SetStatusCondition. +// meta.SetStatusCondition handles updating LastTransitionTime only when the status changes. +func (m *Manager) set(conditionType string, status metav1.ConditionStatus, reason, message string) { + newCondition := metav1.Condition{ + Type: conditionType, + Status: status, + ObservedGeneration: m.generation, // Use stored generation + LastTransitionTime: metav1.NewTime(time.Now()), + Reason: reason, + Message: message, + } + meta.SetStatusCondition(m.conditions, newCondition) +} + +// --- Semantic helpers --- +// These helpers make setting common conditions more readable in the reconciler. + +// SetAvailable sets the Available condition status. +func (m *Manager) SetAvailable(status bool, reason, message string) { + conditionStatus := metav1.ConditionFalse + if status { + conditionStatus = metav1.ConditionTrue + } + m.set(ConditionAvailable, conditionStatus, reason, message) +} + +// SetProgressing sets the Progressing condition status. +func (m *Manager) SetProgressing(status bool, reason, message string) { + conditionStatus := metav1.ConditionFalse + if status { + conditionStatus = metav1.ConditionTrue + } + m.set(ConditionProgressing, conditionStatus, reason, message) +} + +// SetDegraded sets the Degraded condition status. +func (m *Manager) SetDegraded(status bool, reason, message string) { + conditionStatus := metav1.ConditionFalse + if status { + conditionStatus = metav1.ConditionTrue + } + m.set(ConditionDegraded, conditionStatus, reason, message) +} + +// Update returns a ConditionUpdater that allows chaining condition changes before applying them. +func (m *Manager) Update() ConditionUpdater { + return ConditionUpdater{manager: m} +} + +// ConditionUpdater batches updates to conditions so callers can configure multiple +// condition states without repeating boilerplate. Call Apply() to persist the changes. +type ConditionUpdater struct { + manager *Manager + updates []func() +} + +// Available queues an update for the Available condition. +func (u ConditionUpdater) Available(status bool, reason, message string) ConditionUpdater { + if u.manager == nil { + return u + } + + statusCopy := status + reasonCopy := reason + messageCopy := message + + u.updates = append(u.updates, func() { + u.manager.SetAvailable(statusCopy, reasonCopy, messageCopy) + }) + return u +} + +// Progressing queues an update for the Progressing condition. +func (u ConditionUpdater) Progressing(status bool, reason, message string) ConditionUpdater { + if u.manager == nil { + return u + } + + statusCopy := status + reasonCopy := reason + messageCopy := message + + u.updates = append(u.updates, func() { + u.manager.SetProgressing(statusCopy, reasonCopy, messageCopy) + }) + return u +} + +// Degraded queues an update for the Degraded condition. +func (u ConditionUpdater) Degraded(status bool, reason, message string) ConditionUpdater { + if u.manager == nil { + return u + } + + statusCopy := status + reasonCopy := reason + messageCopy := message + + u.updates = append(u.updates, func() { + u.manager.SetDegraded(statusCopy, reasonCopy, messageCopy) + }) + return u +} + +// Apply executes the queued condition updates. +func (u ConditionUpdater) Apply() { + if u.manager == nil { + return + } + + for _, update := range u.updates { + update() + } +} + +// Helper to format error messages concisely for Conditions +func FormatError(prefix string, err error) string { + if err == nil { + return prefix + } + return fmt.Sprintf("%s: %v", prefix, err) +} diff --git a/pkg/status/patch.go b/pkg/status/patch.go new file mode 100644 index 00000000..3bf8c67f --- /dev/null +++ b/pkg/status/patch.go @@ -0,0 +1,118 @@ +// Package status provides utilities for managing Kubernetes resource status, +// particularly focusing on Conditions and status patching. +package status + +import ( + "context" + "fmt" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" // For GroupResource in conflict error + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// DefaultRetry is the default backoff for retrying patch operations. +// Customize this if needed, e.g., by providing a different +// retry.Backoff instance to retry.RetryOnConflict. +var DefaultRetry = retry.DefaultRetry + +// PatchStatusMutate applies status changes using a mutate function. +// It fetches the latest version of the object, applies mutations via the mutate func, +// and then patches the status subresource if changes are detected. +// It uses optimistic locking and retries on conflict errors (like "the object has been modified"). +// +// Parameters: +// - ctx: The context for the operation. +// - c: The controller-runtime client. +// - originalObj: The original object instance fetched at the start of the reconcile loop. +// This is used to get the ObjectKey and the Generation at the start of the reconcile. +// It should not be modified by the reconciliation logic directly; status changes +// should be calculated and applied via the mutateFn on a fresh copy. +// - mutateFn: A function that takes the latest fetched object (as type T) and applies +// the calculated status changes *directly to that object's status field*. +// This function should return an error if the mutation itself fails for some reason, +// which will abort the patch attempt. +func PatchStatusMutate[T client.Object]( + ctx context.Context, + c client.Client, + originalObj T, // Original object from reconcile start + mutateFn func(latestFetchedObj T) error, // Mutate function now takes the latest object +) error { + logger := log.FromContext(ctx).WithValues( + "objectName", originalObj.GetName(), + "objectNamespace", originalObj.GetNamespace(), + "gvk", originalObj.GetObjectKind().GroupVersionKind().String(), + ) + key := client.ObjectKeyFromObject(originalObj) + startGeneration := originalObj.GetGeneration() // Generation at the start of reconcile + + return retry.RetryOnConflict(DefaultRetry, func() error { + // Fetch the latest version of the object in each retry iteration. + latestFetchedObj := originalObj.DeepCopyObject().(T) // Create a new instance of type T + getErr := c.Get(ctx, key, latestFetchedObj) + if getErr != nil { + if apierrors.IsNotFound(getErr) { + logger.Info("Object not found while attempting to patch status, skipping patch.") + return nil // Object deleted, no status to patch. + } + logger.Error(getErr, "Failed to get latest object for status patch.") + // Returning the error from Get will stop retries by default retry settings + // if it's not a conflict error. + return fmt.Errorf("failed to get latest object %v for status patch: %w", key, getErr) + } + + // Check if the generation changed during our reconcile. This might mean the spec changed + // and our calculated status is stale. + if startGeneration != latestFetchedObj.GetGeneration() { + logger.Info("Object generation changed during reconcile, status calculated based on old spec might be stale. "+ + "Aborting this patch attempt.", + "key", key, "startGeneration", startGeneration, "currentGeneration", latestFetchedObj.GetGeneration()) + // Returning nil here means we acknowledge the generation change and decide *not* to patch stale status. + // The main Reconcile loop will requeue soon with the new generation. + // This avoids spamming patch retries for stale data. + return nil + } + + // Create a copy *before* mutation to compare against. This is the object state + // from the API server *before* we apply our calculated changes for this attempt. + beforeMutateStatusCopy := latestFetchedObj.DeepCopyObject().(T) + + // Apply the calculated status changes by calling the mutate function. + // The mutate function should modify the 'latestFetchedObj's status field directly. + if err := mutateFn(latestFetchedObj); err != nil { + // If the mutation logic itself fails, we likely want to stop retrying the patch. + logger.Error(err, "Mutation function failed during status patch attempt.") + // Stop retries by returning non-conflict error + return fmt.Errorf("mutate function failed during status patch: %w", err) + } + + // Compare the object's status before and after mutation. + // We use Semantic.DeepEqual on the whole objects because `Status()` subresource patch + // still sends a patch based on the whole object typically. More accurately, + // we should compare just the status fields if we could extract them generically. + // However, comparing the whole object after mutation (which only touched status) + // against its state before mutation (but after GET) is correct. + if apiequality.Semantic.DeepEqual(beforeMutateStatusCopy, latestFetchedObj) { + logger.V(1).Info("No status change detected after applying mutation, skipping patch.") + return nil // No actual changes to status, no need to patch + } + + // Patch the status subresource using the mutated 'latestFetchedObj' object. + // client.MergeFrom(beforeMutateStatusCopy) generates a JSON merge patch from the diff between the + // mutated object and its pre-mutation copy. We intentionally avoid StrategicMerge + // because CRDs (like EtcdCluster) don't support it. + logger.Info("Status change detected, attempting to patch status subresource.") + patchErr := c.Status().Patch(ctx, latestFetchedObj, client.MergeFrom(beforeMutateStatusCopy)) + if patchErr != nil { + // Log the patch error. RetryOnConflict will decide whether to retry based on the error type. + // Conflict errors will be retried. Other errors might not. + logger.Info("Failed to patch status, will retry if conflict error.", "error", patchErr.Error()) + return patchErr // Return the error to retry.RetryOnConflict + } + + logger.Info("Successfully patched status subresource.") + return nil // Patch successful + }) +} diff --git a/pkg/status/type.go b/pkg/status/type.go new file mode 100644 index 00000000..595411b4 --- /dev/null +++ b/pkg/status/type.go @@ -0,0 +1,53 @@ +// Package status provides utilities for managing Kubernetes resource status, +// particularly focusing on Conditions according to standard practices. +package status + +// Condition types used for EtcdCluster status. +// Adhering to Kubernetes API conventions as much as possible. +// See: +// github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md + +const ( + // ConditionAvailable indicates that the etcd cluster has reached its desired state, + // has quorum, and is ready to serve requests. All members are healthy. + ConditionAvailable string = "Available" + + // ConditionProgressing indicates that the operator is actively working + // to bring the etcd cluster towards the desired state (e.g., creating resources, + // scaling, promoting learners). It's True when reconciliation is in progress + // and False when the desired state is reached or a terminal state occurs. + ConditionProgressing string = "Progressing" + + // ConditionDegraded indicates that the etcd cluster is functional but + // operating with potential issues that might impact performance or fault tolerance + // (e.g., some members unhealthy but quorum maintained, leader missing temporarily). + // It requires attention but is not necessarily completely unavailable. + ConditionDegraded string = "Degraded" +) + +// Common reasons for EtcdCluster status conditions. Reasons should be CamelCase and concise. +const ( + // General Reasons + ReasonReconciling string = "Reconciling" + ReasonReconcileSuccess string = "ReconcileSuccess" + ReasonClusterHealthy string = "ClusterHealthy" + + // Available Reasons + ReasonClusterReady string = "ClusterReady" + ReasonLeaderNotFound string = "LeaderNotFound" + ReasonSizeIsZero string = "SizeIsZero" + + // Progressing Reasons + ReasonInitializingCluster string = "InitializingCluster" + ReasonCreatingResources string = "CreatingResources" // STS, Service etc. + ReasonScalingUp string = "ScalingUp" + ReasonScalingDown string = "ScalingDown" + ReasonPromotingLearner string = "PromotingLearner" + ReasonMembersMismatch string = "MembersMismatch" + + // Degraded Reasons + ReasonMembersUnhealthy string = "MembersUnhealthy" + ReasonHealthCheckError string = "HealthCheckError" + ReasonResourceCreateFail string = "ResourceCreateFail" // Non-fatal resource creation failure + ReasonResourceUpdateFail string = "ResourceUpdateFail" // Non-fatal resource update failure +)