Skip to content

Commit acbac4e

Browse files
committed
feat: add generic metric, extractor, weigher/filter
1 parent 4194472 commit acbac4e

9 files changed

Lines changed: 648 additions & 0 deletions

File tree

internal/knowledge/datasources/plugins/prometheus/supported_syncers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ var supportedMetricSyncers = map[string]func(
2626
"netapp_node_metric": newTypedSyncer[NetAppNodeMetric],
2727
"netapp_volume_aggregate_labels_metric": newTypedSyncer[NetAppVolumeAggrLabelsMetric],
2828
"kvm_libvirt_domain_metric": newTypedSyncer[KVMDomainMetric],
29+
"generic": newTypedSyncer[GenericMetric],
2930
}

internal/knowledge/datasources/plugins/prometheus/types.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,37 @@ func (m NetAppVolumeAggrLabelsMetric) With(n string, t time.Time, v float64) Pro
292292
m.Value = v
293293
return m
294294
}
295+
296+
type GenericMetric struct {
297+
Name string `json:"name" db:"name"`
298+
Host string `json:"host" db:"host"`
299+
Value float64 `json:"value" db:"value"`
300+
Timestamp time.Time `json:"timestamp" db:"timestamp"`
301+
}
302+
303+
func (m GenericMetric) GetName() string {
304+
return m.Name
305+
}
306+
307+
func (m GenericMetric) GetValue() float64 {
308+
return m.Value
309+
}
310+
311+
func (m GenericMetric) GetTimestamp() time.Time {
312+
return m.Timestamp
313+
}
314+
315+
func (m GenericMetric) TableName() string {
316+
return "generic"
317+
}
318+
319+
func (m GenericMetric) Indexes() map[string][]string {
320+
return nil
321+
}
322+
323+
func (m GenericMetric) With(alias string, t time.Time, v float64) PrometheusMetric {
324+
m.Name = alias
325+
m.Timestamp = t
326+
m.Value = v
327+
return m
328+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright SAP SE
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package plugins
5+
6+
import (
7+
_ "embed"
8+
"errors"
9+
10+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
11+
)
12+
13+
type Generic struct {
14+
Host string `db:"host"`
15+
Value float64 `db:"value"`
16+
}
17+
18+
type GenericExtractor struct {
19+
BaseExtractor[
20+
struct{},
21+
Generic,
22+
]
23+
}
24+
25+
func (e *GenericExtractor) Extract(d []*v1alpha1.Datasource, _ []*v1alpha1.Knowledge) ([]Feature, error) {
26+
if len(d) != 1 {
27+
return nil, errors.New("TODO")
28+
}
29+
dsSpec := &d[0].Spec
30+
if dsSpec.Type != v1alpha1.DatasourceTypePrometheus {
31+
return nil, errors.New("TODO")
32+
}
33+
name := dsSpec.Prometheus.Alias
34+
if name == "" {
35+
return nil, errors.New("TODO")
36+
}
37+
38+
query := "SELECT host, value FROM generic WHERE name = '" + name + "'"
39+
return e.ExtractSQL(query)
40+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package plugins
2+
3+
import (
4+
"slices"
5+
"testing"
6+
7+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
8+
"github.com/cobaltcore-dev/cortex/internal/knowledge/datasources/plugins/prometheus"
9+
"github.com/cobaltcore-dev/cortex/internal/knowledge/db"
10+
testlibDB "github.com/cobaltcore-dev/cortex/internal/knowledge/db/testing"
11+
)
12+
13+
func TestGenericExtractor_Extract(t *testing.T) {
14+
dbEnv := testlibDB.SetupDBEnv(t)
15+
testDB := db.DB{DbMap: dbEnv.DbMap}
16+
defer dbEnv.Close()
17+
// Create dependency table
18+
if err := testDB.CreateTable(
19+
testDB.AddTable(prometheus.GenericMetric{}),
20+
); err != nil {
21+
t.Fatalf("expected no error, got %v", err)
22+
}
23+
24+
metrics := []any{
25+
&prometheus.GenericMetric{Name: "node_cpu_seconds_total", Host: "node-01", Value: 0.81},
26+
&prometheus.GenericMetric{Name: "node_cpu_seconds_total", Host: "node-02", Value: 0.37},
27+
}
28+
if err := testDB.Insert(metrics...); err != nil {
29+
t.Fatalf("failed to insert manila storage pools: %v", err)
30+
}
31+
32+
extractor := &GenericExtractor{}
33+
config := v1alpha1.KnowledgeSpec{}
34+
if err := extractor.Init(&testDB, nil, config); err != nil {
35+
t.Fatalf("expected no error, got %v", err)
36+
}
37+
datasources := []*v1alpha1.Datasource{
38+
{
39+
Spec: v1alpha1.DatasourceSpec{
40+
Type: v1alpha1.DatasourceTypePrometheus,
41+
Prometheus: v1alpha1.PrometheusDatasource{
42+
Alias: "node_cpu_seconds_total",
43+
},
44+
},
45+
},
46+
}
47+
48+
features, err := extractor.Extract(datasources, []*v1alpha1.Knowledge{})
49+
if err != nil {
50+
t.Fatalf("expected no error, got %v", err)
51+
}
52+
53+
var actual []Generic
54+
for _, f := range features {
55+
actual = append(actual, f.(Generic))
56+
}
57+
58+
expected := []Generic{
59+
{Host: "node-01", Value: 0.81},
60+
{Host: "node-02", Value: 0.37},
61+
}
62+
63+
if len(actual) != len(expected) {
64+
t.Errorf("expected %d rows, got %d", len(expected), len(actual))
65+
}
66+
67+
for _, exp := range expected {
68+
if !slices.ContainsFunc(actual, func(m Generic) bool {
69+
return m.Host == exp.Host && m.Value == exp.Value
70+
}) {
71+
t.Errorf("expected to find %+v in actual results %+v", exp, actual)
72+
}
73+
}
74+
}

internal/knowledge/extractor/supported_extractors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ var supportedExtractors = map[string]plugins.FeatureExtractor{
2525
"sap_host_details_extractor": &compute.HostDetailsExtractor{},
2626

2727
"netapp_storage_pool_cpu_usage_extractor": &storage.StoragePoolCPUUsageExtractor{},
28+
29+
"generic": &plugins.GenericExtractor{},
2830
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package filters
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
8+
"github.com/cobaltcore-dev/cortex/api/external/pods"
9+
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
10+
"github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins"
11+
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
12+
"k8s.io/apimachinery/pkg/types"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
)
15+
16+
type GenericFilterStepOpts struct {
17+
Knowledge string `json:"knowledge"`
18+
}
19+
20+
func (o GenericFilterStepOpts) Validate() error {
21+
if o.Knowledge == "" {
22+
return fmt.Errorf("knowledge name must be provided")
23+
}
24+
return nil
25+
}
26+
27+
type GenericFilterStep struct {
28+
lib.BaseFilter[pods.PodPipelineRequest, GenericFilterStepOpts]
29+
knowledgeRef types.NamespacedName
30+
}
31+
32+
func (f *GenericFilterStep) Init(ctx context.Context, client client.Client, spec v1alpha1.FilterSpec) error {
33+
if err := f.BaseFilter.Init(ctx, client, spec); err != nil {
34+
return err
35+
}
36+
knowledgeRef := types.NamespacedName{Name: f.Options.Knowledge}
37+
if err := f.CheckKnowledges(ctx, knowledgeRef); err != nil {
38+
return err
39+
}
40+
f.knowledgeRef = knowledgeRef
41+
42+
return nil
43+
}
44+
45+
func (f *GenericFilterStep) Run(ctx context.Context, _ *slog.Logger, req pods.PodPipelineRequest) (*lib.FilterWeigherPipelineStepResult, error) {
46+
knowledge := v1alpha1.Knowledge{}
47+
if err := f.Client.Get(ctx, f.knowledgeRef, &knowledge); err != nil {
48+
return nil, err
49+
}
50+
51+
nodeFeatures, err := v1alpha1.
52+
UnboxFeatureList[plugins.Generic](knowledge.Status.Raw)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
result := f.IncludeAllHostsFromRequest(req)
58+
for _, nodeFeature := range nodeFeatures {
59+
host := nodeFeature.Host
60+
if _, exists := result.Activations[host]; !exists {
61+
continue
62+
}
63+
if nodeFeature.Value == 1.0 {
64+
delete(result.Activations, host)
65+
}
66+
}
67+
68+
return result, nil
69+
}
70+
71+
func init() {
72+
Index["generic"] = func() PodFilter { return &GenericFilterStep{} }
73+
}

0 commit comments

Comments
 (0)