Skip to content

Commit 3c5851d

Browse files
committed
Add measured pods feature with authoritative mode improvements
1 parent 4c0387d commit 3c5851d

10 files changed

Lines changed: 940 additions & 37 deletions

cmd/pod-scaler/admission.go

Lines changed: 131 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,51 @@ import (
3333
"github.com/openshift/ci-tools/pkg/steps"
3434
)
3535

36-
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
36+
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) {
3737
logger := logrus.WithField("component", "pod-scaler admission")
3838
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
3939
health := pjutil.NewHealthOnPort(healthPort)
4040
resources := newResourceServer(loaders, health)
4141
decoder := admission.NewDecoder(scheme.Scheme)
4242

43+
var bqClient *BigQueryClient
44+
if enableMeasuredPods {
45+
if bigQueryProjectID == "" || bigQueryDatasetID == "" {
46+
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
47+
}
48+
cache := NewMeasuredPodCache(logger)
49+
var err error
50+
bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger)
51+
if err != nil {
52+
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
53+
}
54+
logger.Info("Measured pods feature enabled with BigQuery integration")
55+
}
56+
4357
server := webhook.NewServer(webhook.Options{
4458
Port: port,
4559
CertDir: certDir,
4660
})
47-
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
61+
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}})
4862
logger.Info("Serving admission webhooks.")
4963
if err := server.Start(interrupts.Context()); err != nil {
5064
logrus.WithError(err).Fatal("Failed to serve webhooks.")
5165
}
5266
}
5367

5468
type podMutator struct {
55-
logger *logrus.Entry
56-
client buildclientv1.BuildV1Interface
57-
resources *resourceServer
58-
mutateResourceLimits bool
59-
decoder admission.Decoder
60-
cpuCap int64
61-
memoryCap string
62-
cpuPriorityScheduling int64
63-
reporter results.PodScalerReporter
69+
logger *logrus.Entry
70+
client buildclientv1.BuildV1Interface
71+
resources *resourceServer
72+
mutateResourceLimits bool
73+
decoder admission.Decoder
74+
cpuCap int64
75+
memoryCap string
76+
cpuPriorityScheduling int64
77+
authoritativeCPURequests bool
78+
authoritativeMemoryRequests bool
79+
bqClient *BigQueryClient
80+
reporter results.PodScalerReporter
6481
}
6582

6683
func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
@@ -97,7 +114,16 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
97114
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
98115
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
99116
}
100-
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
117+
118+
// Classify pod as normal or measured (if enabled)
119+
if m.bqClient != nil {
120+
ClassifyPod(pod, m.bqClient, logger)
121+
AddPodAntiAffinity(pod, logger)
122+
// Apply measured pod resources before regular resource mutation
123+
ApplyMeasuredPodResources(pod, m.bqClient, logger)
124+
}
125+
126+
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger)
101127
m.addPriorityClass(pod)
102128

103129
marshaledPod, err := json.Marshal(pod)
@@ -196,8 +222,10 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) {
196222
}
197223
}
198224

