From 3a8ccf6b52fd055169c007f20bbae282ad8c723a Mon Sep 17 00:00:00 2001 From: Kamil Ubych Date: Fri, 20 Feb 2026 10:38:08 +0100 Subject: [PATCH 1/6] reconciliation logic for pgbouncer --- .../controller/postgrescluster_controller.go | 273 +++++++++++++++++- 1 file changed, 258 insertions(+), 15 deletions(-) diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index aa4d47294..4ee6f8a10 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -20,6 +20,8 @@ import ( "context" "errors" "fmt" + "time" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" enterprisev4 "github.com/splunk/splunk-operator/api/v4" corev1 "k8s.io/api/core/v1" @@ -61,6 +63,8 @@ type normalizedCNPGClusterSpec struct { // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusterclasses,verbs=get;list;watch // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/status,verbs=get +// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers/status,verbs=get // Main reconciliation loop for PostgresCluster. func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { @@ -93,7 +97,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ postgresClusterClass := &enterprisev4.PostgresClusterClass{} if getClusterClassErr := r.Get(ctx, client.ObjectKey{Name: postgresCluster.Spec.Class}, postgresClusterClass); getClusterClassErr != nil { logger.Error(getClusterClassErr, "Unable to fetch referenced PostgresClusterClass", "className", postgresCluster.Spec.Class) - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterClassNotFound", getClusterClassErr.Error()) + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterClassNotFound", getClusterClassErr.Error()) return res, getClusterClassErr } @@ -112,9 +116,10 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ newCluster := r.buildCNPGCluster(postgresCluster, mergedConfig) if err = r.Create(ctx, newCluster); err != nil { logger.Error(err, "Failed to create CNPG Cluster") - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterBuildFailed", err.Error()) + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterBuildFailed", err.Error()) return res, err } + r.setCondition(postgresCluster, "Ready", metav1.ConditionTrue, "ClusterBuildSucceeded", fmt.Sprintf("CNPG cluster build Succeeded: %s", postgresCluster.Name)) logger.Info("CNPG Cluster created successfully, requeueing for status update", "name", postgresCluster.Name) return ctrl.Result{RequeueAfter: retryDelay}, nil case err != nil: @@ -136,7 +141,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{Requeue: true}, nil } logger.Error(patchCNPGClusterErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterUpdateFailed", patchCNPGClusterErr.Error()) + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterUpdateFailed", patchCNPGClusterErr.Error()) return res, patchCNPGClusterErr } logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) @@ -149,10 +154,19 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ r.setCondition(postgresCluster, metav1.ConditionFalse, "ManagedRolesFailed", err.Error()) return res, err } - - // 8. Report progress back to the user and manage the reconciliation lifecycle. + // 7. Reconcile Connection Pooler if enabled in class + requeuePooler, poolerErr := r.reconcileConnectionPooler(ctx, postgresCluster, postgresClusterClass, cnpgCluster) + if poolerErr != nil { + logger.Error(poolerErr, "Failed to reconcile connection pooler") + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "PoolerReconciliationFailed", poolerErr.Error()) + return res, poolerErr + } + if requeuePooler { + return ctrl.Result{RequeueAfter: retryDelaytimer}, nil + } + // 7. Report progress back to the user and manage the reconciliation lifecycle. logger.Info("Reconciliation completed successfully", "name", postgresCluster.Name) - r.setCondition(postgresCluster, metav1.ConditionTrue, "ClusterUpdateSucceeded", fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) + r.setCondition(postgresCluster, "Ready", metav1.ConditionTrue, "ClusterUpdateSucceeded", fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) return res, nil } @@ -238,6 +252,162 @@ func (r *PostgresClusterReconciler) buildCNPGCluster(postgresCluster *enterprise return cnpgCluster } +// poolerResourceName returns the CNPG Pooler resource name for a given cluster and type (rw/ro). +func poolerResourceName(clusterName, poolerType string) string { + return fmt.Sprintf("%s-pooler-%s", clusterName, poolerType) +} + +// isConnectionPoolerEnabled determines if connection pooler should be active. +func (r *PostgresClusterReconciler) isConnectionPoolerEnabled(class *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) bool { + if cluster.Spec.ConnectionPoolerEnabled != nil { + return *cluster.Spec.ConnectionPoolerEnabled + } + + return class.Spec.Config.ConnectionPoolerEnabled != nil && + *class.Spec.Config.ConnectionPoolerEnabled +} + +// reconcileConnectionPooler creates or deletes CNPG Pooler resources based on the effective enabled state. +// Returns (requeue, error) — requeue is true when poolers were just created and may not be ready yet. +func (r *PostgresClusterReconciler) reconcileConnectionPooler( + ctx context.Context, + postgresCluster *enterprisev4.PostgresCluster, + class *enterprisev4.PostgresClusterClass, + cnpgCluster *cnpgv1.Cluster, +) (bool, error) { + logger := logs.FromContext(ctx) + + if !r.isConnectionPoolerEnabled(class, postgresCluster) { + // Skip deletion if the cluster is not healthy — owner references handle cleanup via GC. + if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { + return false, nil + } + if err := r.deleteConnectionPoolers(ctx, postgresCluster); err != nil { + return false, err + } + postgresCluster.Status.ConnectionPoolerStatus = nil + meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, "PoolerReady") + return false, nil + } + + if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { + logger.Info("CNPG Cluster not healthy, waiting before creating poolers") + r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "ClusterNotHealthy", "Waiting for CNPG cluster to become healthy before creating poolers") + return false, nil + } + + if class.Spec.CNPG == nil || class.Spec.CNPG.ConnectionPooler == nil { + logger.Info("Connection pooler enabled but config missing in class", "class", class.Name) + r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "PoolerConfigMissing", fmt.Sprintf("Connection pooler is enabled but cnpg.connectionPooler config is missing in class %s", class.Name)) + return false, nil + } + + // Create/Update RW Pooler + if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, "rw"); err != nil { + return false, fmt.Errorf("failed to reconcile RW pooler: %w", err) + } + + // Create/Update RO Pooler + if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, "ro"); err != nil { + return false, fmt.Errorf("failed to reconcile RO pooler: %w", err) + } + + // Check if poolers are ready — requeue if they're still provisioning. + poolersReady := r.syncPoolerStatus(ctx, postgresCluster) + return !poolersReady, nil +} + +// deleteConnectionPoolers removes RW and RO pooler resources if they exist. +func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) error { + logger := logs.FromContext(ctx) + + for _, poolerType := range []string{"rw", "ro"} { + poolerName := poolerResourceName(postgresCluster.Name, poolerType) + pooler := &cnpgv1.Pooler{} + + err := r.Get(ctx, types.NamespacedName{ + Name: poolerName, + Namespace: postgresCluster.Namespace, + }, pooler) + + if apierrors.IsNotFound(err) { + continue + } + if err != nil { + return fmt.Errorf("failed to get pooler %s: %w", poolerName, err) + } + + logger.Info("Deleting CNPG Pooler", "name", poolerName) + if err := r.Delete(ctx, pooler); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete pooler %s: %w", poolerName, err) + } + } + + return nil +} + +// ensureConnectionPooler creates a CNPG Pooler resource if it doesn't exist. +// ensureConnectionPooler creates a CNPG Pooler resource if it doesn't exist. +func (r *PostgresClusterReconciler) ensureConnectionPooler( + ctx context.Context, + postgresCluster *enterprisev4.PostgresCluster, + class *enterprisev4.PostgresClusterClass, + cnpgCluster *cnpgv1.Cluster, + poolerType string, +) error { + poolerName := poolerResourceName(postgresCluster.Name, poolerType) + + existingPooler := &cnpgv1.Pooler{} + err := r.Get(ctx, types.NamespacedName{ + Name: poolerName, + Namespace: postgresCluster.Namespace, + }, existingPooler) + + if apierrors.IsNotFound(err) { + logs.FromContext(ctx).Info("Creating CNPG Pooler", "name", poolerName, "type", poolerType) + r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "PoolerCreating", fmt.Sprintf("Creating %s pooler", poolerType)) + pooler := r.buildCNPGPooler(postgresCluster, class, cnpgCluster, poolerType) + return r.Create(ctx, pooler) + } + + return err +} + +// buildCNPGPooler constructs a CNPG Pooler object. +func (r *PostgresClusterReconciler) buildCNPGPooler( + postgresCluster *enterprisev4.PostgresCluster, + class *enterprisev4.PostgresClusterClass, + cnpgCluster *cnpgv1.Cluster, + poolerType string, +) *cnpgv1.Pooler { + cfg := class.Spec.CNPG.ConnectionPooler + poolerName := poolerResourceName(postgresCluster.Name, poolerType) + + instances := *cfg.Instances + mode := cnpgv1.PgBouncerPoolMode(*cfg.Mode) + + pooler := &cnpgv1.Pooler{ + ObjectMeta: metav1.ObjectMeta{ + Name: poolerName, + Namespace: postgresCluster.Namespace, + }, + Spec: cnpgv1.PoolerSpec{ + Cluster: cnpgv1.LocalObjectReference{ + Name: cnpgCluster.Name, + }, + Instances: &instances, + Type: cnpgv1.PoolerType(poolerType), + PgBouncer: &cnpgv1.PgBouncerSpec{ + PoolMode: mode, + Parameters: cfg.Config, + }, + }, + } + + ctrl.SetControllerReference(postgresCluster, pooler, r.Scheme) + return pooler +} + // syncStatus maps CNPG Cluster state to PostgresCluster object. func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, err error) error { // will use Patch as we did for main reconciliation loop. @@ -246,26 +416,26 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus // If there's an error, we set the status to Error and include the error message in the condition. if err != nil { postgresCluster.Status.Phase = "Error" - r.setCondition(postgresCluster, metav1.ConditionFalse, "Error", fmt.Sprintf("Error during reconciliation: %v", err)) + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "Error", fmt.Sprintf("Error during reconciliation: %v", err)) // CNPG not existing, set status to Pending. Direct running `switch` without CNPG cluster in place will cause a panic, so we need to check for that first. } else if cnpgCluster == nil { postgresCluster.Status.Phase = "Pending" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterNotFound", "Underlying CNPG cluster object has not been created yet") + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterNotFound", "Underlying CNPG cluster object has not been created yet") // Cluster exists, map the CNPG Cluster status to our PostgresCluster status. } else { switch cnpgCluster.Status.Phase { case cnpgv1.PhaseHealthy: postgresCluster.Status.Phase = "Ready" - r.setCondition(postgresCluster, metav1.ConditionTrue, "ClusterHealthy", "CNPG cluster is in healthy state") + r.setCondition(postgresCluster, "Ready", metav1.ConditionTrue, "ClusterHealthy", "CNPG cluster is in healthy state") case cnpgv1.PhaseUnrecoverable: postgresCluster.Status.Phase = "Failed" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterCreationFailed", "CNPG cluster is in unrecoverable state") + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterCreationFailed", "CNPG cluster is in unrecoverable state") case "": postgresCluster.Status.Phase = "Pending" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterPending", "CNPG cluster is pending creation") + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterPending", "CNPG cluster is pending creation") default: postgresCluster.Status.Phase = "Provisioning" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterProvisioning", "CNPG cluster is being provisioned") + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterProvisioning", "CNPG cluster is being provisioned") } // Set the reference to the CNPG Cluster in the status. postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ @@ -275,6 +445,8 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus Name: cnpgCluster.Name, UID: cnpgCluster.UID, } + + // ConnectionPoolerStatus and PoolerReady condition are set by reconcileConnectionPooler. } if patchErr := r.Status().Patch(ctx, postgresCluster, latestPGCluster); patchErr != nil { @@ -283,10 +455,10 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus return nil } -// setCondition sets the condition of the PostgresCluster status. -func (r *PostgresClusterReconciler) setCondition(postgresCluster *enterprisev4.PostgresCluster, status metav1.ConditionStatus, reason, message string) { +// setCondition sets a condition on the PostgresCluster status. +func (r *PostgresClusterReconciler) setCondition(postgresCluster *enterprisev4.PostgresCluster, conditionType string, status metav1.ConditionStatus, reason, message string) { meta.SetStatusCondition(&postgresCluster.Status.Conditions, metav1.Condition{ - Type: "Ready", + Type: conditionType, Status: status, Reason: reason, Message: message, @@ -294,6 +466,76 @@ func (r *PostgresClusterReconciler) setCondition(postgresCluster *enterprisev4.P }) } +// syncPoolerStatus populates ConnectionPoolerStatus and the PoolerReady condition. +// It returns true when all poolers are ready, false otherwise. +// The caller decides how pooler readiness affects the overall phase. +func (r *PostgresClusterReconciler) syncPoolerStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) bool { + rwPooler := &cnpgv1.Pooler{} + rwErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, "rw"), + Namespace: postgresCluster.Namespace, + }, rwPooler) + + roPooler := &cnpgv1.Pooler{} + roErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, "ro"), + Namespace: postgresCluster.Namespace, + }, roPooler) + + postgresCluster.Status.ConnectionPoolerStatus = &enterprisev4.ConnectionPoolerStatus{ + Enabled: true, + } + + rwReady := r.isPoolerReady(rwPooler, rwErr) + roReady := r.isPoolerReady(roPooler, roErr) + + if rwReady && roReady { + rwDesired, rwScheduled := r.getPoolerInstanceCount(rwPooler) + roDesired, roScheduled := r.getPoolerInstanceCount(roPooler) + r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionTrue, "AllInstancesReady", fmt.Sprintf("RW: %d/%d, RO: %d/%d", rwScheduled, rwDesired, roScheduled, roDesired)) + return true + } + + rwStatus := r.getPoolerStatusString(rwPooler, rwErr) + roStatus := r.getPoolerStatusString(roPooler, roErr) + r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "PoolersNotReady", fmt.Sprintf("RW: %s, RO: %s", rwStatus, roStatus)) + return false +} + +// isPoolerReady checks if a pooler has all instances scheduled. +// Note: CNPG PoolerStatus only tracks scheduled instances, not ready pods. +func (r *PostgresClusterReconciler) isPoolerReady(pooler *cnpgv1.Pooler, err error) bool { + if err != nil { + return false + } + desiredInstances := int32(1) + if pooler.Spec.Instances != nil { + desiredInstances = *pooler.Spec.Instances + } + return pooler.Status.Instances >= desiredInstances +} + +// getPoolerInstanceCount returns the number of scheduled instances for a pooler. +func (r *PostgresClusterReconciler) getPoolerInstanceCount(pooler *cnpgv1.Pooler) (desired int32, scheduled int32) { + desired = int32(1) + if pooler.Spec.Instances != nil { + desired = *pooler.Spec.Instances + } + return desired, pooler.Status.Instances +} + +// getPoolerStatusString returns a human-readable status string for a pooler. +func (r *PostgresClusterReconciler) getPoolerStatusString(pooler *cnpgv1.Pooler, err error) string { + if apierrors.IsNotFound(err) { + return "not found" + } + if err != nil { + return "error" + } + desired, scheduled := r.getPoolerInstanceCount(pooler) + return fmt.Sprintf("%d/%d", scheduled, desired) +} + // reconcileManagedRoles synchronizes ManagedRoles from PostgresCluster spec to CNPG Cluster managed.roles using diff-based patching func (r *PostgresClusterReconciler) reconcileManagedRoles(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) error { logger := logs.FromContext(ctx) @@ -387,6 +629,7 @@ func (r *PostgresClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&enterprisev4.PostgresCluster{}). Owns(&cnpgv1.Cluster{}). + Owns(&cnpgv1.Pooler{}). Named("postgresCluster"). Complete(r) } From 56ad3402c080eaa331f957462313e8e5281705f1 Mon Sep 17 00:00:00 2001 From: Kamil Ubych Date: Fri, 20 Feb 2026 12:07:00 +0100 Subject: [PATCH 2/6] enabling disabling status fixed --- .../controller/postgrescluster_controller.go | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index 4ee6f8a10..daabd3749 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -73,6 +73,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ // Initialize as nil so syncStatus knows if the object was actually found/created. var cnpgCluster *cnpgv1.Cluster + var poolerEnabled bool // 1. Fetch the PostgresCluster instance, stop, if not found. postgresCluster := &enterprisev4.PostgresCluster{} @@ -88,7 +89,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ // This deferred function will run at the end of the reconciliation process, regardless of whether it exits early due to an error or completes successfully. // It ensures that we always attempt to sync the status of the PostgresCluster based on the final state of the CNPG Cluster and any errors that may have occurred. defer func() { - if syncErr := r.syncStatus(ctx, postgresCluster, cnpgCluster, err); syncErr != nil { + if syncErr := r.syncStatus(ctx, postgresCluster, cnpgCluster, poolerEnabled, err); syncErr != nil { err = errors.Join(err, fmt.Errorf("failed to sync status in deferred function: %w", syncErr)) } }() @@ -155,6 +156,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return res, err } // 7. Reconcile Connection Pooler if enabled in class + poolerEnabled = r.isConnectionPoolerEnabled(postgresClusterClass, postgresCluster) requeuePooler, poolerErr := r.reconcileConnectionPooler(ctx, postgresCluster, postgresClusterClass, cnpgCluster) if poolerErr != nil { logger.Error(poolerErr, "Failed to reconcile connection pooler") @@ -285,8 +287,7 @@ func (r *PostgresClusterReconciler) reconcileConnectionPooler( if err := r.deleteConnectionPoolers(ctx, postgresCluster); err != nil { return false, err } - postgresCluster.Status.ConnectionPoolerStatus = nil - meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, "PoolerReady") + // ConnectionPoolerStatus and PoolerReady condition are cleared by syncStatus in the defer. return false, nil } @@ -313,8 +314,11 @@ func (r *PostgresClusterReconciler) reconcileConnectionPooler( } // Check if poolers are ready — requeue if they're still provisioning. - poolersReady := r.syncPoolerStatus(ctx, postgresCluster) - return !poolersReady, nil + rwPooler := &cnpgv1.Pooler{} + rwErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, "rw"), Namespace: postgresCluster.Namespace}, rwPooler) + roPooler := &cnpgv1.Pooler{} + roErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, "ro"), Namespace: postgresCluster.Namespace}, roPooler) + return !(r.isPoolerReady(rwPooler, rwErr) && r.isPoolerReady(roPooler, roErr)), nil } // deleteConnectionPoolers removes RW and RO pooler resources if they exist. @@ -409,7 +413,7 @@ func (r *PostgresClusterReconciler) buildCNPGPooler( } // syncStatus maps CNPG Cluster state to PostgresCluster object. -func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, err error) error { +func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, poolerEnabled bool, err error) error { // will use Patch as we did for main reconciliation loop. latestPGCluster := client.MergeFrom(postgresCluster.DeepCopy()) @@ -446,7 +450,12 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus UID: cnpgCluster.UID, } - // ConnectionPoolerStatus and PoolerReady condition are set by reconcileConnectionPooler. + if poolerEnabled { + r.syncPoolerStatus(ctx, postgresCluster) + } else { + postgresCluster.Status.ConnectionPoolerStatus = nil + meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, "PoolerReady") + } } if patchErr := r.Status().Patch(ctx, postgresCluster, latestPGCluster); patchErr != nil { From d661e84d618d62116ac0b7a0eb795938cf992d85 Mon Sep 17 00:00:00 2001 From: dpishchenkov Date: Thu, 26 Feb 2026 13:13:31 +0100 Subject: [PATCH 3/6] rebased to features/database-controllers --- internal/controller/postgrescluster_controller.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index daabd3749..b383c324e 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -20,8 +20,6 @@ import ( "context" "errors" "fmt" - "time" - cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" enterprisev4 "github.com/splunk/splunk-operator/api/v4" corev1 "k8s.io/api/core/v1" @@ -125,7 +123,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{RequeueAfter: retryDelay}, nil case err != nil: logger.Error(err, "Failed to get CNPG Cluster") - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterGetFailed", err.Error()) + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterGetFailed", err.Error()) return res, err } cnpgCluster = existingCNPG @@ -152,7 +150,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ // 7. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster if err := r.reconcileManagedRoles(ctx, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile managed roles") - r.setCondition(postgresCluster, metav1.ConditionFalse, "ManagedRolesFailed", err.Error()) + r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ManagedRolesFailed", err.Error()) return res, err } // 7. Reconcile Connection Pooler if enabled in class @@ -164,7 +162,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return res, poolerErr } if requeuePooler { - return ctrl.Result{RequeueAfter: retryDelaytimer}, nil + return ctrl.Result{RequeueAfter: retryDelay}, nil } // 7. Report progress back to the user and manage the reconciliation lifecycle. logger.Info("Reconciliation completed successfully", "name", postgresCluster.Name) @@ -350,7 +348,6 @@ func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, return nil } -// ensureConnectionPooler creates a CNPG Pooler resource if it doesn't exist. // ensureConnectionPooler creates a CNPG Pooler resource if it doesn't exist. func (r *PostgresClusterReconciler) ensureConnectionPooler( ctx context.Context, From fa18efa8e89785998d84ee23e9a6e34e1412199c Mon Sep 17 00:00:00 2001 From: dpishchenkov Date: Thu, 26 Feb 2026 16:45:40 +0100 Subject: [PATCH 4/6] used enums instead of string hard-coded values --- .../controller/postgrescluster_controller.go | 81 ++++++++++--------- .../controller/postgresdatabase_controller.go | 37 --------- .../postgresoperator_common_types.go | 73 +++++++++++++++++ 3 files changed, 116 insertions(+), 75 deletions(-) create mode 100644 internal/controller/postgresoperator_common_types.go diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index b383c324e..e1e821b98 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -96,7 +96,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ postgresClusterClass := &enterprisev4.PostgresClusterClass{} if getClusterClassErr := r.Get(ctx, client.ObjectKey{Name: postgresCluster.Spec.Class}, postgresClusterClass); getClusterClassErr != nil { logger.Error(getClusterClassErr, "Unable to fetch referenced PostgresClusterClass", "className", postgresCluster.Spec.Class) - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterClassNotFound", getClusterClassErr.Error()) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterClassNotFound, getClusterClassErr.Error()) return res, getClusterClassErr } @@ -115,15 +115,15 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ newCluster := r.buildCNPGCluster(postgresCluster, mergedConfig) if err = r.Create(ctx, newCluster); err != nil { logger.Error(err, "Failed to create CNPG Cluster") - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterBuildFailed", err.Error()) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterBuildFailed, err.Error()) return res, err } - r.setCondition(postgresCluster, "Ready", metav1.ConditionTrue, "ClusterBuildSucceeded", fmt.Sprintf("CNPG cluster build Succeeded: %s", postgresCluster.Name)) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonClusterBuildSucceeded, fmt.Sprintf("CNPG cluster build Succeeded: %s", postgresCluster.Name)) logger.Info("CNPG Cluster created successfully, requeueing for status update", "name", postgresCluster.Name) return ctrl.Result{RequeueAfter: retryDelay}, nil case err != nil: logger.Error(err, "Failed to get CNPG Cluster") - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterGetFailed", err.Error()) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterGetFailed, err.Error()) return res, err } cnpgCluster = existingCNPG @@ -140,7 +140,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{Requeue: true}, nil } logger.Error(patchCNPGClusterErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterUpdateFailed", patchCNPGClusterErr.Error()) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterPatchFailed, patchCNPGClusterErr.Error()) return res, patchCNPGClusterErr } logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) @@ -150,7 +150,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ // 7. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster if err := r.reconcileManagedRoles(ctx, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile managed roles") - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ManagedRolesFailed", err.Error()) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, err.Error()) return res, err } // 7. Reconcile Connection Pooler if enabled in class @@ -158,7 +158,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ requeuePooler, poolerErr := r.reconcileConnectionPooler(ctx, postgresCluster, postgresClusterClass, cnpgCluster) if poolerErr != nil { logger.Error(poolerErr, "Failed to reconcile connection pooler") - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "PoolerReconciliationFailed", poolerErr.Error()) + r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, poolerErr.Error()) return res, poolerErr } if requeuePooler { @@ -166,7 +166,7 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ } // 7. Report progress back to the user and manage the reconciliation lifecycle. logger.Info("Reconciliation completed successfully", "name", postgresCluster.Name) - r.setCondition(postgresCluster, "Ready", metav1.ConditionTrue, "ClusterUpdateSucceeded", fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonClusterUpdateSucceeded, fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) return res, nil } @@ -224,8 +224,8 @@ func (r *PostgresClusterReconciler) buildCNPGClusterSpec(mergedConfig *enterpris }, Bootstrap: &cnpgv1.BootstrapConfiguration{ InitDB: &cnpgv1.BootstrapInitDB{ - Database: "postgres", - Owner: "postgres", + Database: defaultDatabaseName, + Owner: defaultUsername, }, }, StorageConfiguration: cnpgv1.StorageConfiguration{ @@ -291,31 +291,31 @@ func (r *PostgresClusterReconciler) reconcileConnectionPooler( if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { logger.Info("CNPG Cluster not healthy, waiting before creating poolers") - r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "ClusterNotHealthy", "Waiting for CNPG cluster to become healthy before creating poolers") + r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonCNPGClusterNotHealthy, "Waiting for CNPG cluster to become healthy before creating poolers") return false, nil } if class.Spec.CNPG == nil || class.Spec.CNPG.ConnectionPooler == nil { logger.Info("Connection pooler enabled but config missing in class", "class", class.Name) - r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "PoolerConfigMissing", fmt.Sprintf("Connection pooler is enabled but cnpg.connectionPooler config is missing in class %s", class.Name)) + r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerConfigMissing, fmt.Sprintf("Connection pooler is enabled but cnpg.connectionPooler config is missing in class %s", class.Name)) return false, nil } // Create/Update RW Pooler - if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, "rw"); err != nil { + if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, readWriteEndpoint); err != nil { return false, fmt.Errorf("failed to reconcile RW pooler: %w", err) } // Create/Update RO Pooler - if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, "ro"); err != nil { + if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, readOnlyEndpoint); err != nil { return false, fmt.Errorf("failed to reconcile RO pooler: %w", err) } // Check if poolers are ready — requeue if they're still provisioning. rwPooler := &cnpgv1.Pooler{} - rwErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, "rw"), Namespace: postgresCluster.Namespace}, rwPooler) + rwErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), Namespace: postgresCluster.Namespace}, rwPooler) roPooler := &cnpgv1.Pooler{} - roErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, "ro"), Namespace: postgresCluster.Namespace}, roPooler) + roErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), Namespace: postgresCluster.Namespace}, roPooler) return !(r.isPoolerReady(rwPooler, rwErr) && r.isPoolerReady(roPooler, roErr)), nil } @@ -323,7 +323,7 @@ func (r *PostgresClusterReconciler) reconcileConnectionPooler( func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) error { logger := logs.FromContext(ctx) - for _, poolerType := range []string{"rw", "ro"} { + for _, poolerType := range []string{readWriteEndpoint, readOnlyEndpoint} { poolerName := poolerResourceName(postgresCluster.Name, poolerType) pooler := &cnpgv1.Pooler{} @@ -366,7 +366,7 @@ func (r *PostgresClusterReconciler) ensureConnectionPooler( if apierrors.IsNotFound(err) { logs.FromContext(ctx).Info("Creating CNPG Pooler", "name", poolerName, "type", poolerType) - r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "PoolerCreating", fmt.Sprintf("Creating %s pooler", poolerType)) + r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerCreating, fmt.Sprintf("Creating %s pooler", poolerType)) pooler := r.buildCNPGPooler(postgresCluster, class, cnpgCluster, poolerType) return r.Create(ctx, pooler) } @@ -416,27 +416,27 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus // If there's an error, we set the status to Error and include the error message in the condition. if err != nil { - postgresCluster.Status.Phase = "Error" - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "Error", fmt.Sprintf("Error during reconciliation: %v", err)) + postgresCluster.Status.Phase = string(failed) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonFailed, fmt.Sprintf("Error during reconciliation: %v", err)) // CNPG not existing, set status to Pending. Direct running `switch` without CNPG cluster in place will cause a panic, so we need to check for that first. } else if cnpgCluster == nil { - postgresCluster.Status.Phase = "Pending" - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterNotFound", "Underlying CNPG cluster object has not been created yet") + postgresCluster.Status.Phase = string(pending) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonNotFound, "Underlying CNPG cluster object has not been created yet") // Cluster exists, map the CNPG Cluster status to our PostgresCluster status. } else { switch cnpgCluster.Status.Phase { case cnpgv1.PhaseHealthy: - postgresCluster.Status.Phase = "Ready" - r.setCondition(postgresCluster, "Ready", metav1.ConditionTrue, "ClusterHealthy", "CNPG cluster is in healthy state") + postgresCluster.Status.Phase = string(ready) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonAvailable, "CNPG cluster is in healthy state") case cnpgv1.PhaseUnrecoverable: - postgresCluster.Status.Phase = "Failed" - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterCreationFailed", "CNPG cluster is in unrecoverable state") + postgresCluster.Status.Phase = string(failed) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterCreationFailed, "CNPG cluster is in unrecoverable state") case "": - postgresCluster.Status.Phase = "Pending" - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterPending", "CNPG cluster is pending creation") + postgresCluster.Status.Phase = string(pending) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterPending, "CNPG cluster is pending creation") default: - postgresCluster.Status.Phase = "Provisioning" - r.setCondition(postgresCluster, "Ready", metav1.ConditionFalse, "ClusterProvisioning", "CNPG cluster is being provisioned") + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterProvisioning, "CNPG cluster is being provisioned") } // Set the reference to the CNPG Cluster in the status. postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ @@ -451,7 +451,7 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus r.syncPoolerStatus(ctx, postgresCluster) } else { postgresCluster.Status.ConnectionPoolerStatus = nil - meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, "PoolerReady") + meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, string(poolerReady)) } } @@ -462,11 +462,16 @@ func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresClus } // setCondition sets a condition on the PostgresCluster status. -func (r *PostgresClusterReconciler) setCondition(postgresCluster *enterprisev4.PostgresCluster, conditionType string, status metav1.ConditionStatus, reason, message string) { +func (r *PostgresClusterReconciler) setCondition( + postgresCluster *enterprisev4.PostgresCluster, + conditionType conditionTypes, + status metav1.ConditionStatus, + reason conditionReasons, + message string) { meta.SetStatusCondition(&postgresCluster.Status.Conditions, metav1.Condition{ - Type: conditionType, + Type: string(conditionType), Status: status, - Reason: reason, + Reason: string(reason), Message: message, ObservedGeneration: postgresCluster.Generation, }) @@ -478,13 +483,13 @@ func (r *PostgresClusterReconciler) setCondition(postgresCluster *enterprisev4.P func (r *PostgresClusterReconciler) syncPoolerStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) bool { rwPooler := &cnpgv1.Pooler{} rwErr := r.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(postgresCluster.Name, "rw"), + Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), Namespace: postgresCluster.Namespace, }, rwPooler) roPooler := &cnpgv1.Pooler{} roErr := r.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(postgresCluster.Name, "ro"), + Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), Namespace: postgresCluster.Namespace, }, roPooler) @@ -498,13 +503,13 @@ func (r *PostgresClusterReconciler) syncPoolerStatus(ctx context.Context, postgr if rwReady && roReady { rwDesired, rwScheduled := r.getPoolerInstanceCount(rwPooler) roDesired, roScheduled := r.getPoolerInstanceCount(roPooler) - r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionTrue, "AllInstancesReady", fmt.Sprintf("RW: %d/%d, RO: %d/%d", rwScheduled, rwDesired, roScheduled, roDesired)) + r.setCondition(postgresCluster, poolerReady, metav1.ConditionTrue, reasonAllInstancesReady, fmt.Sprintf("%s: %d/%d, %s: %d/%d", readWriteEndpoint, rwScheduled, rwDesired, readOnlyEndpoint, roScheduled, roDesired)) return true } rwStatus := r.getPoolerStatusString(rwPooler, rwErr) roStatus := r.getPoolerStatusString(roPooler, roErr) - r.setCondition(postgresCluster, "PoolerReady", metav1.ConditionFalse, "PoolersNotReady", fmt.Sprintf("RW: %s, RO: %s", rwStatus, roStatus)) + r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolersNotReady, fmt.Sprintf("%s: %s, %s: %s", readWriteEndpoint, rwStatus, readOnlyEndpoint, roStatus)) return false } diff --git a/internal/controller/postgresdatabase_controller.go b/internal/controller/postgresdatabase_controller.go index 6a4290a03..98461cfc8 100644 --- a/internal/controller/postgresdatabase_controller.go +++ b/internal/controller/postgresdatabase_controller.go @@ -39,43 +39,6 @@ import ( enterprisev4 "github.com/splunk/splunk-operator/api/v4" ) -type reconcilePhases string -type conditionTypes string -type conditionReasons string -type clusterReadyStatus string - -const ( - retryDelay = time.Second * 15 - // phases - ready reconcilePhases = "Ready" - pending reconcilePhases = "Pending" - provisioning reconcilePhases = "Provisioning" - failed reconcilePhases = "Failed" - - // Conditiontypes - clusterReady conditionTypes = "ClusterReady" - secretsReady conditionTypes = "SecretsReady" - usersReady conditionTypes = "UsersReady" - databasesReady conditionTypes = "DatabasesReady" - privilegesReady conditionTypes = "PrivilegesReady" - - // Condition reasons - reasonNotFound conditionReasons = "NotFound" - reasonProvisioning conditionReasons = "Provisioning" - reasonClusterInfoFetchFailed conditionReasons = "ClusterInfoFetchNotPossible" - reasonAvailable conditionReasons = "Available" - reasonConfiguring conditionReasons = "Configuring" - reasonWaitingForCNPG conditionReasons = "WaitingForCNPG" - reasonFailed conditionReasons = "Failed" - reasonCreating conditionReasons = "Creating" - - // Cluster status - ClusterNotFound clusterReadyStatus = "NotFound" - ClusterNotReady clusterReadyStatus = "NotReady" - ClusterNoProvisionerRef clusterReadyStatus = "NoProvisionerRef" - ClusterReady clusterReadyStatus = "Ready" -) - // PostgresDatabaseReconciler reconciles a PostgresDatabase object type PostgresDatabaseReconciler struct { client.Client diff --git a/internal/controller/postgresoperator_common_types.go b/internal/controller/postgresoperator_common_types.go new file mode 100644 index 000000000..a568bdf93 --- /dev/null +++ b/internal/controller/postgresoperator_common_types.go @@ -0,0 +1,73 @@ +package controller + +import ( + "time" +) + +type reconcilePhases string +type conditionTypes string +type conditionReasons string +type clusterReadyStatus string + +const ( + // default requeue delay + retryDelay = time.Second * 15 + //cluster endpoint suffixes + readOnlyEndpoint string = "ro" + readWriteEndpoint string = "rw" + // default database name + defaultDatabaseName string = "postgres" + defaultUsername string = "postgres" + + // phases + ready reconcilePhases = "Ready" + pending reconcilePhases = "Pending" + provisioning reconcilePhases = "Provisioning" + failed reconcilePhases = "Failed" + + // Conditiontypes + clusterReady conditionTypes = "ClusterReady" + poolerReady conditionTypes = "PoolerReady" + secretsReady conditionTypes = "SecretsReady" + usersReady conditionTypes = "UsersReady" + databasesReady conditionTypes = "DatabasesReady" + privilegesReady conditionTypes = "PrivilegesReady" + + // Condition reasons + reasonNotFound conditionReasons = "NotFound" + reasonProvisioning conditionReasons = "Provisioning" + reasonClusterInfoFetchFailed conditionReasons = "ClusterInfoFetchNotPossible" + reasonAvailable conditionReasons = "Available" + reasonConfiguring conditionReasons = "Configuring" + reasonWaitingForCNPG conditionReasons = "WaitingForCNPG" + reasonFailed conditionReasons = "Failed" + reasonCreating conditionReasons = "Creating" + + // Additional condition reasons for clusterReady conditionType + reasonClusterClassNotFound conditionReasons = "ClusterClassNotFound" + reasonManagedRolesFailed conditionReasons = "ManagedRolesReconciliationFailed" + reasonClusterBuildFailed conditionReasons = "ClusterBuildFailed" + reasonClusterBuildSucceeded conditionReasons = "ClusterBuildSucceeded" + reasonClusterGetFailed conditionReasons = "ClusterGetFailed" + reasonClusterPatchFailed conditionReasons = "ClusterPatchFailed" + reasonClusterUpdateSucceeded conditionReasons = "ClusterUpdateSucceeded" + reasonClusterCreationFailed conditionReasons = "ClusterCreationFailed" + reasonClusterPending conditionReasons = "ClusterPending" + reasonClusterProvisioning conditionReasons = "ClusterProvisioning" + + // Additional condition reasons for poolerReady conditionType + reasonPoolerReconciliationFailed conditionReasons = "PoolerReconciliationFailed" + reasonPoolerConfigMissing conditionReasons = "PoolerConfigMissing" + reasonPoolerCreating conditionReasons = "PoolerCreating" + reasonAllInstancesReady conditionReasons = "AllInstancesReady" + reasonPoolersNotReady conditionReasons = "PoolersNotReady" + + // Additional condition reasons for mapping CNPG cluster statuses + reasonCNPGClusterNotHealthy conditionReasons = "CNPGClusterNotHealthy" + + // Cluster status + ClusterNotFound clusterReadyStatus = "NotFound" + ClusterNotReady clusterReadyStatus = "NotReady" + ClusterNoProvisionerRef clusterReadyStatus = "NoProvisionerRef" + ClusterReady clusterReadyStatus = "Ready" +) From a08300de996843b4f48c42a6d404f5bb6e92e416 Mon Sep 17 00:00:00 2001 From: dpishchenkov Date: Thu, 26 Feb 2026 18:52:37 +0100 Subject: [PATCH 5/6] update status mapping --- .../controller/postgrescluster_controller.go | 105 +++++++++++++----- .../postgresoperator_common_types.go | 40 +++++-- 2 files changed, 108 insertions(+), 37 deletions(-) diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index e1e821b98..138e55049 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -40,20 +40,6 @@ type PostgresClusterReconciler struct { Scheme *runtime.Scheme } -// This struct is used to compare the merged configuration from PostgresClusterClass and PostgresClusterSpec -// in a normalized way, and not to use CNPG-default values which are causing false positive diff state while reconciliation loop. -// It contains only the fields that are relevant for our reconciliation and that we want to compare when deciding whether to update the CNPG Cluster spec or not. -type normalizedCNPGClusterSpec struct { - ImageName string - Instances int - // Parameters we set, instead of complete spec from CNPG - CustomDefinedParameters map[string]string - PgHBA []string - DefaultDatabase string - Owner string - StorageSize string - Resources corev1.ResourceRequirements -} // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters/status,verbs=get;update;patch @@ -101,7 +87,13 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ } // 3. Create the merged configuration by overlaying PostgresClusterSpec on top of PostgresClusterClass defaults. - mergedConfig := r.getMergedConfig(postgresClusterClass, postgresCluster) + mergedConfig, mergeErr := r.getMergedConfig(postgresClusterClass, postgresCluster) + if mergeErr != nil { + logger.Error(mergeErr, "Failed to merge PostgresCluster configuration") + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonInvalidConfiguration, mergeErr.Error()) + err := errors.Join(err, fmt.Errorf("failed to merge configuration: %w", mergeErr)) + return res, err + } // 4. Build the desired CNPG Cluster spec based on the merged configuration. desiredSpec := r.buildCNPGClusterSpec(mergedConfig) @@ -147,13 +139,13 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{RequeueAfter: retryDelay}, nil } - // 7. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster + // 7a. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster if err := r.reconcileManagedRoles(ctx, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile managed roles") r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, err.Error()) return res, err } - // 7. Reconcile Connection Pooler if enabled in class + // 7b. Reconcile Connection Pooler if enabled in class poolerEnabled = r.isConnectionPoolerEnabled(postgresClusterClass, postgresCluster) requeuePooler, poolerErr := r.reconcileConnectionPooler(ctx, postgresCluster, postgresClusterClass, cnpgCluster) if poolerErr != nil { @@ -164,14 +156,14 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ if requeuePooler { return ctrl.Result{RequeueAfter: retryDelay}, nil } - // 7. Report progress back to the user and manage the reconciliation lifecycle. + // 8. Report progress back to the user and manage the reconciliation lifecycle. logger.Info("Reconciliation completed successfully", "name", postgresCluster.Name) r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonClusterUpdateSucceeded, fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) return res, nil } // getMergedConfig merges the configuration from the PostgresClusterClass into the PostgresClusterSpec, giving precedence to the PostgresClusterSpec values. -func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) *enterprisev4.PostgresClusterSpec { +func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) (*enterprisev4.PostgresClusterSpec, error) { resultConfig := cluster.Spec.DeepCopy() classDefaults := clusterClass.Spec.Config @@ -194,6 +186,10 @@ func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.P resultConfig.PgHBA = classDefaults.PgHBA } + if resultConfig.Instances == nil || resultConfig.PostgresVersion == nil || resultConfig.Storage == nil { + return nil, fmt.Errorf("invalid configuration for class %s: instances, postgresVersion and storage are required", clusterClass.Name) + } + // Ensure that maps and slices are initialized to empty if they are still nil after merging, to prevent potential nil pointer dereferences later on. if resultConfig.PostgreSQLConfig == nil { resultConfig.PostgreSQLConfig = make(map[string]string) @@ -206,7 +202,7 @@ func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.P resultConfig.Resources = &corev1.ResourceRequirements{} } - return resultConfig + return resultConfig, nil } // buildCNPGClusterSpec builds the desired CNPG ClusterSpec. @@ -411,34 +407,85 @@ func (r *PostgresClusterReconciler) buildCNPGPooler( // syncStatus maps CNPG Cluster state to PostgresCluster object. func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, poolerEnabled bool, err error) error { - // will use Patch as we did for main reconciliation loop. latestPGCluster := client.MergeFrom(postgresCluster.DeepCopy()) - // If there's an error, we set the status to Error and include the error message in the condition. if err != nil { postgresCluster.Status.Phase = string(failed) r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonFailed, fmt.Sprintf("Error during reconciliation: %v", err)) - // CNPG not existing, set status to Pending. Direct running `switch` without CNPG cluster in place will cause a panic, so we need to check for that first. } else if cnpgCluster == nil { postgresCluster.Status.Phase = string(pending) r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonNotFound, "Underlying CNPG cluster object has not been created yet") - // Cluster exists, map the CNPG Cluster status to our PostgresCluster status. } else { switch cnpgCluster.Status.Phase { case cnpgv1.PhaseHealthy: postgresCluster.Status.Phase = string(ready) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonAvailable, "CNPG cluster is in healthy state") + r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonCNPGClusterHealthy, "Cluster is up and running") + + case cnpgv1.PhaseFirstPrimary, + cnpgv1.PhaseCreatingReplica, + cnpgv1.PhaseWaitingForInstancesToBeActive: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonProvisioning, fmt.Sprintf("CNPG cluster provisioning: %s", cnpgCluster.Status.Phase)) + + case cnpgv1.PhaseSwitchover: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGSwitchover, "Cluster changing primary node") + + case cnpgv1.PhaseFailOver: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGFailingOver, "Pod missing, need to change primary") + + case cnpgv1.PhaseInplacePrimaryRestart, + cnpgv1.PhaseInplaceDeletePrimaryRestart: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGRestarting, fmt.Sprintf("CNPG cluster restarting: %s", cnpgCluster.Status.Phase)) + + case cnpgv1.PhaseUpgrade, + cnpgv1.PhaseMajorUpgrade, + cnpgv1.PhaseUpgradeDelayed, + cnpgv1.PhaseOnlineUpgrading: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGUpgrading, fmt.Sprintf("CNPG cluster upgrading: %s", cnpgCluster.Status.Phase)) + + case cnpgv1.PhaseApplyingConfiguration: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGApplyingConfig, "Configuration change is being applied") + + case cnpgv1.PhaseReplicaClusterPromotion: + postgresCluster.Status.Phase = string(provisioning) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGPromoting, "Replica is being promoted to primary") + + case cnpgv1.PhaseWaitingForUser: + postgresCluster.Status.Phase = string(failed) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGWaitingForUser, "Action from the user is required") + case cnpgv1.PhaseUnrecoverable: postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterCreationFailed, "CNPG cluster is in unrecoverable state") + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGUnrecoverable, "Cluster failed, needs manual intervention") + + case cnpgv1.PhaseCannotCreateClusterObjects: + postgresCluster.Status.Phase = string(failed) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGProvisioningFailed, "Cluster resources cannot be created") + + case cnpgv1.PhaseUnknownPlugin, + cnpgv1.PhaseFailurePlugin: + postgresCluster.Status.Phase = string(failed) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGPluginError, fmt.Sprintf("CNPG plugin error: %s", cnpgCluster.Status.Phase)) + + case cnpgv1.PhaseImageCatalogError, + cnpgv1.PhaseArchitectureBinaryMissing: + postgresCluster.Status.Phase = string(failed) + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGImageError, fmt.Sprintf("CNPG image error: %s", cnpgCluster.Status.Phase)) + case "": postgresCluster.Status.Phase = string(pending) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterPending, "CNPG cluster is pending creation") + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonProvisioning, "CNPG cluster is pending creation") + default: postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterProvisioning, "CNPG cluster is being provisioned") + r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonProvisioning, fmt.Sprintf("CNPG cluster phase: %s", cnpgCluster.Status.Phase)) } - // Set the reference to the CNPG Cluster in the status. + postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ APIVersion: "postgresql.cnpg.io/v1", Kind: "Cluster", diff --git a/internal/controller/postgresoperator_common_types.go b/internal/controller/postgresoperator_common_types.go index a568bdf93..e922515b4 100644 --- a/internal/controller/postgresoperator_common_types.go +++ b/internal/controller/postgresoperator_common_types.go @@ -1,9 +1,25 @@ package controller import ( + corev1 "k8s.io/api/core/v1" "time" ) +// This struct is used to compare the merged configuration from PostgresClusterClass and PostgresClusterSpec +// in a normalized way, and not to use CNPG-default values which are causing false positive diff state while reconciliation loop. +// It contains only the fields that are relevant for our reconciliation and that we want to compare when deciding whether to update the CNPG Cluster spec or not. +type normalizedCNPGClusterSpec struct { + ImageName string + Instances int + // Parameters we set, instead of complete spec from CNPG + CustomDefinedParameters map[string]string + PgHBA []string + DefaultDatabase string + Owner string + StorageSize string + Resources corev1.ResourceRequirements +} + type reconcilePhases string type conditionTypes string type conditionReasons string @@ -12,7 +28,7 @@ type clusterReadyStatus string const ( // default requeue delay retryDelay = time.Second * 15 - //cluster endpoint suffixes + // cluster endpoint suffixes readOnlyEndpoint string = "ro" readWriteEndpoint string = "rw" // default database name @@ -25,7 +41,7 @@ const ( provisioning reconcilePhases = "Provisioning" failed reconcilePhases = "Failed" - // Conditiontypes + // Condition types clusterReady conditionTypes = "ClusterReady" poolerReady conditionTypes = "PoolerReady" secretsReady conditionTypes = "SecretsReady" @@ -38,10 +54,8 @@ const ( reasonProvisioning conditionReasons = "Provisioning" reasonClusterInfoFetchFailed conditionReasons = "ClusterInfoFetchNotPossible" reasonAvailable conditionReasons = "Available" - reasonConfiguring conditionReasons = "Configuring" reasonWaitingForCNPG conditionReasons = "WaitingForCNPG" reasonFailed conditionReasons = "Failed" - reasonCreating conditionReasons = "Creating" // Additional condition reasons for clusterReady conditionType reasonClusterClassNotFound conditionReasons = "ClusterClassNotFound" @@ -51,9 +65,7 @@ const ( reasonClusterGetFailed conditionReasons = "ClusterGetFailed" reasonClusterPatchFailed conditionReasons = "ClusterPatchFailed" reasonClusterUpdateSucceeded conditionReasons = "ClusterUpdateSucceeded" - reasonClusterCreationFailed conditionReasons = "ClusterCreationFailed" - reasonClusterPending conditionReasons = "ClusterPending" - reasonClusterProvisioning conditionReasons = "ClusterProvisioning" + reasonInvalidConfiguration conditionReasons = "InvalidConfiguration" // Additional condition reasons for poolerReady conditionType reasonPoolerReconciliationFailed conditionReasons = "PoolerReconciliationFailed" @@ -63,7 +75,19 @@ const ( reasonPoolersNotReady conditionReasons = "PoolersNotReady" // Additional condition reasons for mapping CNPG cluster statuses - reasonCNPGClusterNotHealthy conditionReasons = "CNPGClusterNotHealthy" + reasonCNPGClusterNotHealthy conditionReasons = "CNPGClusterNotHealthy" + reasonCNPGClusterHealthy conditionReasons = "CNPGClusterHealthy" + reasonCNPGSwitchover conditionReasons = "CNPGSwitchover" + reasonCNPGFailingOver conditionReasons = "CNPGFailingOver" + reasonCNPGRestarting conditionReasons = "CNPGRestarting" + reasonCNPGUpgrading conditionReasons = "CNPGUpgrading" + reasonCNPGApplyingConfig conditionReasons = "CNPGApplyingConfiguration" + reasonCNPGPromoting conditionReasons = "CNPGPromoting" + reasonCNPGWaitingForUser conditionReasons = "CNPGWaitingForUser" + reasonCNPGUnrecoverable conditionReasons = "CNPGUnrecoverable" + reasonCNPGProvisioningFailed conditionReasons = "CNPGProvisioningFailed" + reasonCNPGPluginError conditionReasons = "CNPGPluginError" + reasonCNPGImageError conditionReasons = "CNPGImageError" // Cluster status ClusterNotFound clusterReadyStatus = "NotFound" From ac532738338d953d4ea3fc9d569af8950b1c3c58 Mon Sep 17 00:00:00 2001 From: dpishchenkov Date: Fri, 27 Feb 2026 15:58:15 +0100 Subject: [PATCH 6/6] Fix validation logic for poolers --- api/v4/postgrescluster_types.go | 6 + api/v4/postgresclusterclass_types.go | 1 + ...nterprise_v4_postgresclusterclass_dev.yaml | 7 + .../controller/postgrescluster_controller.go | 607 +++++++++++------- .../postgresoperator_common_types.go | 29 +- 5 files changed, 386 insertions(+), 264 deletions(-) diff --git a/api/v4/postgrescluster_types.go b/api/v4/postgrescluster_types.go index 3b1e81b32..f6ae81ea7 100644 --- a/api/v4/postgrescluster_types.go +++ b/api/v4/postgrescluster_types.go @@ -45,6 +45,7 @@ type ManagedRole struct { // Validation rules ensure immutability of Class, and that Storage and PostgresVersion can only be set once and cannot be removed or downgraded. // +kubebuilder:validation:XValidation:rule="!has(oldSelf.postgresVersion) || (has(self.postgresVersion) && int(self.postgresVersion.split('.')[0]) >= int(oldSelf.postgresVersion.split('.')[0]))",messageExpression="!has(self.postgresVersion) ? 'postgresVersion cannot be removed once set (was: ' + oldSelf.postgresVersion + ')' : 'postgresVersion major version cannot be downgraded (from: ' + oldSelf.postgresVersion + ', to: ' + self.postgresVersion + ')'" // +kubebuilder:validation:XValidation:rule="!has(oldSelf.storage) || (has(self.storage) && quantity(self.storage).compareTo(quantity(oldSelf.storage)) >= 0)",messageExpression="!has(self.storage) ? 'storage cannot be removed once set (was: ' + string(oldSelf.storage) + ')' : 'storage size cannot be decreased (from: ' + string(oldSelf.storage) + ', to: ' + string(self.storage) + ')'" +// +kubebuilder:validation:XValidation:rule="!self.connectionPoolerEnabled || self.connectionPoolerConfig != null || (self.cnpg != null && self.cnpg.connectionPooler != null)",message="connectionPoolerConfig must be set in cluster spec or class when connectionPoolerEnabled is true" type PostgresClusterSpec struct { // This field is IMMUTABLE after creation. // +kubebuilder:validation:Required @@ -96,6 +97,11 @@ type PostgresClusterSpec struct { // +optional ConnectionPoolerEnabled *bool `json:"connectionPoolerEnabled,omitempty"` + // ConnectionPoolerConfig overrides the connection pooler configuration from the class. + // Only takes effect when connection pooling is enabled. + // +optional + ConnectionPoolerConfig *ConnectionPoolerConfig `json:"connectionPoolerConfig,omitempty"` + // ManagedRoles contains PostgreSQL roles that should be created in the cluster. // This field supports Server-Side Apply with per-role granularity, allowing // multiple PostgresDatabase controllers to manage different roles independently. diff --git a/api/v4/postgresclusterclass_types.go b/api/v4/postgresclusterclass_types.go index 430e3409e..9c4fcef44 100644 --- a/api/v4/postgresclusterclass_types.go +++ b/api/v4/postgresclusterclass_types.go @@ -114,6 +114,7 @@ const ( // ConnectionPoolerConfig defines PgBouncer connection pooler configuration. // When enabled, creates RW and RO pooler deployments for clusters using this class. +// +kubebuilder:validation:XValidation:rule="!self.connectionPoolerEnabled || self.connectionPoolerConfig != null || (self.cnpg != null && self.cnpg.connectionPooler != null)",message="connectionPoolerConfig must be set in cluster spec or class when connectionPoolerEnabled is true" type ConnectionPoolerConfig struct { // Instances is the number of PgBouncer pod replicas. // Higher values provide better availability and load distribution. diff --git a/config/samples/enterprise_v4_postgresclusterclass_dev.yaml b/config/samples/enterprise_v4_postgresclusterclass_dev.yaml index cf9c16487..a9846e36c 100644 --- a/config/samples/enterprise_v4_postgresclusterclass_dev.yaml +++ b/config/samples/enterprise_v4_postgresclusterclass_dev.yaml @@ -26,7 +26,14 @@ spec: limits: cpu: "1" memory: "2Gi" + connectionPoolerEnabled: true cnpg: # Restart method - tolerate downtime in dev primaryUpdateMethod: restart + connectionPooler: + instances: 2 + mode: transaction + config: + max_client_conn: "100" + diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index 138e55049..871d1c84a 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -18,7 +18,6 @@ package controller import ( "context" - "errors" "fmt" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" enterprisev4 "github.com/splunk/splunk-operator/api/v4" @@ -40,7 +39,6 @@ type PostgresClusterReconciler struct { Scheme *runtime.Scheme } - // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters/finalizers,verbs=update @@ -51,11 +49,10 @@ type PostgresClusterReconciler struct { // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers/status,verbs=get // Main reconciliation loop for PostgresCluster. -func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { +func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logs.FromContext(ctx) logger.Info("Reconciling PostgresCluster", "name", req.Name, "namespace", req.Namespace) - // Initialize as nil so syncStatus knows if the object was actually found/created. var cnpgCluster *cnpgv1.Cluster var poolerEnabled bool @@ -64,35 +61,35 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ if getPGClusterErr := r.Get(ctx, req.NamespacedName, postgresCluster); getPGClusterErr != nil { if apierrors.IsNotFound(getPGClusterErr) { logger.Info("PostgresCluster deleted, skipping reconciliation") - return res, nil + return ctrl.Result{}, nil } logger.Error(getPGClusterErr, "Unable to fetch PostgresCluster") - return res, getPGClusterErr + return ctrl.Result{}, getPGClusterErr } - // This deferred function will run at the end of the reconciliation process, regardless of whether it exits early due to an error or completes successfully. - // It ensures that we always attempt to sync the status of the PostgresCluster based on the final state of the CNPG Cluster and any errors that may have occurred. - defer func() { - if syncErr := r.syncStatus(ctx, postgresCluster, cnpgCluster, poolerEnabled, err); syncErr != nil { - err = errors.Join(err, fmt.Errorf("failed to sync status in deferred function: %w", syncErr)) - } - }() + // helper function to update status with less boilerplate. + updateStatus := func(conditionType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, message string, phase reconcilePhases) error { + return (r.updateStatus(ctx, postgresCluster, conditionType, status, reason, message, phase)) + } // 2. Load the referenced PostgresClusterClass. postgresClusterClass := &enterprisev4.PostgresClusterClass{} if getClusterClassErr := r.Get(ctx, client.ObjectKey{Name: postgresCluster.Spec.Class}, postgresClusterClass); getClusterClassErr != nil { logger.Error(getClusterClassErr, "Unable to fetch referenced PostgresClusterClass", "className", postgresCluster.Spec.Class) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterClassNotFound, getClusterClassErr.Error()) - return res, getClusterClassErr + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterClassNotFound, fmt.Sprintf("ClusterClass %s not found: %v", postgresCluster.Spec.Class, getClusterClassErr), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, getClusterClassErr } // 3. Create the merged configuration by overlaying PostgresClusterSpec on top of PostgresClusterClass defaults. mergedConfig, mergeErr := r.getMergedConfig(postgresClusterClass, postgresCluster) if mergeErr != nil { logger.Error(mergeErr, "Failed to merge PostgresCluster configuration") - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonInvalidConfiguration, mergeErr.Error()) - err := errors.Join(err, fmt.Errorf("failed to merge configuration: %w", mergeErr)) - return res, err + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonInvalidConfiguration, fmt.Sprintf("Failed to merge configuration: %v", mergeErr), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, mergeErr } // 4. Build the desired CNPG Cluster spec based on the merged configuration. @@ -100,72 +97,160 @@ func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ // 5. Fetch existing CNPG Cluster or create it if it doesn't exist yet. existingCNPG := &cnpgv1.Cluster{} - err = r.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG) + err := r.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG) switch { case apierrors.IsNotFound(err): + // CNPG Cluster doesn't exist, create it and requeue for status update. logger.Info("CNPG Cluster not found, creating", "name", postgresCluster.Name) newCluster := r.buildCNPGCluster(postgresCluster, mergedConfig) if err = r.Create(ctx, newCluster); err != nil { logger.Error(err, "Failed to create CNPG Cluster") - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterBuildFailed, err.Error()) - return res, err + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildFailed, fmt.Sprintf("Failed to create CNPG Cluster: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildSucceeded, "CNPG Cluster created", pending); statusErr != nil { + logger.Error(statusErr, "Failed to update status") } - r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonClusterBuildSucceeded, fmt.Sprintf("CNPG cluster build Succeeded: %s", postgresCluster.Name)) logger.Info("CNPG Cluster created successfully, requeueing for status update", "name", postgresCluster.Name) return ctrl.Result{RequeueAfter: retryDelay}, nil case err != nil: logger.Error(err, "Failed to get CNPG Cluster") - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterGetFailed, err.Error()) - return res, err + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterGetFailed, fmt.Sprintf("Failed to get CNPG Cluster: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err } cnpgCluster = existingCNPG // 6. If CNPG Cluster exists, compare the current spec with the desired spec and update if necessary. currentNormalizedSpec := normalizeCNPGClusterSpec(cnpgCluster.Spec, mergedConfig.PostgreSQLConfig) desiredNormalizedSpec := normalizeCNPGClusterSpec(desiredSpec, mergedConfig.PostgreSQLConfig) + if !equality.Semantic.DeepEqual(currentNormalizedSpec, desiredNormalizedSpec) { + logger.Info("Detected drift in CNPG Cluster spec, patching", "name", cnpgCluster.Name) originalCluster := cnpgCluster.DeepCopy() cnpgCluster.Spec = desiredSpec - if patchCNPGClusterErr := r.Patch(ctx, cnpgCluster, client.MergeFrom(originalCluster)); patchCNPGClusterErr != nil { - if apierrors.IsConflict(patchCNPGClusterErr) { - logger.Info("Conflict occurred while updating CNPG Cluster, requeueing", "name", cnpgCluster.Name) - return ctrl.Result{Requeue: true}, nil + + switch patchErr := r.Patch(ctx, cnpgCluster, client.MergeFrom(originalCluster)); { + case apierrors.IsConflict(patchErr): + logger.Info("Conflict occurred while updating CNPG Cluster, requeueing", "name", cnpgCluster.Name) + return ctrl.Result{Requeue: true}, nil + + case patchErr != nil: + logger.Error(patchErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterPatchFailed, fmt.Sprintf("Failed to patch CNPG Cluster: %v", patchErr), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") } - logger.Error(patchCNPGClusterErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonClusterPatchFailed, patchCNPGClusterErr.Error()) - return res, patchCNPGClusterErr + return ctrl.Result{}, patchErr + + default: + logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) + return ctrl.Result{RequeueAfter: retryDelay}, nil } - logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) - return ctrl.Result{RequeueAfter: retryDelay}, nil } // 7a. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster if err := r.reconcileManagedRoles(ctx, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile managed roles") - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, err.Error()) - return res, err - } - // 7b. Reconcile Connection Pooler if enabled in class - poolerEnabled = r.isConnectionPoolerEnabled(postgresClusterClass, postgresCluster) - requeuePooler, poolerErr := r.reconcileConnectionPooler(ctx, postgresCluster, postgresClusterClass, cnpgCluster) - if poolerErr != nil { - logger.Error(poolerErr, "Failed to reconcile connection pooler") - r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, poolerErr.Error()) - return res, poolerErr - } - if requeuePooler { + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, fmt.Sprintf("Failed to reconcile managed roles: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + + // 7b. Reconcile Connection Pooler + poolerEnabled = mergedConfig.ConnectionPoolerEnabled != nil && *mergedConfig.ConnectionPoolerEnabled + switch { + case !poolerEnabled: + // Pooler disabled — delete if they exist + if err := r.deleteConnectionPoolers(ctx, postgresCluster); err != nil { + logger.Error(err, "Failed to delete connection poolers") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, fmt.Sprintf("Failed to delete connection poolers: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + postgresCluster.Status.ConnectionPoolerStatus = nil + meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, string(poolerReady)) + + case !r.poolerExists(ctx, postgresCluster, readWriteEndpoint) || !r.poolerExists(ctx, postgresCluster, readOnlyEndpoint): + if mergedConfig.ConnectionPoolerConfig == nil { + logger.Info("Connection pooler enabled but no config found in class or cluster spec, skipping", + "class", postgresCluster.Spec.Class, + "cluster", postgresCluster.Name, + ) + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerConfigMissing, + fmt.Sprintf("Connection pooler is enabled but no config found in class %q or cluster %q", + postgresCluster.Spec.Class, postgresCluster.Name), + failed, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, nil + } + + if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { + logger.Info("CNPG Cluster not healthy yet, skipping pooler creation", "phase", cnpgCluster.Status.Phase) + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonCNPGClusterNotHealthy, + "Waiting for CNPG cluster to become healthy before creating poolers", pending, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{RequeueAfter: retryDelay}, nil + } + + if err := r.createOrUpdateConnectionPooler(ctx, postgresCluster, mergedConfig, cnpgCluster); err != nil { + logger.Error(err, "Failed to reconcile connection pooler") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, + fmt.Sprintf("Failed to reconcile connection pooler: %v", err), failed, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + + logger.Info("Connection Poolers created, requeueing to check readiness") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, + "Connection poolers are being provisioned", provisioning, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } return ctrl.Result{RequeueAfter: retryDelay}, nil + + case !r.arePoolersReady(ctx, postgresCluster): + // Poolers exist but not ready yet + logger.Info("Connection Poolers are not ready yet, requeueing") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, "Connection poolers are being provisioned", pending); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{RequeueAfter: retryDelay}, nil + + default: + // Poolers exist and are ready — sync status + if err := r.syncPoolerStatus(ctx, postgresCluster); err != nil { + logger.Error(err, "Failed to sync pooler status") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, fmt.Sprintf("Failed to sync pooler status: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } } + // 8. Report progress back to the user and manage the reconciliation lifecycle. - logger.Info("Reconciliation completed successfully", "name", postgresCluster.Name) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonClusterUpdateSucceeded, fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) - return res, nil + if err := r.syncStatus(ctx, postgresCluster, cnpgCluster); err != nil { + logger.Error(err, "Failed to sync final status") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil } // getMergedConfig merges the configuration from the PostgresClusterClass into the PostgresClusterSpec, giving precedence to the PostgresClusterSpec values. func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) (*enterprisev4.PostgresClusterSpec, error) { resultConfig := cluster.Spec.DeepCopy() classDefaults := clusterClass.Spec.Config + CNPGDefaults := clusterClass.Spec.CNPG if resultConfig.Instances == nil { resultConfig.Instances = classDefaults.Instances @@ -201,6 +286,19 @@ func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.P if resultConfig.Resources == nil { resultConfig.Resources = &corev1.ResourceRequirements{} } + // Check if connection pooler is enabled and set the field accordingly, giving precedence to the cluster spec over class defaults. + if cluster.Spec.ConnectionPoolerEnabled != nil { + resultConfig.ConnectionPoolerEnabled = cluster.Spec.ConnectionPoolerEnabled + } else if classDefaults.ConnectionPoolerEnabled != nil { + resultConfig.ConnectionPoolerEnabled = classDefaults.ConnectionPoolerEnabled + } + + // Merge ConnectionPooler config: cluster spec takes precedence over class + if cluster.Spec.ConnectionPoolerConfig != nil { + resultConfig.ConnectionPoolerConfig = cluster.Spec.ConnectionPoolerConfig + } else if CNPGDefaults != nil && CNPGDefaults.ConnectionPooler != nil { + resultConfig.ConnectionPoolerConfig = CNPGDefaults.ConnectionPooler + } return resultConfig, nil } @@ -234,8 +332,10 @@ func (r *PostgresClusterReconciler) buildCNPGClusterSpec(mergedConfig *enterpris } // build CNPGCluster builds the CNPG Cluster object based on the PostgresCluster resource and merged configuration. -func (r *PostgresClusterReconciler) buildCNPGCluster(postgresCluster *enterprisev4.PostgresCluster, mergedConfig *enterprisev4.PostgresClusterSpec) *cnpgv1.Cluster { - +func (r *PostgresClusterReconciler) buildCNPGCluster( + postgresCluster *enterprisev4.PostgresCluster, + mergedConfig *enterprisev4.PostgresClusterSpec, +) *cnpgv1.Cluster { cnpgCluster := &cnpgv1.Cluster{ ObjectMeta: metav1.ObjectMeta{ Name: postgresCluster.Name, @@ -243,7 +343,6 @@ func (r *PostgresClusterReconciler) buildCNPGCluster(postgresCluster *enterprise }, Spec: r.buildCNPGClusterSpec(mergedConfig), } - ctrl.SetControllerReference(postgresCluster, cnpgCluster, r.Scheme) return cnpgCluster } @@ -253,66 +352,42 @@ func poolerResourceName(clusterName, poolerType string) string { return fmt.Sprintf("%s-pooler-%s", clusterName, poolerType) } -// isConnectionPoolerEnabled determines if connection pooler should be active. -func (r *PostgresClusterReconciler) isConnectionPoolerEnabled(class *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) bool { - if cluster.Spec.ConnectionPoolerEnabled != nil { - return *cluster.Spec.ConnectionPoolerEnabled - } - - return class.Spec.Config.ConnectionPoolerEnabled != nil && - *class.Spec.Config.ConnectionPoolerEnabled -} - -// reconcileConnectionPooler creates or deletes CNPG Pooler resources based on the effective enabled state. -// Returns (requeue, error) — requeue is true when poolers were just created and may not be ready yet. -func (r *PostgresClusterReconciler) reconcileConnectionPooler( +// createOrUpdateConnectionPooler creates or updates CNPG Pooler resources. +func (r *PostgresClusterReconciler) createOrUpdateConnectionPooler( ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, - class *enterprisev4.PostgresClusterClass, + mergedConfig *enterprisev4.PostgresClusterSpec, cnpgCluster *cnpgv1.Cluster, -) (bool, error) { - logger := logs.FromContext(ctx) - - if !r.isConnectionPoolerEnabled(class, postgresCluster) { - // Skip deletion if the cluster is not healthy — owner references handle cleanup via GC. - if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { - return false, nil - } - if err := r.deleteConnectionPoolers(ctx, postgresCluster); err != nil { - return false, err - } - // ConnectionPoolerStatus and PoolerReady condition are cleared by syncStatus in the defer. - return false, nil +) error { + // Create/Update RW Pooler + if err := r.ensureConnectionPooler(ctx, postgresCluster, mergedConfig, cnpgCluster, readWriteEndpoint); err != nil { + return fmt.Errorf("failed to reconcile RW pooler: %w", err) } - if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { - logger.Info("CNPG Cluster not healthy, waiting before creating poolers") - r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonCNPGClusterNotHealthy, "Waiting for CNPG cluster to become healthy before creating poolers") - return false, nil + // Create/Update RO Pooler + if err := r.ensureConnectionPooler(ctx, postgresCluster, mergedConfig, cnpgCluster, readOnlyEndpoint); err != nil { + return fmt.Errorf("failed to reconcile RO pooler: %w", err) } - if class.Spec.CNPG == nil || class.Spec.CNPG.ConnectionPooler == nil { - logger.Info("Connection pooler enabled but config missing in class", "class", class.Name) - r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerConfigMissing, fmt.Sprintf("Connection pooler is enabled but cnpg.connectionPooler config is missing in class %s", class.Name)) - return false, nil - } + return nil +} - // Create/Update RW Pooler - if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, readWriteEndpoint); err != nil { - return false, fmt.Errorf("failed to reconcile RW pooler: %w", err) - } +// poolerExists checks if a pooler resource exists and returns it +func (r *PostgresClusterReconciler) poolerExists(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, poolerType string) bool { + pooler := &cnpgv1.Pooler{} + err := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, poolerType), + Namespace: postgresCluster.Namespace, + }, pooler) - // Create/Update RO Pooler - if err := r.ensureConnectionPooler(ctx, postgresCluster, class, cnpgCluster, readOnlyEndpoint); err != nil { - return false, fmt.Errorf("failed to reconcile RO pooler: %w", err) + if apierrors.IsNotFound(err) { + return false } - - // Check if poolers are ready — requeue if they're still provisioning. - rwPooler := &cnpgv1.Pooler{} - rwErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), Namespace: postgresCluster.Namespace}, rwPooler) - roPooler := &cnpgv1.Pooler{} - roErr := r.Get(ctx, types.NamespacedName{Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), Namespace: postgresCluster.Namespace}, roPooler) - return !(r.isPoolerReady(rwPooler, rwErr) && r.isPoolerReady(roPooler, roErr)), nil + if err != nil { + logs.FromContext(ctx).Error(err, "Failed to check pooler existence", "type", poolerType) + return false + } + return true } // deleteConnectionPoolers removes RW and RO pooler resources if they exist. @@ -321,17 +396,19 @@ func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, for _, poolerType := range []string{readWriteEndpoint, readOnlyEndpoint} { poolerName := poolerResourceName(postgresCluster.Name, poolerType) - pooler := &cnpgv1.Pooler{} + exists := r.poolerExists(ctx, postgresCluster, poolerType) + if !exists { + continue + } - err := r.Get(ctx, types.NamespacedName{ + pooler := &cnpgv1.Pooler{} + if err := r.Get(ctx, types.NamespacedName{ Name: poolerName, Namespace: postgresCluster.Namespace, - }, pooler) - - if apierrors.IsNotFound(err) { - continue - } - if err != nil { + }, pooler); err != nil { + if apierrors.IsNotFound(err) { + continue + } return fmt.Errorf("failed to get pooler %s: %w", poolerName, err) } @@ -340,7 +417,6 @@ func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, return fmt.Errorf("failed to delete pooler %s: %w", poolerName, err) } } - return nil } @@ -348,7 +424,7 @@ func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, func (r *PostgresClusterReconciler) ensureConnectionPooler( ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, - class *enterprisev4.PostgresClusterClass, + mergedConfig *enterprisev4.PostgresClusterSpec, cnpgCluster *cnpgv1.Cluster, poolerType string, ) error { @@ -362,8 +438,8 @@ func (r *PostgresClusterReconciler) ensureConnectionPooler( if apierrors.IsNotFound(err) { logs.FromContext(ctx).Info("Creating CNPG Pooler", "name", poolerName, "type", poolerType) - r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerCreating, fmt.Sprintf("Creating %s pooler", poolerType)) - pooler := r.buildCNPGPooler(postgresCluster, class, cnpgCluster, poolerType) + r.updateStatus(ctx, postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerCreating, fmt.Sprintf("Creating %s pooler", poolerType), pending) + pooler := r.buildCNPGPooler(postgresCluster, mergedConfig, cnpgCluster, poolerType) return r.Create(ctx, pooler) } @@ -373,11 +449,11 @@ func (r *PostgresClusterReconciler) ensureConnectionPooler( // buildCNPGPooler constructs a CNPG Pooler object. func (r *PostgresClusterReconciler) buildCNPGPooler( postgresCluster *enterprisev4.PostgresCluster, - class *enterprisev4.PostgresClusterClass, + mergedConfig *enterprisev4.PostgresClusterSpec, cnpgCluster *cnpgv1.Cluster, poolerType string, ) *cnpgv1.Pooler { - cfg := class.Spec.CNPG.ConnectionPooler + cfg := mergedConfig.ConnectionPoolerConfig poolerName := poolerResourceName(postgresCluster.Name, poolerType) instances := *cfg.Instances @@ -405,116 +481,138 @@ func (r *PostgresClusterReconciler) buildCNPGPooler( return pooler } -// syncStatus maps CNPG Cluster state to PostgresCluster object. -func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, poolerEnabled bool, err error) error { - latestPGCluster := client.MergeFrom(postgresCluster.DeepCopy()) - - if err != nil { - postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonFailed, fmt.Sprintf("Error during reconciliation: %v", err)) - } else if cnpgCluster == nil { - postgresCluster.Status.Phase = string(pending) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonNotFound, "Underlying CNPG cluster object has not been created yet") - } else { - switch cnpgCluster.Status.Phase { - case cnpgv1.PhaseHealthy: - postgresCluster.Status.Phase = string(ready) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionTrue, reasonCNPGClusterHealthy, "Cluster is up and running") - - case cnpgv1.PhaseFirstPrimary, - cnpgv1.PhaseCreatingReplica, - cnpgv1.PhaseWaitingForInstancesToBeActive: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonProvisioning, fmt.Sprintf("CNPG cluster provisioning: %s", cnpgCluster.Status.Phase)) - - case cnpgv1.PhaseSwitchover: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGSwitchover, "Cluster changing primary node") - - case cnpgv1.PhaseFailOver: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGFailingOver, "Pod missing, need to change primary") - - case cnpgv1.PhaseInplacePrimaryRestart, - cnpgv1.PhaseInplaceDeletePrimaryRestart: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGRestarting, fmt.Sprintf("CNPG cluster restarting: %s", cnpgCluster.Status.Phase)) - - case cnpgv1.PhaseUpgrade, - cnpgv1.PhaseMajorUpgrade, - cnpgv1.PhaseUpgradeDelayed, - cnpgv1.PhaseOnlineUpgrading: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGUpgrading, fmt.Sprintf("CNPG cluster upgrading: %s", cnpgCluster.Status.Phase)) - - case cnpgv1.PhaseApplyingConfiguration: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGApplyingConfig, "Configuration change is being applied") - - case cnpgv1.PhaseReplicaClusterPromotion: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGPromoting, "Replica is being promoted to primary") - - case cnpgv1.PhaseWaitingForUser: - postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGWaitingForUser, "Action from the user is required") - - case cnpgv1.PhaseUnrecoverable: - postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGUnrecoverable, "Cluster failed, needs manual intervention") - - case cnpgv1.PhaseCannotCreateClusterObjects: - postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGProvisioningFailed, "Cluster resources cannot be created") - - case cnpgv1.PhaseUnknownPlugin, - cnpgv1.PhaseFailurePlugin: - postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGPluginError, fmt.Sprintf("CNPG plugin error: %s", cnpgCluster.Status.Phase)) - - case cnpgv1.PhaseImageCatalogError, - cnpgv1.PhaseArchitectureBinaryMissing: - postgresCluster.Status.Phase = string(failed) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonCNPGImageError, fmt.Sprintf("CNPG image error: %s", cnpgCluster.Status.Phase)) - - case "": - postgresCluster.Status.Phase = string(pending) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonProvisioning, "CNPG cluster is pending creation") - - default: - postgresCluster.Status.Phase = string(provisioning) - r.setCondition(postgresCluster, clusterReady, metav1.ConditionFalse, reasonProvisioning, fmt.Sprintf("CNPG cluster phase: %s", cnpgCluster.Status.Phase)) - } - - postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ - APIVersion: "postgresql.cnpg.io/v1", - Kind: "Cluster", - Namespace: cnpgCluster.Namespace, - Name: cnpgCluster.Name, - UID: cnpgCluster.UID, - } - - if poolerEnabled { - r.syncPoolerStatus(ctx, postgresCluster) - } else { - postgresCluster.Status.ConnectionPoolerStatus = nil - meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, string(poolerReady)) - } - } - - if patchErr := r.Status().Patch(ctx, postgresCluster, latestPGCluster); patchErr != nil { - return fmt.Errorf("failed to patch PostgresCluster status: %w", patchErr) - } - return nil +// syncStatus maps CNPG Cluster state to PostgresCluster object and handles pooler status. +func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) error { + + // 1. Set ProvisionerRef + postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ + APIVersion: "postgresql.cnpg.io/v1", + Kind: "Cluster", + Namespace: cnpgCluster.Namespace, + Name: cnpgCluster.Name, + UID: cnpgCluster.UID, + } + + // Map CNPG Phase to PostgresCluster Phase/Conditions + var phase reconcilePhases + var conditionStatus metav1.ConditionStatus + var reason conditionReasons + var message string + + switch cnpgCluster.Status.Phase { + case cnpgv1.PhaseHealthy: + phase = ready + conditionStatus = metav1.ConditionTrue + reason = reasonCNPGClusterHealthy + message = "Cluster is up and running" + + case cnpgv1.PhaseFirstPrimary, + cnpgv1.PhaseCreatingReplica, + cnpgv1.PhaseWaitingForInstancesToBeActive: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonProvisioning + message = fmt.Sprintf("CNPG cluster provisioning: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseSwitchover: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGSwitchover + message = "Cluster changing primary node" + + case cnpgv1.PhaseFailOver: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGFailingOver + message = "Pod missing, need to change primary" + + case cnpgv1.PhaseInplacePrimaryRestart, + cnpgv1.PhaseInplaceDeletePrimaryRestart: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGRestarting + message = fmt.Sprintf("CNPG cluster restarting: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseUpgrade, + cnpgv1.PhaseMajorUpgrade, + cnpgv1.PhaseUpgradeDelayed, + cnpgv1.PhaseOnlineUpgrading: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGUpgrading + message = fmt.Sprintf("CNPG cluster upgrading: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseApplyingConfiguration: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGApplyingConfig + message = "Configuration change is being applied" + + case cnpgv1.PhaseReplicaClusterPromotion: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGPromoting + message = "Replica is being promoted to primary" + + case cnpgv1.PhaseWaitingForUser: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGWaitingForUser + message = "Action from the user is required" + + case cnpgv1.PhaseUnrecoverable: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGUnrecoverable + message = "Cluster failed, needs manual intervention" + + case cnpgv1.PhaseCannotCreateClusterObjects: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGProvisioningFailed + message = "Cluster resources cannot be created" + + case cnpgv1.PhaseUnknownPlugin, + cnpgv1.PhaseFailurePlugin: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGPluginError + message = fmt.Sprintf("CNPG plugin error: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseImageCatalogError, + cnpgv1.PhaseArchitectureBinaryMissing: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGImageError + message = fmt.Sprintf("CNPG image error: %s", cnpgCluster.Status.Phase) + + case "": + phase = pending + conditionStatus = metav1.ConditionFalse + reason = reasonProvisioning + message = "CNPG cluster is pending creation" + + default: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonProvisioning + message = fmt.Sprintf("CNPG cluster phase: %s", cnpgCluster.Status.Phase) + } + + return r.updateStatus(ctx, postgresCluster, clusterReady, conditionStatus, reason, message, phase) } -// setCondition sets a condition on the PostgresCluster status. -func (r *PostgresClusterReconciler) setCondition( +// updateStatus sets the phase, condition and persists the status to Kubernetes. +func (r *PostgresClusterReconciler) updateStatus( + ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, conditionType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, - message string) { + message string, + phase reconcilePhases, +) error { + postgresCluster.Status.Phase = string(phase) meta.SetStatusCondition(&postgresCluster.Status.Conditions, metav1.Condition{ Type: string(conditionType), Status: status, @@ -522,42 +620,48 @@ func (r *PostgresClusterReconciler) setCondition( Message: message, ObservedGeneration: postgresCluster.Generation, }) + + if err := r.Status().Update(ctx, postgresCluster); err != nil { + return fmt.Errorf("failed to update PostgresCluster status: %w", err) + } + return nil } // syncPoolerStatus populates ConnectionPoolerStatus and the PoolerReady condition. -// It returns true when all poolers are ready, false otherwise. -// The caller decides how pooler readiness affects the overall phase. -func (r *PostgresClusterReconciler) syncPoolerStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) bool { +// Called only when poolers are confirmed ready by the reconciler. +func (r *PostgresClusterReconciler) syncPoolerStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) error { rwPooler := &cnpgv1.Pooler{} - rwErr := r.Get(ctx, types.NamespacedName{ + if rwErr := r.Get(ctx, types.NamespacedName{ Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), Namespace: postgresCluster.Namespace, - }, rwPooler) + }, rwPooler); rwErr != nil { + return rwErr + } roPooler := &cnpgv1.Pooler{} - roErr := r.Get(ctx, types.NamespacedName{ + if roErr := r.Get(ctx, types.NamespacedName{ Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), Namespace: postgresCluster.Namespace, - }, roPooler) + }, roPooler); roErr != nil { + return roErr + } postgresCluster.Status.ConnectionPoolerStatus = &enterprisev4.ConnectionPoolerStatus{ Enabled: true, } - rwReady := r.isPoolerReady(rwPooler, rwErr) - roReady := r.isPoolerReady(roPooler, roErr) - - if rwReady && roReady { - rwDesired, rwScheduled := r.getPoolerInstanceCount(rwPooler) - roDesired, roScheduled := r.getPoolerInstanceCount(roPooler) - r.setCondition(postgresCluster, poolerReady, metav1.ConditionTrue, reasonAllInstancesReady, fmt.Sprintf("%s: %d/%d, %s: %d/%d", readWriteEndpoint, rwScheduled, rwDesired, readOnlyEndpoint, roScheduled, roDesired)) - return true - } + rwDesired, rwScheduled := r.getPoolerInstanceCount(rwPooler) + roDesired, roScheduled := r.getPoolerInstanceCount(roPooler) - rwStatus := r.getPoolerStatusString(rwPooler, rwErr) - roStatus := r.getPoolerStatusString(roPooler, roErr) - r.setCondition(postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolersNotReady, fmt.Sprintf("%s: %s, %s: %s", readWriteEndpoint, rwStatus, readOnlyEndpoint, roStatus)) - return false + r.updateStatus( + ctx, + postgresCluster, + poolerReady, + metav1.ConditionTrue, + reasonAllInstancesReady, + fmt.Sprintf("%s: %d/%d, %s: %d/%d", readWriteEndpoint, rwScheduled, rwDesired, readOnlyEndpoint, roScheduled, roDesired), + ready) + return nil } // isPoolerReady checks if a pooler has all instances scheduled. @@ -582,16 +686,21 @@ func (r *PostgresClusterReconciler) getPoolerInstanceCount(pooler *cnpgv1.Pooler return desired, pooler.Status.Instances } -// getPoolerStatusString returns a human-readable status string for a pooler. -func (r *PostgresClusterReconciler) getPoolerStatusString(pooler *cnpgv1.Pooler, err error) string { - if apierrors.IsNotFound(err) { - return "not found" - } - if err != nil { - return "error" - } - desired, scheduled := r.getPoolerInstanceCount(pooler) - return fmt.Sprintf("%d/%d", scheduled, desired) +// arePoolersReady checks if both RW and RO poolers have all instances scheduled. +func (r *PostgresClusterReconciler) arePoolersReady(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) bool { + rwPooler := &cnpgv1.Pooler{} + rwErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), + Namespace: postgresCluster.Namespace, + }, rwPooler) + + roPooler := &cnpgv1.Pooler{} + roErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), + Namespace: postgresCluster.Namespace, + }, roPooler) + + return r.isPoolerReady(rwPooler, rwErr) && r.isPoolerReady(roPooler, roErr) } // reconcileManagedRoles synchronizes ManagedRoles from PostgresCluster spec to CNPG Cluster managed.roles using diff-based patching diff --git a/internal/controller/postgresoperator_common_types.go b/internal/controller/postgresoperator_common_types.go index e922515b4..b7ebd7b22 100644 --- a/internal/controller/postgresoperator_common_types.go +++ b/internal/controller/postgresoperator_common_types.go @@ -42,12 +42,13 @@ const ( failed reconcilePhases = "Failed" // Condition types - clusterReady conditionTypes = "ClusterReady" - poolerReady conditionTypes = "PoolerReady" - secretsReady conditionTypes = "SecretsReady" - usersReady conditionTypes = "UsersReady" - databasesReady conditionTypes = "DatabasesReady" - privilegesReady conditionTypes = "PrivilegesReady" + clusterReady conditionTypes = "ClusterReady" + poolerReady conditionTypes = "PoolerReady" + usersReady conditionTypes = "UsersReady" + databasesReady conditionTypes = "DatabasesReady" + // TODO - to use in the future implementation + // secretsReady conditionTypes = "SecretsReady" + // privilegesReady conditionTypes = "PrivilegesReady" // Condition reasons reasonNotFound conditionReasons = "NotFound" @@ -58,21 +59,19 @@ const ( reasonFailed conditionReasons = "Failed" // Additional condition reasons for clusterReady conditionType - reasonClusterClassNotFound conditionReasons = "ClusterClassNotFound" - reasonManagedRolesFailed conditionReasons = "ManagedRolesReconciliationFailed" - reasonClusterBuildFailed conditionReasons = "ClusterBuildFailed" - reasonClusterBuildSucceeded conditionReasons = "ClusterBuildSucceeded" - reasonClusterGetFailed conditionReasons = "ClusterGetFailed" - reasonClusterPatchFailed conditionReasons = "ClusterPatchFailed" - reasonClusterUpdateSucceeded conditionReasons = "ClusterUpdateSucceeded" - reasonInvalidConfiguration conditionReasons = "InvalidConfiguration" + reasonClusterClassNotFound conditionReasons = "ClusterClassNotFound" + reasonManagedRolesFailed conditionReasons = "ManagedRolesReconciliationFailed" + reasonClusterBuildFailed conditionReasons = "ClusterBuildFailed" + reasonClusterBuildSucceeded conditionReasons = "ClusterBuildSucceeded" + reasonClusterGetFailed conditionReasons = "ClusterGetFailed" + reasonClusterPatchFailed conditionReasons = "ClusterPatchFailed" + reasonInvalidConfiguration conditionReasons = "InvalidConfiguration" // Additional condition reasons for poolerReady conditionType reasonPoolerReconciliationFailed conditionReasons = "PoolerReconciliationFailed" reasonPoolerConfigMissing conditionReasons = "PoolerConfigMissing" reasonPoolerCreating conditionReasons = "PoolerCreating" reasonAllInstancesReady conditionReasons = "AllInstancesReady" - reasonPoolersNotReady conditionReasons = "PoolersNotReady" // Additional condition reasons for mapping CNPG cluster statuses reasonCNPGClusterNotHealthy conditionReasons = "CNPGClusterNotHealthy"