Skip to content

Commit 13781c2

Browse files
committed
Add measured pods feature for accurate resource measurement on isolated nodes
Implements DPTP-4613 to address the issue where pod-scaler recommendations are skewed by node contention. When multiple pods with poor CPU configurations are scheduled on the same node, CPU is maxed out but pods finish eventually. The pod-scaler observes low CPU utilization (due to node contention) and incorrectly concludes requests should not be increased, leading to a cycle of reduced limits and tighter packing. This change introduces a measured pods system: - Pods are classified as 'normal' or 'measured' based on whether they need fresh measurement data (measured if last measurement >10 days ago or never measured) - Measured pods use podAntiAffinity rules to run on isolated nodes with no other CI workloads, ensuring accurate CPU/memory utilization measurement - BigQuery integration queries and caches max CPU/memory utilization from measured pod runs, refreshing daily to keep data current - Resource recommendations are applied only to the longest-running container in each pod, using actual measured utilization data instead of Prometheus metrics that may be skewed by node contention The feature is opt-in via --enable-measured-pods flag and requires BigQuery configuration (--bigquery-project-id and --bigquery-dataset-id).
1 parent 4c0387d commit 13781c2

5 files changed

Lines changed: 718 additions & 15 deletions

File tree

cmd/pod-scaler/admission.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,51 @@ import (
3333
"github.com/openshift/ci-tools/pkg/steps"
3434
)
3535

36-
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
36+
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) {
3737
logger := logrus.WithField("component", "pod-scaler admission")
3838
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
3939
health := pjutil.NewHealthOnPort(healthPort)
4040
resources := newResourceServer(loaders, health)
4141
decoder := admission.NewDecoder(scheme.Scheme)
4242

43+
var bqClient *BigQueryClient
44+
if enableMeasuredPods {
45+
if bigQueryProjectID == "" || bigQueryDatasetID == "" {
46+
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
47+
}
48+
cache := NewMeasuredPodCache(logger)
49+
var err error
50+
bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger)
51+
if err != nil {
52+
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
53+
}
54+
logger.Info("Measured pods feature enabled with BigQuery integration")
55+
}
56+
4357
server := webhook.NewServer(webhook.Options{
4458
Port: port,
4559
CertDir: certDir,
4660
})
47-
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
61+
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}})
4862
logger.Info("Serving admission webhooks.")
4963
if err := server.Start(interrupts.Context()); err != nil {
5064
logrus.WithError(err).Fatal("Failed to serve webhooks.")
5165
}
5266
}
5367

5468
type podMutator struct {
55-
logger *logrus.Entry
56-
client buildclientv1.BuildV1Interface
57-
resources *resourceServer
58-
mutateResourceLimits bool
59-
decoder admission.Decoder
60-
cpuCap int64
61-
memoryCap string
62-
cpuPriorityScheduling int64
63-
reporter results.PodScalerReporter
69+
logger *logrus.Entry
70+
client buildclientv1.BuildV1Interface
71+
resources *resourceServer
72+
mutateResourceLimits bool
73+
decoder admission.Decoder
74+
cpuCap int64
75+
memoryCap string
76+
cpuPriorityScheduling int64
77+
authoritativeCPURequests bool
78+
authoritativeMemoryRequests bool
79+
bqClient *BigQueryClient
80+
reporter results.PodScalerReporter
6481
}
6582

6683
func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
@@ -97,7 +114,16 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
97114
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
98115
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
99116
}
100-
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
117+
118+
// Classify pod as normal or measured (if enabled)
119+
if m.bqClient != nil {
120+
ClassifyPod(pod, m.bqClient, logger)
121+
AddPodAntiAffinity(pod, logger)
122+
// Apply measured pod resources before regular resource mutation
123+
ApplyMeasuredPodResources(pod, m.bqClient, logger)
124+
}
125+
126+
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger)
101127
m.addPriorityClass(pod)
102128

103129
marshaledPod, err := json.Marshal(pod)
@@ -292,7 +318,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
292318
}
293319
}
294320

295-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
321+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
296322
mutateResources := func(containers []corev1.Container) {
297323
for i := range containers {
298324
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)

cmd/pod-scaler/admission_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
554554
for _, testCase := range testCases {
555555
t.Run(testCase.name, func(t *testing.T) {
556556
original := testCase.pod.DeepCopy()
557-
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name))
557+
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, false, &defaultReporter, logrus.WithField("test", testCase.name))
558558
diff := cmp.Diff(original, testCase.pod)
559559
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
560560
// particularly deterministic about this. We don't care.

cmd/pod-scaler/main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ type consumerOptions struct {
6767
cpuCap int64
6868
memoryCap string
6969
cpuPriorityScheduling int64
70+
71+
// Measured pods options - when enabled, pods are classified as "normal" or "measured"
72+
// Measured pods run on isolated nodes to get accurate CPU/memory utilization data
73+
enableMeasuredPods bool
74+
bigQueryProjectID string
75+
bigQueryDatasetID string
76+
bigQueryCredentialsFile string
7077
}
7178

7279
func bindOptions(fs *flag.FlagSet) *options {
@@ -89,6 +96,10 @@ func bindOptions(fs *flag.FlagSet) *options {
8996
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9097
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
9198
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
99+
fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
100+
fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)")
101+
fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
102+
fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
92103
o.resultsOptions.Bind(fs)
93104
return &o
94105
}
@@ -268,7 +279,7 @@ func mainAdmission(opts *options, cache Cache) {
268279
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
269280
}
270281

271-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter)
282+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter)
272283
}
273284

274285
func loaders(cache Cache) map[string][]*cacheReloader {

0 commit comments

Comments
 (0)