Skip to content
13 changes: 7 additions & 6 deletions executor-plugins/resource-tagger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ require (
github.com/kube-arbiter/arbiter v0.1.1-0.20221019145918-1199780f119f
github.com/pseudomuto/protoc-gen-doc v1.5.1
google.golang.org/grpc v1.47.0
k8s.io/apimachinery v0.24.2
k8s.io/apimachinery v0.25.3
k8s.io/client-go v0.24.2
k8s.io/klog/v2 v2.60.1
k8s.io/klog/v2 v2.70.1
)

require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.7 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand All @@ -44,8 +45,8 @@ require (
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
23 changes: 15 additions & 8 deletions executor-plugins/resource-tagger/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE=
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand All @@ -102,8 +103,9 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
Expand Down Expand Up @@ -669,28 +671,33 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.24.2 h1:g518dPU/L7VRLxWfcadQn2OnsiGWVOadTLpdnqgY2OI=
k8s.io/api v0.24.2/go.mod h1:AHqbSkTm6YrQ0ObxjO3Pmp/ubFF/KuM7jU+3khoBsOg=
k8s.io/apimachinery v0.24.2 h1:5QlH9SL2C8KMcrNJPor+LbXVTaZRReml7svPEh4OKDM=
k8s.io/apimachinery v0.24.2/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM=
k8s.io/apimachinery v0.25.3 h1:7o9ium4uyUOM76t6aunP0nZuex7gDf8VGwkR5RcJnQc=
k8s.io/apimachinery v0.25.3/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo=
k8s.io/client-go v0.24.2 h1:CoXFSf8if+bLEbinDqN9ePIDGzcLtqhfd6jpfnwGOFA=
k8s.io/client-go v0.24.2/go.mod h1:zg4Xaoo+umDsfCWr4fCnmLEtQXyCNXCvJuSsglNcV30=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.60.1 h1:VW25q3bZx9uE3vvdL6M8ezOX79vA2Aq1nEWLqNQclHc=
k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 h1:Gii5eqf+GmIEwGNKQYQClCayuJCe2/4fZUvF7VG99sU=
k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ=
k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk=
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 h1:HNSDgDCrr/6Ly3WEGKZftiE7IY19Vz2GdbOCyI4qqhc=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4=
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 h1:kDi4JBNAsJWfz1aEXhO8Jg87JJaPNLh5tIzYHgStQ9Y=
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k=
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
33 changes: 32 additions & 1 deletion observer-plugins/metric-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"
"net"
"os"
"os/signal"
"syscall"

//"github.com/smoky8/pkg/lib/go/obi"
"google.golang.org/grpc"
Expand All @@ -37,6 +40,9 @@ var (
endpoint = flag.String("endpoint", "/var/run/observer.sock", "unix socket domain for current server")
kubeconfig = flag.String("kubeconfig", "", "kubernetes auth config file")
)
var (
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

func main() {
klog.InitFlags(flag.CommandLine)
Expand All @@ -61,8 +67,11 @@ func main() {
if err != nil {
log.Fatalf("%s create metric client error: %s", server.PluginName, err)
}
metricServer := grpc.NewServer()

// Setup signal watcher to handle cleanup
SetupSignalHandler(*endpoint)

metricServer := grpc.NewServer()
obi.RegisterServerServer(metricServer, server.NewServer(clientSet))
listen, err := net.Listen("unix", *endpoint)
if err != nil {
Expand All @@ -72,3 +81,25 @@ func main() {

klog.Fatalln(metricServer.Serve(listen))
}

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler(socketFile string) {
c := make(chan os.Signal)
signal.Notify(c, shutdownSignals...)
go func() {
for s := range c {
switch s {
case os.Interrupt, syscall.SIGTERM:
klog.Infoln("Shutting down normally...")
if err := os.RemoveAll(socketFile); err != nil {
klog.Fatal(err)
}
os.Exit(1)
default:
klog.Infoln("Got signal", s)
}
}
}()
}
32 changes: 31 additions & 1 deletion observer-plugins/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"
"net"
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,6 +41,9 @@ var (
stepSeconds = flag.Int64("step", 60, "query steps")
rangeMinute = flag.Int64("range", 2, "prometheus, the maximum time between two slices within the boundaries.")
)
var (
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

func main() {
klog.InitFlags(flag.CommandLine)
Expand All @@ -64,6 +70,8 @@ func main() {
if err != nil {
klog.Fatal(err)
}
// Setup signal watcher to handle cleanup
SetupSignalHandler(*endpoint)

server := grpc.NewServer()
obi.RegisterServerServer(server, prometheus.NewPrometheusServer(*address, conf, *stepSeconds, *rangeMinute))
Expand All @@ -72,6 +80,28 @@ func main() {
log.Fatal(err)
}

klog.Infof("%s starting work...", prometheus.PluginName)
klog.Infof("%s plugin started ...", prometheus.PluginName)
klog.Fatalln(server.Serve(listen))
}

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler(socketFile string) {
c := make(chan os.Signal)
signal.Notify(c, shutdownSignals...)
go func() {
for s := range c {
switch s {
case os.Interrupt, syscall.SIGTERM:
klog.Infoln("Shutting down normally...")
if err := os.RemoveAll(socketFile); err != nil {
klog.Fatal(err)
}
os.Exit(1)
default:
klog.Infoln("Got signal", s)
}
}
}()
}
30 changes: 21 additions & 9 deletions observer-plugins/prometheus/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package prometheus

import (
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -65,7 +66,7 @@ type CalculateAux struct {
Value float64
}

func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string) (DataSeries, error) {
func (p *prometheusServer) Query(startTime, endTime time.Time, kind, query, op string) (DataSeries, error) {
method := "prometheusServer.Query"
ans := DataSeries{Timestamp: endTime.UnixMilli()}
prometheusAPI, err := p.NewPrometheusAPI()
Expand All @@ -86,18 +87,29 @@ func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string)
klog.V(4).Infof("%s quer '%s' result with warnings %v\n", method, warnings)
}

data, err := formatRawValues(result, op)
if err != nil {
return ans, err
}

if f, ok := actionFuncs[op]; ok {
f(data, &ans)
// TODO: Use kind as the raw data query, may add a 'rawData: true' property for this?
if kind == "Pod" || kind == "Node" {
data, err := formatRawValues(result)
if err != nil {
return ans, err
}
if f, ok := actionFuncs[op]; ok {
f(data, &ans)
}
} else {
// Handle raw data if no aggregation defined, just return the json data
jsonValue, err := json.Marshal(result)
if err != nil {
klog.Errorf("failed to marshal result to json: %s", err)
ans.Value = fmt.Sprintf("failed to get json value: %s " + result.String())
} else {
ans.Value = string(jsonValue)
}
}
return ans, nil
}

func formatRawValues(rawValue model.Value, op string) ([]CalculateAux, error) {
func formatRawValues(rawValue model.Value) ([]CalculateAux, error) {
ans := make([]CalculateAux, 0)
switch rawValue.Type() {
case model.ValScalar:
Expand Down
16 changes: 10 additions & 6 deletions observer-plugins/prometheus/prometheus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
MaxAction = "max"
MinAction = "min"
AvgAction = "avg"
NoneAction = "none"
)

// impl obi interface
Expand Down Expand Up @@ -77,24 +78,27 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe

var err error
klog.V(4).Infof("prometheus query: %s\n", req.Query)
var resourceName string
if len(req.ResourceNames) > 0 {
resourceName = req.ResourceNames[0]
}
result := &obi.GetMetricsResponse{
ResourceName: req.ResourceNames[0],
ResourceName: resourceName,
Namespace: req.Namespace,
Unit: req.Unit,
Records: []*obi.GetMetricsResponseRecord{},
}

// use avgerage as the default aggregation action
op := AvgAction
if len(req.Aggregation) > 0 {
op = req.Aggregation[0]
}
klog.Infof("exec aggregation is: %s\n", op)
metricData, err := p.Query(startTime, endTime, req.Query, op)
metricData, err := p.Query(startTime, endTime, req.Kind, req.Query, op)
if err != nil {
klog.Errorf("%s query error: %s\n", method, err)
return result, err
}

// only return the latest record
result.Records = append(result.Records, &obi.GetMetricsResponseRecord{Timestamp: metricData.Timestamp, Value: metricData.Value})
/*
Expand All @@ -103,8 +107,8 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe
}
*/

klog.Infof("query by %s successfully", req.MetricName)
klog.V(5).Infof("%s query by %s result: %v\n", method, req.MetricName, metricData)
klog.Infof("query by metric '%s', query '%s' successfully", req.MetricName, req.Query)
klog.V(5).Infof("%s query by %s, %s result: %v\n", method, req.MetricName, req.Query, metricData)

return result, nil
}