diff --git a/api/v4/postgrescluster_types.go b/api/v4/postgrescluster_types.go index 3b1e81b32..f6ae81ea7 100644 --- a/api/v4/postgrescluster_types.go +++ b/api/v4/postgrescluster_types.go @@ -45,6 +45,7 @@ type ManagedRole struct { // Validation rules ensure immutability of Class, and that Storage and PostgresVersion can only be set once and cannot be removed or downgraded. // +kubebuilder:validation:XValidation:rule="!has(oldSelf.postgresVersion) || (has(self.postgresVersion) && int(self.postgresVersion.split('.')[0]) >= int(oldSelf.postgresVersion.split('.')[0]))",messageExpression="!has(self.postgresVersion) ? 'postgresVersion cannot be removed once set (was: ' + oldSelf.postgresVersion + ')' : 'postgresVersion major version cannot be downgraded (from: ' + oldSelf.postgresVersion + ', to: ' + self.postgresVersion + ')'" // +kubebuilder:validation:XValidation:rule="!has(oldSelf.storage) || (has(self.storage) && quantity(self.storage).compareTo(quantity(oldSelf.storage)) >= 0)",messageExpression="!has(self.storage) ? 'storage cannot be removed once set (was: ' + string(oldSelf.storage) + ')' : 'storage size cannot be decreased (from: ' + string(oldSelf.storage) + ', to: ' + string(self.storage) + ')'" +// +kubebuilder:validation:XValidation:rule="!self.connectionPoolerEnabled || self.connectionPoolerConfig != null || (self.cnpg != null && self.cnpg.connectionPooler != null)",message="connectionPoolerConfig must be set in cluster spec or class when connectionPoolerEnabled is true" type PostgresClusterSpec struct { // This field is IMMUTABLE after creation. // +kubebuilder:validation:Required @@ -96,6 +97,11 @@ type PostgresClusterSpec struct { // +optional ConnectionPoolerEnabled *bool `json:"connectionPoolerEnabled,omitempty"` + // ConnectionPoolerConfig overrides the connection pooler configuration from the class. + // Only takes effect when connection pooling is enabled. + // +optional + ConnectionPoolerConfig *ConnectionPoolerConfig `json:"connectionPoolerConfig,omitempty"` + // ManagedRoles contains PostgreSQL roles that should be created in the cluster. // This field supports Server-Side Apply with per-role granularity, allowing // multiple PostgresDatabase controllers to manage different roles independently. diff --git a/api/v4/postgresclusterclass_types.go b/api/v4/postgresclusterclass_types.go index 430e3409e..9c4fcef44 100644 --- a/api/v4/postgresclusterclass_types.go +++ b/api/v4/postgresclusterclass_types.go @@ -114,6 +114,7 @@ const ( // ConnectionPoolerConfig defines PgBouncer connection pooler configuration. // When enabled, creates RW and RO pooler deployments for clusters using this class. +// +kubebuilder:validation:XValidation:rule="!self.connectionPoolerEnabled || self.connectionPoolerConfig != null || (self.cnpg != null && self.cnpg.connectionPooler != null)",message="connectionPoolerConfig must be set in cluster spec or class when connectionPoolerEnabled is true" type ConnectionPoolerConfig struct { // Instances is the number of PgBouncer pod replicas. // Higher values provide better availability and load distribution. diff --git a/config/samples/enterprise_v4_postgresclusterclass_dev.yaml b/config/samples/enterprise_v4_postgresclusterclass_dev.yaml index cf9c16487..a9846e36c 100644 --- a/config/samples/enterprise_v4_postgresclusterclass_dev.yaml +++ b/config/samples/enterprise_v4_postgresclusterclass_dev.yaml @@ -26,7 +26,14 @@ spec: limits: cpu: "1" memory: "2Gi" + connectionPoolerEnabled: true cnpg: # Restart method - tolerate downtime in dev primaryUpdateMethod: restart + connectionPooler: + instances: 2 + mode: transaction + config: + max_client_conn: "100" + diff --git a/internal/controller/postgrescluster_controller.go b/internal/controller/postgrescluster_controller.go index aa4d47294..871d1c84a 100644 --- a/internal/controller/postgrescluster_controller.go +++ b/internal/controller/postgrescluster_controller.go @@ -18,7 +18,6 @@ package controller import ( "context" - "errors" "fmt" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" enterprisev4 "github.com/splunk/splunk-operator/api/v4" @@ -40,126 +39,218 @@ type PostgresClusterReconciler struct { Scheme *runtime.Scheme } -// This struct is used to compare the merged configuration from PostgresClusterClass and PostgresClusterSpec -// in a normalized way, and not to use CNPG-default values which are causing false positive diff state while reconciliation loop. -// It contains only the fields that are relevant for our reconciliation and that we want to compare when deciding whether to update the CNPG Cluster spec or not. -type normalizedCNPGClusterSpec struct { - ImageName string - Instances int - // Parameters we set, instead of complete spec from CNPG - CustomDefinedParameters map[string]string - PgHBA []string - DefaultDatabase string - Owner string - StorageSize string - Resources corev1.ResourceRequirements -} - // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusters/finalizers,verbs=update // +kubebuilder:rbac:groups=enterprise.splunk.com,resources=postgresclusterclasses,verbs=get;list;watch // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=clusters/status,verbs=get +// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=postgresql.cnpg.io,resources=poolers/status,verbs=get // Main reconciliation loop for PostgresCluster. -func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { +func (r *PostgresClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logs.FromContext(ctx) logger.Info("Reconciling PostgresCluster", "name", req.Name, "namespace", req.Namespace) - // Initialize as nil so syncStatus knows if the object was actually found/created. var cnpgCluster *cnpgv1.Cluster + var poolerEnabled bool // 1. Fetch the PostgresCluster instance, stop, if not found. postgresCluster := &enterprisev4.PostgresCluster{} if getPGClusterErr := r.Get(ctx, req.NamespacedName, postgresCluster); getPGClusterErr != nil { if apierrors.IsNotFound(getPGClusterErr) { logger.Info("PostgresCluster deleted, skipping reconciliation") - return res, nil + return ctrl.Result{}, nil } logger.Error(getPGClusterErr, "Unable to fetch PostgresCluster") - return res, getPGClusterErr + return ctrl.Result{}, getPGClusterErr } - // This deferred function will run at the end of the reconciliation process, regardless of whether it exits early due to an error or completes successfully. - // It ensures that we always attempt to sync the status of the PostgresCluster based on the final state of the CNPG Cluster and any errors that may have occurred. - defer func() { - if syncErr := r.syncStatus(ctx, postgresCluster, cnpgCluster, err); syncErr != nil { - err = errors.Join(err, fmt.Errorf("failed to sync status in deferred function: %w", syncErr)) - } - }() + // helper function to update status with less boilerplate. + updateStatus := func(conditionType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, message string, phase reconcilePhases) error { + return (r.updateStatus(ctx, postgresCluster, conditionType, status, reason, message, phase)) + } // 2. Load the referenced PostgresClusterClass. postgresClusterClass := &enterprisev4.PostgresClusterClass{} if getClusterClassErr := r.Get(ctx, client.ObjectKey{Name: postgresCluster.Spec.Class}, postgresClusterClass); getClusterClassErr != nil { logger.Error(getClusterClassErr, "Unable to fetch referenced PostgresClusterClass", "className", postgresCluster.Spec.Class) - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterClassNotFound", getClusterClassErr.Error()) - return res, getClusterClassErr + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterClassNotFound, fmt.Sprintf("ClusterClass %s not found: %v", postgresCluster.Spec.Class, getClusterClassErr), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, getClusterClassErr } // 3. Create the merged configuration by overlaying PostgresClusterSpec on top of PostgresClusterClass defaults. - mergedConfig := r.getMergedConfig(postgresClusterClass, postgresCluster) + mergedConfig, mergeErr := r.getMergedConfig(postgresClusterClass, postgresCluster) + if mergeErr != nil { + logger.Error(mergeErr, "Failed to merge PostgresCluster configuration") + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonInvalidConfiguration, fmt.Sprintf("Failed to merge configuration: %v", mergeErr), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, mergeErr + } // 4. Build the desired CNPG Cluster spec based on the merged configuration. desiredSpec := r.buildCNPGClusterSpec(mergedConfig) // 5. Fetch existing CNPG Cluster or create it if it doesn't exist yet. existingCNPG := &cnpgv1.Cluster{} - err = r.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG) + err := r.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG) switch { case apierrors.IsNotFound(err): + // CNPG Cluster doesn't exist, create it and requeue for status update. logger.Info("CNPG Cluster not found, creating", "name", postgresCluster.Name) newCluster := r.buildCNPGCluster(postgresCluster, mergedConfig) if err = r.Create(ctx, newCluster); err != nil { logger.Error(err, "Failed to create CNPG Cluster") - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterBuildFailed", err.Error()) - return res, err + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildFailed, fmt.Sprintf("Failed to create CNPG Cluster: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildSucceeded, "CNPG Cluster created", pending); statusErr != nil { + logger.Error(statusErr, "Failed to update status") } logger.Info("CNPG Cluster created successfully, requeueing for status update", "name", postgresCluster.Name) return ctrl.Result{RequeueAfter: retryDelay}, nil case err != nil: logger.Error(err, "Failed to get CNPG Cluster") - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterGetFailed", err.Error()) - return res, err + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterGetFailed, fmt.Sprintf("Failed to get CNPG Cluster: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err } cnpgCluster = existingCNPG // 6. If CNPG Cluster exists, compare the current spec with the desired spec and update if necessary. currentNormalizedSpec := normalizeCNPGClusterSpec(cnpgCluster.Spec, mergedConfig.PostgreSQLConfig) desiredNormalizedSpec := normalizeCNPGClusterSpec(desiredSpec, mergedConfig.PostgreSQLConfig) + if !equality.Semantic.DeepEqual(currentNormalizedSpec, desiredNormalizedSpec) { + logger.Info("Detected drift in CNPG Cluster spec, patching", "name", cnpgCluster.Name) originalCluster := cnpgCluster.DeepCopy() cnpgCluster.Spec = desiredSpec - if patchCNPGClusterErr := r.Patch(ctx, cnpgCluster, client.MergeFrom(originalCluster)); patchCNPGClusterErr != nil { - if apierrors.IsConflict(patchCNPGClusterErr) { - logger.Info("Conflict occurred while updating CNPG Cluster, requeueing", "name", cnpgCluster.Name) - return ctrl.Result{Requeue: true}, nil + + switch patchErr := r.Patch(ctx, cnpgCluster, client.MergeFrom(originalCluster)); { + case apierrors.IsConflict(patchErr): + logger.Info("Conflict occurred while updating CNPG Cluster, requeueing", "name", cnpgCluster.Name) + return ctrl.Result{Requeue: true}, nil + + case patchErr != nil: + logger.Error(patchErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterPatchFailed, fmt.Sprintf("Failed to patch CNPG Cluster: %v", patchErr), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") } - logger.Error(patchCNPGClusterErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterUpdateFailed", patchCNPGClusterErr.Error()) - return res, patchCNPGClusterErr + return ctrl.Result{}, patchErr + + default: + logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) + return ctrl.Result{RequeueAfter: retryDelay}, nil } - logger.Info("CNPG Cluster patched successfully, requeueing for status update", "name", cnpgCluster.Name) - return ctrl.Result{RequeueAfter: retryDelay}, nil } - // 7. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster + // 7a. Reconcile ManagedRoles from PostgresCluster to CNPG Cluster if err := r.reconcileManagedRoles(ctx, postgresCluster, cnpgCluster); err != nil { logger.Error(err, "Failed to reconcile managed roles") - r.setCondition(postgresCluster, metav1.ConditionFalse, "ManagedRolesFailed", err.Error()) - return res, err + if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, fmt.Sprintf("Failed to reconcile managed roles: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + + // 7b. Reconcile Connection Pooler + poolerEnabled = mergedConfig.ConnectionPoolerEnabled != nil && *mergedConfig.ConnectionPoolerEnabled + switch { + case !poolerEnabled: + // Pooler disabled — delete if they exist + if err := r.deleteConnectionPoolers(ctx, postgresCluster); err != nil { + logger.Error(err, "Failed to delete connection poolers") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, fmt.Sprintf("Failed to delete connection poolers: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + postgresCluster.Status.ConnectionPoolerStatus = nil + meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, string(poolerReady)) + + case !r.poolerExists(ctx, postgresCluster, readWriteEndpoint) || !r.poolerExists(ctx, postgresCluster, readOnlyEndpoint): + if mergedConfig.ConnectionPoolerConfig == nil { + logger.Info("Connection pooler enabled but no config found in class or cluster spec, skipping", + "class", postgresCluster.Spec.Class, + "cluster", postgresCluster.Name, + ) + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerConfigMissing, + fmt.Sprintf("Connection pooler is enabled but no config found in class %q or cluster %q", + postgresCluster.Spec.Class, postgresCluster.Name), + failed, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, nil + } + + if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { + logger.Info("CNPG Cluster not healthy yet, skipping pooler creation", "phase", cnpgCluster.Status.Phase) + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonCNPGClusterNotHealthy, + "Waiting for CNPG cluster to become healthy before creating poolers", pending, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{RequeueAfter: retryDelay}, nil + } + + if err := r.createOrUpdateConnectionPooler(ctx, postgresCluster, mergedConfig, cnpgCluster); err != nil { + logger.Error(err, "Failed to reconcile connection pooler") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, + fmt.Sprintf("Failed to reconcile connection pooler: %v", err), failed, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } + + logger.Info("Connection Poolers created, requeueing to check readiness") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, + "Connection poolers are being provisioned", provisioning, + ); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{RequeueAfter: retryDelay}, nil + + case !r.arePoolersReady(ctx, postgresCluster): + // Poolers exist but not ready yet + logger.Info("Connection Poolers are not ready yet, requeueing") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, "Connection poolers are being provisioned", pending); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{RequeueAfter: retryDelay}, nil + + default: + // Poolers exist and are ready — sync status + if err := r.syncPoolerStatus(ctx, postgresCluster); err != nil { + logger.Error(err, "Failed to sync pooler status") + if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, fmt.Sprintf("Failed to sync pooler status: %v", err), failed); statusErr != nil { + logger.Error(statusErr, "Failed to update status") + } + return ctrl.Result{}, err + } } // 8. Report progress back to the user and manage the reconciliation lifecycle. - logger.Info("Reconciliation completed successfully", "name", postgresCluster.Name) - r.setCondition(postgresCluster, metav1.ConditionTrue, "ClusterUpdateSucceeded", fmt.Sprintf("Reconciliation completed successfully: %s", postgresCluster.Name)) - return res, nil + if err := r.syncStatus(ctx, postgresCluster, cnpgCluster); err != nil { + logger.Error(err, "Failed to sync final status") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil } // getMergedConfig merges the configuration from the PostgresClusterClass into the PostgresClusterSpec, giving precedence to the PostgresClusterSpec values. -func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) *enterprisev4.PostgresClusterSpec { +func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.PostgresClusterClass, cluster *enterprisev4.PostgresCluster) (*enterprisev4.PostgresClusterSpec, error) { resultConfig := cluster.Spec.DeepCopy() classDefaults := clusterClass.Spec.Config + CNPGDefaults := clusterClass.Spec.CNPG if resultConfig.Instances == nil { resultConfig.Instances = classDefaults.Instances @@ -180,6 +271,10 @@ func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.P resultConfig.PgHBA = classDefaults.PgHBA } + if resultConfig.Instances == nil || resultConfig.PostgresVersion == nil || resultConfig.Storage == nil { + return nil, fmt.Errorf("invalid configuration for class %s: instances, postgresVersion and storage are required", clusterClass.Name) + } + // Ensure that maps and slices are initialized to empty if they are still nil after merging, to prevent potential nil pointer dereferences later on. if resultConfig.PostgreSQLConfig == nil { resultConfig.PostgreSQLConfig = make(map[string]string) @@ -191,8 +286,21 @@ func (r *PostgresClusterReconciler) getMergedConfig(clusterClass *enterprisev4.P if resultConfig.Resources == nil { resultConfig.Resources = &corev1.ResourceRequirements{} } + // Check if connection pooler is enabled and set the field accordingly, giving precedence to the cluster spec over class defaults. + if cluster.Spec.ConnectionPoolerEnabled != nil { + resultConfig.ConnectionPoolerEnabled = cluster.Spec.ConnectionPoolerEnabled + } else if classDefaults.ConnectionPoolerEnabled != nil { + resultConfig.ConnectionPoolerEnabled = classDefaults.ConnectionPoolerEnabled + } - return resultConfig + // Merge ConnectionPooler config: cluster spec takes precedence over class + if cluster.Spec.ConnectionPoolerConfig != nil { + resultConfig.ConnectionPoolerConfig = cluster.Spec.ConnectionPoolerConfig + } else if CNPGDefaults != nil && CNPGDefaults.ConnectionPooler != nil { + resultConfig.ConnectionPoolerConfig = CNPGDefaults.ConnectionPooler + } + + return resultConfig, nil } // buildCNPGClusterSpec builds the desired CNPG ClusterSpec. @@ -210,8 +318,8 @@ func (r *PostgresClusterReconciler) buildCNPGClusterSpec(mergedConfig *enterpris }, Bootstrap: &cnpgv1.BootstrapConfiguration{ InitDB: &cnpgv1.BootstrapInitDB{ - Database: "postgres", - Owner: "postgres", + Database: defaultDatabaseName, + Owner: defaultUsername, }, }, StorageConfiguration: cnpgv1.StorageConfiguration{ @@ -224,8 +332,10 @@ func (r *PostgresClusterReconciler) buildCNPGClusterSpec(mergedConfig *enterpris } // build CNPGCluster builds the CNPG Cluster object based on the PostgresCluster resource and merged configuration. -func (r *PostgresClusterReconciler) buildCNPGCluster(postgresCluster *enterprisev4.PostgresCluster, mergedConfig *enterprisev4.PostgresClusterSpec) *cnpgv1.Cluster { - +func (r *PostgresClusterReconciler) buildCNPGCluster( + postgresCluster *enterprisev4.PostgresCluster, + mergedConfig *enterprisev4.PostgresClusterSpec, +) *cnpgv1.Cluster { cnpgCluster := &cnpgv1.Cluster{ ObjectMeta: metav1.ObjectMeta{ Name: postgresCluster.Name, @@ -233,65 +343,364 @@ func (r *PostgresClusterReconciler) buildCNPGCluster(postgresCluster *enterprise }, Spec: r.buildCNPGClusterSpec(mergedConfig), } - ctrl.SetControllerReference(postgresCluster, cnpgCluster, r.Scheme) return cnpgCluster } -// syncStatus maps CNPG Cluster state to PostgresCluster object. -func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, err error) error { - // will use Patch as we did for main reconciliation loop. - latestPGCluster := client.MergeFrom(postgresCluster.DeepCopy()) +// poolerResourceName returns the CNPG Pooler resource name for a given cluster and type (rw/ro). +func poolerResourceName(clusterName, poolerType string) string { + return fmt.Sprintf("%s-pooler-%s", clusterName, poolerType) +} + +// createOrUpdateConnectionPooler creates or updates CNPG Pooler resources. +func (r *PostgresClusterReconciler) createOrUpdateConnectionPooler( + ctx context.Context, + postgresCluster *enterprisev4.PostgresCluster, + mergedConfig *enterprisev4.PostgresClusterSpec, + cnpgCluster *cnpgv1.Cluster, +) error { + // Create/Update RW Pooler + if err := r.ensureConnectionPooler(ctx, postgresCluster, mergedConfig, cnpgCluster, readWriteEndpoint); err != nil { + return fmt.Errorf("failed to reconcile RW pooler: %w", err) + } + + // Create/Update RO Pooler + if err := r.ensureConnectionPooler(ctx, postgresCluster, mergedConfig, cnpgCluster, readOnlyEndpoint); err != nil { + return fmt.Errorf("failed to reconcile RO pooler: %w", err) + } + + return nil +} + +// poolerExists checks if a pooler resource exists and returns it +func (r *PostgresClusterReconciler) poolerExists(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, poolerType string) bool { + pooler := &cnpgv1.Pooler{} + err := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, poolerType), + Namespace: postgresCluster.Namespace, + }, pooler) - // If there's an error, we set the status to Error and include the error message in the condition. + if apierrors.IsNotFound(err) { + return false + } if err != nil { - postgresCluster.Status.Phase = "Error" - r.setCondition(postgresCluster, metav1.ConditionFalse, "Error", fmt.Sprintf("Error during reconciliation: %v", err)) - // CNPG not existing, set status to Pending. Direct running `switch` without CNPG cluster in place will cause a panic, so we need to check for that first. - } else if cnpgCluster == nil { - postgresCluster.Status.Phase = "Pending" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterNotFound", "Underlying CNPG cluster object has not been created yet") - // Cluster exists, map the CNPG Cluster status to our PostgresCluster status. - } else { - switch cnpgCluster.Status.Phase { - case cnpgv1.PhaseHealthy: - postgresCluster.Status.Phase = "Ready" - r.setCondition(postgresCluster, metav1.ConditionTrue, "ClusterHealthy", "CNPG cluster is in healthy state") - case cnpgv1.PhaseUnrecoverable: - postgresCluster.Status.Phase = "Failed" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterCreationFailed", "CNPG cluster is in unrecoverable state") - case "": - postgresCluster.Status.Phase = "Pending" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterPending", "CNPG cluster is pending creation") - default: - postgresCluster.Status.Phase = "Provisioning" - r.setCondition(postgresCluster, metav1.ConditionFalse, "ClusterProvisioning", "CNPG cluster is being provisioned") + logs.FromContext(ctx).Error(err, "Failed to check pooler existence", "type", poolerType) + return false + } + return true +} + +// deleteConnectionPoolers removes RW and RO pooler resources if they exist. +func (r *PostgresClusterReconciler) deleteConnectionPoolers(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) error { + logger := logs.FromContext(ctx) + + for _, poolerType := range []string{readWriteEndpoint, readOnlyEndpoint} { + poolerName := poolerResourceName(postgresCluster.Name, poolerType) + exists := r.poolerExists(ctx, postgresCluster, poolerType) + if !exists { + continue + } + + pooler := &cnpgv1.Pooler{} + if err := r.Get(ctx, types.NamespacedName{ + Name: poolerName, + Namespace: postgresCluster.Namespace, + }, pooler); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return fmt.Errorf("failed to get pooler %s: %w", poolerName, err) } - // Set the reference to the CNPG Cluster in the status. - postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ - APIVersion: "postgresql.cnpg.io/v1", - Kind: "Cluster", - Namespace: cnpgCluster.Namespace, - Name: cnpgCluster.Name, - UID: cnpgCluster.UID, + + logger.Info("Deleting CNPG Pooler", "name", poolerName) + if err := r.Delete(ctx, pooler); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete pooler %s: %w", poolerName, err) } } + return nil +} - if patchErr := r.Status().Patch(ctx, postgresCluster, latestPGCluster); patchErr != nil { - return fmt.Errorf("failed to patch PostgresCluster status: %w", patchErr) +// ensureConnectionPooler creates a CNPG Pooler resource if it doesn't exist. +func (r *PostgresClusterReconciler) ensureConnectionPooler( + ctx context.Context, + postgresCluster *enterprisev4.PostgresCluster, + mergedConfig *enterprisev4.PostgresClusterSpec, + cnpgCluster *cnpgv1.Cluster, + poolerType string, +) error { + poolerName := poolerResourceName(postgresCluster.Name, poolerType) + + existingPooler := &cnpgv1.Pooler{} + err := r.Get(ctx, types.NamespacedName{ + Name: poolerName, + Namespace: postgresCluster.Namespace, + }, existingPooler) + + if apierrors.IsNotFound(err) { + logs.FromContext(ctx).Info("Creating CNPG Pooler", "name", poolerName, "type", poolerType) + r.updateStatus(ctx, postgresCluster, poolerReady, metav1.ConditionFalse, reasonPoolerCreating, fmt.Sprintf("Creating %s pooler", poolerType), pending) + pooler := r.buildCNPGPooler(postgresCluster, mergedConfig, cnpgCluster, poolerType) + return r.Create(ctx, pooler) } - return nil + + return err +} + +// buildCNPGPooler constructs a CNPG Pooler object. +func (r *PostgresClusterReconciler) buildCNPGPooler( + postgresCluster *enterprisev4.PostgresCluster, + mergedConfig *enterprisev4.PostgresClusterSpec, + cnpgCluster *cnpgv1.Cluster, + poolerType string, +) *cnpgv1.Pooler { + cfg := mergedConfig.ConnectionPoolerConfig + poolerName := poolerResourceName(postgresCluster.Name, poolerType) + + instances := *cfg.Instances + mode := cnpgv1.PgBouncerPoolMode(*cfg.Mode) + + pooler := &cnpgv1.Pooler{ + ObjectMeta: metav1.ObjectMeta{ + Name: poolerName, + Namespace: postgresCluster.Namespace, + }, + Spec: cnpgv1.PoolerSpec{ + Cluster: cnpgv1.LocalObjectReference{ + Name: cnpgCluster.Name, + }, + Instances: &instances, + Type: cnpgv1.PoolerType(poolerType), + PgBouncer: &cnpgv1.PgBouncerSpec{ + PoolMode: mode, + Parameters: cfg.Config, + }, + }, + } + + ctrl.SetControllerReference(postgresCluster, pooler, r.Scheme) + return pooler +} + +// syncStatus maps CNPG Cluster state to PostgresCluster object and handles pooler status. +func (r *PostgresClusterReconciler) syncStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) error { + + // 1. Set ProvisionerRef + postgresCluster.Status.ProvisionerRef = &corev1.ObjectReference{ + APIVersion: "postgresql.cnpg.io/v1", + Kind: "Cluster", + Namespace: cnpgCluster.Namespace, + Name: cnpgCluster.Name, + UID: cnpgCluster.UID, + } + + // Map CNPG Phase to PostgresCluster Phase/Conditions + var phase reconcilePhases + var conditionStatus metav1.ConditionStatus + var reason conditionReasons + var message string + + switch cnpgCluster.Status.Phase { + case cnpgv1.PhaseHealthy: + phase = ready + conditionStatus = metav1.ConditionTrue + reason = reasonCNPGClusterHealthy + message = "Cluster is up and running" + + case cnpgv1.PhaseFirstPrimary, + cnpgv1.PhaseCreatingReplica, + cnpgv1.PhaseWaitingForInstancesToBeActive: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonProvisioning + message = fmt.Sprintf("CNPG cluster provisioning: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseSwitchover: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGSwitchover + message = "Cluster changing primary node" + + case cnpgv1.PhaseFailOver: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGFailingOver + message = "Pod missing, need to change primary" + + case cnpgv1.PhaseInplacePrimaryRestart, + cnpgv1.PhaseInplaceDeletePrimaryRestart: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGRestarting + message = fmt.Sprintf("CNPG cluster restarting: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseUpgrade, + cnpgv1.PhaseMajorUpgrade, + cnpgv1.PhaseUpgradeDelayed, + cnpgv1.PhaseOnlineUpgrading: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGUpgrading + message = fmt.Sprintf("CNPG cluster upgrading: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseApplyingConfiguration: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGApplyingConfig + message = "Configuration change is being applied" + + case cnpgv1.PhaseReplicaClusterPromotion: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGPromoting + message = "Replica is being promoted to primary" + + case cnpgv1.PhaseWaitingForUser: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGWaitingForUser + message = "Action from the user is required" + + case cnpgv1.PhaseUnrecoverable: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGUnrecoverable + message = "Cluster failed, needs manual intervention" + + case cnpgv1.PhaseCannotCreateClusterObjects: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGProvisioningFailed + message = "Cluster resources cannot be created" + + case cnpgv1.PhaseUnknownPlugin, + cnpgv1.PhaseFailurePlugin: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGPluginError + message = fmt.Sprintf("CNPG plugin error: %s", cnpgCluster.Status.Phase) + + case cnpgv1.PhaseImageCatalogError, + cnpgv1.PhaseArchitectureBinaryMissing: + phase = failed + conditionStatus = metav1.ConditionFalse + reason = reasonCNPGImageError + message = fmt.Sprintf("CNPG image error: %s", cnpgCluster.Status.Phase) + + case "": + phase = pending + conditionStatus = metav1.ConditionFalse + reason = reasonProvisioning + message = "CNPG cluster is pending creation" + + default: + phase = provisioning + conditionStatus = metav1.ConditionFalse + reason = reasonProvisioning + message = fmt.Sprintf("CNPG cluster phase: %s", cnpgCluster.Status.Phase) + } + + return r.updateStatus(ctx, postgresCluster, clusterReady, conditionStatus, reason, message, phase) } -// setCondition sets the condition of the PostgresCluster status. -func (r *PostgresClusterReconciler) setCondition(postgresCluster *enterprisev4.PostgresCluster, status metav1.ConditionStatus, reason, message string) { +// updateStatus sets the phase, condition and persists the status to Kubernetes. +func (r *PostgresClusterReconciler) updateStatus( + ctx context.Context, + postgresCluster *enterprisev4.PostgresCluster, + conditionType conditionTypes, + status metav1.ConditionStatus, + reason conditionReasons, + message string, + phase reconcilePhases, +) error { + postgresCluster.Status.Phase = string(phase) meta.SetStatusCondition(&postgresCluster.Status.Conditions, metav1.Condition{ - Type: "Ready", + Type: string(conditionType), Status: status, - Reason: reason, + Reason: string(reason), Message: message, ObservedGeneration: postgresCluster.Generation, }) + + if err := r.Status().Update(ctx, postgresCluster); err != nil { + return fmt.Errorf("failed to update PostgresCluster status: %w", err) + } + return nil +} + +// syncPoolerStatus populates ConnectionPoolerStatus and the PoolerReady condition. +// Called only when poolers are confirmed ready by the reconciler. +func (r *PostgresClusterReconciler) syncPoolerStatus(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) error { + rwPooler := &cnpgv1.Pooler{} + if rwErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), + Namespace: postgresCluster.Namespace, + }, rwPooler); rwErr != nil { + return rwErr + } + + roPooler := &cnpgv1.Pooler{} + if roErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), + Namespace: postgresCluster.Namespace, + }, roPooler); roErr != nil { + return roErr + } + + postgresCluster.Status.ConnectionPoolerStatus = &enterprisev4.ConnectionPoolerStatus{ + Enabled: true, + } + + rwDesired, rwScheduled := r.getPoolerInstanceCount(rwPooler) + roDesired, roScheduled := r.getPoolerInstanceCount(roPooler) + + r.updateStatus( + ctx, + postgresCluster, + poolerReady, + metav1.ConditionTrue, + reasonAllInstancesReady, + fmt.Sprintf("%s: %d/%d, %s: %d/%d", readWriteEndpoint, rwScheduled, rwDesired, readOnlyEndpoint, roScheduled, roDesired), + ready) + return nil +} + +// isPoolerReady checks if a pooler has all instances scheduled. +// Note: CNPG PoolerStatus only tracks scheduled instances, not ready pods. +func (r *PostgresClusterReconciler) isPoolerReady(pooler *cnpgv1.Pooler, err error) bool { + if err != nil { + return false + } + desiredInstances := int32(1) + if pooler.Spec.Instances != nil { + desiredInstances = *pooler.Spec.Instances + } + return pooler.Status.Instances >= desiredInstances +} + +// getPoolerInstanceCount returns the number of scheduled instances for a pooler. +func (r *PostgresClusterReconciler) getPoolerInstanceCount(pooler *cnpgv1.Pooler) (desired int32, scheduled int32) { + desired = int32(1) + if pooler.Spec.Instances != nil { + desired = *pooler.Spec.Instances + } + return desired, pooler.Status.Instances +} + +// arePoolersReady checks if both RW and RO poolers have all instances scheduled. +func (r *PostgresClusterReconciler) arePoolersReady(ctx context.Context, postgresCluster *enterprisev4.PostgresCluster) bool { + rwPooler := &cnpgv1.Pooler{} + rwErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), + Namespace: postgresCluster.Namespace, + }, rwPooler) + + roPooler := &cnpgv1.Pooler{} + roErr := r.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), + Namespace: postgresCluster.Namespace, + }, roPooler) + + return r.isPoolerReady(rwPooler, rwErr) && r.isPoolerReady(roPooler, roErr) } // reconcileManagedRoles synchronizes ManagedRoles from PostgresCluster spec to CNPG Cluster managed.roles using diff-based patching @@ -387,6 +796,7 @@ func (r *PostgresClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&enterprisev4.PostgresCluster{}). Owns(&cnpgv1.Cluster{}). + Owns(&cnpgv1.Pooler{}). Named("postgresCluster"). Complete(r) } diff --git a/internal/controller/postgresdatabase_controller.go b/internal/controller/postgresdatabase_controller.go index 6a4290a03..98461cfc8 100644 --- a/internal/controller/postgresdatabase_controller.go +++ b/internal/controller/postgresdatabase_controller.go @@ -39,43 +39,6 @@ import ( enterprisev4 "github.com/splunk/splunk-operator/api/v4" ) -type reconcilePhases string -type conditionTypes string -type conditionReasons string -type clusterReadyStatus string - -const ( - retryDelay = time.Second * 15 - // phases - ready reconcilePhases = "Ready" - pending reconcilePhases = "Pending" - provisioning reconcilePhases = "Provisioning" - failed reconcilePhases = "Failed" - - // Conditiontypes - clusterReady conditionTypes = "ClusterReady" - secretsReady conditionTypes = "SecretsReady" - usersReady conditionTypes = "UsersReady" - databasesReady conditionTypes = "DatabasesReady" - privilegesReady conditionTypes = "PrivilegesReady" - - // Condition reasons - reasonNotFound conditionReasons = "NotFound" - reasonProvisioning conditionReasons = "Provisioning" - reasonClusterInfoFetchFailed conditionReasons = "ClusterInfoFetchNotPossible" - reasonAvailable conditionReasons = "Available" - reasonConfiguring conditionReasons = "Configuring" - reasonWaitingForCNPG conditionReasons = "WaitingForCNPG" - reasonFailed conditionReasons = "Failed" - reasonCreating conditionReasons = "Creating" - - // Cluster status - ClusterNotFound clusterReadyStatus = "NotFound" - ClusterNotReady clusterReadyStatus = "NotReady" - ClusterNoProvisionerRef clusterReadyStatus = "NoProvisionerRef" - ClusterReady clusterReadyStatus = "Ready" -) - // PostgresDatabaseReconciler reconciles a PostgresDatabase object type PostgresDatabaseReconciler struct { client.Client diff --git a/internal/controller/postgresoperator_common_types.go b/internal/controller/postgresoperator_common_types.go new file mode 100644 index 000000000..b7ebd7b22 --- /dev/null +++ b/internal/controller/postgresoperator_common_types.go @@ -0,0 +1,96 @@ +package controller + +import ( + corev1 "k8s.io/api/core/v1" + "time" +) + +// This struct is used to compare the merged configuration from PostgresClusterClass and PostgresClusterSpec +// in a normalized way, and not to use CNPG-default values which are causing false positive diff state while reconciliation loop. +// It contains only the fields that are relevant for our reconciliation and that we want to compare when deciding whether to update the CNPG Cluster spec or not. +type normalizedCNPGClusterSpec struct { + ImageName string + Instances int + // Parameters we set, instead of complete spec from CNPG + CustomDefinedParameters map[string]string + PgHBA []string + DefaultDatabase string + Owner string + StorageSize string + Resources corev1.ResourceRequirements +} + +type reconcilePhases string +type conditionTypes string +type conditionReasons string +type clusterReadyStatus string + +const ( + // default requeue delay + retryDelay = time.Second * 15 + // cluster endpoint suffixes + readOnlyEndpoint string = "ro" + readWriteEndpoint string = "rw" + // default database name + defaultDatabaseName string = "postgres" + defaultUsername string = "postgres" + + // phases + ready reconcilePhases = "Ready" + pending reconcilePhases = "Pending" + provisioning reconcilePhases = "Provisioning" + failed reconcilePhases = "Failed" + + // Condition types + clusterReady conditionTypes = "ClusterReady" + poolerReady conditionTypes = "PoolerReady" + usersReady conditionTypes = "UsersReady" + databasesReady conditionTypes = "DatabasesReady" + // TODO - to use in the future implementation + // secretsReady conditionTypes = "SecretsReady" + // privilegesReady conditionTypes = "PrivilegesReady" + + // Condition reasons + reasonNotFound conditionReasons = "NotFound" + reasonProvisioning conditionReasons = "Provisioning" + reasonClusterInfoFetchFailed conditionReasons = "ClusterInfoFetchNotPossible" + reasonAvailable conditionReasons = "Available" + reasonWaitingForCNPG conditionReasons = "WaitingForCNPG" + reasonFailed conditionReasons = "Failed" + + // Additional condition reasons for clusterReady conditionType + reasonClusterClassNotFound conditionReasons = "ClusterClassNotFound" + reasonManagedRolesFailed conditionReasons = "ManagedRolesReconciliationFailed" + reasonClusterBuildFailed conditionReasons = "ClusterBuildFailed" + reasonClusterBuildSucceeded conditionReasons = "ClusterBuildSucceeded" + reasonClusterGetFailed conditionReasons = "ClusterGetFailed" + reasonClusterPatchFailed conditionReasons = "ClusterPatchFailed" + reasonInvalidConfiguration conditionReasons = "InvalidConfiguration" + + // Additional condition reasons for poolerReady conditionType + reasonPoolerReconciliationFailed conditionReasons = "PoolerReconciliationFailed" + reasonPoolerConfigMissing conditionReasons = "PoolerConfigMissing" + reasonPoolerCreating conditionReasons = "PoolerCreating" + reasonAllInstancesReady conditionReasons = "AllInstancesReady" + + // Additional condition reasons for mapping CNPG cluster statuses + reasonCNPGClusterNotHealthy conditionReasons = "CNPGClusterNotHealthy" + reasonCNPGClusterHealthy conditionReasons = "CNPGClusterHealthy" + reasonCNPGSwitchover conditionReasons = "CNPGSwitchover" + reasonCNPGFailingOver conditionReasons = "CNPGFailingOver" + reasonCNPGRestarting conditionReasons = "CNPGRestarting" + reasonCNPGUpgrading conditionReasons = "CNPGUpgrading" + reasonCNPGApplyingConfig conditionReasons = "CNPGApplyingConfiguration" + reasonCNPGPromoting conditionReasons = "CNPGPromoting" + reasonCNPGWaitingForUser conditionReasons = "CNPGWaitingForUser" + reasonCNPGUnrecoverable conditionReasons = "CNPGUnrecoverable" + reasonCNPGProvisioningFailed conditionReasons = "CNPGProvisioningFailed" + reasonCNPGPluginError conditionReasons = "CNPGPluginError" + reasonCNPGImageError conditionReasons = "CNPGImageError" + + // Cluster status + ClusterNotFound clusterReadyStatus = "NotFound" + ClusterNotReady clusterReadyStatus = "NotReady" + ClusterNoProvisionerRef clusterReadyStatus = "NoProvisionerRef" + ClusterReady clusterReadyStatus = "Ready" +)