Skip to content

Commit 1fd5ab9

Browse files
committed
Add data collection for measured pods to populate ci_operator_metrics table
1 parent 37e0717 commit 1fd5ab9

5 files changed

Lines changed: 528 additions & 37 deletions

File tree

cmd/pod-scaler/admission.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func preventUnschedulable(resources *corev1.ResourceRequirements, cpuCap int64,
318318
}
319319
}
320320

321-
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, authoritativeCPU, authoritativeMemory bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
321+
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, _ /* authoritativeCPU */, _ /* authoritativeMemory */ bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
322322
mutateResources := func(containers []corev1.Container) {
323323
for i := range containers {
324324
meta := podscaler.MetadataFor(pod.ObjectMeta.Labels, pod.ObjectMeta.Name, containers[i].Name)

cmd/pod-scaler/main.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ type options struct {
5252
}
5353

5454
type producerOptions struct {
55-
kubernetesOptions prowflagutil.KubernetesOptions
56-
once bool
57-
ignoreLatest time.Duration
55+
kubernetesOptions prowflagutil.KubernetesOptions
56+
once bool
57+
ignoreLatest time.Duration
58+
enableMeasuredPods bool
59+
bigQueryProjectID string
60+
bigQueryDatasetID string
61+
bigQueryCredentialsFile string
5862
}
5963

6064
type consumerOptions struct {
@@ -67,22 +71,19 @@ type consumerOptions struct {
6771
cpuCap int64
6872
memoryCap string
6973
cpuPriorityScheduling int64
70-
71-
// Measured pods options - when enabled, pods are classified as "normal" or "measured"
72-
// Measured pods run on isolated nodes to get accurate CPU/memory utilization data
73-
enableMeasuredPods bool
74-
bigQueryProjectID string
75-
bigQueryDatasetID string
76-
bigQueryCredentialsFile string
7774
}
7875

7976
func bindOptions(fs *flag.FlagSet) *options {
8077
o := options{producerOptions: producerOptions{kubernetesOptions: prowflagutil.KubernetesOptions{NOInClusterConfigDefault: true}}}
8178
o.instrumentationOptions.AddFlags(fs)
8279
fs.StringVar(&o.mode, "mode", "", "Which mode to run in.")
8380
o.producerOptions.kubernetesOptions.AddFlags(fs)
84-
fs.DurationVar(&o.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
85-
fs.BoolVar(&o.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
81+
fs.DurationVar(&o.producerOptions.ignoreLatest, "ignore-latest", 0, "Duration of latest time series to ignore when querying Prometheus. For instance, 1h will ignore the latest hour of data.")
82+
fs.BoolVar(&o.producerOptions.once, "produce-once", false, "Query Prometheus and refresh cached data only once before exiting.")
83+
fs.BoolVar(&o.producerOptions.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
84+
fs.StringVar(&o.producerOptions.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery (required if enable-measured-pods is true)")
85+
fs.StringVar(&o.producerOptions.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
86+
fs.StringVar(&o.producerOptions.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
8687
fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.")
8788
fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.")
8889
fs.StringVar(&o.certDir, "serving-cert-dir", "", "Path to directory with serving certificate and key for the admission webhook server.")
@@ -96,10 +97,6 @@ func bindOptions(fs *flag.FlagSet) *options {
9697
fs.Int64Var(&o.cpuCap, "cpu-cap", 10, "The maximum CPU request value, ex: 10")
9798
fs.StringVar(&o.memoryCap, "memory-cap", "20Gi", "The maximum memory request value, ex: '20Gi'")
9899
fs.Int64Var(&o.cpuPriorityScheduling, "cpu-priority-scheduling", 8, "Pods with CPU requests at, or above, this value will be admitted with priority scheduling")
99-
fs.BoolVar(&o.enableMeasuredPods, "enable-measured-pods", false, "Enable measured pods feature. When enabled, pods are classified as 'normal' or 'measured' and measured pods run on isolated nodes to get accurate CPU/memory utilization data.")
100-
fs.StringVar(&o.bigQueryProjectID, "bigquery-project-id", "", "Google Cloud project ID for BigQuery queries (required if enable-measured-pods is true)")
101-
fs.StringVar(&o.bigQueryDatasetID, "bigquery-dataset-id", "", "BigQuery dataset ID for pod metrics (required if enable-measured-pods is true)")
102-
fs.StringVar(&o.bigQueryCredentialsFile, "bigquery-credentials-file", "", "Path to Google Cloud credentials file for BigQuery access")
103100
o.resultsOptions.Bind(fs)
104101
return &o
105102
}
@@ -255,7 +252,7 @@ func mainProduce(opts *options, cache Cache) {
255252
logger.Debugf("Loaded Prometheus client.")
256253
}
257254

258-
produce(clients, cache, opts.ignoreLatest, opts.once)
255+
produce(clients, cache, opts.producerOptions.ignoreLatest, opts.producerOptions.once, opts.producerOptions.enableMeasuredPods, opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile)
259256

260257
}
261258

@@ -279,7 +276,8 @@ func mainAdmission(opts *options, cache Cache) {
279276
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
280277
}
281278

282-
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.enableMeasuredPods, opts.bigQueryProjectID, opts.bigQueryDatasetID, opts.bigQueryCredentialsFile, reporter)
279+
// Use producerOptions for measured pods config (shared between producer and consumer modes)
280+
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, false, false, opts.producerOptions.enableMeasuredPods, opts.producerOptions.bigQueryProjectID, opts.producerOptions.bigQueryDatasetID, opts.producerOptions.bigQueryCredentialsFile, reporter)
283281
}
284282

285283
func loaders(cache Cache) map[string][]*cacheReloader {

cmd/pod-scaler/measured.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"sync"
78
"time"
@@ -140,25 +141,23 @@ func NewBigQueryClient(projectID, datasetID, credentialsFile string, cache *Meas
140141
func (bq *BigQueryClient) Refresh(ctx context.Context) error {
141142
bq.logger.Info("Refreshing measured pod data from BigQuery")
142143

143-
// TODO: Replace with actual BigQuery query based on ci-metrics structure.
144-
// This is a placeholder query - the actual query will depend on the BigQuery schema
145-
// for ci-metrics pod CPU utilization data. We need to query the table that stores
146-
// max CPU/memory utilization for pods that ran with the "measured" label.
144+
// Query ci_operator_metrics table for measured pod data
145+
// This queries the table that stores max CPU/memory utilization for pods that ran with the "measured" label
147146
query := bq.client.Query(fmt.Sprintf(`
148147
SELECT
149148
org,
150149
repo,
151150
branch,
152151
container,
153-
MAX(cpu_utilization) as max_cpu,
154-
MAX(memory_utilization) as max_memory,
155-
MAX(timestamp) as last_measured,
152+
MAX(max_cpu) as max_cpu,
153+
MAX(max_memory) as max_memory,
154+
MAX(created) as last_measured,
156155
ANY_VALUE(container_durations) as container_durations
157156
FROM
158-
`+"`%s.%s.pod_metrics`"+`
157+
`+"`%s.%s.ci_operator_metrics`"+`
159158
WHERE
160159
pod_scaler_label = 'measured'
161-
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY)
160+
AND created >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL %d DAY)
162161
GROUP BY
163162
org, repo, branch, container
164163
`, bq.projectID, bq.datasetID, MeasuredPodDataRetentionDays))
@@ -202,8 +201,28 @@ func (bq *BigQueryClient) Refresh(ctx context.Context) error {
202201
Container: row.Container,
203202
}
204203

205-
// TODO: Parse container_durations JSON string into map[string]time.Duration
204+
// Parse container_durations JSON string into map[string]time.Duration
205+
// BigQuery stores this as a JSON string. time.Duration serializes as int64 (nanoseconds)
206+
// so the format is: {"container1": 3600000000000, "container2": 245000000000}
206207
containerDurations := make(map[string]time.Duration)
208+
if row.ContainerDurations != "" {
209+
var durationsMap map[string]int64
210+
if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMap); err == nil {
211+
for container, nanoseconds := range durationsMap {
212+
containerDurations[container] = time.Duration(nanoseconds)
213+
}
214+
} else {
215+
// Fallback: try parsing as string format (for backwards compatibility)
216+
var durationsMapStr map[string]string
217+
if err := json.Unmarshal([]byte(row.ContainerDurations), &durationsMapStr); err == nil {
218+
for container, durationStr := range durationsMapStr {
219+
if duration, err := time.ParseDuration(durationStr); err == nil {
220+
containerDurations[container] = duration
221+
}
222+
}
223+
}
224+
}
225+
}
207226

208227
data[meta] = &MeasuredPodData{
209228
Metadata: meta,
@@ -261,8 +280,9 @@ func ClassifyPod(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry
261280
}
262281
}
263282
} else {
264-
// If BigQuery isn't configured, default to measuring new pods.
265-
shouldBeMeasured = true
283+
// If BigQuery isn't configured, default to normal to avoid overwhelming isolated nodes.
284+
shouldBeMeasured = false
285+
logger.Warn("BigQuery client not available, defaulting pod to normal classification")
266286
}
267287

268288
if shouldBeMeasured {
@@ -327,21 +347,28 @@ func AddPodAntiAffinity(pod *corev1.Pod, logger *logrus.Entry) {
327347
}
328348

329349
if len(requiredTerms) > 0 {
350+
// Merge with existing anti-affinity terms instead of overwriting
351+
existingTerms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
352+
if existingTerms != nil {
353+
requiredTerms = append(existingTerms, requiredTerms...)
354+
}
330355
pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = requiredTerms
331356
}
332357
}
333358

334359
// ApplyMeasuredPodResources uses the real resource data we collected when this pod ran in isolation.
335360
// We only increase resources for the longest-running container (the main workload), not all containers.
336361
// This is based on actual measured usage, not Prometheus data that might be skewed by node contention.
362+
// This function applies resources to pods labeled "normal" (which have measured data), not "measured" (which need measurement).
337363
func ApplyMeasuredPodResources(pod *corev1.Pod, bqClient *BigQueryClient, logger *logrus.Entry) {
338364
if bqClient == nil {
339365
return
340366
}
341367

342368
podScalerLabel, hasLabel := pod.Labels[PodScalerLabelKey]
343-
if !hasLabel || podScalerLabel != PodScalerLabelValueMeasured {
344-
// Only apply measured resources to pods that are actually being measured.
369+
// Apply measured resources to pods labeled "normal" (which have fresh measured data)
370+
// Pods labeled "measured" don't have data yet - they're being measured for the first time.
371+
if !hasLabel || podScalerLabel != PodScalerLabelValueNormal {
345372
return
346373
}
347374

cmd/pod-scaler/measured_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestClassifyPod(t *testing.T) {
130130
t.Errorf("Expected pod to be classified as normal, got %s", pod2.Labels[PodScalerLabelKey])
131131
}
132132

133-
// Test case 3: Pod with nil BigQuery client - should default to measured
133+
// Test case 3: Pod with nil BigQuery client - should default to normal to avoid overwhelming isolated nodes
134134
pod3 := &corev1.Pod{
135135
ObjectMeta: metav1.ObjectMeta{
136136
Name: "test-pod-3",
@@ -143,8 +143,8 @@ func TestClassifyPod(t *testing.T) {
143143
}
144144

145145
ClassifyPod(pod3, nil, logger)
146-
if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueMeasured {
147-
t.Errorf("Expected pod to be classified as measured when BigQuery client is nil, got %s", pod3.Labels[PodScalerLabelKey])
146+
if pod3.Labels[PodScalerLabelKey] != PodScalerLabelValueNormal {
147+
t.Errorf("Expected pod to be classified as normal when BigQuery client is nil (to avoid overwhelming isolated nodes), got %s", pod3.Labels[PodScalerLabelKey])
148148
}
149149
}
150150

0 commit comments

Comments
 (0)