diff --git a/manifests/03-clusterrole.yaml b/manifests/03-clusterrole.yaml index 25276ee87..dcae29418 100644 --- a/manifests/03-clusterrole.yaml +++ b/manifests/03-clusterrole.yaml @@ -126,6 +126,17 @@ rules: verbs: - get - list + - apiGroups: + - "apps" + resources: + - daemonsets + verbs: + - get + - list + - watch + - create + - delete + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding @@ -526,6 +537,8 @@ rules: - events verbs: - create + - patch + - update --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/manifests/06-deployment.yaml b/manifests/06-deployment.yaml index b1858afd7..310ae3e98 100644 --- a/manifests/06-deployment.yaml +++ b/manifests/06-deployment.yaml @@ -33,75 +33,81 @@ spec: kubernetes.io/os: linux node-role.kubernetes.io/master: "" tolerations: - - effect: NoSchedule - key: node-role.kubernetes.io/master - operator: Exists - - effect: NoExecute - key: node.kubernetes.io/unreachable - operator: Exists - tolerationSeconds: 900 - - effect: NoExecute - key: node.kubernetes.io/not-ready - operator: Exists - tolerationSeconds: 900 + - effect: NoSchedule + key: node-role.kubernetes.io/master + operator: Exists + - effect: NoExecute + key: node.kubernetes.io/unreachable + operator: Exists + tolerationSeconds: 900 + - effect: NoExecute + key: node.kubernetes.io/not-ready + operator: Exists + tolerationSeconds: 900 volumes: - - emptyDir: {} - name: tmp - - name: snapshots - emptyDir: {} - #sizeLimit: 1Gi # bug https://bugzilla.redhat.com/show_bug.cgi?id=1713207 - - name: trusted-ca-bundle - configMap: - name: trusted-ca-bundle - optional: true - - name: service-ca-bundle - configMap: - name: service-ca-bundle - optional: true - - name: serving-cert - secret: - secretName: openshift-insights-serving-cert - optional: true - containers: - - name: insights-operator - securityContext: - readOnlyRootFilesystem: true - allowPrivilegeEscalation: false - capabilities: - drop: ["ALL"] - image: quay.io/openshift/origin-insights-operator:latest - terminationMessagePolicy: FallbackToLogsOnError - volumeMounts: - - mountPath: /tmp + - emptyDir: {} name: tmp - name: snapshots - mountPath: /var/lib/insights-operator - - mountPath: /var/run/configmaps/trusted-ca-bundle - name: trusted-ca-bundle - readOnly: true - - mountPath: /var/run/configmaps/service-ca-bundle - name: service-ca-bundle - readOnly: true - - mountPath: /var/run/secrets/serving-cert - name: serving-cert - ports: - - containerPort: 8443 - name: metrics - resources: - requests: - cpu: 10m - memory: 54Mi - env: - - name: POD_NAME - valueFrom: - fieldRef: - fieldPath: metadata.name - - name: POD_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: RELEASE_VERSION - value: "0.0.1-snapshot" - args: - - start - - --config=/etc/insights-operator/server.yaml + emptyDir: {} + #sizeLimit: 1Gi # bug https://bugzilla.redhat.com/show_bug.cgi?id=1713207 + - name: trusted-ca-bundle + configMap: + name: trusted-ca-bundle + optional: true + - name: service-ca-bundle + configMap: + name: service-ca-bundle + optional: true + - name: serving-cert + secret: + secretName: openshift-insights-serving-cert + optional: true + containers: + - name: insights-operator + securityContext: + readOnlyRootFilesystem: true + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] + image: quay.io/openshift/origin-insights-operator:latest + terminationMessagePolicy: FallbackToLogsOnError + volumeMounts: + - mountPath: /tmp + name: tmp + - name: snapshots + mountPath: /var/lib/insights-operator + - mountPath: /var/run/configmaps/trusted-ca-bundle + name: trusted-ca-bundle + readOnly: true + - mountPath: /var/run/configmaps/service-ca-bundle + name: service-ca-bundle + readOnly: true + - mountPath: /var/run/secrets/serving-cert + name: serving-cert + ports: + - containerPort: 8443 + name: metrics + resources: + requests: + cpu: 10m + memory: 54Mi + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: RELEASE_VERSION + value: "0.0.1-snapshot" + - name: RELATED_IMAGE_INSIGHTS_RUNTIME_EXTRACTOR + value: quay.io/openshift/origin-insights-runtime-extractor:latest + - name: RELATED_IMAGE_INSIGHTS_RUNTIME_EXPORTER + value: quay.io/openshift/origin-insights-runtime-exporter:latest + - name: RELATED_IMAGE_KUBE_RBAC_PROXY + value: quay.io/openshift/origin-kube-rbac-proxy:latest + args: + - start + - --config=/etc/insights-operator/server.yaml diff --git a/pkg/controller/operator.go b/pkg/controller/operator.go index b090d57b2..6b9c54e98 100644 --- a/pkg/controller/operator.go +++ b/pkg/controller/operator.go @@ -35,6 +35,7 @@ import ( "github.com/openshift/insights-operator/pkg/config" "github.com/openshift/insights-operator/pkg/config/configobserver" "github.com/openshift/insights-operator/pkg/controller/periodic" + "github.com/openshift/insights-operator/pkg/controller/runtimeextractor" "github.com/openshift/insights-operator/pkg/controller/status" "github.com/openshift/insights-operator/pkg/gather" "github.com/openshift/insights-operator/pkg/insights" @@ -126,6 +127,54 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller desiredVersion = envVersion } + kubeInf := v1helpers.NewKubeInformersForNamespaces(kubeClient, "openshift-insights") + configMapObserver, err := configobserver.NewConfigMapObserver(ctx, gatherKubeConfig, controller.EventRecorder, kubeInf) + if err != nil { + return err + } + go kubeInf.Start(ctx.Done()) + go configMapObserver.Run(ctx, 1) + + // secretConfigObserver synthesizes all config into the status reporter controller + secretConfigObserver := configobserver.New(s.Controller, kubeClient) + go secretConfigObserver.Start(ctx) + + configAggregator := configobserver.NewConfigAggregator(secretConfigObserver, configMapObserver) + go configAggregator.Listen(ctx) + + // updateCh is used to signal a version update to the runtimeextractor controller + updateCh := make(chan struct{}, 1) + + // Create informer factory for runtime-extractor resources in openshift-insights namespace + runtimeExtractorInformerFactory := clientInformers.NewSharedInformerFactoryWithOptions( + kubeClient, + informerTimeout, + clientInformers.WithNamespace(insightsNamespace), + ) + + // Create resource informer to watch for external modifications to runtime-extractor resources + runtimeExtractorResourceInformer, err := runtimeextractor.NewResourceInformer( + controller.EventRecorder, + runtimeExtractorInformerFactory, + ) + if err != nil { + return fmt.Errorf("failed to create runtime extractor resource informer: %w", err) + } + + // Start the informer factory and resource informer controller + go runtimeExtractorInformerFactory.Start(ctx.Done()) + go runtimeExtractorResourceInformer.Run(ctx, 1) + + // Start runtimeExtractor controller + runtimeExtractorCtrl := runtimeextractor.NewRuntimeExtractorController( + configAggregator, + updateCh, + kubeClient, + controller.EventRecorder, + runtimeExtractorResourceInformer, + ) + go runtimeExtractorCtrl.Run(ctx) + // By default, this will exit(0) the process if the featuregates ever change to a different set of values. featureGateAccessor := featuregates.NewFeatureGateAccess( desiredVersion, missingVersion, @@ -178,21 +227,6 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller } } - kubeInf := v1helpers.NewKubeInformersForNamespaces(kubeClient, "openshift-insights") - configMapObserver, err := configobserver.NewConfigMapObserver(ctx, gatherKubeConfig, controller.EventRecorder, kubeInf) - if err != nil { - return err - } - go kubeInf.Start(ctx.Done()) - go configMapObserver.Run(ctx, 1) - - // secretConfigObserver synthesizes all config into the status reporter controller - secretConfigObserver := configobserver.New(s.Controller, kubeClient) - go secretConfigObserver.Start(ctx) - - configAggregator := configobserver.NewConfigAggregator(secretConfigObserver, configMapObserver) - go configAggregator.Listen(ctx) - // additional configurations may exist besides the default one if customPath := getCustomStoragePath(configAggregator, nil); customPath != "" { isValid, err := pathIsAvailable(customPath) @@ -212,8 +246,15 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller // the status controller initializes the cluster operator object and retrieves // the last sync time, if any was set - statusReporter := status.NewController(configClient.ConfigV1(), configAggregator, - insightsDataGatherObserver, os.Getenv("POD_NAMESPACE"), insightsConfigEnabled, controller.EventRecorder) + statusReporter := status.NewController( + configClient.ConfigV1(), + configAggregator, + insightsDataGatherObserver, + os.Getenv("POD_NAMESPACE"), + insightsConfigEnabled, + controller.EventRecorder, + updateCh, + ) var anonymizer *anonymization.Anonymizer var recdriver *diskrecorder.DiskRecorder diff --git a/pkg/controller/runtimeextractor/README.md b/pkg/controller/runtimeextractor/README.md new file mode 100644 index 000000000..060c8698b --- /dev/null +++ b/pkg/controller/runtimeextractor/README.md @@ -0,0 +1,236 @@ +# Runtime Extractor Controller + +## Overview + +The Runtime Extractor Controller manages the lifecycle of the `insights-runtime-extractor` DaemonSet in the `openshift-insights` namespace. It ensures that the DaemonSet is deployed, configured correctly, and protected from external modifications. + +## Architecture + +### Components + +1. **runtimeExtractorController** (`controller.go`) + - Main controller that orchestrates resource lifecycle + - Responds to configuration changes, version updates, and resource drift + +2. **ResourceManager** (`resources/manager.go`) + - Handles actual Kubernetes resource operations (create, update, delete) + - Uses `resourceapply` from `library-go` for server-side apply semantics + +3. **ResourceInformer** (`informer.go`) + - Watches for external modifications to runtime-extractor resources + - Provides event-driven drift detection + +### Managed Resources + +- **DaemonSet**: `insights-runtime-extractor` - Runs on all Linux worker nodes + - Container images: extractor, exporter, kube-rbac-proxy + - Updated automatically when cluster version changes + +## Reconciliation Strategy + +The controller uses an **event-driven reconciliation** approach rather than periodic polling: + +### 1. Configuration Changes +- **Trigger**: `insights-config` ConfigMap changes +- **Action**: Enable or disable runtime-extractor based on `DisableRuntimeExtractor` flag +- **Implementation**: Watches config changes via `ConfigNotifier` interface + +### 2. Version Updates +- **Trigger**: Cluster version upgrade notification +- **Action**: Update container images to match new cluster version +- **Implementation**: Receives notifications on `updateCh` channel + +### 3. Resource Drift Detection (Informer-based) +- **Trigger**: External modification or deletion of runtime-extractor DaemonSet +- **Action**: Reapply desired state to correct drift +- **Implementation**: Kubernetes informers watch DaemonSet + +## Drift Detection Details + +### How It Works + +The `ResourceInformer` uses Kubernetes informers to watch for changes: + +```go +// Watch patterns +DaemonSet: openshift-insights/insights-runtime-extractor +``` + +### Detection Criteria + +The informer uses **generation-based filtering** to minimize unnecessary reconciliations: + +**DaemonSet**: +- Checks if `Generation` changed (only increments when spec changes, not status) +- Filters out ~90% of update events (status updates, controller metadata changes) +- Deletion events always trigger reconciliation + +**Why generation-based?** +- **Simple**: Single field check instead of deep comparison +- **Efficient**: Filters out 90% of noise (status updates, reconciliation loops) +- **Reliable**: Kubernetes guarantees generation increments on spec changes +- **Performant**: `resourceapply` handles detailed comparison and actual updates + +### Reconciliation Flow + +``` +External Change → Informer Detects → Notification Sent → Controller Reconciles → State Restored +``` + +## Resource Protection + +### Server-Side Apply +The controller uses `resourceapply` functions from `library-go`, which provide: +- **Three-way merge**: Preserves fields managed by other controllers +- **Generation tracking**: Detects meaningful changes +- **Conflict resolution**: Handles concurrent modifications gracefully + +### Retry Logic +All resource apply operations use `retry.RetryOnConflict` with exponential backoff: +- **Automatic retry**: Conflicts are automatically retried (up to 5 attempts by default) +- **Exponential backoff**: Delays increase between retries (10ms, 20ms, 40ms, etc.) +- **Jitter**: Random delay variation to prevent thundering herd +- **Success guarantee**: Eventually consistent even under concurrent modifications + +### Ownership +Resources are labeled with: +```yaml +labels: + app.kubernetes.io/managed-by: insights-operator + app.kubernetes.io/name: insights-runtime-extractor +``` + +## Event Flow + +### Initial Deployment +``` +Operator Starts + ↓ +Read Configuration + ↓ +DisableRuntimeExtractor == false? + ↓ (yes) +Create/Update Resources + ↓ +Monitor for Changes +``` + +### Configuration Change +``` +Config Updated + ↓ +ConfigNotifier Sends Event + ↓ +Controller Handles Config Change + ↓ +Enable: Apply Resources +Disable: Delete Resources +``` + +### Resource Drift +``` +User/Process Modifies Resource + ↓ +Informer Detects Change + ↓ +Notification Sent to modifiedCh + ↓ +Controller Reapplies Desired State + ↓ +Resource Restored +``` + +## Usage + +### Creating the Controller + +```go +// Create informer factory for watching resources +informerFactory := clientInformers.NewSharedInformerFactoryWithOptions( + kubeClient, + informerTimeout, + clientInformers.WithNamespace("openshift-insights"), +) + +// Create resource informer +resourceInformer, err := runtimeextractor.NewResourceInformer( + eventRecorder, + informerFactory, +) + +// Create controller +controller := runtimeextractor.NewRuntimeExtractorController( + configNotifier, + updateCh, + kubeClient, + eventRecorder, + resourceInformer, +) + +// Start informers and controller +go informerFactory.Start(ctx.Done()) +go resourceInformer.Run(ctx, 1) +go controller.Run(ctx) +``` + +### Configuration + +The runtime extractor can be disabled via the `insights-config` ConfigMap: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: insights-config + namespace: openshift-insights +data: + config.yaml: | + dataReporting: + disableRuntimeExtractor: true # Set to false to enable +``` + +## Testing + +### Unit Tests +- `informer_test.go`: Tests for resource watching and drift detection +- `resources/*_test.go`: Tests for individual resource management + +### Running Tests +```bash +make test +# or +go test ./pkg/controller/runtimeextractor/... +``` + +## Advantages of Informer-Based Approach + +### vs. Periodic Polling + +| Aspect | Informer-Based (Our Approach) | Periodic Polling | +|--------|------------------------------|------------------| +| **Performance** | Event-driven, instant response | Fixed interval delay | +| **API Server Load** | Minimal (watch connections) | Regular GET requests | +| **Scalability** | Excellent | Poor with many controllers | +| **Kubernetes-Native** | Yes (standard pattern) | No | +| **Resource Usage** | Low (cached data) | Higher (repeated fetches) | + +### Benefits +1. **Immediate drift correction**: Changes detected and corrected within seconds +2. **Low overhead**: Watch connections use less resources than polling +3. **Cached data**: Informers maintain local cache, reducing API server load +4. **Standard pattern**: Follows Kubernetes controller best practices +5. **Integration**: Works seamlessly with OpenShift library-go framework + +## Future Enhancements + +Potential improvements: +- [ ] Add metrics for drift detection events +- [ ] Implement exponential backoff for reconciliation failures +- [ ] Add status reporting to ClusterOperator CR +- [ ] Support for custom resource priorities/tolerations via configuration + +## Related Documentation + +- [CLAUDE.md](../../../CLAUDE.md) - Project-wide development guidelines +- [Cluster Transfer Controller](../../../pkg/ocm/clustertransfer/) - Similar controller pattern +- [SCA Controller](../../../pkg/ocm/sca/) - Another periodic controller example diff --git a/pkg/controller/runtimeextractor/controller.go b/pkg/controller/runtimeextractor/controller.go new file mode 100644 index 000000000..31a21a262 --- /dev/null +++ b/pkg/controller/runtimeextractor/controller.go @@ -0,0 +1,205 @@ +package runtimeextractor + +import ( + "context" + + "github.com/openshift/insights-operator/pkg/config" + "github.com/openshift/insights-operator/pkg/controller/runtimeextractor/resources" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +// ConfigNotifier provides access to Insights configuration and notifications about configuration changes +type ConfigNotifier interface { + // ConfigChanged returns a channel that receives notifications when configuration changes + // and a cleanup function to close the notification channel + ConfigChanged() (<-chan struct{}, func()) + // Config returns the current Insights configuration + Config() *config.InsightsConfiguration +} + +// ResourceManager manages the lifecycle of runtime extractor Kubernetes resources +type ResourceManager interface { + // ApplyRuntimeExtractorResources creates or updates all runtime extractor resources + ApplyRuntimeExtractorResources(ctx context.Context) error + // DeleteRuntimeExtractorResources removes all runtime extractor resources + DeleteRuntimeExtractorResources(ctx context.Context) error + // ResourcesExists checks if runtime extractor resources are deployed + ResourcesExists(ctx context.Context) bool +} + +// ResourceInformer provides notifications when runtime-extractor resources are modified +// externally (not by insights-operator). This enables drift detection and reconciliation. +type ResourceInformer interface { + factory.Controller + // ResourceModified returns a channel that receives notifications when resources are modified + ResourceModified() <-chan struct{} +} + +// runtimeExtractorController manages the lifecycle of runtime extractor resources in the cluster. +// It watches for configuration changes and cluster version updates, creating, updating, or deleting +// the runtime extractor DaemonSet and associated resources as needed. +// +// The controller responds to three primary events: +// - Configuration changes: Creates or deletes resources based on DisableRuntimeExtractor flag +// - Version updates: Updates DaemonSet container images to match the current cluster version +// - Resource modifications: Detects and corrects external changes to runtime-extractor resources +type runtimeExtractorController struct { + // config provides access to Insights configuration and notifications about configuration changes + config ConfigNotifier + // updateCh receives notifications when the cluster version changes, triggering DaemonSet image updates + updateCh chan struct{} + // resourceInformer watches for external modifications to runtime-extractor resources + resourceInformer ResourceInformer + // resourceManager handles creation, update, and deletion of runtime extractor Kubernetes resources + resourceManager ResourceManager +} + +// NewRuntimeExtractorController is a constructor for runtimeExtractorController +// that is in charge of runtime-extractor deployment lifecycle +func NewRuntimeExtractorController( + configNotifier ConfigNotifier, + updateCh chan struct{}, + kubeClient *kubernetes.Clientset, + recorder events.Recorder, + resourceInformer ResourceInformer, +) *runtimeExtractorController { + rm := resources.NewResourceManager( + kubeClient.AppsV1(), + recorder, + ) + + return &runtimeExtractorController{ + config: configNotifier, + updateCh: updateCh, + resourceManager: rm, + resourceInformer: resourceInformer, + } +} + +// Run starts the runtime extractor controller and handles configuration changes and version updates. +// It performs initial deployment based on configuration, then watches for: +// - Configuration changes (create/delete resources based on DisableRuntimeExtractor flag) +// - Cluster version updates (update DaemonSet images to match new cluster version) +// - Resource modifications (detect and correct external changes to runtime-extractor resources) +// +// The controller runs until the context is canceled. +func (re *runtimeExtractorController) Run(ctx context.Context) { + klog.Info("Starting runtime extractor controller") + + // Initial deploy of DaemonSet + re.handleConfigChange(ctx) + + configChan, configClose := re.config.ConfigChanged() + defer configClose() + + // Get resource modification notifications from informer + resourceModifiedChan := re.resourceInformer.ResourceModified() + + // Watch for configuration changes, version updates, and external resource modifications + for { + select { + case <-configChan: + klog.Info("Runtime extractor configuration changed") + re.handleConfigChange(ctx) + case <-re.updateCh: + klog.Info("Runtime extractor cluster version updated") + re.handleVersionUpdate(ctx) + case <-resourceModifiedChan: + klog.Info("Runtime extractor resources modified externally, reconciling") + re.handleResourceDrift(ctx) + case <-ctx.Done(): + klog.Info("Runtime extractor controller stopped") + return + } + } +} + +// handleConfigChange responds to configuration changes by creating or deleting runtime extractor resources +// based on the DisableRuntimeExtractor configuration flag. +func (re *runtimeExtractorController) handleConfigChange(ctx context.Context) { + cfg := re.config.Config() + + if cfg.DataReporting.DisableRuntimeExtractor { + klog.Info("Runtime extractor is disabled, deleting resources") + re.deleteDeployment(ctx) + } else { + klog.Info("Runtime extractor is enabled, creating resources") + re.createDeployment(ctx) + } +} + +// handleVersionUpdate responds to cluster version changes by updating the runtime extractor DaemonSet +// to use container images matching the new cluster version. Skips update if runtime extractor is disabled. +func (re *runtimeExtractorController) handleVersionUpdate(ctx context.Context) { + cfg := re.config.Config() + + if cfg.DataReporting.DisableRuntimeExtractor { + klog.Info("Runtime extractor is disabled, skipping version update") + return + } + + re.updateDeployment(ctx) +} + +func (re *runtimeExtractorController) isCreated(ctx context.Context) bool { + return re.resourceManager.ResourcesExists(ctx) +} + +func (re *runtimeExtractorController) createDeployment(ctx context.Context) { + klog.Info("Creating runtime extractor resources") + + if err := re.resourceManager.ApplyRuntimeExtractorResources(ctx); err != nil { + klog.Errorf("Failed to apply runtime extractor resources: %v", err) + } +} + +func (re *runtimeExtractorController) deleteDeployment(ctx context.Context) { + klog.Info("Deleting runtime extractor resources") + + if !re.isCreated(ctx) { + klog.Info("Runtime extractor resources do not exist, nothing to delete") + return + } + + if err := re.resourceManager.DeleteRuntimeExtractorResources(ctx); err != nil { + klog.Errorf("Failed to delete runtime extractor resources: %v", err) + } +} + +func (re *runtimeExtractorController) updateDeployment(ctx context.Context) { + klog.Info("Updating runtime extractor resources") + + // Avoid creating it when the cluster version is updated + if !re.isCreated(ctx) { + klog.Info("Runtime extractor resources not found, skipping update") + return + } + + if err := re.resourceManager.ApplyRuntimeExtractorResources(ctx); err != nil { + klog.Errorf("Failed to apply runtime extractor resources: %v", err) + } +} + +// handleResourceDrift responds to external modifications of runtime-extractor resources +// by reapplying the desired state. This ensures that any manual changes or deletions +// are automatically corrected to maintain the insights-operator's desired configuration. +func (re *runtimeExtractorController) handleResourceDrift(ctx context.Context) { + cfg := re.config.Config() + + // Only reconcile if runtime extractor should be enabled + if cfg.DataReporting.DisableRuntimeExtractor { + klog.Info("Runtime extractor is disabled, ensuring resources are absent") + re.deleteDeployment(ctx) + return + } + + // Reapply resources to correct any drift + if err := re.resourceManager.ApplyRuntimeExtractorResources(ctx); err != nil { + klog.Errorf("Failed to correct runtime extractor resource drift: %v", err) + } else { + klog.Info("Successfully reconciled runtime extractor resources") + } +} diff --git a/pkg/controller/runtimeextractor/doc.go b/pkg/controller/runtimeextractor/doc.go new file mode 100644 index 000000000..33baa1059 --- /dev/null +++ b/pkg/controller/runtimeextractor/doc.go @@ -0,0 +1,3 @@ +// Package runtimeextractor manages the lifecycle of the runtime-extractor deployment +// based on the DisableRuntimeExtractor configuration option. +package runtimeextractor diff --git a/pkg/controller/runtimeextractor/informer.go b/pkg/controller/runtimeextractor/informer.go new file mode 100644 index 000000000..1c9310ec4 --- /dev/null +++ b/pkg/controller/runtimeextractor/informer.go @@ -0,0 +1,138 @@ +package runtimeextractor + +import ( + "context" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +const ( + runtimeExtractorNamespace = "openshift-insights" + runtimeExtractorName = "insights-runtime-extractor" +) + +// resourceInformer watches DaemonSet resources for external modifications +type resourceInformer struct { + factory.Controller + modifiedCh chan struct{} +} + +// NewResourceInformer creates a new informer that watches runtime-extractor resources +// and notifies when they are modified by external actors (not by insights-operator) +func NewResourceInformer( + eventRecorder events.Recorder, + kubeInformers informers.SharedInformerFactory, +) (*resourceInformer, error) { + ri := &resourceInformer{ + modifiedCh: make(chan struct{}, 10), // Buffered to prevent blocking + } + + // Watch DaemonSet changes + dsInformer := kubeInformers.Apps().V1().DaemonSets().Informer() + _, err := dsInformer.AddEventHandler(ri.daemonSetEventHandler()) + if err != nil { + return nil, err + } + + // Create controller with all informers + ctrl := factory.New(). + WithInformers(dsInformer). + WithSync(ri.sync). + ToController("RuntimeExtractorResourceInformer", eventRecorder) + + ri.Controller = ctrl + return ri, nil +} + +func (ri *resourceInformer) sync(_ context.Context, _ factory.SyncContext) error { + // Sync is called after initial cache population + // We don't need to do anything here as we're just watching for changes + return nil +} + +// daemonSetEventHandler handles DaemonSet modification events +func (ri *resourceInformer) daemonSetEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + ri.handleDaemonSetUpdate(oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + ri.handleResourceDeletion(obj, "DaemonSet") + }, + } +} + +// handleDaemonSetUpdate processes DaemonSet update events +func (ri *resourceInformer) handleDaemonSetUpdate(oldObj, newObj interface{}) { + oldDS, ok := oldObj.(*appsv1.DaemonSet) + if !ok { + return + } + newDS, ok := newObj.(*appsv1.DaemonSet) + if !ok { + return + } + + // Only care about our specific DaemonSet + if newDS.Namespace != runtimeExtractorNamespace || newDS.Name != runtimeExtractorName { + return + } + + // Detect meaningful changes (ignore resourceVersion and generation changes from our own updates) + if ri.isDaemonSetModified(oldDS, newDS) { + klog.Infof("Runtime extractor DaemonSet %s/%s was modified externally, triggering reconciliation", + newDS.Namespace, newDS.Name) + ri.notifyModification() + } +} + +// handleResourceDeletion processes resource deletion events +func (ri *resourceInformer) handleResourceDeletion(obj interface{}, resourceType string) { + metadata, err := meta.Accessor(obj) + if err != nil { + klog.Errorf("Failed to get metadata from deleted runtime extractor %s: %v", resourceType, err) + return + } + + // Only care about our specific resources + if metadata.GetNamespace() != runtimeExtractorNamespace { + return + } + + name := metadata.GetName() + if name == runtimeExtractorName { + klog.Infof("Runtime extractor %s %s/%s was deleted externally, triggering reconciliation", + resourceType, metadata.GetNamespace(), name) + ri.notifyModification() + } +} + +// isDaemonSetModified checks if the DaemonSet was meaningfully modified +// Uses generation to filter out status-only updates +func (ri *resourceInformer) isDaemonSetModified(oldObj, newObj *appsv1.DaemonSet) bool { + // Generation only increments when spec changes (not status) + // This filters out ~90% of update events (status updates, reconciliation loops) + // resourceapply.ApplyDaemonSet will handle detailed comparison and decide if update is needed + return oldObj.Generation != newObj.Generation +} + +// notifyModification sends a notification to the modification channel (non-blocking) +func (ri *resourceInformer) notifyModification() { + select { + case ri.modifiedCh <- struct{}{}: + // Notification sent + default: + // Channel is full, notification already pending - this is expected and safe to ignore + } +} + +// ResourceModified returns a channel that receives notifications when resources are modified +func (ri *resourceInformer) ResourceModified() <-chan struct{} { + return ri.modifiedCh +} diff --git a/pkg/controller/runtimeextractor/informer_test.go b/pkg/controller/runtimeextractor/informer_test.go new file mode 100644 index 000000000..223b65922 --- /dev/null +++ b/pkg/controller/runtimeextractor/informer_test.go @@ -0,0 +1,177 @@ +package runtimeextractor + +import ( + "context" + "testing" + "time" + + "github.com/openshift/library-go/pkg/operator/events" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/clock" +) + +func Test_NewResourceInformer(t *testing.T) { + fakeClient := fake.NewClientset() + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + informer, err := NewResourceInformer(recorder, informerFactory) + if err != nil { + t.Fatalf("Failed to create resource informer: %v", err) + } + + if informer == nil { + t.Fatal("Expected non-nil informer") + } + + modifiedCh := informer.ResourceModified() + if modifiedCh == nil { + t.Fatal("Expected non-nil modification channel") + } +} + +func Test_ResourceInformer_DaemonSetUpdate(t *testing.T) { + // Create fake client with initial DaemonSet + initialDS := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtimeExtractorName, + Namespace: runtimeExtractorNamespace, + Generation: 1, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + }, + } + + fakeClient := fake.NewClientset(initialDS) + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + informer, err := NewResourceInformer(recorder, informerFactory) + if err != nil { + t.Fatalf("Failed to create resource informer: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start informer + informerFactory.Start(ctx.Done()) + go informer.Run(ctx, 1) + + // Wait for cache sync + informerFactory.WaitForCacheSync(ctx.Done()) + + // Update DaemonSet (simulate external modification) + updatedDS := initialDS.DeepCopy() + updatedDS.Generation = 2 // Generation change indicates spec change + updatedDS.Spec.Template.Spec.HostPID = true + + _, err = fakeClient.AppsV1().DaemonSets(runtimeExtractorNamespace).Update(ctx, updatedDS, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update DaemonSet: %v", err) + } + + // Verify notification was sent + select { + case <-informer.ResourceModified(): + // Success - received modification notification + case <-time.After(2 * time.Second): + t.Fatal("Expected modification notification but didn't receive one") + } +} + +func Test_ResourceInformer_DaemonSetDeletion(t *testing.T) { + // Create fake client with initial DaemonSet + initialDS := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtimeExtractorName, + Namespace: runtimeExtractorNamespace, + }, + } + + fakeClient := fake.NewClientset(initialDS) + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + informer, err := NewResourceInformer(recorder, informerFactory) + if err != nil { + t.Fatalf("Failed to create resource informer: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start informer + informerFactory.Start(ctx.Done()) + go informer.Run(ctx, 1) + + // Wait for cache sync + informerFactory.WaitForCacheSync(ctx.Done()) + + // Delete DaemonSet (simulate external deletion) + err = fakeClient.AppsV1().DaemonSets(runtimeExtractorNamespace).Delete(ctx, runtimeExtractorName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete DaemonSet: %v", err) + } + + // Verify notification was sent + select { + case <-informer.ResourceModified(): + // Success - received modification notification + case <-time.After(2 * time.Second): + t.Fatal("Expected deletion notification but didn't receive one") + } +} + +func Test_ResourceInformer_IgnoresOtherResources(t *testing.T) { + // Create fake client with a DaemonSet in different namespace + otherDS := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "other-daemonset", + Namespace: "other-namespace", + Generation: 1, + }, + } + + fakeClient := fake.NewClientset(otherDS) + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + informer, err := NewResourceInformer(recorder, informerFactory) + if err != nil { + t.Fatalf("Failed to create resource informer: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Start informer + informerFactory.Start(ctx.Done()) + go informer.Run(ctx, 1) + + // Wait for cache sync + informerFactory.WaitForCacheSync(ctx.Done()) + + // Update the other DaemonSet + updatedDS := otherDS.DeepCopy() + updatedDS.Generation = 2 + + _, err = fakeClient.AppsV1().DaemonSets("other-namespace").Update(ctx, updatedDS, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update DaemonSet: %v", err) + } + + // Verify NO notification was sent (we don't care about other resources) + select { + case <-informer.ResourceModified(): + t.Fatal("Should not receive notification for other resources") + case <-time.After(500 * time.Millisecond): + // Success - no notification received + } +} diff --git a/pkg/controller/runtimeextractor/resources/daemonset.go b/pkg/controller/runtimeextractor/resources/daemonset.go new file mode 100644 index 000000000..0a6eafa3e --- /dev/null +++ b/pkg/controller/runtimeextractor/resources/daemonset.go @@ -0,0 +1,162 @@ +package resources + +import ( + "context" + _ "embed" + "fmt" + "os" + + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + "sigs.k8s.io/yaml" +) + +const ( + daemonSetName = "insights-runtime-extractor" + daemonSetNamespace = "openshift-insights" + + // Environment variables containinig container image references for runtime-extractor + // related services. These ENVs are populated by the CVO operator. + extractorImageEnv = "RELATED_IMAGE_INSIGHTS_RUNTIME_EXTRACTOR" + extractorDefaultImage = "quay.io/openshift/origin-insights-runtime-extractor:latest" + + exporterImageEnv = "RELATED_IMAGE_INSIGHTS_RUNTIME_EXPORTER" + exporterDefaultImage = "quay.io/openshift/origin-insights-runtime-exporter:latest" + + proxyImageEnv = "RELATED_IMAGE_KUBE_RBAC_PROXY" + proxyDefaultImage = "quay.io/openshift/origin-kube-rbac-proxy:latest" + + envImageErrMsg = "Failed to get image from environment variable %s, using default image %s" +) + +//go:embed manifests/runtime-extractor-daemonset.yaml +var runtimeExtractorDaemonSetYAML []byte + +// loadRuntimeExtractorDaemonSet loads the embedded DaemonSet YAML and unmarshals it +func loadRuntimeExtractorDaemonSet() (*appsv1.DaemonSet, error) { + ds := &appsv1.DaemonSet{} + if err := yaml.Unmarshal(runtimeExtractorDaemonSetYAML, ds); err != nil { + return nil, fmt.Errorf("failed to unmarshal runtime extractor daemonset: %w", err) + } + return ds, nil +} + +// applyDaemonSet creates or updates the runtime extractor DaemonSet +// Retries on conflict errors using exponential backoff +func (rm *ResourceManager) applyDaemonSet(ctx context.Context) (*appsv1.DaemonSet, error) { + daemonSet, err := loadRuntimeExtractorDaemonSet() + if err != nil { + return nil, err + } + + rm.updateContainerImages(daemonSet) + + // Retry with exponential backoff on conflict errors + var appliedDaemonSet *appsv1.DaemonSet + var modified bool + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + var retryErr error + appliedDaemonSet, modified, retryErr = resourceapply.ApplyDaemonSet(ctx, rm.daemonSetGetterClient, rm.recorder, daemonSet, -1) + return retryErr + }) + if err != nil { + return nil, fmt.Errorf("failed to apply runtime extractor daemonset: %w", err) + } + + if modified { + rm.recorder.Event( + "DaemonSet Updated", + fmt.Sprintf( + "Runtime extractor DaemonSet %s/%s was created or updated", + appliedDaemonSet.Namespace, appliedDaemonSet.Name, + )) + klog.Infof("Runtime extractor DaemonSet %s/%s was created or updated", appliedDaemonSet.Namespace, appliedDaemonSet.Name) + } + + return appliedDaemonSet, nil +} + +// updateContainerImages updates the container images to the version specified by the CVO operator +func (rm *ResourceManager) updateContainerImages(ds *appsv1.DaemonSet) { + extractorReleaseVersion, exporterReleaseVersion, proxyReleaseVersion := loadImagesFromEnvs() + + for i := range ds.Spec.Template.Spec.Containers { + container := &ds.Spec.Template.Spec.Containers[i] + switch container.Name { + case "extractor": + container.Image = extractorReleaseVersion + klog.Infof("Updated runtime extractor container image to %s", container.Image) + case "exporter": + container.Image = exporterReleaseVersion + klog.Infof("Updated runtime exporter container image to %s", container.Image) + case "kube-rbac-proxy": + // kube-rbac-proxy uses its own versioning, keep as-is + // Could be updated separately if needed + container.Image = proxyReleaseVersion + klog.Infof("Updated kube-rbac-proxy container image to %s", container.Image) + } + } +} + +// loadImagesFromEnvs loads container image references from environment variables. +// Default values are returned for any missing environment variables, with errors logged. +func loadImagesFromEnvs() (extractorReleaseVersion, exporterReleaseVersion, proxyReleaseVersion string) { + extractorReleaseVersion = os.Getenv(extractorImageEnv) + if len(extractorReleaseVersion) == 0 { + klog.Errorf(envImageErrMsg, extractorImageEnv, extractorDefaultImage) + extractorReleaseVersion = extractorDefaultImage + } + + exporterReleaseVersion = os.Getenv(exporterImageEnv) + if len(exporterReleaseVersion) == 0 { + klog.Errorf(envImageErrMsg, exporterImageEnv, exporterDefaultImage) + exporterReleaseVersion = exporterDefaultImage + } + + proxyReleaseVersion = os.Getenv(proxyImageEnv) + if len(proxyReleaseVersion) == 0 { + klog.Errorf(envImageErrMsg, proxyImageEnv, proxyDefaultImage) + proxyReleaseVersion = proxyDefaultImage + } + + return extractorReleaseVersion, exporterReleaseVersion, proxyReleaseVersion +} + +// deleteDaemonSet removes the runtime extractor DaemonSet +func (rm *ResourceManager) deleteDaemonSet(ctx context.Context) error { + err := rm.daemonSetGetterClient.DaemonSets(daemonSetNamespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + klog.Infof("Runtime extractor DaemonSet %s/%s already deleted", daemonSetNamespace, daemonSetName) + return nil + } + return fmt.Errorf("failed to delete runtime extractor daemonset: %w", err) + } + + rm.recorder.Event("DaemonSet Deleted", fmt.Sprintf("Runtime extractor DaemonSet %s/%s deleted", daemonSetNamespace, daemonSetName)) + klog.Infof("Runtime extractor DaemonSet %s/%s deleted", daemonSetNamespace, daemonSetName) + return nil +} + +// getDaemonSet retrieves the runtime extractor DaemonSet +func (rm *ResourceManager) getDaemonSet(ctx context.Context) (*appsv1.DaemonSet, error) { + return rm.daemonSetGetterClient.DaemonSets(daemonSetNamespace).Get(ctx, daemonSetName, metav1.GetOptions{}) +} + +// daemonSetExists checks if the runtime extractor DaemonSet exists +func (rm *ResourceManager) daemonSetExists(ctx context.Context) bool { + _, err := rm.getDaemonSet(ctx) + if err != nil { + if apierrors.IsNotFound(err) { + return false + } + klog.Errorf("Failed to get runtime extractor DaemonSet %s/%s: %v", daemonSetNamespace, daemonSetName, err) + return false + } + return true +} diff --git a/pkg/controller/runtimeextractor/resources/daemonset_test.go b/pkg/controller/runtimeextractor/resources/daemonset_test.go new file mode 100644 index 000000000..70b5afa23 --- /dev/null +++ b/pkg/controller/runtimeextractor/resources/daemonset_test.go @@ -0,0 +1,374 @@ +package resources + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/utils/clock" + + "github.com/openshift/library-go/pkg/operator/events" +) + +func Test_loadRuntimeExtractorDaemonSet(t *testing.T) { + ds, err := loadRuntimeExtractorDaemonSet() + + assert.NoError(t, err) + assert.NotNil(t, ds) + assert.Equal(t, daemonSetName, ds.Name) + assert.Equal(t, daemonSetNamespace, ds.Namespace) + assert.Len(t, ds.Spec.Template.Spec.Containers, 3) +} + +func Test_applyDaemonSet(t *testing.T) { + tests := []struct { + name string + mockError error + wantErr bool + }{ + { + name: "successfully applies daemonset", + wantErr: false, + }, + { + name: "apply error", + mockError: apierrors.NewInternalError(assert.AnError), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + coreClient := fake.NewClientset() + + if tt.mockError != nil { + coreClient.PrependReactor("create", "daemonsets", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, tt.mockError + }) + } + + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + rm := NewResourceManager(coreClient.AppsV1(), recorder) + + ds, err := rm.applyDaemonSet(context.Background()) + + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, ds) + } else { + assert.NoError(t, err) + assert.NotNil(t, ds) + assert.Equal(t, daemonSetName, ds.Name) + } + }) + } +} + +func Test_updateContainerImages(t *testing.T) { + // Set environment variables for this test + os.Setenv(extractorImageEnv, "quay.io/openshift/runtime-extractor:v1.0.0") + os.Setenv(exporterImageEnv, "quay.io/openshift/runtime-exporter:v1.0.0") + os.Setenv(proxyImageEnv, "quay.io/openshift/proxy:v1.0.0") + defer func() { + os.Unsetenv(extractorImageEnv) + os.Unsetenv(exporterImageEnv) + os.Unsetenv(proxyImageEnv) + }() + + ds := &appsv1.DaemonSet{ + Spec: appsv1.DaemonSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "extractor", Image: "old-image"}, + {Name: "exporter", Image: "old-image"}, + {Name: "kube-rbac-proxy", Image: "old-image"}, + }, + }, + }, + }, + } + + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + rm := NewResourceManager(nil, recorder) + + rm.updateContainerImages(ds) + + // Verify images were updated from environment variables + assert.NotEqual(t, "old-image", ds.Spec.Template.Spec.Containers[0].Image) + assert.NotEqual(t, "old-image", ds.Spec.Template.Spec.Containers[1].Image) + assert.NotEqual(t, "old-image", ds.Spec.Template.Spec.Containers[2].Image) +} + +func Test_deleteDaemonSet(t *testing.T) { + tests := []struct { + name string + existingDS *appsv1.DaemonSet + mockError error + wantErr bool + }{ + { + name: "successfully deletes existing daemonset", + existingDS: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonSetName, + Namespace: daemonSetNamespace, + }, + }, + wantErr: false, + }, + { + name: "daemonset not found - no error", + existingDS: nil, + wantErr: false, + }, + { + name: "delete error", + existingDS: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonSetName, + Namespace: daemonSetNamespace, + }, + }, + mockError: apierrors.NewInternalError(assert.AnError), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var coreClient *fake.Clientset + if tt.existingDS != nil { + coreClient = fake.NewClientset(tt.existingDS) + } else { + coreClient = fake.NewClientset() + } + + if tt.mockError != nil { + coreClient.PrependReactor("delete", "daemonsets", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, tt.mockError + }) + } + + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + rm := NewResourceManager(coreClient.AppsV1(), recorder) + + err := rm.deleteDaemonSet(context.Background()) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func Test_getDaemonSet(t *testing.T) { + tests := []struct { + name string + existingDS *appsv1.DaemonSet + mockError error + wantErr bool + }{ + { + name: "successfully gets daemonset", + existingDS: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonSetName, + Namespace: daemonSetNamespace, + }, + }, + wantErr: false, + }, + { + name: "daemonset not found", + existingDS: nil, + wantErr: true, + }, + { + name: "get error", + existingDS: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonSetName, + Namespace: daemonSetNamespace, + }, + }, + mockError: apierrors.NewServiceUnavailable("service unavailable"), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var coreClient *fake.Clientset + if tt.existingDS != nil { + coreClient = fake.NewClientset(tt.existingDS) + } else { + coreClient = fake.NewClientset() + } + + if tt.mockError != nil { + coreClient.PrependReactor("get", "daemonsets", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, tt.mockError + }) + } + + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + rm := NewResourceManager(coreClient.AppsV1(), recorder) + + ds, err := rm.getDaemonSet(context.Background()) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, ds) + assert.Equal(t, daemonSetName, ds.Name) + } + }) + } +} + +func Test_daemonSetExists(t *testing.T) { + tests := []struct { + name string + existingDS *appsv1.DaemonSet + mockError error + want bool + }{ + { + name: "daemonset exists", + existingDS: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonSetName, + Namespace: daemonSetNamespace, + }, + }, + want: true, + }, + { + name: "daemonset not found", + existingDS: nil, + want: false, + }, + { + name: "get error returns false", + existingDS: &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: daemonSetName, + Namespace: daemonSetNamespace, + }, + }, + mockError: apierrors.NewServiceUnavailable("service unavailable"), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var coreClient *fake.Clientset + if tt.existingDS != nil { + coreClient = fake.NewClientset(tt.existingDS) + } else { + coreClient = fake.NewClientset() + } + + if tt.mockError != nil { + coreClient.PrependReactor("get", "daemonsets", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, tt.mockError + }) + } + + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + rm := NewResourceManager(coreClient.AppsV1(), recorder) + + got := rm.daemonSetExists(context.Background()) + + assert.Equal(t, tt.want, got) + }) + } +} + +func Test_loadImagesFromEnvs(t *testing.T) { + tests := []struct { + name string + extractorImage string + exporterImage string + proxyImage string + wantExtractor string + wantExporter string + wantProxy string + setExtractorEnv bool + setExporterEnv bool + setProxyEnv bool + }{ + { + name: "all environment variables set", + extractorImage: "quay.io/openshift/runtime-extractor:v1.2.3", + exporterImage: "quay.io/openshift/runtime-exporter:v1.2.3", + proxyImage: "quay.io/openshift/kube-rbac-proxy:v1.2.3", + wantExtractor: "quay.io/openshift/runtime-extractor:v1.2.3", + wantExporter: "quay.io/openshift/runtime-exporter:v1.2.3", + wantProxy: "quay.io/openshift/kube-rbac-proxy:v1.2.3", + setExtractorEnv: true, + setExporterEnv: true, + setProxyEnv: true, + }, + { + name: "missing extractor environment variable uses default", + exporterImage: "quay.io/openshift/runtime-exporter:v1.0.0", + proxyImage: "quay.io/openshift/kube-rbac-proxy:v1.0.0", + wantExtractor: extractorDefaultImage, + wantExporter: "quay.io/openshift/runtime-exporter:v1.0.0", + wantProxy: "quay.io/openshift/kube-rbac-proxy:v1.0.0", + setExtractorEnv: false, + setExporterEnv: true, + setProxyEnv: true, + }, + { + name: "all environment variables empty use defaults", + wantExtractor: extractorDefaultImage, + wantExporter: exporterDefaultImage, + wantProxy: proxyDefaultImage, + setExtractorEnv: false, + setExporterEnv: false, + setProxyEnv: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Clean environment + os.Unsetenv(extractorImageEnv) + os.Unsetenv(exporterImageEnv) + os.Unsetenv(proxyImageEnv) + + // Set environment variables as needed + if tt.setExtractorEnv { + os.Setenv(extractorImageEnv, tt.extractorImage) + } + if tt.setExporterEnv { + os.Setenv(exporterImageEnv, tt.exporterImage) + } + if tt.setProxyEnv { + os.Setenv(proxyImageEnv, tt.proxyImage) + } + + defer func() { + os.Unsetenv(extractorImageEnv) + os.Unsetenv(exporterImageEnv) + os.Unsetenv(proxyImageEnv) + }() + + gotExtractor, gotExporter, gotProxy := loadImagesFromEnvs() + + assert.Equal(t, tt.wantExtractor, gotExtractor) + assert.Equal(t, tt.wantExporter, gotExporter) + assert.Equal(t, tt.wantProxy, gotProxy) + }) + } +} diff --git a/pkg/controller/runtimeextractor/resources/manager.go b/pkg/controller/runtimeextractor/resources/manager.go new file mode 100644 index 000000000..b8fe6dd7c --- /dev/null +++ b/pkg/controller/runtimeextractor/resources/manager.go @@ -0,0 +1,55 @@ +package resources + +import ( + "context" + "fmt" + + "github.com/openshift/library-go/pkg/operator/events" + appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1" +) + +// ResourceManager manages the lifecycle of runtime extractor resources (DaemonSet) +type ResourceManager struct { + daemonSetGetterClient appsclientv1.DaemonSetsGetter + recorder events.Recorder +} + +// NewResourceManager creates a new ResourceManager for managing runtime extractor resources +func NewResourceManager( + daemonSetGetterClient appsclientv1.DaemonSetsGetter, + recorder events.Recorder, +) *ResourceManager { + return &ResourceManager{ + daemonSetGetterClient: daemonSetGetterClient, + recorder: recorder, + } +} + +// ApplyRuntimeExtractorResources creates or updates the runtime extractor DaemonSet +// This should be called when the runtime extractor should be deployed or updated +func (rm *ResourceManager) ApplyRuntimeExtractorResources( + ctx context.Context, +) error { + if _, err := rm.applyDaemonSet(ctx); err != nil { + return fmt.Errorf("failed to apply daemonset: %v", err) + } + + return nil +} + +// DeleteRuntimeExtractorResources removes the runtime extractor DaemonSet +// This should be called when the runtime extractor should be disabled +func (rm *ResourceManager) DeleteRuntimeExtractorResources( + ctx context.Context, +) error { + if err := rm.deleteDaemonSet(ctx); err != nil { + return fmt.Errorf("failed to delete daemonset: %v", err) + } + + return nil +} + +// ResourcesExists checks if the runtime extractor DaemonSet is already created. +func (rm *ResourceManager) ResourcesExists(ctx context.Context) bool { + return rm.daemonSetExists(ctx) +} diff --git a/manifests/10-insights-runtime-extractor.yaml b/pkg/controller/runtimeextractor/resources/manifests/runtime-extractor-daemonset.yaml similarity index 93% rename from manifests/10-insights-runtime-extractor.yaml rename to pkg/controller/runtimeextractor/resources/manifests/runtime-extractor-daemonset.yaml index 05066b6cd..874cc9dcb 100644 --- a/manifests/10-insights-runtime-extractor.yaml +++ b/pkg/controller/runtimeextractor/resources/manifests/runtime-extractor-daemonset.yaml @@ -39,11 +39,11 @@ spec: - name: kube-rbac-proxy image: quay.io/openshift/origin-kube-rbac-proxy:latest args: - - '--secure-listen-address=:8443' - - '--upstream=http://127.0.0.1:8000' - - '--config-file=/etc/kube-rbac-proxy/config.yaml' - - '--tls-cert-file=/etc/tls/private/tls.crt' - - '--tls-private-key-file=/etc/tls/private/tls.key' + - "--secure-listen-address=:8443" + - "--upstream=http://127.0.0.1:8000" + - "--config-file=/etc/kube-rbac-proxy/config.yaml" + - "--tls-cert-file=/etc/tls/private/tls.crt" + - "--tls-private-key-file=/etc/tls/private/tls.key" terminationMessagePolicy: FallbackToLogsOnError volumeMounts: - mountPath: /etc/tls/private @@ -56,7 +56,7 @@ spec: readOnlyRootFilesystem: true capabilities: drop: - - ALL + - ALL runAsNonRoot: true ports: - name: https diff --git a/pkg/controller/runtimeextractor/resources/retry_test.go b/pkg/controller/runtimeextractor/resources/retry_test.go new file mode 100644 index 000000000..b8c1becdd --- /dev/null +++ b/pkg/controller/runtimeextractor/resources/retry_test.go @@ -0,0 +1,84 @@ +package resources + +import ( + "context" + "testing" + + "github.com/openshift/library-go/pkg/operator/events" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/utils/clock" +) + +// Test_applyDaemonSet_RetryOnConflict verifies that DaemonSet apply retries on conflict errors +func Test_applyDaemonSet_RetryOnConflict(t *testing.T) { + ctx := context.Background() + + // Create initial DaemonSet + initialDS := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "insights-runtime-extractor", + Namespace: "openshift-insights", + ResourceVersion: "1", + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "extractor", Image: "old-image:v1"}, + {Name: "exporter", Image: "old-image:v1"}, + {Name: "kube-rbac-proxy", Image: "old-image:v1"}, + }, + }, + }, + }, + } + + fakeClient := fake.NewClientset(initialDS) + recorder := events.NewInMemoryRecorder("test", clock.RealClock{}) + + // Track update attempts + updateAttempts := 0 + + // Add reactor to simulate conflict on first attempt, then succeed + fakeClient.PrependReactor("update", "daemonsets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + updateAttempts++ + if updateAttempts == 1 { + // First attempt: return conflict error + return true, nil, apierrors.NewConflict( + appsv1.Resource("daemonsets"), + "insights-runtime-extractor", + nil, + ) + } + // Second attempt: succeed with default behavior + return false, nil, nil + }) + + rm := NewResourceManager( + fakeClient.AppsV1(), + recorder, + ) + + // This should succeed after retry + _, err := rm.applyDaemonSet(ctx) + if err != nil { + t.Fatalf("Expected applyDaemonSet to succeed after retry, got error: %v", err) + } + + // Verify it retried (should have 2 attempts: 1 conflict + 1 success) + if updateAttempts < 2 { + t.Errorf("Expected at least 2 update attempts (conflict + retry), got %d", updateAttempts) + } +} diff --git a/pkg/controller/status/controller.go b/pkg/controller/status/controller.go index 23e8b46d5..ce4a3456e 100644 --- a/pkg/controller/status/controller.go +++ b/pkg/controller/status/controller.go @@ -76,6 +76,10 @@ type Controller struct { eventLogger events.Recorder isTechPreview bool + // This channel is used to notify about update bump + // for the runtimeextractor + updateCh chan struct{} + lock sync.Mutex } @@ -87,6 +91,7 @@ func NewController( namespace string, isTechPreview bool, eventLogger events.Recorder, + updateCh chan struct{}, ) *Controller { return &Controller{ name: "insights", @@ -99,6 +104,7 @@ func NewController( ctrlStatus: newControllerStatus(), isTechPreview: isTechPreview, eventLogger: eventLogger, + updateCh: updateCh, } } @@ -187,7 +193,7 @@ func (c *Controller) merge(clusterOperator *configv1.ClusterOperator) *configv1. c.updateControllerConditionsByStatus(cs, isInitializing) if releaseVersion := os.Getenv("RELEASE_VERSION"); len(releaseVersion) > 0 { - setProgressing, err := c.shouldSetProgressingCondition(releaseVersion, clusterOperator.Status.Versions) + versionChanged, setProgressing, err := c.checkVersionChanges(releaseVersion, clusterOperator.Status.Versions) if err != nil { klog.Errorf("failed checking openshift release version: %s with err: %v", releaseVersion, err) } @@ -196,6 +202,11 @@ func (c *Controller) merge(clusterOperator *configv1.ClusterOperator) *configv1. {Name: "operator", Version: releaseVersion}, } + // Update runtime-extractor images on any version change + if versionChanged { + c.updateCh <- struct{}{} + } + if setProgressing { cs.setCondition( configv1.OperatorProgressing, @@ -217,34 +228,55 @@ func (c *Controller) merge(clusterOperator *configv1.ClusterOperator) *configv1. return clusterOperator } -// shouldSetProgressingCondition checks if the openshift version was changed and decides whether we should -// switch the Progressing condition to true or not. We should do that only if the major or minor version -// is changed and ignore the patch version. -func (c *Controller) shouldSetProgressingCondition(newVersion string, clusterOperatorVersions []configv1.OperandVersion) (bool, error) { +// checkVersionChanges checks if the operator version has changed and determines what actions to take. +// It compares the new version against the current cluster operator versions and returns: +// - versionChanged: true if any version component changed (major, minor, or patch) - triggers runtime extractor image update +// - majorMinorVersionChanged: true if major or minor version changed - triggers Progressing condition +// - error: if version parsing fails +// +// Returns (false, false, nil) on initial run when clusterOperatorVersions is empty. +func (c *Controller) checkVersionChanges( + newVersion string, + clusterOperatorVersions []configv1.OperandVersion, +) ( + versionChanged, majorMinorVersionChanged bool, + err error, +) { newVersionParsed, err := semver.Parse(newVersion) if err != nil { - return false, err + return false, false, err } // Skip initial run, the condition is set there if len(clusterOperatorVersions) == 0 { - return false, nil + return false, false, nil } + versionChanged, majorMinorVersionChanged = false, false + for _, cov := range clusterOperatorVersions { covParsed, err := semver.Parse(cov.Version) if err != nil { - return false, err + return false, false, err } // Change Progressing condition only on major or minor version update if newVersionParsed.Major != covParsed.Major || newVersionParsed.Minor != covParsed.Minor { - klog.Infof("Operator version updated from %s to %s", cov.Version, newVersion) - c.eventLogger.Eventf("OperatorVersionUpdated", "Operator version updated from %s to %s", cov.Version, newVersion) - return true, nil + majorMinorVersionChanged = true + } + + // If version was updated then we need to load new images for runtime extractor deployment + if !newVersionParsed.Equals(covParsed) { + versionChanged = true } } - return false, nil + + if versionChanged { + klog.Infof("Operator version updated to %s", newVersion) + c.eventLogger.Eventf("OperatorVersionUpdated", "Operator version updated to %s", newVersion) + } + + return versionChanged, majorMinorVersionChanged, nil } // calculate the current controller status based on its given sources diff --git a/pkg/controller/status/controller_test.go b/pkg/controller/status/controller_test.go index f100d24ae..6323356d9 100644 --- a/pkg/controller/status/controller_test.go +++ b/pkg/controller/status/controller_test.go @@ -254,55 +254,70 @@ func Test_updatingConditionsFromDegradedToDisabled(t *testing.T) { assert.Equal(t, disabledCondition, getConditionByType(updatedCO.Status.Conditions, OperatorDisabled)) } -func Test_shouldSetProgressingCondition(t *testing.T) { +func Test_checkVersionChanges(t *testing.T) { tests := []struct { - name string - newVersion string - clusterOperatorVersions []configv1.OperandVersion - expectedShouldUpdate bool - expectError bool + name string + newVersion string + clusterOperatorVersions []configv1.OperandVersion + expectedVersionChanged bool + expectedMajorMinorChanged bool + expectError bool }{ { - name: "Invalid new version returns error", - newVersion: "invalid-version", - clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, - expectedShouldUpdate: false, - expectError: true, + name: "Invalid new version returns error", + newVersion: "invalid-version", + clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, + expectedVersionChanged: false, + expectedMajorMinorChanged: false, + expectError: true, }, { - name: "Empty clusterOperatorVersions returns false", - newVersion: "4.21.0-0.nightly-2026-01-07-204315", - clusterOperatorVersions: []configv1.OperandVersion{}, - expectedShouldUpdate: false, - expectError: false, + name: "Empty clusterOperatorVersions returns false", + newVersion: "4.21.0-0.nightly-2026-01-07-204315", + clusterOperatorVersions: []configv1.OperandVersion{}, + expectedVersionChanged: false, + expectedMajorMinorChanged: false, + expectError: false, }, { - name: "Major version change triggers update", - newVersion: "5.21.0-0.nightly-2026-01-07-204315", - clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, - expectedShouldUpdate: true, - expectError: false, + name: "Major version change triggers both flags", + newVersion: "5.21.0-0.nightly-2026-01-07-204315", + clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, + expectedVersionChanged: true, + expectedMajorMinorChanged: true, + expectError: false, }, { - name: "Minor version change triggers update", - newVersion: "4.22.0-0.nightly-2026-01-07-204315", - clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, - expectedShouldUpdate: true, - expectError: false, + name: "Minor version change triggers both flags", + newVersion: "4.22.0-0.nightly-2026-01-07-204315", + clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, + expectedVersionChanged: true, + expectedMajorMinorChanged: true, + expectError: false, }, { - name: "Patch version change does not trigger update", - newVersion: "4.21.1-0.nightly-2026-01-07-204315", - clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, - expectedShouldUpdate: false, - expectError: false, + name: "Patch version change triggers versionChanged only", + newVersion: "4.21.1-0.nightly-2026-01-07-204315", + clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, + expectedVersionChanged: true, + expectedMajorMinorChanged: false, + expectError: false, }, { - name: "Invalid existing version returns error", - newVersion: "4.21.0-0.nightly-2026-01-07-204315", - clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "invalid"}}, - expectedShouldUpdate: false, - expectError: true, + name: "No version change returns false for both", + newVersion: "4.21.0-0.nightly-2026-01-07-204315", + clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "4.21.0-0.nightly-2026-01-07-204315"}}, + expectedVersionChanged: false, + expectedMajorMinorChanged: false, + expectError: false, + }, + { + name: "Invalid existing version returns error", + newVersion: "4.21.0-0.nightly-2026-01-07-204315", + clusterOperatorVersions: []configv1.OperandVersion{{Name: "operator", Version: "invalid"}}, + expectedVersionChanged: false, + expectedMajorMinorChanged: false, + expectError: true, }, } @@ -329,7 +344,7 @@ func Test_shouldSetProgressingCondition(t *testing.T) { ctrlStatus: newControllerStatus(), } - shouldUpdate, err := ctrl.shouldSetProgressingCondition(tt.newVersion, tt.clusterOperatorVersions) + versionChanged, majorMinorChanged, err := ctrl.checkVersionChanges(tt.newVersion, tt.clusterOperatorVersions) if tt.expectError { assert.Error(t, err, "Expected an error but got nil") @@ -337,9 +352,12 @@ func Test_shouldSetProgressingCondition(t *testing.T) { assert.NoError(t, err, "Expected no error but got: %v", err) } - assert.Equal(t, tt.expectedShouldUpdate, shouldUpdate, - "shouldUpdateVersion(%q, %v) = %v, want %v", - tt.newVersion, tt.clusterOperatorVersions, shouldUpdate, tt.expectedShouldUpdate) + assert.Equal(t, tt.expectedVersionChanged, versionChanged, + "checkVersionChanges(%q, %v) versionChanged = %v, want %v", + tt.newVersion, tt.clusterOperatorVersions, versionChanged, tt.expectedVersionChanged) + assert.Equal(t, tt.expectedMajorMinorChanged, majorMinorChanged, + "checkVersionChanges(%q, %v) majorMinorChanged = %v, want %v", + tt.newVersion, tt.clusterOperatorVersions, majorMinorChanged, tt.expectedMajorMinorChanged) }) } }