Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions pkg/status/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package status

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Manager helps manage conditions in a Kubernetes resource status.
type Manager struct {
conditions *[]metav1.Condition
generation int64 // Store the generation
}

// NewManager creates a new condition manager for the given slice of conditions and generation.
// The slice passed must be a pointer, as the manager will modify it directly.
func NewManager(conditions *[]metav1.Condition, generation int64) *Manager {
if conditions == nil {
// Initialize if nil to prevent panics.
emptyConditions := make([]metav1.Condition, 0)
conditions = &emptyConditions
}
return &Manager{conditions: conditions, generation: generation}
}

// set is a private helper to set or update a condition using meta.SetStatusCondition.
// meta.SetStatusCondition handles updating LastTransitionTime only when the status changes.
func (m *Manager) set(conditionType string, status metav1.ConditionStatus, reason, message string) {
newCondition := metav1.Condition{
Type: conditionType,
Status: status,
ObservedGeneration: m.generation, // Use stored generation
LastTransitionTime: metav1.NewTime(time.Now()),
Reason: reason,
Message: message,
}
meta.SetStatusCondition(m.conditions, newCondition)
}

// --- Semantic helpers ---
// These helpers make setting common conditions more readable in the reconciler.

// SetAvailable sets the Available condition status.
func (m *Manager) SetAvailable(status bool, reason, message string) {
conditionStatus := metav1.ConditionFalse
if status {
conditionStatus = metav1.ConditionTrue
}
m.set(ConditionAvailable, conditionStatus, reason, message)
}

// SetProgressing sets the Progressing condition status.
func (m *Manager) SetProgressing(status bool, reason, message string) {
conditionStatus := metav1.ConditionFalse
if status {
conditionStatus = metav1.ConditionTrue
}
m.set(ConditionProgressing, conditionStatus, reason, message)
}

// SetDegraded sets the Degraded condition status.
func (m *Manager) SetDegraded(status bool, reason, message string) {
conditionStatus := metav1.ConditionFalse
if status {
conditionStatus = metav1.ConditionTrue
}
m.set(ConditionDegraded, conditionStatus, reason, message)
}

// Update returns a ConditionUpdater that allows chaining condition changes before applying them.
func (m *Manager) Update() ConditionUpdater {
return ConditionUpdater{manager: m}
}

// ConditionUpdater batches updates to conditions so callers can configure multiple
// condition states without repeating boilerplate. Call Apply() to persist the changes.
type ConditionUpdater struct {
manager *Manager
updates []func()
}

// Available queues an update for the Available condition.
func (u ConditionUpdater) Available(status bool, reason, message string) ConditionUpdater {
if u.manager == nil {
return u
}

statusCopy := status
reasonCopy := reason
messageCopy := message

u.updates = append(u.updates, func() {
u.manager.SetAvailable(statusCopy, reasonCopy, messageCopy)
})
return u
}

// Progressing queues an update for the Progressing condition.
func (u ConditionUpdater) Progressing(status bool, reason, message string) ConditionUpdater {
if u.manager == nil {
return u
}

statusCopy := status
reasonCopy := reason
messageCopy := message

u.updates = append(u.updates, func() {
u.manager.SetProgressing(statusCopy, reasonCopy, messageCopy)
})
return u
}

// Degraded queues an update for the Degraded condition.
func (u ConditionUpdater) Degraded(status bool, reason, message string) ConditionUpdater {
if u.manager == nil {
return u
}

statusCopy := status
reasonCopy := reason
messageCopy := message

u.updates = append(u.updates, func() {
u.manager.SetDegraded(statusCopy, reasonCopy, messageCopy)
})
return u
}

// Apply executes the queued condition updates.
func (u ConditionUpdater) Apply() {
if u.manager == nil {
return
}

for _, update := range u.updates {
update()
}
}

// Helper to format error messages concisely for Conditions
func FormatError(prefix string, err error) string {
if err == nil {
return prefix
}
return fmt.Sprintf("%s: %v", prefix, err)
}
118 changes: 118 additions & 0 deletions pkg/status/patch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Package status provides utilities for managing Kubernetes resource status,
// particularly focusing on Conditions and status patching.
package status

