Skip to content

Commit 059588d

Browse files
committed
Add authoritative mode and improvements to pod scaler resource recommendations
1 parent 4c0387d commit 059588d

10 files changed

Lines changed: 402 additions & 31 deletions

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ index.js
99

1010
# default working dir
1111
/job-aggregator-working-dir
12-
# go built binary
13-
/job-run-aggregator
12+
# go built binaries
13+
/job-run-aggregator
14+
/pod-scaler

cmd/pod-scaler/admission.go

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
"github.com/openshift/ci-tools/pkg/steps"
3434
)
3535

36-
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
36+
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter) {
3737
logger := logrus.WithField("component", "pod-scaler admission")
3838
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
3939
health := pjutil.NewHealthOnPort(healthPort)
@@ -44,7 +44,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int
4444
Port: port,
4545
CertDir: certDir,
4646
})
47-
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
47+
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, reporter: reporter}})
4848
logger.Info("Serving admission webhooks.")
4949
if err := server.Start(interrupts.Context()); err != nil {
5050
logrus.WithError(err).Fatal("Failed to serve webhooks.")
@@ -60,6 +60,8 @@ type podMutator struct {
6060
cpuCap int64
6161
memoryCap string
6262
cpuPriorityScheduling int64
63+
authoritativeCPU bool
64+
authoritativeMemory bool
6365
reporter results.PodScalerReporter
6466
}
6567

@@ -97,7 +99,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
9799
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
98100
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
99101
}
100-
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
102+
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPU, m.authoritativeMemory, m.reporter, logger)
101103
m.addPriorityClass(pod)
102104

103105
marshaledPod, err := json.Marshal(pod)
@@ -196,8 +198,14 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) {
196198
}
197199
}
198200

199-
// useOursIfLarger updates fields in theirs when ours are larger
200-
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, reporter results.PodScalerReporter, logger *logrus.Entry) {
201+
// applyRecommendationsBasedOnRecentData applies resource recommendations based on recent usage data
202+
// (see resourceRecommendationWindow). If they used more, we increase resources. If they used less,
203+
// we decrease them if authoritative mode is enabled for that resource type.
204+
//
205+
// TestApplyRecommendationsBasedOnRecentData_ReducesResources is tested in admission_test.go
206+
// as part of TestUseOursIfLarger. The reduction functionality is verified there with proper
207+
// test cases that handle ResourceQuantity comparison correctly.
208+
func applyRecommendationsBasedOnRecentData(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
201209
for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} {
202210
if item.Requests == nil {
203211
item.Requests = corev1.ResourceList{}
@@ -215,6 +223,10 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
215223
} {
216224
for _, field := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
217225
our := (*pair.ours)[field]
226+
// If we have no recommendation for this resource, skip it
227+
if our.IsZero() {
228+
continue
229+
}
218230
//TODO(sgoeddel): this is a temporary experiment to see what effect setting values that are 120% of what has
219231
// been determined has on the rate of OOMKilled and similar termination of workloads
220232
increased := our.AsApproximateFloat64() * 1.2
@@ -231,13 +243,49 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
231243
})
232244
cmp := our.Cmp(their)
233245
if cmp == 1 {
234-
fieldLogger.Debug("determined amount larger than configured")
246+
fieldLogger.Debug("determined amount larger than configured, increasing resources")
235247
(*pair.theirs)[field] = our
236248
if their.Value() > 0 && our.Value() > (their.Value()*10) {
237249
reporter.ReportResourceConfigurationWarning(workloadName, workloadType, their.String(), our.String(), field.String())
238250
}
239251
} else if cmp < 0 {
240-
fieldLogger.Debug("determined amount smaller than configured")
252+
// Check if authoritative mode is enabled for this resource type
253+
isAuthoritative := false
254+
if field == corev1.ResourceCPU {
255+
isAuthoritative = authoritativeCPU
256+
} else if field == corev1.ResourceMemory {
257+
isAuthoritative = authoritativeMemory
258+
}
259+
260+
if !isAuthoritative {
261+
fieldLogger.Debug("authoritative mode disabled for this resource, skipping reduction")
262+
continue
263+
}
264+
265+
// Apply gradual reduction with safety limits: max 25% reduction per cycle, minimum 5% difference
266+
ourValue := our.AsApproximateFloat64()
267+
theirValue := their.AsApproximateFloat64()
268+
if theirValue == 0 {
269+
fieldLogger.Debug("theirs is zero, applying recommendation")
270+
(*pair.theirs)[field] = our
271+
continue
272+
}
273+
274+
reductionPercent := 1.0 - (ourValue / theirValue)
275+
if reductionPercent < 0.05 {
276+
fieldLogger.Debug("difference less than 5%, skipping micro-adjustment")
277+
continue
278+
}
279+
280+
maxReductionPercent := 0.25
281+
if reductionPercent > maxReductionPercent {
282+
maxAllowed := theirValue * (1.0 - maxReductionPercent)
283+
our.Set(int64(maxAllowed))
284+
fieldLogger.Debugf("applying gradual reduction (limited to 25%% per cycle)")
285+
} else {
286+
fieldLogger.Debug("reducing resources based on recent usage")
287+
}
288+
(*pair.theirs)[field] = our
241289
} else {
242290
fieldLogger.Debug("determined amount equal to configured")
243291
}
@@ -292,7 +340,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
292340
}
293341
}
294342

