Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindPFlag("server-address", cmd.Flags().Lookup("server-address"))

_ = viper.BindEnv("sidecar-image", "SIDECAR_IMAGE")
_ = viper.BindEnv("custom-cnpg-group", "CUSTOM_CNPG_GROUP")
_ = viper.BindEnv("custom-cnpg-version", "CUSTOM_CNPG_VERSION")

return cmd
}
2 changes: 2 additions & 0 deletions internal/cmd/restore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("custom-cnpg-group", "CUSTOM_CNPG_GROUP")
_ = viper.BindEnv("custom-cnpg-version", "CUSTOM_CNPG_VERSION")

return cmd
}
24 changes: 2 additions & 22 deletions internal/cnpgi/instance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ import (
"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/scheme"

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
extendedclient "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/instance/internal/client"
pluginscheme "github.com/cloudnative-pg/plugin-barman-cloud/internal/scheme"
)

// Start starts the sidecar informers and CNPG-i server
Expand Down Expand Up @@ -127,26 +126,7 @@ func generateScheme(ctx context.Context) *runtime.Scheme {

utilruntime.Must(barmancloudv1.AddToScheme(result))
utilruntime.Must(clientgoscheme.AddToScheme(result))

cnpgGroup := viper.GetString("custom-cnpg-group")
cnpgVersion := viper.GetString("custom-cnpg-version")
if len(cnpgGroup) == 0 {
cnpgGroup = cnpgv1.SchemeGroupVersion.Group
}
if len(cnpgVersion) == 0 {
cnpgVersion = cnpgv1.SchemeGroupVersion.Version
}

// Proceed with custom registration of the CNPG scheme
schemeGroupVersion := schema.GroupVersion{Group: cnpgGroup, Version: cnpgVersion}
schemeBuilder := &scheme.Builder{GroupVersion: schemeGroupVersion}
schemeBuilder.Register(&cnpgv1.Cluster{}, &cnpgv1.ClusterList{})
schemeBuilder.Register(&cnpgv1.Backup{}, &cnpgv1.BackupList{})
schemeBuilder.Register(&cnpgv1.ScheduledBackup{}, &cnpgv1.ScheduledBackupList{})
utilruntime.Must(schemeBuilder.AddToScheme(result))

schemeLog := log.FromContext(ctx)
schemeLog.Info("CNPG types registration", "schemeGroupVersion", schemeGroupVersion)
pluginscheme.AddCNPGToScheme(ctx, result)

return result
}
4 changes: 4 additions & 0 deletions internal/cnpgi/metadata/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import "github.com/cloudnative-pg/cnpg-i/pkg/identity"
const PluginName = "barman-cloud.cloudnative-pg.io"

