Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/operator/encryption/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ func NewControllers(
encryptionSecretSelector,
eventRecorder,
),
controllers.NewEncryptionRotationController(
component,
provider,
deployer,
encryptionEnabledChecker.PreconditionFulfilled,
migrator,
operatorClient,
apiServerInformer,
kubeInformersForNamespaces,
secretsClient,
encryptionSecretSelector,
eventRecorder,
),
}, nil
}

Expand Down
293 changes: 293 additions & 0 deletions pkg/operator/encryption/controllers/encryption_rotation_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package controllers

import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/encryption/controllers/migrators"
"github.com/openshift/library-go/pkg/operator/encryption/encryptionstatus"
"github.com/openshift/library-go/pkg/operator/encryption/secrets"
"github.com/openshift/library-go/pkg/operator/encryption/statemachine"
"github.com/openshift/library-go/pkg/operator/events"
operatorv1helpers "github.com/openshift/library-go/pkg/operator/v1helpers"
)

const (
encryptionRotationConvergenceDelay = 5 * time.Minute
)

// encryptionRotationController orchestrates KMS KEK rotation by resetting migration state on the
// encryption key secret and recording rotation progress on operator status.
type encryptionRotationController struct {
instanceName string
controllerInstanceName string
operatorClient operatorv1helpers.OperatorClient
encryptionSecretSelector metav1.ListOptions
secretClient corev1client.SecretsGetter
deployer statemachine.Deployer
migrator migrators.Migrator
provider Provider
preconditionsFulfilledFn preconditionsFulfilled
}

func NewEncryptionRotationController(
instanceName string,
provider Provider,
deployer statemachine.Deployer,
preconditionsFulfilledFn preconditionsFulfilled,
migrator migrators.Migrator,
operatorClient operatorv1helpers.OperatorClient,
apiServerConfigInformer configv1informers.APIServerInformer,
kubeInformersForNamespaces operatorv1helpers.KubeInformersForNamespaces,
secretClient corev1client.SecretsGetter,
encryptionSecretSelector metav1.ListOptions,
eventRecorder events.Recorder,
) factory.Controller {
c := &encryptionRotationController{
instanceName: instanceName,
controllerInstanceName: factory.ControllerInstanceName(instanceName, "EncryptionRotation"),
operatorClient: operatorClient,
encryptionSecretSelector: encryptionSecretSelector,
secretClient: secretClient,
deployer: deployer,
migrator: migrator,
provider: provider,
preconditionsFulfilledFn: preconditionsFulfilledFn,
}

return factory.New().ResyncEvery(time.Minute).WithSync(c.sync).WithControllerInstanceName(c.controllerInstanceName).WithInformers(
operatorClient.Informer(),
kubeInformersForNamespaces.InformersFor("openshift-config-managed").Core().V1().Secrets().Informer(),
apiServerConfigInformer.Informer(),
deployer,
).ToController(
c.controllerInstanceName,
eventRecorder.WithComponentSuffix("encryption-rotation-controller"),
)
}

func (c *encryptionRotationController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
if ready, err := shouldRunEncryptionController(c.operatorClient, c.preconditionsFulfilledFn, c.provider.ShouldRunEncryptionControllers); err != nil || !ready {
return err
}
return c.reconcile(ctx)
}

