Skip to content

Commit 196c83c

Browse files
committed
Add measured pods feature with authoritative mode improvements
1 parent 2eb166d commit 196c83c

11 files changed

Lines changed: 942 additions & 38 deletions

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ index.js
1010
# default working dir
1111
/job-aggregator-working-dir
1212
# go built binary
13-
/job-run-aggregator
13+
/job-run-aggregatorpod-scaler

cmd/pod-scaler/admission.go

Lines changed: 128 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,51 @@ import (
3333
"github.com/openshift/ci-tools/pkg/steps"
3434
)
3535

36-
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
36+
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) {
3737
logger := logrus.WithField("component", "pod-scaler admission")
3838
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
3939
health := pjutil.NewHealthOnPort(healthPort)
4040
resources := newResourceServer(loaders, health)
4141
decoder := admission.NewDecoder(scheme.Scheme)
4242

43+
var bqClient *BigQueryClient
44+
if enableMeasuredPods {
45+
if bigQueryProjectID == "" || bigQueryDatasetID == "" {
46+
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
47+
}
48+
cache := NewMeasuredPodCache(logger)
49+
var err error
50+
bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger)
51+
if err != nil {
52+
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
53+
}
54+
logger.Info("Measured pods feature enabled with BigQuery integration")
55+
}
56+
4357
server := webhook.NewServer(webhook.Options{
4458
Port: port,
4559
CertDir: certDir,
4660
})
47-
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
61+
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}})
4862
logger.Info("Serving admission webhooks.")
4963
if err := server.Start(interrupts.Context()); err != nil {
5064
logrus.WithError(err).Fatal("Failed to serve webhooks.")
5165
}
5266
}
5367

5468
type podMutator struct {
55-
logger *logrus.Entry
56-
client buildclientv1.BuildV1Interface
57-
resources *resourceServer
58-
mutateResourceLimits bool
59-
decoder admission.Decoder
60-
cpuCap int64
61-
memoryCap string
62-
cpuPriorityScheduling int64
63-
reporter results.PodScalerReporter
69+
logger *logrus.Entry
70+
client buildclientv1.BuildV1Interface
71+
resources *resourceServer
72+
mutateResourceLimits bool
73+
decoder admission.Decoder
74+
cpuCap int64
75+
memoryCap string
76+
cpuPriorityScheduling int64
77+
authoritativeCPURequests bool
78+
authoritativeMemoryRequests bool
79+
bqClient *BigQueryClient
80+
reporter results.PodScalerReporter
6481
}
6582

6683
func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
@@ -97,7 +114,16 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
97114
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
98115
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
99116
}
100-
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
117+
118+
// Classify pod as normal or measured (if enabled)
119+
if m.bqClient != nil {
120+
ClassifyPod(pod, m.bqClient, logger)
121+
AddPodAntiAffinity(pod, logger)
122+
// Apply measured pod resources before regular resource mutation
123+
ApplyMeasuredPodResources(pod, m.bqClient, logger)
124+
}
125+
126+
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger)
101127
m.addPriorityClass(pod)
102128

103129
marshaledPod, err := json.Marshal(pod)
@@ -196,8 +222,14 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) {
196222
}
197223
}
198224

