Skip to content

Commit bfc4f68

Browse files
committed
Add data collection for measured pods to populate ci_operator_metrics table
1 parent 37e0717 commit bfc4f68

7 files changed

Lines changed: 719 additions & 122 deletions

File tree

cmd/pod-scaler/admission.go

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,51 +33,35 @@ import (
3333
"github.com/openshift/ci-tools/pkg/steps"
3434
)
3535

36-
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) {
36+
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, bqClient *BigQueryClient, reporter results.PodScalerReporter) {
3737
logger := logrus.WithField("component", "pod-scaler admission")
3838
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
3939
health := pjutil.NewHealthOnPort(healthPort)
4040
resources := newResourceServer(loaders, health)
4141
decoder := admission.NewDecoder(scheme.Scheme)
4242

43-
var bqClient *BigQueryClient
44-
if enableMeasuredPods {
45-
if bigQueryProjectID == "" || bigQueryDatasetID == "" {
46-
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
47-
}
48-
cache := NewMeasuredPodCache(logger)
49-
var err error
50-
bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger)
51-
if err != nil {
52-
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
53-
}
54-
logger.Info("Measured pods feature enabled with BigQuery integration")
55-
}
56-
5743
server := webhook.NewServer(webhook.Options{
5844
Port: port,
5945
CertDir: certDir,
6046
})
61-
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}})
47+
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, bqClient: bqClient, reporter: reporter}})
6248
logger.Info("Serving admission webhooks.")
6349
if err := server.Start(interrupts.Context()); err != nil {
6450
logrus.WithError(err).Fatal("Failed to serve webhooks.")
6551
}
6652
}
6753

6854
type podMutator struct {
69-
logger *logrus.Entry
70-
client buildclientv1.BuildV1Interface
71-
resources *resourceServer
72-
mutateResourceLimits bool
73-
decoder admission.Decoder
74-
cpuCap int64
75-
memoryCap string
76-
cpuPriorityScheduling int64
77-
authoritativeCPURequests bool
78-
authoritativeMemoryRequests bool
79-
bqClient *BigQueryClient
80-
reporter results.PodScalerReporter
55+
logger *logrus.Entry
56+
client buildclientv1.BuildV1Interface
57+
resources *resourceServer
58+
mutateResourceLimits bool
59+
decoder admission.Decoder
60+
cpuCap int64
61+
memoryCap string
62+
cpuPriorityScheduling int64
63+
bqClient *BigQueryClient
64+
reporter results.PodScalerReporter
8165
}
8266

8367
func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
@@ -123,7 +107,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
123107
ApplyMeasuredPodResources(pod, m.bqClient, logger)
124108
}
125109

126-
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger)
110+
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
127111
m.addPriorityClass(pod)
128112

129113
marshaledPod, err := json.Marshal(pod)
@@ -318,7 +302,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
318302
}
319303
}
320304

321-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
305+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
322306
mutateResources := func(containers []corev1.Container) {
323307
for i := range containers {
324308
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)

cmd/pod-scaler/admission_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
554554
for _, testCase := range testCases {
555555
t.Run(testCase.name, func(t *testing.T) {
556556
original := testCase.pod.DeepCopy()
557-
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, false, &defaultReporter, logrus.WithField("test", testCase.name))
557+
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name))
558558
diff := cmp.Diff(original, testCase.pod)
559559
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
560560
// particularly deterministic about this. We don't care.

cmd/pod-scaler/main.go

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ type options struct {
5252
}
5353

5454
type producerOptions struct {
55-
kubernetesOptions prowflagutil.KubernetesOptions
56-
once bool
57-
ignoreLatest time.Duration
55+
kubernetesOptions prowflagutil.KubernetesOptions
56+
once bool
57+
ignoreLatest time.Duration
58+
enableMeasuredPods bool
59+
bigQueryProjectID string
60+
bigQueryDatasetID string
61+
bigQueryCredentialsFile string
5862
}
5963

6064
type consumerOptions struct {
@@ -67,22 +71,19 @@ type consumerOptions struct {
6771
cpuCap int64
6872
memoryCap string
6973
cpuPriorityScheduling int64
70-
71-
// Measured pods options - when enabled, pods are classified as "normal" or "measured"
72-
// Measured pods run on isolated nodes to get accurate CPU/memory utilization data
73-
enableMeasuredPods bool
74-
bigQueryProjectID string
75-
bigQueryDatasetID string
76-
bigQueryCredentialsFile string
7774
}
7875

7976
func bindOptions(fs *flag.FlagSet) *options {
8077
o := options{producerOptions: producerOptions{kubernetesOptions: prowflagutil.KubernetesOptions{NOInClusterConfigDefault: true}}}
8178
o.instrumentationOptions.AddFlags(fs)
8279
fs.StringVar(&o.mode, "mode", "", "Which mode to run in.")
8380
o.producerOptions.kubernetesOptions.AddFlags(fs)
84-
fs.DurationVar(&o.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
85-
fs.BoolVar(&o.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
81+
fs.DurationVar(&o.producerOptions.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
82+
fs.BoolVar(&o.producerOptions.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
83+
fs.BoolVar(&o.producerOptions.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
84+
fs.StringVar(&o.producerOptions.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery (required if enable-measured-pods is true)")
85+
fs.StringVar(&o.producerOptions.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
86+
fs.StringVar(&o.producerOptions.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
8687
fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.")
8788
fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.")
8889
fs.StringVar(&o.certDir, "serving-cert-dir", "", "Path to directory with serving certificate and key for the admission webhook server.")
@@ -96,10 +97,6 @@ func bindOptions(fs *flag.FlagSet) *options {
9697
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9798
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
9899
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
99-
fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
100-
fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)")
101-
fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
102-
fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
103100
o.resultsOptions.Bind(fs)
104101
return &o
105102
}
@@ -255,7 +252,21 @@ func mainProduce(opts *options, cache Cache) {
255252
logger.Debugf("Loaded Prometheus client.")
256253
}
257254

258-
produce(clients, cache, opts.ignoreLatest, opts.once)
255+
var bqClient *BigQueryClient
256+
if opts.producerOptions.enableMeasuredPods {
257+
if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" {
258+
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
259+
}
260+
cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-producer"))
261+
var err error
262+
bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-producer"))
263+
if err != nil {
264+
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
265+
}
266+
logrus.Info("Measured pods feature enabled with BigQuery integration")
267+
}
268+
269+
produce(clients, cache, opts.producerOptions.ignoreLatest, opts.producerOptions.once, bqClient)
259270

260271
}
261272

@@ -279,7 +290,22 @@ func mainAdmission(opts *options, cache Cache) {
279290
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
280291
}
281292

282-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter)
293+
// Use producerOptions for measured pods config (shared between producer and consumer modes)
294+
var bqClient *BigQueryClient
295+
if opts.producerOptions.enableMeasuredPods {
296+
if opts.producerOptions.bigQueryProjectID == "" || opts.producerOptions.bigQueryDatasetID == "" {
297+
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
298+
}
299+
cache := NewMeasuredPodCache(logrus.WithField("component", "measured-pods-admission"))
300+
var err error
301+
bqClient, err = NewBigQueryClient(opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, cache, logrus.WithField("component", "measured-pods-admission"))
302+
if err != nil {
303+
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
304+
}
305+
logrus.Info("Measured pods feature enabled with BigQuery integration")
306+
}
307+
308+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, bqClient, reporter)
283309
}
284310

285311
func loaders(cache Cache) map[string][]*cacheReloader {

0 commit comments

Comments
 (0)