199-
// useOursIfLarger updates fields in theirs when ours are larger
200-
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, reporter results.PodScalerReporter, logger *logrus.Entry) {
225+
// applyRecommendationsBasedOnRecentData applies resource recommendations based on recent usage data
226+
// (see resourceRecommendationWindow). If they used more, we increase resources. If they used less
227+
// and authoritative mode is enabled for that resource, we decrease them.
228+
func applyRecommendationsBasedOnRecentData(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
201229
for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} {
202230
if item.Requests == nil {
203231
item.Requests = corev1.ResourceList{}
@@ -215,12 +243,37 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
215243
} {
216244
for _, field := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
217245
our := (*pair.ours)[field]
218-
//TODO(sgoeddel): this is a temporary experiment to see what effect setting values that are 120% of what has
219-
// been determined has on the rate of OOMKilled and similar termination of workloads
220-
increased := our.AsApproximateFloat64() * 1.2
221-
our.Set(int64(increased))
222-
246+
// If we have no recommendation for this resource, skip it
247+
if our.IsZero() {
248+
continue
249+
}
223250
their := (*pair.theirs)[field]
251+
252+
// Check if resources were already set by measured pods logic (which already applies 1.2x buffer).
253+
// If so, skip applying the buffer again to avoid double buffering (1.2 * 1.2 = 1.44x instead of 1.2x).
254+
//
255+
// Note: This heuristic (checking if theirs >= our) is imperfect because:
256+
// - Users could manually configure higher values
257+
// - Previous mutations or other logic could have set higher values
258+
// - It doesn't definitively prove the resources came from measured pods
259+
// However, in practice, measured pods logic runs before this function and sets resources
260+
// with a 1.2x buffer, so if theirs >= our base recommendation, it's likely from measured pods.
261+
// A more robust solution would be to pass explicit metadata tracking resource source,
262+
// but this heuristic works for the current implementation where measured pods are processed first.
263+
alreadySetByMeasuredPods := !their.IsZero() && their.Cmp(our) >= 0
264+
265+
if !alreadySetByMeasuredPods {
266+
// Apply a 1.2x safety buffer to resource recommendations to reduce the rate of OOMKilled
267+
// and similar workload terminations. This buffer accounts for:
268+
// - Natural variance in resource usage patterns
269+
// - Transient spikes in CPU/memory consumption
270+
// - Measurement inaccuracies in historical data
271+
// The 20% overhead provides a safety margin while still allowing for efficient resource utilization.
272+
increased := our.AsApproximateFloat64() * 1.2
273+
our.Set(int64(increased))
274+
} else {
275+
logger.Debugf("Skipping 1.2x buffer for %s %s as resources appear to be already set by measured pods logic", pair.resource, field)
276+
}
224277
fieldLogger := logger.WithFields(logrus.Fields{
225278
"workloadName": workloadName,
226279
"workloadType": workloadType,
@@ -231,13 +284,40 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
231284
})
232285
cmp := our.Cmp(their)
233286
if cmp == 1 {
234-
fieldLogger.Debug("determined amount larger than configured")
287+
fieldLogger.Debug("determined amount larger than configured, increasing resources")
235288
(*pair.theirs)[field] = our
236289
if their.Value() > 0 && our.Value() > (their.Value()*10) {
237290
reporter.ReportResourceConfigurationWarning(workloadName, workloadType, their.String(), our.String(), field.String())
238291
}
239292
} else if cmp < 0 {
240-
fieldLogger.Debug("determined amount smaller than configured")
293+
authoritative := (field == corev1.ResourceCPU && authoritativeCPU) || (field == corev1.ResourceMemory && authoritativeMemory)
294+
if authoritative {
295+
// Apply gradual reduction with safety limits: max 25% reduction per cycle, minimum 5% difference
296+
ourValue := our.AsApproximateFloat64()
297+
theirValue := their.AsApproximateFloat64()
298+
if theirValue > 0 {
299+
reductionPercent := 1.0 - (ourValue / theirValue)
300+
maxReductionPercent := 0.25
301+
302+
if reductionPercent >= 0.05 {
303+
if reductionPercent > maxReductionPercent {
304+
maxAllowed := theirValue * (1.0 - maxReductionPercent)
305+
our.Set(int64(maxAllowed))
306+
fieldLogger.Debugf("applying gradual reduction (limited to 25%% per cycle)")
307+
} else {
308+
fieldLogger.Debug("reducing resources based on recent usage")
309+
}
310+
(*pair.theirs)[field] = our
311+
} else {
312+
fieldLogger.Debug("difference less than 5%, skipping micro-adjustment")
313+
}
314+
} else {
315+
fieldLogger.Debug("theirs is zero, applying recommendation")
316+
(*pair.theirs)[field] = our
317+
}
318+
} else {
319+
fieldLogger.Debug("authoritative mode disabled, keeping existing value")
320+
}
241321
} else {
242322
fieldLogger.Debug("determined amount equal to configured")
243323
}
@@ -292,16 +372,44 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
292372
}
293373
}
294374

295-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
375+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
376+
// Check if this is a measured pod - measured pods have resources set by ApplyMeasuredPodResources
377+
// and we should preserve those instead of overwriting with Prometheus recommendations
378+
isMeasuredPod := pod.Labels != nil && pod.Labels[PodScalerLabelKey] == PodScalerLabelValueMeasured
379+
296380
mutateResources := func(containers []corev1.Container) {
297381
for i := range containers {
382+
// For measured pods, skip Prometheus-based recommendations if resources were already set
383+
// by ApplyMeasuredPodResources (which uses BigQuery measured data)
384+
if isMeasuredPod {
385+
hasCPURequest := false
386+
hasMemoryRequest := false
387+
if containers[i].Resources.Requests != nil {
388+
if cpuReq, ok := containers[i].Resources.Requests[corev1.ResourceCPU]; ok && cpuReq.Sign() > 0 {
389+
hasCPURequest = true
390+
}
391+
if memReq, ok := containers[i].Resources.Requests[corev1.ResourceMemory]; ok && memReq.Sign() > 0 {
392+
hasMemoryRequest = true
393+
}
394+
}
395+
if hasCPURequest || hasMemoryRequest {
396+
logger.Debugf("Skipping Prometheus recommendations for measured pod container %s - resources already set from BigQuery data", containers[i].Name)
397+
// Still apply caps and limits even for measured pods
398+
preventUnschedulable(&containers[i].Resources, cpuCap, memoryCap, logger)
399+
if mutateResourceLimits {
400+
reconcileLimits(&containers[i].Resources)
401+
}
402+
continue
403+
}
404+
}
405+
298406
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)
299407
resources, recommendationExists := server.recommendedRequestFor(meta)
300408
if recommendationExists {
301409
logger.Debugf("recommendation exists for: %s", containers[i].Name)
302410
workloadType := determineWorkloadType(pod.Annotations, pod.Labels)
303411
workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels)
304-
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, reporter, logger)
412+
applyRecommendationsBasedOnRecentData(&resources, &containers[i].Resources, workloadName, workloadType, authoritativeCPU, authoritativeMemory, reporter, logger)
305413
if mutateResourceLimits {
306414
reconcileLimits(&containers[i].Resources)
307415
}

cmd/pod-scaler/admission_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
554554
for _, testCase := range testCases {
555555
t.Run(testCase.name, func(t *testing.T) {
556556
original := testCase.pod.DeepCopy()
557-
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name))
557+
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, false, &defaultReporter, logrus.WithField("test", testCase.name))
558558
diff := cmp.Diff(original, testCase.pod)
559559
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
560560
// particularly deterministic about this. We don't care.
@@ -729,7 +729,7 @@ func TestUseOursIfLarger(t *testing.T) {
729729
}
730730
for _, testCase := range testCases {
731731
t.Run(testCase.name, func(t *testing.T) {
732-
useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", &defaultReporter, logrus.WithField("test", testCase.name))
732+
applyRecommendationsBasedOnRecentData(&testCase.ours, &testCase.theirs, "test", "build", false, false, &defaultReporter, logrus.WithField("test", testCase.name))
733733
if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" {
734734
t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff)
735735
}
@@ -814,7 +814,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) {
814814

815815
for _, tc := range testCases {
816816
t.Run(tc.name, func(t *testing.T) {
817-
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", &tc.reporter, logrus.WithField("test", tc.name))
817+
applyRecommendationsBasedOnRecentData(&tc.ours, &tc.theirs, "test", "build", false, false, &tc.reporter, logrus.WithField("test", tc.name))
818818

819819
if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" {
820820
t.Errorf("actual and expected reporter states don't match, : %v", diff)

cmd/pod-scaler/main.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,21 @@ type consumerOptions struct {
6161
port int
6262
uiPort int
6363

64-
dataDir string
65-
certDir string
66-
mutateResourceLimits bool
67-
cpuCap int64
68-
memoryCap string
69-
cpuPriorityScheduling int64
64+
dataDir string
65+
certDir string
66+
mutateResourceLimits bool
67+
cpuCap int64
68+
memoryCap string
69+
cpuPriorityScheduling int64
70+
authoritativeCPURequests bool
71+
authoritativeMemoryRequests bool
72+
73+
// Measured pods options - when enabled, pods are classified as "normal" or "measured"
74+
// Measured pods run on isolated nodes to get accurate CPU/memory utilization data
75+
enableMeasuredPods bool
76+
bigQueryProjectID string
77+
bigQueryDatasetID string
78+
bigQueryCredentialsFile string
7079
}
7180

7281
func bindOptions(fs *flag.FlagSet) *options {
@@ -89,6 +98,12 @@ func bindOptions(fs *flag.FlagSet) *options {
8998
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9099
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
91100
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
101+
fs.BoolVar(&o.authoritativeCPURequests, "authoritative-cpu-requests", false, "Enable authoritative CPU request recommendations. When enabled, pod-scaler can reduce CPU requests based on recent usage data (past 3 weeks).")
102+
fs.BoolVar(&o.authoritativeMemoryRequests, "authoritative-memory-requests", false, "Enable authoritative memory request recommendations. When enabled, pod-scaler can reduce memory requests based on recent usage data (past 3 weeks).")
103+
fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
104+
fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)")
105+
fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
106+
fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
92107
o.resultsOptions.Bind(fs)
93108
return &o
94109
}
@@ -122,6 +137,15 @@ func (o *options) validate() error {
122137
if memoryCap := resource.MustParse(o.memoryCap); memoryCap.Sign() <= 0 {
123138
return errors.New("--memory-cap must be greater than 0")
124139
}
140+
if o.enableMeasuredPods {
141+
if o.bigQueryProjectID == "" {
142+
return errors.New("--bigquery-project-id is required when --enable-measured-pods is true")
143+
}
144+
if o.bigQueryDatasetID == "" {
145+
return errors.New("--bigquery-dataset-id is required when --enable-measured-pods is true")
146+
}
147+
// Note: bigQueryCredentialsFile may use default application credentials if not specified
148+
}
125149
if err := o.resultsOptions.Validate(); err != nil {
126150
return err
127151
}
@@ -268,7 +292,7 @@ func mainAdmission(opts *options, cache Cache) {
268292
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
269293
}
270294

271-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter)
295+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.authoritativeCPURequests, opts.authoritativeMemoryRequests, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter)
272296
}
273297

274298
func loaders(cache Cache) map[string][]*cacheReloader {

0 commit comments

Comments
 (0)