From 104e9458b1bdb5b0d4f95ab539d6f04139c8297e Mon Sep 17 00:00:00 2001 From: Evgenii Leko Date: Thu, 3 Oct 2024 08:17:38 +0000 Subject: [PATCH] config: add ignored fields Specify fields to skip sending object update. Will be applied to all objects. If after removal of these fields from k8s object all remaining fields will be equal, handler won't trigger sending update. Removing array elements is not supported. For example, ```yaml ignorefields: status: metadata: resourceVersion: managedFields: ``` will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields" from k8s object before comparing old & new k8s objects. --- config/config.go | 13 ++++++ go.mod | 1 + pkg/controller/controller.go | 87 +++++++++++++++++++++++++++--------- 3 files changed, 79 insertions(+), 22 deletions(-) diff --git a/config/config.go b/config/config.go index 47d3ad20..a492c70c 100755 --- a/config/config.go +++ b/config/config.go @@ -94,6 +94,19 @@ type Config struct { // For watching specific namespace, leave it empty for watching all. // this config is ignored when watching namespaces Namespace string `json:"namespace,omitempty"` + + // Specify fields to skip sending object update. Will be applied to all objects. + // If after removal of these fields from k8s object all remaining fields will be equal, + // handler won't trigger sending update. Removing array elements is not supported. + // For example, + // ignorefields: + // status: + // metadata: + // resourceVersion: + // managedFields: + // will remove ".status", ".metadata.resourceVersion" and ".metadata.managedFields" + // from k8s object before comparing old & new k8s objects. + IgnoredFields map[string]interface{} `json:"ignoredfields,omitempty"` } // Slack contains slack configuration diff --git a/go.mod b/go.mod index f0d0739e..cddcbd58 100755 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/fatih/structtag v1.2.0 + github.com/google/go-cmp v0.6.0 github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect github.com/hashicorp/hcl v0.0.0-20171017181929-23c074d0eceb // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5bb907a3..60ef1c00 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -54,6 +54,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/google/go-cmp/cmp" ) const maxRetries = 5 @@ -131,7 +133,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics) + allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopAllCoreEventsCh := make(chan struct{}) defer close(stopAllCoreEventsCh) @@ -155,7 +157,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics) + allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopAllEventsCh := make(chan struct{}) defer close(stopAllEventsCh) @@ -177,7 +179,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -199,7 +201,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -222,7 +224,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -244,7 +246,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -266,7 +268,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -288,7 +290,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -310,7 +312,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -332,7 +334,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -354,7 +356,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -376,7 +378,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -398,7 +400,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -420,7 +422,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -442,7 +444,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -464,7 +466,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -486,7 +488,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -508,7 +510,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -530,7 +532,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -552,7 +554,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -583,7 +585,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { cache.Indexers{}, ) - c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics) + c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics, conf.IgnoredFields) stopCh := make(chan struct{}) defer close(stopCh) @@ -596,7 +598,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { <-sigterm } -func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec) *Controller { +func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec, ignoredFields map[string]interface{}) *Controller { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) var newEvent Event var err error @@ -634,6 +636,15 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha if !ok { logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot convert old to runtime.Object for update on %v", old) } + if len(ignoredFields) > 0 { + diff, errDiff := diffObjects(old, new, ignoredFields) + if errDiff != nil { + logrus.WithField("pkg", "kubewatch-"+resourceType).Errorf("cannot diff old & new objects %v and %v: %v", old, new, errDiff) + } else if len(diff) == 0 { + logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Ignoring update to %v: %s", resourceType, newEvent.key) + return + } + } logrus.WithField("pkg", "kubewatch-"+resourceType).Infof("Processing update to %v: %s", resourceType, newEvent.key) if err == nil { queue.Add(newEvent) @@ -670,6 +681,38 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha } } +func diffObjects(old, new interface{}, ignoredFields map[string]interface{}) (string, error) { + oldContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(old) + if err != nil { + return "", err + } + newContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(new) + if err != nil { + return "", err + } + recursiveDelete(oldContent, ignoredFields) + recursiveDelete(newContent, ignoredFields) + return cmp.Diff(oldContent, newContent), nil +} + +// recursiveDelete recursively removes key from object +// value of key should be either nil or nested map[string]interface{} +// value of object to delete from should be nested map[string]interface{} +func recursiveDelete(object map[string]interface{}, key map[string]interface{}) { + for k, v := range key { + if v == nil { + delete(object, k) + continue + } + if recursiveKey, ok := v.(map[string]interface{}); ok { + if recursiveObj, ok := object[k].(map[string]interface{}); ok { + recursiveDelete(recursiveObj, recursiveKey) + } + } + } + return +} + // Run starts the kubewatch controller func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash()