Skip to content

Commit 708cd65

Browse files
committed
Add label select for k8s metadata processor
Signed-off-by: longhui.li <longhui.li@woqutech.com>
1 parent d07ce54 commit 708cd65

2 files changed

Lines changed: 18 additions & 1 deletion

File tree

collector/pkg/component/consumer/processor/k8sprocessor/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package k8sprocessor
22

33
import (
44
"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"
5+
6+
"k8s.io/apimachinery/pkg/apis/meta/v1"
57
)
68

79
type Config struct {
@@ -18,6 +20,8 @@ type Config struct {
1820
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
1921
// Otherwise, the agent will panic if it can't connect to the API-server.
2022
Enable bool `mapstructure:"enable"`
23+
24+
LabelSelector *v1.LabelSelector `mapstructure:"label_selector"`
2125
}
2226

2327
var DefaultConfig Config = Config{

collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"strconv"
55

66
"go.uber.org/zap"
7+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/apimachinery/pkg/labels"
79

810
"github.com/Kindling-project/kindling/collector/pkg/component"
911
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
@@ -26,6 +28,7 @@ type K8sMetadataProcessor struct {
2628
localNodeIp string
2729
localNodeName string
2830
telemetry *component.TelemetryTools
31+
labelSelecotr labels.Selector
2932
}
3033

3134
func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools, nextConsumer consumer.Consumer) processor.Processor {
@@ -62,20 +65,30 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
6265
if localNodeName, err = getHostNameFromEnv(); err != nil {
6366
telemetry.Logger.Warn("Local NodeName can not found", zap.Error(err))
6467
}
65-
return &K8sMetadataProcessor{
68+
res := &K8sMetadataProcessor{
6669
config: config,
6770
metadata: kubernetes.MetaDataCache,
6871
nextConsumer: nextConsumer,
6972
localNodeIp: localNodeIp,
7073
localNodeName: localNodeName,
7174
telemetry: telemetry,
7275
}
76+
if config.LabelSelector != nil {
77+
res.labelSelecotr, err = v1.LabelSelectorAsSelector(config.LabelSelector)
78+
if err != nil {
79+
telemetry.Logger.Warn("load label selector failed %v, skip label selector. ", zap.Error(err))
80+
}
81+
}
82+
return res
7383
}
7484

7585
func (p *K8sMetadataProcessor) Consume(dataGroup *model.DataGroup) error {
7686
if !p.config.Enable {
7787
return p.nextConsumer.Consume(dataGroup)
7888
}
89+
if p.labelSelecotr != nil && !p.labelSelecotr.Matches(labels.Set(dataGroup.Labels.ToStringMap())) {
90+
return nil
91+
}
7992
name := dataGroup.Name
8093
switch name {
8194
case constnames.NetRequestMetricGroupName:

0 commit comments

Comments
 (0)