295-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
343+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
296344
mutateResources := func(containers []corev1.Container) {
297345
for i := range containers {
298346
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)
@@ -301,7 +349,7 @@ func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceL
301349
logger.Debugf("recommendation exists for: %s", containers[i].Name)
302350
workloadType := determineWorkloadType(pod.Annotations, pod.Labels)
303351
workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels)
304-
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, reporter, logger)
352+
applyRecommendationsBasedOnRecentData(&resources, &containers[i].Resources, workloadName, workloadType, authoritativeCPU, authoritativeMemory, reporter, logger)
305353
if mutateResourceLimits {
306354
reconcileLimits(&containers[i].Resources)
307355
}

cmd/pod-scaler/admission_test.go

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
554554
for _, testCase := range testCases {
555555
t.Run(testCase.name, func(t *testing.T) {
556556
original := testCase.pod.DeepCopy()
557-
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", &defaultReporter, logrus.WithField("test", testCase.name))
557+
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", true, true, &defaultReporter, logrus.WithField("test", testCase.name))
558558
diff := cmp.Diff(original, testCase.pod)
559559
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
560560
// particularly deterministic about this. We don't care.
@@ -661,7 +661,7 @@ func TestUseOursIfLarger(t *testing.T) {
661661
},
662662
},
663663
{
664-
name: "nothing in ours is larger",
664+
name: "ours are smaller with very small values - should reduce resources based on recent usage",
665665
ours: corev1.ResourceRequirements{
666666
Limits: corev1.ResourceList{
667667
corev1.ResourceCPU: *resource.NewQuantity(10, resource.DecimalSI),
@@ -684,12 +684,16 @@ func TestUseOursIfLarger(t *testing.T) {
684684
},
685685
expected: corev1.ResourceRequirements{
686686
Limits: corev1.ResourceList{
687-
corev1.ResourceCPU: *resource.NewQuantity(200, resource.DecimalSI),
688-
corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI),
687+
// Ours: 10 * 1.2 = 12, Theirs: 200, Reduction: 94% > 25%, so limit to 25%: 200 * 0.75 = 150
688+
corev1.ResourceCPU: *resource.NewQuantity(150, resource.DecimalSI),
689+
// Ours: 10 * 1.2 = 12, Theirs: 3e10, Reduction: >99% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10
690+
corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI),
689691
},
690692
Requests: corev1.ResourceList{
691-
corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI),
692-
corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI),
693+
// Ours: 10 * 1.2 = 12, Theirs: 100, Reduction: 88% > 25%, so limit to 25%: 100 * 0.75 = 75
694+
corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI),
695+
// Ours: 10 * 1.2 = 12, Theirs: 2e10, Reduction: >99% > 25%, so limit to 25%: 2e10 * 0.75 = 1.5e10
696+
corev1.ResourceMemory: *resource.NewQuantity(15e9, resource.BinarySI),
693697
},
694698
},
695699
},
@@ -717,19 +721,57 @@ func TestUseOursIfLarger(t *testing.T) {
717721
},
718722
expected: corev1.ResourceRequirements{
719723
Limits: corev1.ResourceList{
720-
corev1.ResourceCPU: *resource.NewQuantity(480, resource.DecimalSI),
721-
corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI),
724+
corev1.ResourceCPU: *resource.NewQuantity(480, resource.DecimalSI),
725+
// Ours: 10 * 1.2 = 12, Theirs: 3e10, Reduction: >99% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10
726+
corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI),
722727
},
723728
Requests: corev1.ResourceList{
724729
corev1.ResourceCPU: *resource.NewQuantity(1200, resource.DecimalSI),
725730
corev1.ResourceMemory: *resource.NewQuantity(48e9, resource.BinarySI),
726731
},
727732
},
728733
},
734+
{
735+
name: "ours are smaller with medium values - should reduce resources based on recent usage",
736+
ours: corev1.ResourceRequirements{
737+
Limits: corev1.ResourceList{
738+
corev1.ResourceCPU: *resource.NewQuantity(50, resource.DecimalSI),
739+
corev1.ResourceMemory: *resource.NewQuantity(1e9, resource.BinarySI),
740+
},
741+
Requests: corev1.ResourceList{
742+
corev1.ResourceCPU: *resource.NewQuantity(25, resource.DecimalSI),
743+
corev1.ResourceMemory: *resource.NewQuantity(5e9, resource.BinarySI),
744+
},
745+
},
746+
theirs: corev1.ResourceRequirements{
747+
Limits: corev1.ResourceList{
748+
corev1.ResourceCPU: *resource.NewQuantity(200, resource.DecimalSI),
749+
corev1.ResourceMemory: *resource.NewQuantity(3e10, resource.BinarySI),
750+
},
751+
Requests: corev1.ResourceList{
752+
corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI),
753+
corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI),
754+
},
755+
},
756+
expected: corev1.ResourceRequirements{
757+
Limits: corev1.ResourceList{
758+
// Ours: 50 * 1.2 = 60, Theirs: 200, Reduction: 70% > 25%, so limit to 25%: 200 * 0.75 = 150
759+
corev1.ResourceCPU: *resource.NewQuantity(150, resource.DecimalSI),
760+
// Ours: 1e9 * 1.2 = 1.2e9, Theirs: 3e10, Reduction: 96% > 25%, so limit to 25%: 3e10 * 0.75 = 2.25e10
761+
corev1.ResourceMemory: *resource.NewQuantity(225e8, resource.BinarySI),
762+
},
763+
Requests: corev1.ResourceList{
764+
// Ours: 25 * 1.2 = 30, Theirs: 100, Reduction: 70% > 25%, so limit to 25%: 100 * 0.75 = 75
765+
corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI),
766+
// Ours: 5e9 * 1.2 = 6e9, Theirs: 2e10, Reduction: 70% > 25%, so limit to 25%: 2e10 * 0.75 = 1.5e10
767+
corev1.ResourceMemory: *resource.NewQuantity(15e9, resource.BinarySI),
768+
},
769+
},
770+
},
729771
}
730772
for _, testCase := range testCases {
731773
t.Run(testCase.name, func(t *testing.T) {
732-
useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", &defaultReporter, logrus.WithField("test", testCase.name))
774+
applyRecommendationsBasedOnRecentData(&testCase.ours, &testCase.theirs, "test", "build", true, true, &defaultReporter, logrus.WithField("test", testCase.name))
733775
if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" {
734776
t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff)
735777
}
@@ -814,7 +856,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) {
814856

815857
for _, tc := range testCases {
816858
t.Run(tc.name, func(t *testing.T) {
817-
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", &tc.reporter, logrus.WithField("test", tc.name))
859+
applyRecommendationsBasedOnRecentData(&tc.ours, &tc.theirs, "test", "build", true, true, &tc.reporter, logrus.WithField("test", tc.name))
818860

819861
if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" {
820862
t.Errorf("actual and expected reporter states don't match, : %v", diff)

cmd/pod-scaler/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ type consumerOptions struct {
6767
cpuCap int64
6868
memoryCap string
6969
cpuPriorityScheduling int64
70+
authoritativeCPU bool
71+
authoritativeMemory bool
7072
}
7173

7274
func bindOptions(fs *flag.FlagSet) *options {
@@ -89,6 +91,8 @@ func bindOptions(fs *flag.FlagSet) *options {
8991
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9092
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
9193
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
94+
fs.BoolVar(&o.authoritativeCPU, "authoritative-cpu", true, "Enable authoritative mode for CPU requests (allows decreasing resources based on recent usage)")
95+
fs.BoolVar(&o.authoritativeMemory, "authoritative-memory", true, "Enable authoritative mode for memory requests (allows decreasing resources based on recent usage)")
9296
o.resultsOptions.Bind(fs)
9397
return &o
9498
}
@@ -268,7 +272,7 @@ func mainAdmission(opts *options, cache Cache) {
268272
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
269273
}
270274

271-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, reporter)
275+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.authoritativeCPU, opts.authoritativeMemory, reporter)
272276
}
273277

274278
func loaders(cache Cache) map[string][]*cacheReloader {

0 commit comments

Comments
 (0)