Skip to content

Commit 8fe3660

Browse files
committed
Add data collection for measured pods to populate ci_operator_metrics table
1 parent 37e0717 commit 8fe3660

5 files changed

Lines changed: 561 additions & 38 deletions

File tree

cmd/pod-scaler/admission.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
318318
}
319319
}
320320

321-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
321+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, _ /* authoritativeCPU */, _ /* authoritativeMemory */ bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
322322
mutateResources := func(containers []corev1.Container) {
323323
for i := range containers {
324324
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)

cmd/pod-scaler/main.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ type options struct {
5252
}
5353

5454
type producerOptions struct {
55-
kubernetesOptions prowflagutil.KubernetesOptions
56-
once bool
57-
ignoreLatest time.Duration
55+
kubernetesOptions prowflagutil.KubernetesOptions
56+
once bool
57+
ignoreLatest time.Duration
58+
enableMeasuredPods bool
59+
bigQueryProjectID string
60+
bigQueryDatasetID string
61+
bigQueryCredentialsFile string
5862
}
5963

6064
type consumerOptions struct {
@@ -67,22 +71,19 @@ type consumerOptions struct {
6771
cpuCap int64
6872
memoryCap string
6973
cpuPriorityScheduling int64
70-
71-
// Measured pods options - when enabled, pods are classified as "normal" or "measured"
72-
// Measured pods run on isolated nodes to get accurate CPU/memory utilization data
73-
enableMeasuredPods bool
74-
bigQueryProjectID string
75-
bigQueryDatasetID string
76-
bigQueryCredentialsFile string
7774
}
7875

7976
func bindOptions(fs *flag.FlagSet) *options {
8077
o := options{producerOptions: producerOptions{kubernetesOptions: prowflagutil.KubernetesOptions{NOInClusterConfigDefault: true}}}
8178
o.instrumentationOptions.AddFlags(fs)
8279
fs.StringVar(&o.mode, "mode", "", "Which mode to run in.")
8380
o.producerOptions.kubernetesOptions.AddFlags(fs)
84-
fs.DurationVar(&o.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
85-
fs.BoolVar(&o.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
81+
fs.DurationVar(&o.producerOptions.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
82+
fs.BoolVar(&o.producerOptions.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
83+
fs.BoolVar(&o.producerOptions.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
84+
fs.StringVar(&o.producerOptions.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery (required if enable-measured-pods is true)")
85+
fs.StringVar(&o.producerOptions.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
86+
fs.StringVar(&o.producerOptions.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
8687
fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.")
8788
fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.")
8889
fs.StringVar(&o.certDir, "serving-cert-dir", "", "Path to directory with serving certificate and key for the admission webhook server.")
@@ -96,10 +97,6 @@ func bindOptions(fs *flag.FlagSet) *options {
9697
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9798
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
9899
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
99-
fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
100-
fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)")
101-
fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
102-
fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
103100
o.resultsOptions.Bind(fs)
104101
return &o
105102
}
@@ -255,7 +252,7 @@ func mainProduce(opts *options, cache Cache) {
255252
logger.Debugf("Loaded Prometheus client.")
256253
}
257254

258-
produce(clients, cache, opts.ignoreLatest, opts.once)
255+
produce(clients, cache, opts.producerOptions.ignoreLatest, opts.producerOptions.once, opts.producerOptions.enableMeasuredPods, opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile)
259256

260257
}
261258

@@ -279,7 +276,8 @@ func mainAdmission(opts *options, cache Cache) {
279276
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
280277
}
281278

282-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter)
279+
// Use producerOptions for measured pods config (shared between producer and consumer modes)
280+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.producerOptions.enableMeasuredPods, opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, reporter)
283281
}
284282

285283
func loaders(cache Cache) map[string][]*cacheReloader {

cmd/pod-scaler/measured.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"sync"
78
"time"
@@ -140,27 +141,27 @@ func NewBigQueryClient(projectID, datasetID, credentialsFile string, cache *Meas
140141
func (bq *BigQueryClient) Refresh(ctx context.Context) error {
141142
bq.logger.Info("Refreshing measured pod data from BigQuery")
142143

143-
// TODO: Replace with actual BigQuery query based on ci-metrics structure.
144-
// This is a placeholder query - the actual query will depend on the BigQuery schema
145-
// for ci-metrics pod CPU utilization data. We need to query the table that stores
146-
// max CPU/memory utilization for pods that ran with the "measured" label.
144+
// Query ci_operator_metrics table for measured pod data
145+
// This queries the table that stores max CPU/memory utilization for pods that ran with the "measured" label
147146
query := bq.client.Query(fmt.Sprintf(`
148147
SELECT
149148
org,
150149
repo,
151150
branch,
151+
target,
152152
container,
153-
MAX(cpu_utilization) as max_cpu,
154-
MAX(memory_utilization) as max_memory,
155-
MAX(timestamp) as last_measured,
153+
pod_name,
154+
MAX(max_cpu) as max_cpu,
155+
MAX(max_memory) as max_memory,
156+
MAX(created) as last_measured,
156157
ANY_VALUE(container_durations) as container_durations
157158
FROM
158-
`+"`%s.%s.pod_metrics`"+`
159+
`+"`%s.%s.ci_operator_metrics`"+`
159160
WHERE
160161
pod_scaler_label = 'measured'
161-
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY)
162+
AND created >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY)
162163
GROUP BY
163-
org, repo, branch, container
164+
org, repo, branch, target, container, pod_name
164165
`, bq.projectID, bq.datasetID, MeasuredPodDataRetentionDays))
165166

166167
query.QueryConfig.Labels = map[string]string{
@@ -179,7 +180,9 @@ func (bq *BigQueryClient) Refresh(ctx context.Context) error {
179180
Org string `bigquery:"org"`
180181
Repo string `bigquery:"repo"`
181182
Branch string `bigquery:"branch"`
183+
Target string `bigquery:"target"`
182184
Container string `bigquery:"container"`
185+
PodName string `bigquery:"pod_name"`
183186
MaxCPU float64 `bigquery:"max_cpu"`
184187
MaxMemory int64 `bigquery:"max_memory"`
185188
LastMeasured time.Time `bigquery:"last_measured"`
@@ -199,11 +202,33 @@ func (bq *BigQueryClient) Refresh(ctx context.Context) error {
199202
Repo: row.Repo,
200203
Branch: row.Branch,
201204
},
205+
Target: row.Target,
206+
Pod: row.PodName,
202207
Container: row.Container,
203208
}
204209

205-
// TODO: Parse container_durations JSON string into map[string]time.Duration
210+
// Parse container_durations JSON string into map[string]time.Duration
211+
// BigQuery stores this as a JSON string. time.Duration serializes as int64 (nanoseconds)
212+
// so the format is: {"container1": 3600000000000, "container2": 245000000000}
206213
containerDurations := make(map[string]time.Duration)
214+
if row.ContainerDurations != "" {
215+
var durationsMap map[string]int64
216+
if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMap); err == nil {
217+
for container, nanoseconds := range durationsMap {
218+
containerDurations[container] = time.Duration(nanoseconds)
219+
}
220+
} else {
221+
// Fallback: try parsing as string format (for backwards compatibility)
222+
var durationsMapStr map[string]string
223+
if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMapStr); err == nil {
224+
for container, durationStr := range durationsMapStr {
225+
if duration, err := time.ParseDuration(durationStr); err == nil {
226+
containerDurations[container] = duration
227+
}
228+
}
229+
}
230+
}
231+
}
207232

208233
data[meta] = &MeasuredPodData{
209234
Metadata: meta,
@@ -261,8 +286,9 @@ func ClassifyPod(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry
261286
}
262287
}
263288
} else {
264-
// If BigQuery isn't configured, default to measuring new pods.
265-
shouldBeMeasured = true
289+
// If BigQuery isn't configured, default to normal to avoid overwhelming isolated nodes.
290+
shouldBeMeasured = false
291+
logger.Warn("BigQuery client not available, defaulting pod to normal classification")
266292
}
267293

268294
if shouldBeMeasured {
@@ -327,21 +353,28 @@ func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) {
327353
}
328354

329355
if len(requiredTerms) > 0 {
356+
// Merge with existing anti-affinity terms instead of overwriting
357+
existingTerms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
358+
if existingTerms != nil {
359+
requiredTerms = append(existingTerms, requiredTerms...)
360+
}
330361
pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms
331362
}
332363
}
333364

334365
// ApplyMeasuredPodResources uses the real resource data we collected when this pod ran in isolation.
335366
// We only increase resources for the longest-running container (the main workload), not all containers.
336367
// This is based on actual measured usage, not Prometheus data that might be skewed by node contention.
368+
// This function applies resources to pods labeled "normal" (which have measured data), not "measured" (which need measurement).
337369
func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) {
338370
if bqClient == nil {
339371
return
340372
}
341373

342374
podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey]
343-
if !hasLabel || podScalerLabel != PodScalerLabelValueMeasured {
344-
// Only apply measured resources to pods that are actually being measured.
375+
// Apply measured resources to pods labeled "normal" (which have fresh measured data)
376+
// Pods labeled "measured" don't have data yet - they're being measured for the first time.
377+
if !hasLabel || podScalerLabel != PodScalerLabelValueNormal {
345378
return
346379
}
347380

cmd/pod-scaler/measured_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestClassifyPod(t *testing.T) {
130130
t.Errorf("Expected pod to be classified as normal, got %s", pod2.Labels[PodScalerLabelKey])
131131
}
132132

133-
// Test case 3: Pod with nil BigQuery client - should default to measured
133+
// Test case 3: Pod with nil BigQuery client - should default to normal to avoid overwhelming isolated nodes
134134
pod3 := &corev1.Pod{
135135
ObjectMeta: metav1.ObjectMeta{
136136
Name: "test-pod-3",
@@ -143,8 +143,8 @@ func TestClassifyPod(t *testing.T) {
143143
}
144144

145145
ClassifyPod(pod3, nil, logger)
146-
if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueMeasured {
147-
t.Errorf("Expected pod to be classified as measured when BigQuery client is nil, got %s", pod3.Labels[PodScalerLabelKey])
146+
if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueNormal {
147+
t.Errorf("Expected pod to be classified as normal when BigQuery client is nil (to avoid overwhelming isolated nodes), got %s", pod3.Labels[PodScalerLabelKey])
148148
}
149149
}
150150

0 commit comments

Comments
 (0)