Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
plugins.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory)
plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
plugins.Register(scorer.RunningRequestsSizeScorerType, scorer.RunningRequestsSizeScorerFactory)
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
// Latency predictor plugins
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, slo_aware_router.SLOAwareRouterFactory)
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
if p.MetricMapping.TotalRunningRequests != nil {
running, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalRunningRequests)
if err == nil {
updated.RunningQueueSize = int(running.GetGauge().GetValue())
updated.RunningRequestsSize = int(running.GetGauge().GetValue())
} else {
errs = multierr.Append(errs, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/datalayer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Metrics struct {
WaitingModels map[string]int
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
MaxActiveModels int
RunningQueueSize int
RunningRequestsSize int
WaitingQueueSize int
KVCacheUsagePercent float64
KvCacheMaxTokenCapacity int
Expand Down Expand Up @@ -74,7 +74,7 @@ func (m *Metrics) Clone() *Metrics {
ActiveModels: activeModels,
WaitingModels: waitingModels,
MaxActiveModels: m.MaxActiveModels,
RunningQueueSize: m.RunningQueueSize,
RunningRequestsSize: m.RunningRequestsSize,
WaitingQueueSize: m.WaitingQueueSize,
KVCacheUsagePercent: m.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity,
Expand Down
3 changes: 2 additions & 1 deletion pkg/epp/datalayer/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Extractor struct {
func Produces() map[string]any {
return map[string]any{
metrics.WaitingQueueSizeKey: int(0),
metrics.RunningRequestsSizeKey: int(0),
metrics.KVCacheUsagePercentKey: float64(0),
metrics.ActiveModelsKey: map[string]int{},
metrics.WaitingModelsKey: map[string]int{},
Expand Down Expand Up @@ -119,7 +120,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
if metric, err := spec.getLatestMetric(families); err != nil {
errs = append(errs, err)
} else {
clone.RunningQueueSize = int(extractValue(metric))
clone.RunningRequestsSize = int(extractValue(metric))
updated = true
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/datalayer/metrics/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestLogger(t *testing.T) {
assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}")
assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Metadata: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678")
assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5")
assert.Contains(t, logOutput, "RunningQueueSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048")
assert.Contains(t, logOutput, "RunningRequestsSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048")
assert.Contains(t, logOutput, "Metadata: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679")
assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"")
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (f *fakeDataStore) PodList(predicate func(datalayer.Endpoint) bool) []datal
ActiveModels: map[string]int{"modelA": 1},
WaitingModels: map[string]int{"modelB": 2},
MaxActiveModels: 5,
RunningQueueSize: 3,
RunningRequestsSize: 3,
WaitingQueueSize: 7,
KVCacheUsagePercent: 42.5,
KvCacheMaxTokenCapacity: 2048,
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/datalayer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMetricsClone(t *testing.T) {
ActiveModels: map[string]int{"modelA": 1},
WaitingModels: map[string]int{"modelB": 2},
MaxActiveModels: 5,
RunningQueueSize: 3,
RunningRequestsSize: 3,
WaitingQueueSize: 7,
KVCacheUsagePercent: 42.5,
KvCacheMaxTokenCapacity: 2048,
Expand Down
1 change: 1 addition & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (

KVCacheUsagePercentKey = "KVCacheUsagePercent"
WaitingQueueSizeKey = "WaitingQueueSize"
RunningRequestsSizeKey = "RunningRequestsSize"
MaxActiveModelsKey = "MaxActiveModels"
ActiveModelsKey = "ActiveModels"
WaitingModelsKey = "WaitingModels"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func processHeaderForLatencyPrediction(
KVCachePercentage: m.KVCacheUsagePercent,
InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)),
NumRequestWaiting: m.WaitingQueueSize,
NumRequestRunning: m.RunningQueueSize,
NumRequestRunning: m.RunningRequestsSize,
NumTokensGenerated: 0,
PrefixCacheScore: prefix_cache_score,
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func recordTTFTTrainingData(
ActualTPOT: 0,
Timestamp: now,
NumRequestWaiting: m.WaitingQueueSize,
NumRequestRunning: m.RunningQueueSize,
NumRequestRunning: m.RunningRequestsSize,
NumTokensGenerated: 0,
PrefixCacheScore: prefixCacheScore,
}
Expand All @@ -201,7 +201,7 @@ func predictFirstTPOT(
KVCachePercentage: m.KVCacheUsagePercent,
InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)),
NumRequestWaiting: m.WaitingQueueSize,
NumRequestRunning: m.RunningQueueSize,
NumRequestRunning: m.RunningRequestsSize,
NumTokensGenerated: sloCtx.generatedTokenCount,
PrefixCacheScore: 0,
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func processTokenForLatencyPrediction(
ActualTPOT: latencyMs,
Timestamp: now,
NumRequestWaiting: m.WaitingQueueSize,
NumRequestRunning: m.RunningQueueSize,
NumRequestRunning: m.RunningRequestsSize,
NumTokensGenerated: sloCtx.generatedTokenCount - 1,
PrefixCacheScore: 0, // TPOT does not use prefix cache score
}
Expand All @@ -274,7 +274,7 @@ func processTokenForLatencyPrediction(
KVCachePercentage: m.KVCacheUsagePercent,
InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)),
NumRequestWaiting: m.WaitingQueueSize,
NumRequestRunning: m.RunningQueueSize,
NumRequestRunning: m.RunningRequestsSize,
NumTokensGenerated: sloCtx.generatedTokenCount,
PrefixCacheScore: 0, // TPOT does not use prefix cache score
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func bulkPredictWithMetrics(
KVCachePercentage: metricsStates[i].KVCacheUsagePercent,
InputTokenLength: len(strings.Fields(prompts[i])),
NumRequestWaiting: metricsStates[i].WaitingQueueSize,
NumRequestRunning: metricsStates[i].RunningQueueSize,
NumRequestRunning: metricsStates[i].RunningRequestsSize,
NumTokensGenerated: generatedTokenCounts[i],
PrefixCacheScore: prefixCacheScores[i],
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func bulkPredictWithMetrics(
"generated_tokens", bulkRequests[i].NumTokensGenerated,
"kv_cache_percent", bulkRequests[i].KVCachePercentage,
"waiting_queue", bulkRequests[i].NumRequestWaiting,
"running_queue", bulkRequests[i].NumRequestRunning,
"running_requests", bulkRequests[i].NumRequestRunning,
"prefix_cache_score", bulkRequests[i].PrefixCacheScore)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ import (
)

const (
testModelName = "test-model"
kvUsage = 1
runningQueue = 1
waitingQueue = 1
testModelName = "test-model"
kvUsage = 1
runningRequests = 1
waitingQueue = 1
)

// Helper functions

func createTestSchedulingResult(pod *backend.Pod) *schedulingtypes.SchedulingResult {

mockPod := createTestPod(pod.NamespacedName.Name, kvUsage, runningQueue, waitingQueue)
mockPod := createTestPod(pod.NamespacedName.Name, kvUsage, runningRequests, waitingQueue)

return &schedulingtypes.SchedulingResult{
PrimaryProfileName: "default",
Expand Down Expand Up @@ -343,12 +343,12 @@ func TestSLOAwareRouter_ResponseStreaming_FirstToken(t *testing.T) {
sloCtx.lastSeenMetrics["prefill"] = &backendmetrics.MetricsState{
KVCacheUsagePercent: 0.5,
WaitingQueueSize: 1,
RunningQueueSize: 1,
RunningRequestsSize: 1,
}
sloCtx.lastSeenMetrics["default"] = &backendmetrics.MetricsState{
KVCacheUsagePercent: 0.5,
WaitingQueueSize: 1,
RunningQueueSize: 1,
RunningRequestsSize: 1,
}
router.setSLOContextForRequest(request, sloCtx)

Expand Down Expand Up @@ -394,12 +394,12 @@ func TestSLOAwareRouter_ResponseStreaming_SubsequentTokens(t *testing.T) {
sloCtx.lastSeenMetrics["prefill"] = &backendmetrics.MetricsState{
KVCacheUsagePercent: 0.5,
WaitingQueueSize: 1,
RunningQueueSize: 1,
RunningRequestsSize: 1,
}
sloCtx.lastSeenMetrics["default"] = &backendmetrics.MetricsState{
KVCacheUsagePercent: 0.5,
WaitingQueueSize: 1,
RunningQueueSize: 1,
RunningRequestsSize: 1,
}
firstTokenTime := time.Now().Add(-100 * time.Millisecond)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (m *mockPredictor) GetServerStatus(ctx context.Context) (*latencypredictor.
return &latencypredictor.ServerStatusResponse{}, nil
}

func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQueueSize int) schedulingtypes.Pod {
func createTestPod(name string, kvCacheUsage float64, runningRequestsSize, waitingQueueSize int) schedulingtypes.Pod {
return &schedulingtypes.PodMetrics{
Pod: &backend.Pod{
NamespacedName: types.NamespacedName{
Expand All @@ -113,7 +113,7 @@ func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQ
},
MetricsState: &backendmetrics.MetricsState{
KVCacheUsagePercent: kvCacheUsage,
RunningQueueSize: runningQueueSize,
RunningRequestsSize: runningRequestsSize,
WaitingQueueSize: waitingQueueSize,
},
}
Expand Down
104 changes: 104 additions & 0 deletions pkg/epp/scheduling/framework/plugins/scorer/running.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scorer

import (
"context"
"encoding/json"
"math"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
RunningRequestsSizeScorerType = "running-requests-size-scorer"
)

// compile-time type assertion
var _ framework.Scorer = &RunningRequestsSizeScorer{}

// RunningRequestsSizeScorerFactory defines the factory function for RunningRequestsSizeScorer.
func RunningRequestsSizeScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewRunningRequestsSizeScorer().WithName(name), nil
}

// NewRunningRequestsSizeScorer initializes a new RunningRequestsSizeScorer and returns its pointer.
func NewRunningRequestsSizeScorer() *RunningRequestsSizeScorer {
return &RunningRequestsSizeScorer{
typedName: plugins.TypedName{Type: RunningRequestsSizeScorerType, Name: RunningRequestsSizeScorerType},
}
}

// RunningRequestsSizeScorer scores list of candidate pods based on the pod's running request size.
// the less running request size the pod has, the higher score it will get (since it's more available to serve new request).
type RunningRequestsSizeScorer struct {
typedName plugins.TypedName
}

// TypedName returns the type and name tuple of this plugin instance.
func (s *RunningRequestsSizeScorer) TypedName() plugins.TypedName {
return s.typedName
}

// Consumes returns the list of data that is consumed by the plugin.
func (s *RunningRequestsSizeScorer) Consumes() map[string]any {
return map[string]any{
metrics.RunningRequestsSizeKey: int(0),
}
}

// WithName sets the name of the scorer.
func (s *RunningRequestsSizeScorer) WithName(name string) *RunningRequestsSizeScorer {
s.typedName.Name = name
return s
}

// Score returns the scoring result for the given list of pods based on context.
func (s *RunningRequestsSizeScorer) Score(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 {
minQueueSize := math.MaxInt
maxQueueSize := math.MinInt

// Iterate through the remaining pods to find min and max
for _, pod := range pods {
queueSize := pod.GetMetrics().RunningRequestsSize
if queueSize < minQueueSize {
minQueueSize = queueSize
}
if queueSize > maxQueueSize {
maxQueueSize = queueSize
}
}

// podScoreFunc calculates the score based on the queue size of each pod. Longer queue gets a lower score.
podScoreFunc := func(pod types.Pod) float64 {
if maxQueueSize == minQueueSize {
// If all pods have the same queue size, return a neutral score
return 1.0
}
return float64(maxQueueSize-pod.GetMetrics().RunningRequestsSize) / float64(maxQueueSize-minQueueSize)
}

// Create a map to hold the scores for each pod
scores := make(map[types.Pod]float64, len(pods))
for _, pod := range pods {
scores[pod] = podScoreFunc(pod)
}
return scores
}
Loading