const (
// ClusterLabelName is the label applied to RBAC resources created
// by this plugin. Its value is the name of the owning Cluster.
ClusterLabelName = "barmancloud.cnpg.io/cluster"

// CheckEmptyWalArchiveFile is the name of the file in the PGDATA that,
// if present, requires the WAL archiver to check that the backup object
// store is empty.
Expand Down
19 changes: 13 additions & 6 deletions internal/cnpgi/operator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"crypto/tls"

// +kubebuilder:scaffold:imports
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/machinery/pkg/log"
"github.com/spf13/viper"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,25 +37,33 @@ import (

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/controller"
pluginscheme "github.com/cloudnative-pg/plugin-barman-cloud/internal/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

var scheme = runtime.NewScheme()
// generateScheme creates a runtime.Scheme with all type definitions
// needed by the operator. CNPG types are registered under a
// configurable API group to support custom CNPG-based operators.
func generateScheme(ctx context.Context) *runtime.Scheme {
result := runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(barmancloudv1.AddToScheme(scheme))
utilruntime.Must(cnpgv1.AddToScheme(scheme))
utilruntime.Must(clientgoscheme.AddToScheme(result))
utilruntime.Must(barmancloudv1.AddToScheme(result))
pluginscheme.AddCNPGToScheme(ctx, result)
// +kubebuilder:scaffold:scheme

return result
}

// Start starts the manager
func Start(ctx context.Context) error {
setupLog := log.FromContext(ctx)

scheme := generateScheme(ctx)

var tlsOpts []func(*tls.Config)

// if the enable-http2 flag is false (the default), http/2 should be disabled
Expand Down
58 changes: 0 additions & 58 deletions internal/cnpgi/operator/ownership.go

This file was deleted.

177 changes: 177 additions & 0 deletions internal/cnpgi/operator/rbac/ensure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
Copyright © contributors to CloudNativePG, established as
CloudNativePG a Series of LF Projects, LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

SPDX-License-Identifier: Apache-2.0
*/

// Package rbac contains utilities to reconcile RBAC resources
// for the barman-cloud plugin.
package rbac

import (
"context"

cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/machinery/pkg/log"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/operator/specs"
)

// EnsureRole ensures the RBAC Role for the given Cluster matches
// the desired state derived from the given ObjectStores. On creation,
// the Cluster is set as the owner of the Role for garbage collection.
//
// This function is called from the Pre hook (gRPC). It creates the
// Role if it does not exist, then patches rules and labels to match
// the desired state.
func EnsureRole(
ctx context.Context,
c client.Client,
cluster *cnpgv1.Cluster,
barmanObjects []barmancloudv1.ObjectStore,
) error {
newRole := specs.BuildRole(cluster, barmanObjects)
roleKey := client.ObjectKeyFromObject(newRole)

if err := ensureRoleExists(ctx, c, cluster, newRole); err != nil {
return err
}

return patchRole(ctx, c, roleKey, newRole.Rules, map[string]string{
metadata.ClusterLabelName: cluster.Name,
})
}

// EnsureRoleRules updates the rules of an existing Role to match
// the desired state derived from the given ObjectStores. Unlike
// EnsureRole, this function does not create Roles or set owner
// references — it only patches rules on Roles that already exist.
// It is intended for the ObjectStore controller path where no
// Cluster object is available. Returns nil if the Role does not
// exist (the Pre hook has not created it yet).
func EnsureRoleRules(
ctx context.Context,
c client.Client,
roleKey client.ObjectKey,
barmanObjects []barmancloudv1.ObjectStore,
) error {
err := patchRole(ctx, c, roleKey, specs.BuildRoleRules(barmanObjects), nil)
if apierrs.IsNotFound(err) {
log.FromContext(ctx).Debug("Role not found, skipping rule update",
"name", roleKey.Name, "namespace", roleKey.Namespace)
return nil
}

return err
}

// ensureRoleExists creates the Role if it does not exist. Returns
// nil on success and nil on AlreadyExists (another writer created
// it concurrently). The caller always follows up with patchRole.
func ensureRoleExists(
ctx context.Context,
c client.Client,
cluster *cnpgv1.Cluster,
newRole *rbacv1.Role,
) error {
contextLogger := log.FromContext(ctx)

var existing rbacv1.Role
err := c.Get(ctx, client.ObjectKeyFromObject(newRole), &existing)
if err == nil {
return nil
}
if !apierrs.IsNotFound(err) {
return err
}

if err := controllerutil.SetControllerReference(cluster, newRole, c.Scheme()); err != nil {
return err
}

contextLogger.Info("Creating role",
"name", newRole.Name, "namespace", newRole.Namespace)

createErr := c.Create(ctx, newRole)
if createErr == nil || apierrs.IsAlreadyExists(createErr) {
return nil
}

return createErr
}

// patchRole patches the Role's rules and optionally its labels to
// match the desired state. When desiredLabels is nil, labels are
// not modified. Uses retry.RetryOnConflict for concurrent
// modification handling.
func patchRole(
ctx context.Context,
c client.Client,
roleKey client.ObjectKey,
desiredRules []rbacv1.PolicyRule,
desiredLabels map[string]string,
) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var role rbacv1.Role
if err := c.Get(ctx, roleKey, &role); err != nil {
return err
}

rulesMatch := equality.Semantic.DeepEqual(desiredRules, role.Rules)
labelsMatch := desiredLabels == nil || !labelsNeedUpdate(role.Labels, desiredLabels)

if rulesMatch && labelsMatch {
return nil
}

contextLogger := log.FromContext(ctx)
contextLogger.Info("Patching role",
"name", role.Name, "namespace", role.Namespace)

oldRole := role.DeepCopy()
role.Rules = desiredRules

if desiredLabels != nil {
if role.Labels == nil {
role.Labels = make(map[string]string, len(desiredLabels))
}
for k, v := range desiredLabels {
role.Labels[k] = v
}
}

return c.Patch(ctx, &role, client.MergeFrom(oldRole))
})
}

// labelsNeedUpdate returns true if any key in desired is missing
// or has a different value in existing.
func labelsNeedUpdate(existing, desired map[string]string) bool {
for k, v := range desired {
if existing[k] != v {
return true
}
}
return false
}
Loading
Loading