import (
"context"
"fmt"

apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors" // For GroupResource in conflict error
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// DefaultRetry is the default backoff for retrying patch operations.
// Customize this if needed, e.g., by providing a different
// retry.Backoff instance to retry.RetryOnConflict.
var DefaultRetry = retry.DefaultRetry

// PatchStatusMutate applies status changes using a mutate function.
// It fetches the latest version of the object, applies mutations via the mutate func,
// and then patches the status subresource if changes are detected.
// It uses optimistic locking and retries on conflict errors (like "the object has been modified").
//
// Parameters:
// - ctx: The context for the operation.
// - c: The controller-runtime client.
// - originalObj: The original object instance fetched at the start of the reconcile loop.
// This is used to get the ObjectKey and the Generation at the start of the reconcile.
// It should not be modified by the reconciliation logic directly; status changes
// should be calculated and applied via the mutateFn on a fresh copy.
// - mutateFn: A function that takes the latest fetched object (as type T) and applies
// the calculated status changes *directly to that object's status field*.
// This function should return an error if the mutation itself fails for some reason,
// which will abort the patch attempt.
func PatchStatusMutate[T client.Object](
ctx context.Context,
c client.Client,
originalObj T, // Original object from reconcile start
mutateFn func(latestFetchedObj T) error, // Mutate function now takes the latest object
) error {
logger := log.FromContext(ctx).WithValues(
"objectName", originalObj.GetName(),
"objectNamespace", originalObj.GetNamespace(),
"gvk", originalObj.GetObjectKind().GroupVersionKind().String(),
)
key := client.ObjectKeyFromObject(originalObj)
startGeneration := originalObj.GetGeneration() // Generation at the start of reconcile

return retry.RetryOnConflict(DefaultRetry, func() error {
// Fetch the latest version of the object in each retry iteration.
latestFetchedObj := originalObj.DeepCopyObject().(T) // Create a new instance of type T
getErr := c.Get(ctx, key, latestFetchedObj)
if getErr != nil {
if apierrors.IsNotFound(getErr) {
logger.Info("Object not found while attempting to patch status, skipping patch.")
return nil // Object deleted, no status to patch.
}
logger.Error(getErr, "Failed to get latest object for status patch.")
// Returning the error from Get will stop retries by default retry settings
// if it's not a conflict error.
return fmt.Errorf("failed to get latest object %v for status patch: %w", key, getErr)
}

// Check if the generation changed during our reconcile. This might mean the spec changed
// and our calculated status is stale.
if startGeneration != latestFetchedObj.GetGeneration() {
logger.Info("Object generation changed during reconcile, status calculated based on old spec might be stale. "+
"Aborting this patch attempt.",
"key", key, "startGeneration", startGeneration, "currentGeneration", latestFetchedObj.GetGeneration())
// Returning nil here means we acknowledge the generation change and decide *not* to patch stale status.
// The main Reconcile loop will requeue soon with the new generation.
// This avoids spamming patch retries for stale data.
return nil
}

// Create a copy *before* mutation to compare against. This is the object state
// from the API server *before* we apply our calculated changes for this attempt.
beforeMutateStatusCopy := latestFetchedObj.DeepCopyObject().(T)

// Apply the calculated status changes by calling the mutate function.
// The mutate function should modify the 'latestFetchedObj's status field directly.
if err := mutateFn(latestFetchedObj); err != nil {
// If the mutation logic itself fails, we likely want to stop retrying the patch.
logger.Error(err, "Mutation function failed during status patch attempt.")
// Stop retries by returning non-conflict error
return fmt.Errorf("mutate function failed during status patch: %w", err)
}

// Compare the object's status before and after mutation.
// We use Semantic.DeepEqual on the whole objects because `Status()` subresource patch
// still sends a patch based on the whole object typically. More accurately,
// we should compare just the status fields if we could extract them generically.
// However, comparing the whole object after mutation (which only touched status)
// against its state before mutation (but after GET) is correct.
if apiequality.Semantic.DeepEqual(beforeMutateStatusCopy, latestFetchedObj) {
logger.V(1).Info("No status change detected after applying mutation, skipping patch.")
return nil // No actual changes to status, no need to patch
}

// Patch the status subresource using the mutated 'latestFetchedObj' object.
// client.MergeFrom(beforeMutateStatusCopy) generates a JSON merge patch from the diff between the
// mutated object and its pre-mutation copy. We intentionally avoid StrategicMerge
// because CRDs (like EtcdCluster) don't support it.
logger.Info("Status change detected, attempting to patch status subresource.")
patchErr := c.Status().Patch(ctx, latestFetchedObj, client.MergeFrom(beforeMutateStatusCopy))
if patchErr != nil {
// Log the patch error. RetryOnConflict will decide whether to retry based on the error type.
// Conflict errors will be retried. Other errors might not.
logger.Info("Failed to patch status, will retry if conflict error.", "error", patchErr.Error())
return patchErr // Return the error to retry.RetryOnConflict
}

logger.Info("Successfully patched status subresource.")
return nil // Patch successful
})
}
53 changes: 53 additions & 0 deletions pkg/status/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Package status provides utilities for managing Kubernetes resource status,
// particularly focusing on Conditions according to standard practices.
package status

// Condition types used for EtcdCluster status.
// Adhering to Kubernetes API conventions as much as possible.
// See:
// github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md

const (
// ConditionAvailable indicates that the etcd cluster has reached its desired state,
// has quorum, and is ready to serve requests. All members are healthy.
ConditionAvailable string = "Available"

// ConditionProgressing indicates that the operator is actively working
// to bring the etcd cluster towards the desired state (e.g., creating resources,
// scaling, promoting learners). It's True when reconciliation is in progress
// and False when the desired state is reached or a terminal state occurs.
ConditionProgressing string = "Progressing"

// ConditionDegraded indicates that the etcd cluster is functional but
// operating with potential issues that might impact performance or fault tolerance
// (e.g., some members unhealthy but quorum maintained, leader missing temporarily).
// It requires attention but is not necessarily completely unavailable.
ConditionDegraded string = "Degraded"
)

// Common reasons for EtcdCluster status conditions. Reasons should be CamelCase and concise.
const (
// General Reasons
ReasonReconciling string = "Reconciling"
ReasonReconcileSuccess string = "ReconcileSuccess"
ReasonClusterHealthy string = "ClusterHealthy"

// Available Reasons
ReasonClusterReady string = "ClusterReady"
ReasonLeaderNotFound string = "LeaderNotFound"
ReasonSizeIsZero string = "SizeIsZero"

// Progressing Reasons
ReasonInitializingCluster string = "InitializingCluster"
ReasonCreatingResources string = "CreatingResources" // STS, Service etc.
ReasonScalingUp string = "ScalingUp"
ReasonScalingDown string = "ScalingDown"
ReasonPromotingLearner string = "PromotingLearner"
ReasonMembersMismatch string = "MembersMismatch"

// Degraded Reasons
ReasonMembersUnhealthy string = "MembersUnhealthy"
ReasonHealthCheckError string = "HealthCheckError"
ReasonResourceCreateFail string = "ResourceCreateFail" // Non-fatal resource creation failure
ReasonResourceUpdateFail string = "ResourceUpdateFail" // Non-fatal resource update failure
)