Skip to content

Commit d70644b

Browse files
openshift-merge-bot[bot]deepsm007
authored andcommitted
Add measured pods feature with BigQuery integration
2 parents a66adc9 + e37f16d commit d70644b

19 files changed

Lines changed: 363 additions & 118 deletions

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@ index.js
1010
# default working dir
1111
/job-aggregator-working-dir
1212
# go built binary
13-
/job-run-aggregator
13+
/job-run-aggregatorpod-scaler
14+
15+
pod-scaler
16+
job-run-aggregator

cmd/ci-operator/main.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import (
9191
"github.com/openshift/ci-tools/pkg/results"
9292
"github.com/openshift/ci-tools/pkg/secrets"
9393
"github.com/openshift/ci-tools/pkg/steps"
94+
tooldetector "github.com/openshift/ci-tools/pkg/tool-detector"
9495
"github.com/openshift/ci-tools/pkg/util"
9596
"github.com/openshift/ci-tools/pkg/util/gzip"
9697
"github.com/openshift/ci-tools/pkg/validation"
@@ -451,6 +452,8 @@ type options struct {
451452
enableSecretsStoreCSIDriver bool
452453

453454
metricsAgent *metrics.MetricsAgent
455+
456+
skippedImages sets.Set[string]
454457
}
455458

456459
func bindOptions(flag *flag.FlagSet) *options {
@@ -606,6 +609,7 @@ func (o *options) Complete() error {
606609
if err := validation.IsValidResolvedConfiguration(o.configSpec, mergedConfig); err != nil {
607610
return results.ForReason("validating_config").ForError(err)
608611
}
612+
o.skippedImages = determineSkippedImages(o.configSpec, o.jobSpec, o.targets.values)
609613
o.graphConfig = defaults.FromConfigStatic(o.configSpec)
610614
if err := validation.IsValidGraphConfiguration(o.graphConfig.Steps); err != nil {
611615
return results.ForReason("validating_config").ForError(err)
@@ -956,7 +960,7 @@ func (o *options) Run() []error {
956960
// load the graph from the configuration
957961
buildSteps, promotionSteps, err := defaults.FromConfig(ctx, o.configSpec, &o.graphConfig, o.jobSpec, o.templates, o.writeParams, o.promote, o.clusterConfig,
958962
o.podPendingTimeout, leaseClient, o.targets.values, o.cloneAuthConfig, o.pullSecret, o.pushSecret, o.censor, o.hiveKubeconfig,
959-
o.nodeName, nodeArchitectures, o.targetAdditionalSuffix, o.manifestToolDockerCfg, o.localRegistryDNS, streams, injectedTest, o.enableSecretsStoreCSIDriver, o.metricsAgent)
963+
o.nodeName, nodeArchitectures, o.targetAdditionalSuffix, o.manifestToolDockerCfg, o.localRegistryDNS, streams, injectedTest, o.enableSecretsStoreCSIDriver, o.metricsAgent, o.skippedImages)
960964
if err != nil {
961965
return []error{results.ForReason("defaulting_config").WithError(err).Errorf("failed to generate steps from config: %v", err)}
962966
}
@@ -1078,6 +1082,33 @@ func (o *options) Run() []error {
10781082
})
10791083
}
10801084

1085+
// determineSkippedImages determines which images can be skipped when
1086+
// build_images_if_affected is enabled and the [images] target is requested.
1087+
func determineSkippedImages(config *api.ReleaseBuildConfiguration, jobSpec *api.JobSpec, targets []string) sets.Set[string] {
1088+
if config == nil || jobSpec == nil || !config.BuildImagesIfAffected {
1089+
return nil
1090+
}
1091+
1092+
if !slices.Contains(targets, "[images]") {
1093+
return nil
1094+
}
1095+
1096+
detector := tooldetector.New(jobSpec, config)
1097+
affectedTools, err := detector.AffectedTools()
1098+
if err != nil {
1099+
logrus.WithError(err).Warn("Failed to detect affected tools; building all images")
1100+
return nil
1101+
}
1102+
1103+
skipped := sets.New[string]()
1104+
for _, img := range config.Images {
1105+
if !affectedTools.Has(string(img.To)) {
1106+
skipped.Insert(string(img.To))
1107+
}
1108+
}
1109+
return skipped
1110+
}
1111+
10811112
func runPromotionStep(ctx context.Context, step api.Step, detailsChan chan<- api.CIOperatorStepDetails, errChan chan<- error, metricsAgent *metrics.MetricsAgent) {
10821113
details, err := runStep(ctx, step, metricsAgent)
10831114
if err != nil {

cmd/pipeline-controller/config_watcher.go

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package main
22

33
import (
44
"os"
5+
"reflect"
56
"sync"
7+
"time"
68

79
"github.com/sirupsen/logrus"
8-
"gopkg.in/fsnotify.v1"
910
"gopkg.in/yaml.v2"
1011
)
1112

@@ -82,35 +83,39 @@ func newWatcher(filePath string, logger *logrus.Entry) *watcher {
8283
}
8384

8485
func (w *watcher) watch() {
85-
fileWatcher, err := fsnotify.NewWatcher()
86-
if err != nil {
87-
w.logger.Fatal(err)
86+
// Load initial config
87+
if err := w.reloadConfig(); err != nil {
88+
w.logger.WithError(err).Error("Failed to load initial config")
8889
}
8990

90-
defer fileWatcher.Close()
91-
92-
err = fileWatcher.Add(w.filePath)
93-
if err != nil {
94-
w.logger.Fatal(err)
95-
}
91+
// Use polling instead of fsnotify because git-sync doesn't trigger filesystem events
92+
ticker := time.NewTicker(3 * time.Minute)
93+
defer ticker.Stop()
9694

97-
err = w.reloadConfig()
98-
if err != nil {
99-
w.logger.WithError(err)
100-
}
95+
// Store previous config for comparison
96+
prevConfig := w.getConfigCopy()
10197

102-
for {
103-
event := <-fileWatcher.Events
104-
if event.Op&fsnotify.Write == fsnotify.Write {
105-
err = w.reloadConfig()
106-
if err != nil {
107-
w.logger.WithError(err)
108-
}
98+
for range ticker.C {
99+
if err := w.reloadConfig(); err != nil {
100+
w.logger.WithError(err).Error("Failed to reload config")
101+
continue
109102
}
110103

104+
currentConfig := w.getConfigCopy()
105+
if !reflect.DeepEqual(currentConfig, prevConfig) {
106+
w.logger.Info("Config change detected, config reloaded successfully")
107+
prevConfig = currentConfig
108+
}
111109
}
112110
}
113111

112+
// getConfigCopy returns a deep copy of the current config for comparison
113+
func (w *watcher) getConfigCopy() enabledConfig {
114+
w.mutex.Lock()
115+
defer w.mutex.Unlock()
116+
return w.config
117+
}
118+
114119
func (w *watcher) reloadConfig() error {
115120
w.mutex.Lock()
116121
defer w.mutex.Unlock()

cmd/pod-scaler/admission.go

Lines changed: 128 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,51 @@ import (
3333
"github.com/openshift/ci-tools/pkg/steps"
3434
)
3535

36-
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, reporter results.PodScalerReporter) {
36+
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, authoritativeCPURequests, authoritativeMemoryRequests bool, enableMeasuredPods bool, bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile string, reporter results.PodScalerReporter) {
3737
logger := logrus.WithField("component", "pod-scaler admission")
3838
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
3939
health := pjutil.NewHealthOnPort(healthPort)
4040
resources := newResourceServer(loaders, health)
4141
decoder := admission.NewDecoder(scheme.Scheme)
4242

43+
var bqClient *BigQueryClient
44+
if enableMeasuredPods {
45+
if bigQueryProjectID == "" || bigQueryDatasetID == "" {
46+
logrus.Fatal("bigquery-project-id and bigquery-dataset-id are required when enable-measured-pods is true")
47+
}
48+
cache := NewMeasuredPodCache(logger)
49+
var err error
50+
bqClient, err = NewBigQueryClient(bigQueryProjectID, bigQueryDatasetID, bigQueryCredentialsFile, cache, logger)
51+
if err != nil {
52+
logrus.WithError(err).Fatal("Failed to create BigQuery client for measured pods")
53+
}
54+
logger.Info("Measured pods feature enabled with BigQuery integration")
55+
}
56+
4357
server := webhook.NewServer(webhook.Options{
4458
Port: port,
4559
CertDir: certDir,
4660
})
47-
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, reporter: reporter}})
61+
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, authoritativeCPURequests: authoritativeCPURequests, authoritativeMemoryRequests: authoritativeMemoryRequests, bqClient: bqClient, reporter: reporter}})
4862
logger.Info("Serving admission webhooks.")
4963
if err := server.Start(interrupts.Context()); err != nil {
5064
logrus.WithError(err).Fatal("Failed to serve webhooks.")
5165
}
5266
}
5367

5468
type podMutator struct {
55-
logger *logrus.Entry
56-
client buildclientv1.BuildV1Interface
57-
resources *resourceServer
58-
mutateResourceLimits bool
59-
decoder admission.Decoder
60-
cpuCap int64
61-
memoryCap string
62-
cpuPriorityScheduling int64
63-
reporter results.PodScalerReporter
69+
logger *logrus.Entry
70+
client buildclientv1.BuildV1Interface
71+
resources *resourceServer
72+
mutateResourceLimits bool
73+
decoder admission.Decoder
74+
cpuCap int64
75+
memoryCap string
76+
cpuPriorityScheduling int64
77+
authoritativeCPURequests bool
78+
authoritativeMemoryRequests bool
79+
bqClient *BigQueryClient
80+
reporter results.PodScalerReporter
6481
}
6582

6683
func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
@@ -97,7 +114,16 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
97114
logger.WithError(err).Error("Failed to handle rehearsal Pod.")
98115
return admission.Allowed("Failed to handle rehearsal Pod, ignoring.")
99116
}
100-
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.reporter, logger)
117+
118+
// Classify pod as normal or measured (if enabled)
119+
if m.bqClient != nil {
120+
ClassifyPod(pod, m.bqClient, logger)
121+
AddPodAntiAffinity(pod, logger)
122+
// Apply measured pod resources before regular resource mutation
123+
ApplyMeasuredPodResources(pod, m.bqClient, logger)
124+
}
125+
126+
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, m.authoritativeCPURequests, m.authoritativeMemoryRequests, m.reporter, logger)
101127
m.addPriorityClass(pod)
102128