199-
// useOursIfLarger updates fields in theirs when ours are larger
200-
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, reporter results.PodScalerReporter, logger *logrus.Entry) {
225+
// applyRecommendationsBasedOnRecentData applies resource recommendations based on recent usage data
226+
// (see resourceRecommendationWindow). If they used more, we increase resources. If they used less
227+
// and authoritative mode is enabled for that resource, we decrease them.
228+
//
229+
// Note: The reduction functionality (authoritative mode) is tested in admission_test.go as part
230+
// of TestUseOursIfLarger. The test cases there properly handle ResourceQuantity comparison
231+
// and verify the gradual reduction logic with safety limits.
232+
func applyRecommendationsBasedOnRecentData(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry, pod *corev1.Pod) {
201233
for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} {
202234
if item.Requests == nil {
203235
item.Requests = corev1.ResourceList{}
@@ -215,12 +247,30 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
215247
} {
216248
for _, field := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
217249
our := (*pair.ours)[field]
218-
//TODO(sgoeddel): this is a temporary experiment to see what effect setting values that are 120% of what has
219-
// been determined has on the rate of OOMKilled and similar termination of workloads
220-
increased := our.AsApproximateFloat64() * 1.2
221-
our.Set(int64(increased))
222-
250+
// If we have no recommendation for this resource, skip it
251+
if our.IsZero() {
252+
continue
253+
}
223254
their := (*pair.theirs)[field]
255+
256+
// Check if this is a measured pod. Measured pods have resources set by ApplyMeasuredPodResources
257+
// which already applies a 1.2x buffer, so we skip applying the buffer again to avoid double
258+
// buffering (1.2 * 1.2 = 1.44x instead of 1.2x). We use the existing pod label to determine
259+
// this, which is more reliable than inferring from value comparisons.
260+
isMeasuredPod := pod != nil && pod.Labels != nil && pod.Labels[PodScalerLabelKey] == PodScalerLabelValueMeasured
261+
262+
if !isMeasuredPod {
263+
// Apply a 1.2x safety buffer to resource recommendations to reduce the rate of OOMKilled
264+
// and similar workload terminations. This buffer accounts for:
265+
// - Natural variance in resource usage patterns
266+
// - Transient spikes in CPU/memory consumption
267+
// - Measurement inaccuracies in historical data
268+
// The 20% overhead provides a safety margin while still allowing for efficient resource utilization.
269+
increased := our.AsApproximateFloat64() * 1.2
270+
our.Set(int64(increased))
271+
} else {
272+
logger.Debugf("Skipping 1.2x buffer for %s %s - pod is marked as measured and resources were set by measured pods logic", pair.resource, field)
273+
}
224274
fieldLogger := logger.WithFields(logrus.Fields{
225275
"workloadName": workloadName,
226276
"workloadType": workloadType,
@@ -231,13 +281,40 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
231281
})
232282
cmp := our.Cmp(their)
233283
if cmp == 1 {
234-
fieldLogger.Debug("determined amount larger than configured")
284+
fieldLogger.Debug("determined amount larger than configured, increasing resources")
235285
(*pair.theirs)[field] = our
236286
if their.Value() > 0 && our.Value() > (their.Value()*10) {
237287
reporter.ReportResourceConfigurationWarning(workloadName, workloadType, their.String(), our.String(), field.String())
238288
}
239289
} else if cmp < 0 {
240-
fieldLogger.Debug("determined amount smaller than configured")
290+
authoritative := (field == corev1.ResourceCPU && authoritativeCPU) || (field == corev1.ResourceMemory && authoritativeMemory)
291+
if authoritative {
292+
// Apply gradual reduction with safety limits: max 25% reduction per cycle, minimum 5% difference
293+
ourValue := our.AsApproximateFloat64()
294+
theirValue := their.AsApproximateFloat64()
295+
if theirValue > 0 {
296+
reductionPercent := 1.0 - (ourValue / theirValue)
297+
maxReductionPercent := 0.25
298+
299+
if reductionPercent >= 0.05 {
300+
if reductionPercent > maxReductionPercent {
301+
maxAllowed := theirValue * (1.0 - maxReductionPercent)
302+
our.Set(int64(maxAllowed))
303+
fieldLogger.Debugf("applying gradual reduction (limited to 25%% per cycle)")
304+
} else {
305+
fieldLogger.Debug("reducing resources based on recent usage")
306+
}
307+
(*pair.theirs)[field] = our
308+
} else {
309+
fieldLogger.Debug("difference less than 5%, skipping micro-adjustment")
310+
}
311+
} else {
312+
fieldLogger.Debug("theirs is zero, applying recommendation")
313+
(*pair.theirs)[field] = our
314+
}
315+
} else {
316+
fieldLogger.Debug("authoritative mode disabled, keeping existing value")
317+
}
241318
} else {
242319
fieldLogger.Debug("determined amount equal to configured")
243320
}
@@ -292,16 +369,44 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
292369
}
293370
}
294371

295-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
372+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
373+
// Check if this is a measured pod - measured pods have resources set by ApplyMeasuredPodResources
374+
// and we should preserve those instead of overwriting with Prometheus recommendations
375+
isMeasuredPod := pod.Labels != nil && pod.Labels[PodScalerLabelKey] == PodScalerLabelValueMeasured
376+
296377
mutateResources := func(containers []corev1.Container) {
297378
for i := range containers {
379+
// For measured pods, skip Prometheus-based recommendations if resources were already set
380+
// by ApplyMeasuredPodResources (which uses BigQuery measured data)
381+
if isMeasuredPod {
382+
hasCPURequest := false
383+
hasMemoryRequest := false
384+
if containers[i].Resources.Requests != nil {
385+
if cpuReq, ok := containers[i].Resources.Requests[corev1.ResourceCPU]; ok && cpuReq.Sign() > 0 {
386+
hasCPURequest = true
387+
}
388+
if memReq, ok := containers[i].Resources.Requests[corev1.ResourceMemory]; ok && memReq.Sign() > 0 {
389+
hasMemoryRequest = true
390+
}
391+
}
392+
if hasCPURequest || hasMemoryRequest {
393+
logger.Debugf("Skipping Prometheus recommendations for measured pod container %s - resources already set from BigQuery data", containers[i].Name)
394+
// Still apply caps and limits even for measured pods
395+
preventUnschedulable(&containers[i].Resources, cpuCap, memoryCap, logger)
396+
if mutateResourceLimits {
397+
reconcileLimits(&containers[i].Resources)
398+
}
399+
continue
400+
}
401+
}
402+
298403
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)
299404
resources, recommendationExists := server.recommendedRequestFor(meta)
300405
if recommendationExists {
301406
logger.Debugf("recommendation exists for: %s", containers[i].Name)
302407
workloadType := determineWorkloadType(pod.Annotations, pod.Labels)
303408
workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels)
304-
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, reporter, logger)
409+
applyRecommendationsBasedOnRecentData(&resources, &containers[i].Resources, workloadName, workloadType, authoritativeCPU, authoritativeMemory, reporter, logger, pod)
305410
if mutateResourceLimits {
306411
reconcileLimits(&containers[i].Resources)
307412
}

cmd/pod-scaler/admission_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
554554
for _, testCase := range testCases {
555555
t.Run(testCase.name, func(t *testing.T) {
556556
original := testCase.pod.DeepCopy()
557-
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name))
557+
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, false, &defaultReporter, logrus.WithField("test", testCase.name))
558558
diff := cmp.Diff(original, testCase.pod)
559559
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
560560
// particularly deterministic about this. We don't care.
@@ -729,7 +729,7 @@ func TestUseOursIfLarger(t *testing.T) {
729729
}
730730
for _, testCase := range testCases {
731731
t.Run(testCase.name, func(t *testing.T) {
732-
useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", &defaultReporter, logrus.WithField("test", testCase.name))
732+
applyRecommendationsBasedOnRecentData(&testCase.ours, &testCase.theirs, "test", "build", false, false, &defaultReporter, logrus.WithField("test", testCase.name), nil)
733733
if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" {
734734
t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff)
735735
}
@@ -814,7 +814,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) {
814814

815815
for _, tc := range testCases {
816816
t.Run(tc.name, func(t *testing.T) {
817-
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", &tc.reporter, logrus.WithField("test", tc.name))
817+
applyRecommendationsBasedOnRecentData(&tc.ours, &tc.theirs, "test", "build", false, false, &tc.reporter, logrus.WithField("test", tc.name), nil)
818818

819819
if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" {
820820
t.Errorf("actual and expected reporter states don't match, : %v", diff)

cmd/pod-scaler/main.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,21 @@ type consumerOptions struct {
6161
port int
6262
uiPort int
6363

64-
dataDir string
65-
certDir string
66-
mutateResourceLimits bool
67-
cpuCap int64
68-
memoryCap string
69-
cpuPriorityScheduling int64
64+
dataDir string
65+
certDir string
66+
mutateResourceLimits bool
67+
cpuCap int64
68+
memoryCap string
69+
cpuPriorityScheduling int64
70+
authoritativeCPURequests bool
71+
authoritativeMemoryRequests bool
72+
73+
// Measured pods options - when enabled, pods are classified as "normal" or "measured"
74+
// Measured pods run on isolated nodes to get accurate CPU/memory utilization data
75+
enableMeasuredPods bool
76+
bigQueryProjectID string
77+
bigQueryDatasetID string
78+
bigQueryCredentialsFile string
7079
}
7180

7281
func bindOptions(fs *flag.FlagSet) *options {
@@ -89,6 +98,12 @@ func bindOptions(fs *flag.FlagSet) *options {
8998
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9099
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
91100
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
101+
fs.BoolVar(&o.authoritativeCPURequests, "authoritative-cpu-requests", false, "Enable authoritative CPU request recommendations. When enabled, pod-scaler can reduce CPU requests based on recent usage data (past 3 weeks).")
102+
fs.BoolVar(&o.authoritativeMemoryRequests, "authoritative-memory-requests", false, "Enable authoritative memory request recommendations. When enabled, pod-scaler can reduce memory requests based on recent usage data (past 3 weeks).")
103+
fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
104+
fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)")
105+
fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
106+
fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
92107
o.resultsOptions.Bind(fs)
93108
return &o
94109
}
@@ -122,6 +137,15 @@ func (o *options) validate() error {
122137
if memoryCap := resource.MustParse(o.memoryCap); memoryCap.Sign() <= 0 {
123138
return errors.New("--memory-cap must be greater than 0")
124139
}
140+
if o.enableMeasuredPods {
141+
if o.bigQueryProjectID == "" {
142+
return errors.New("--bigquery-project-id is required when --enable-measured-pods is true")
143+
}
144+
if o.bigQueryDatasetID == "" {
145+
return errors.New("--bigquery-dataset-id is required when --enable-measured-pods is true")
146+
}
147+
// Note: bigQueryCredentialsFile may use default application credentials if not specified
148+
}
125149
if err := o.resultsOptions.Validate(); err != nil {
126150
return err
127151
}
@@ -268,7 +292,7 @@ func mainAdmission(opts *options, cache Cache) {
268292
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
269293
}
270294

271-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter)
295+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.authoritativeCPURequests, opts.authoritativeMemoryRequests, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter)
272296
}
273297

274298
func loaders(cache Cache) map[string][]*cacheReloader {

0 commit comments

Comments
 (0)