func (c *encryptionRotationController) reconcile(ctx context.Context) error {
currentConfig, _, encryptionSecrets, _, err := statemachine.GetEncryptionConfigAndState(
ctx, c.deployer, c.secretClient, c.encryptionSecretSelector, c.provider.EncryptedGRs(),
)
if err != nil {
return err
}

if currentConfig == nil || !currentConfig.UsesKMS() {
return nil
}

_, operatorStatus, _, err := c.operatorClient.GetOperatorState()
if err != nil {
return err
}

healthReports := encryptionstatus.HealthReportsFromOperatorStatus(operatorStatus)
rotations, err := encryptionstatus.KeyRotationStatusFromOperatorStatus(operatorStatus)
if err != nil {
return err
}

encryptedGRs := currentConfig.EncryptedGroupResources()
klog.Infof("%s: reconciling KMS key rotation (%d plugin(s), %d health report(s), %d rotation entr(ies))",
c.instanceName, len(currentConfig.KMSPlugins), len(healthReports), len(rotations))
for keyID := range currentConfig.KMSPlugins {
writeKeySecret := secrets.FindKeySecret(encryptionSecrets, c.instanceName, keyID)
if writeKeySecret == nil {
klog.Infof("%s: no write key secret for KMS plugin keyID %q, skipping", c.instanceName, keyID)
continue
}

rotations, err = c.reconcileKMSPlugin(
ctx, keyID, encryptedGRs, writeKeySecret, healthReports, rotations,
)
if err != nil {
return err
}
}

if err := c.persistRotations(ctx, rotations); err != nil {
return err
}

return nil
}

func (c *encryptionRotationController) reconcileKMSPlugin(
ctx context.Context,
keyID string,
encryptedGRs []schema.GroupResource,
writeKeySecret *corev1.Secret,
healthReports []encryptionstatus.KMSPluginHealthReport,
rotations []encryptionstatus.KMSPluginRotationStatus,
) ([]encryptionstatus.KMSPluginRotationStatus, error) {
// Check KEK convergence across all nodes for this keyID.
convergedKEKID, converged := encryptionstatus.ConvergedKEKForKeyID(healthReports, keyID)
if !converged {
klog.Infof("%s: KEK not yet converged for keyID %q, waiting for all nodes to report the same kekID", c.instanceName, keyID)
return rotations, nil
}
klog.Infof("%s: KEK converged for keyID %q: kekID=%q", c.instanceName, keyID, convergedKEKID)

// If the converged kekID matches the last completed rotation, we are in steady state.
lastCompleted, hasCompleted := encryptionstatus.LatestCompletedRotationForKeyID(rotations, keyID)
if hasCompleted && convergedKEKID == lastCompleted.KEKID {
klog.V(4).Infof("%s: converged kekID %q matches last completed rotation for keyID %q, nothing to do", c.instanceName, convergedKEKID, keyID)
return rotations, nil
}

// Ensure an open rotation entry exists with discoveryTime for the converged kekID.
now := metav1.Now()
rotations, openIdx := encryptionstatus.GetOrCreateOpenRotation(rotations, keyID, convergedKEKID, now)
klog.Infof("%s: tracking open rotation for keyID=%q kekID=%q (discoveryTime=%s)",
c.instanceName, keyID, convergedKEKID, rotations[openIdx].DiscoveryTime.Format(time.RFC3339))

// Mirror migration finish time from the write key secret annotations when all
// encrypted group resources have been migrated by the migration controller.
migrated, err := encryptionstatus.AllEncryptedGRsMigrated(writeKeySecret, encryptedGRs)
if err != nil {
return rotations, err
}
rotations = mirrorMigrationFinish(c.instanceName, rotations, openIdx, migrated, writeKeySecret)

// Bootstrap: if no prior completed rotation exists, this is the initial provider
// migration driven by the migration controller. We only track convergence and mirror
// the finish time — never prune migrations or clear annotations.
if !hasCompleted {
klog.Infof("%s: bootstrap for keyID %q — no prior completed rotation, waiting for initial migration to complete", c.instanceName, keyID)
return rotations, nil
}

// From here on a KEK change was detected (convergedKEKID != lastCompleted.KEKID).
// Check guards before starting a storage re-migration.
entry := rotations[openIdx]

if entry.MigrationStartTime != nil {
klog.Infof("%s: rotation for kekID %q already started at %s, waiting for migration to complete",
c.instanceName, convergedKEKID, entry.MigrationStartTime.Format(time.RFC3339))
return rotations, nil
}

if entry.DiscoveryTime != nil && time.Since(entry.DiscoveryTime.Time) < encryptionRotationConvergenceDelay {
remaining := encryptionRotationConvergenceDelay - time.Since(entry.DiscoveryTime.Time)
klog.Infof("%s: rotation for kekID %q waiting for convergence delay (%s remaining)",
c.instanceName, convergedKEKID, remaining.Round(time.Second))
return rotations, nil
}

// All guards passed — start the rotation by pruning existing storage version
// migrations and clearing migration annotations on the write key secret so the
// migration controller picks up the work again.
klog.Infof("%s: starting storage re-migration for keyID=%q: kekID changed from %q to %q",
c.instanceName, keyID, lastCompleted.KEKID, convergedKEKID)
if err := c.startRotation(ctx, encryptedGRs, writeKeySecret); err != nil {
return rotations, err
}

rotations = encryptionstatus.SetMigrationStartTime(rotations, openIdx, now)
klog.Infof("%s: rotation migration started for kekID %q at %s", c.instanceName, convergedKEKID, now.Format(time.RFC3339))
return rotations, nil
}