103129
marshaledPod, err := json.Marshal(pod)
@@ -196,8 +222,14 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) {
196222
}
197223
}
198224

199-
// useOursIfLarger updates fields in theirs when ours are larger
200-
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, reporter results.PodScalerReporter, logger *logrus.Entry) {
225+
// applyRecommendationsBasedOnRecentData applies resource recommendations based on recent usage data
226+
// (see resourceRecommendationWindow). If they used more, we increase resources. If they used less
227+
// and authoritative mode is enabled for that resource, we decrease them.
228+
//
229+
// Note: The reduction functionality (authoritative mode) is tested in admission_test.go as part
230+
// of TestUseOursIfLarger. The test cases there properly handle ResourceQuantity comparison
231+
// and verify the gradual reduction logic with safety limits.
232+
func applyRecommendationsBasedOnRecentData(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry, pod *corev1.Pod) {
201233
for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} {
202234
if item.Requests == nil {
203235
item.Requests = corev1.ResourceList{}
@@ -215,12 +247,30 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
215247
} {
216248
for _, field := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
217249
our := (*pair.ours)[field]
218-
//TODO(sgoeddel): this is a temporary experiment to see what effect setting values that are 120% of what has
219-
// been determined has on the rate of OOMKilled and similar termination of workloads
220-
increased := our.AsApproximateFloat64() * 1.2
221-
our.Set(int64(increased))
222-
250+
// If we have no recommendation for this resource, skip it
251+
if our.IsZero() {
252+
continue
253+
}
223254
their := (*pair.theirs)[field]
255+
256+
// Check if this is a measured pod. Measured pods have resources set by ApplyMeasuredPodResources
257+
// which already applies a 1.2x buffer, so we skip applying the buffer again to avoid double
258+
// buffering (1.2 * 1.2 = 1.44x instead of 1.2x). We use the existing pod label to determine
259+
// this, which is more reliable than inferring from value comparisons.
260+
isMeasuredPod := pod != nil && pod.Labels != nil && pod.Labels[PodScalerLabelKey] == PodScalerLabelValueMeasured
261+
262+
if !isMeasuredPod {
263+
// Apply a 1.2x safety buffer to resource recommendations to reduce the rate of OOMKilled
264+
// and similar workload terminations. This buffer accounts for:
265+
// - Natural variance in resource usage patterns
266+
// - Transient spikes in CPU/memory consumption
267+
// - Measurement inaccuracies in historical data
268+
// The 20% overhead provides a safety margin while still allowing for efficient resource utilization.
269+
increased := our.AsApproximateFloat64() * 1.2
270+
our.Set(int64(increased))
271+
} else {
272+
logger.Debugf("Skipping 1.2x buffer for %s %s - pod is marked as measured and resources were set by measured pods logic", pair.resource, field)
273+
}
224274
fieldLogger := logger.WithFields(logrus.Fields{
225275
"workloadName": workloadName,
226276
"workloadType": workloadType,
@@ -231,13 +281,40 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
231281
})
232282
cmp := our.Cmp(their)
233283
if cmp == 1 {
234-
fieldLogger.Debug("determined amount larger than configured")
284+
fieldLogger.Debug("determined amount larger than configured, increasing resources")
235285
(*pair.theirs)[field] = our
236286
if their.Value() > 0 && our.Value() > (their.Value()*10) {
237287
reporter.ReportResourceConfigurationWarning(workloadName, workloadType, their.String(), our.String(), field.String())
238288
}
239289
} else if cmp < 0 {
240-
fieldLogger.Debug("determined amount smaller than configured")
290+
authoritative := (field == corev1.ResourceCPU && authoritativeCPU) || (field == corev1.ResourceMemory && authoritativeMemory)
291+
if authoritative {
292+
// Apply gradual reduction with safety limits: max 25% reduction per cycle, minimum 5% difference
293+
ourValue := our.AsApproximateFloat64()
294+
theirValue := their.AsApproximateFloat64()
295+
if theirValue > 0 {
296+
reductionPercent := 1.0 - (ourValue / theirValue)
297+
maxReductionPercent := 0.25
298+
299+
if reductionPercent >= 0.05 {
300+
if reductionPercent > maxReductionPercent {
301+
maxAllowed := theirValue * (1.0 - maxReductionPercent)
302+
our.Set(int64(maxAllowed))
303+
fieldLogger.Debugf("applying gradual reduction (limited to 25%% per cycle)")
304+
} else {
305+
fieldLogger.Debug("reducing resources based on recent usage")
306+
}
307+
(*pair.theirs)[field] = our
308+
} else {
309+
fieldLogger.Debug("difference less than 5%, skipping micro-adjustment")
310+
}
311+
} else {
312+
fieldLogger.Debug("theirs is zero, applying recommendation")
313+
(*pair.theirs)[field] = our
314+
}
315+
} else {
316+
fieldLogger.Debug("authoritative mode disabled, keeping existing value")
317+
}
241318
} else {
242319
fieldLogger.Debug("determined amount equal to configured")
243320
}
@@ -292,16 +369,44 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
292369
}
293370
}
294371

295-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, reporter results.PodScalerReporter, logger *logrus.Entry) {
372+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
373+
// Check if this is a measured pod - measured pods have resources set by ApplyMeasuredPodResources
374+
// and we should preserve those instead of overwriting with Prometheus recommendations
375+
isMeasuredPod := pod.Labels != nil && pod.Labels[PodScalerLabelKey] == PodScalerLabelValueMeasured
376+
296377
mutateResources := func(containers []corev1.Container) {
297378
for i := range containers {
379+
// For measured pods, skip Prometheus-based recommendations if resources were already set
380+
// by ApplyMeasuredPodResources (which uses BigQuery measured data)
381+
if isMeasuredPod {
382+
hasCPURequest := false
383+
hasMemoryRequest := false
384+
if containers[i].Resources.Requests != nil {
385+
if cpuReq, ok := containers[i].Resources.Requests[corev1.ResourceCPU]; ok && cpuReq.Sign() > 0 {
386+
hasCPURequest = true
387+
}
388+
if memReq, ok := containers[i].Resources.Requests[corev1.ResourceMemory]; ok && memReq.Sign() > 0 {
389+
hasMemoryRequest = true
390+
}
391+
}
392+
if hasCPURequest || hasMemoryRequest {
393+
logger.Debugf("Skipping Prometheus recommendations for measured pod container %s - resources already set from BigQuery data", containers[i].Name)
394+
// Still apply caps and limits even for measured pods
395+
preventUnschedulable(&containers[i].Resources, cpuCap, memoryCap, logger)
396+
if mutateResourceLimits {
397+
reconcileLimits(&containers[i].Resources)
398+
}
399+
continue
400+
}
401+
}
402+
298403
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)
299404
resources, recommendationExists := server.recommendedRequestFor(meta)
300405
if recommendationExists {
301406
logger.Debugf("recommendation exists for: %s", containers[i].Name)
302407
workloadType := determineWorkloadType(pod.Annotations, pod.Labels)
303408
workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels)
304-
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, reporter, logger)
409+
applyRecommendationsBasedOnRecentData(&resources, &containers[i].Resources, workloadName, workloadType, authoritativeCPU, authoritativeMemory, reporter, logger, pod)
305410
if mutateResourceLimits {
306411
reconcileLimits(&containers[i].Resources)
307412
}

0 commit comments

Comments
 (0)