func (c *encryptionRotationController) startRotation(ctx context.Context, encryptedGRs []schema.GroupResource, secret *corev1.Secret) error {
for _, gr := range encryptedGRs {
klog.Infof("%s: pruning storage migration for %s", c.instanceName, gr)
if err := c.migrator.PruneMigration(gr); err != nil {
return err
}
}
klog.Infof("%s: clearing migration annotations on secret %s/%s", c.instanceName, secret.Namespace, secret.Name)
return c.clearMigrationAnnotations(ctx, secret)
}

func mirrorMigrationFinish(
instanceName string,
rotations []encryptionstatus.KMSPluginRotationStatus,
idx int,
migrated bool,
secret *corev1.Secret,
) []encryptionstatus.KMSPluginRotationStatus {
if !migrated || idx < 0 || idx >= len(rotations) || rotations[idx].MigrationFinishTime != nil {
return rotations
}
finish, ok := migrationFinishTimeFromSecret(secret)
if !ok {
return rotations
}
entry := rotations[idx]
klog.Infof("%s: mirroring migrationFinishTime for keyID %q kekID %q from secret %s/%s at %s",
instanceName, entry.KeyID, entry.KEKID, secret.Namespace, secret.Name, finish.Format(time.RFC3339))
return encryptionstatus.SetMigrationFinishTime(rotations, idx, finish)
}

func migrationFinishTimeFromSecret(secret *corev1.Secret) (metav1.Time, bool) {
if secret == nil || secret.Annotations == nil {
return metav1.Time{}, false
}
raw, ok := secret.Annotations[secrets.EncryptionSecretMigratedTimestamp]
if !ok || raw == "" {
return metav1.Time{}, false
}
ts, err := time.Parse(time.RFC3339, raw)
if err != nil {
klog.Warningf("ignoring invalid %s annotation on secret %s/%s: %v",
secrets.EncryptionSecretMigratedTimestamp, secret.Namespace, secret.Name, err)
return metav1.Time{}, false
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return metav1.NewTime(ts), true
}

func (c *encryptionRotationController) clearMigrationAnnotations(ctx context.Context, secret *corev1.Secret) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
current, err := c.secretClient.Secrets(secret.Namespace).Get(ctx, secret.Name, metav1.GetOptions{})
if err != nil {
return err
}
if current.Annotations == nil {
return nil
}
changed := false
if _, ok := current.Annotations[secrets.EncryptionSecretMigratedTimestamp]; ok {
delete(current.Annotations, secrets.EncryptionSecretMigratedTimestamp)
changed = true
}
if _, ok := current.Annotations[secrets.EncryptionSecretMigratedResources]; ok {
delete(current.Annotations, secrets.EncryptionSecretMigratedResources)
changed = true
}
if !changed {
return nil
}
_, err = c.secretClient.Secrets(current.Namespace).Update(ctx, current, metav1.UpdateOptions{})
return err
})
}

func (c *encryptionRotationController) persistRotations(ctx context.Context, rotations []encryptionstatus.KMSPluginRotationStatus) error {
_, updated, err := operatorv1helpers.UpdateStatus(ctx, c.operatorClient, encryptionstatus.SetKeyRotationStatusCondition(rotations))
if err != nil {
return err
}
if updated {
klog.Infof("%s: updated key rotation status (%d entries)", c.instanceName, len(rotations))
}
return nil
}
19 changes: 19 additions & 0 deletions pkg/operator/encryption/encryptiondata/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ func (c *Config) HasEncryptionConfiguration() bool {
return c != nil && c.Encryption != nil
}

// UsesKMS returns whether the deployed encryption configuration includes KMS plugins.
func (c *Config) UsesKMS() bool {
return c != nil && len(c.KMSPlugins) > 0
}

// EncryptedGroupResources returns group resources from the deployed encryption configuration.
func (c *Config) EncryptedGroupResources() []schema.GroupResource {
if !c.HasEncryptionConfiguration() {
return nil
}
grs := make([]schema.GroupResource, 0, len(c.Encryption.Resources))
for _, resourceConfig := range c.Encryption.Resources {
for _, resource := range resourceConfig.Resources {
grs = append(grs, schema.ParseGroupResource(resource))
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return grs
}

// FromEncryptionState converts encryption state to Config.
func FromEncryptionState(encryptionState map[schema.GroupResource]state.GroupResourceState) (*Config, error) {
resourceConfigs := make([]apiserverconfigv1.ResourceConfiguration, 0, len(encryptionState))
Expand Down
50 changes: 50 additions & 0 deletions pkg/operator/encryption/encryptionstatus/convergence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package encryptionstatus

import (
"strings"
)

// KEKByKeyID groups observed kekIds per plugin keyId across health reports.
// Only healthy reports with non-empty keyId and kekId are included.
func KEKByKeyID(reports []KMSPluginHealthReport) map[string][]string {
result := map[string][]string{}
for _, report := range reports {
if !isHealthyReport(report) {
continue
}
result[report.KeyID] = append(result[report.KeyID], report.KEKID)
}
return result
Comment on lines +9 to +17
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Filter empty KeyID/KEKID to match documented contract.

The comment on line 8 states that only reports with non-empty keyId and kekId are included, but the code does not enforce this. Although ConvergedKEKForKeyID handles empty values downstream, filtering here would match the documented contract and simplify caller logic.

Proposed fix
 func KEKByKeyID(reports []KMSPluginHealthReport) map[string][]string {
 	result := map[string][]string{}
 	for _, report := range reports {
 		if !isHealthyReport(report) {
 			continue
 		}
+		if report.KeyID == "" || report.KEKID == "" {
+			continue
+		}
 		result[report.KeyID] = append(result[report.KeyID], report.KEKID)
 	}
 	return result
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/operator/encryption/encryptionstatus/convergence.go` around lines 9 - 17,
KEKByKeyID currently only checks isHealthyReport but does not skip reports with
empty KeyID or KEKID as the contract requires; update KEKByKeyID to additionally
guard that report.KeyID != "" and report.KEKID != "" before appending to result
(keep the existing isHealthyReport(report) check), so only reports with
non-empty keyId and kekId are included and downstream callers simplify.

}

// ConvergedKEKForKeyID returns the unanimous kekId for keyID when every healthy report for that keyId agrees.
func ConvergedKEKForKeyID(reports []KMSPluginHealthReport, keyID string) (kekID string, ok bool) {
if keyID == "" {
return "", false
}

byKeyID := KEKByKeyID(reports)
kekIDs, found := byKeyID[keyID]
if !found || len(kekIDs) == 0 {
return "", false
}

uniq := map[string]struct{}{}
for _, id := range kekIDs {
if id == "" {
return "", false
}
uniq[id] = struct{}{}
}
if len(uniq) != 1 {
return "", false
}
for id := range uniq {
return id, true
}
return "", false
}

func isHealthyReport(report KMSPluginHealthReport) bool {
return strings.EqualFold(report.Status, "healthy")
}
Loading