Skip to content

Commit 1d16857

Browse files
committed
requestcontrol: enable scale-from-zero
Refactors the request processing flow to support queuing when no backends are available. - Inverts Director flow: Admission is now called before Pod Resolution. - Updates AdmissionController interface to remove eager pod list. - LegacyAdmissionController now resolves pods internally via PodLocator. - ShardProcessor (Flow Control) now resolves pods lazily via PodLocator during the dispatch cycle. - Updates Runner wiring to inject PodLocator where needed.
1 parent 517490c commit 1d16857

File tree

10 files changed

+118
-102
lines changed

10 files changed

+118
-102
lines changed

cmd/epp/runner/runner.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ func (r *Runner) Run(ctx context.Context) error {
329329
saturationDetector := saturationdetector.NewDetector(eppConfig.SaturationDetectorConfig, setupLog)
330330

331331
// --- Admission Control Initialization ---
332+
locator := requestcontrol.NewDatastorePodLocator(ds)
333+
cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
332334
var admissionController requestcontrol.AdmissionController
333335
if r.featureGates[flowcontrol.FeatureGate] {
334336
setupLog.Info("Initializing experimental Flow Control layer")
@@ -342,19 +344,23 @@ func (r *Runner) Run(ctx context.Context) error {
342344
if err != nil {
343345
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
344346
}
345-
fc, err := fccontroller.NewFlowController(ctx, fcCfg.Controller, registry, saturationDetector, setupLog)
347+
fc, err := fccontroller.NewFlowController(
348+
ctx,
349+
fcCfg.Controller,
350+
registry, saturationDetector,
351+
cachedLocator,
352+
setupLog,
353+
)
346354
if err != nil {
347355
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
348356
}
349357
go registry.Run(ctx)
350-
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
358+
admissionController = requestcontrol.NewFlowControlAdmissionController(fc)
351359
} else {
352360
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
353-
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
361+
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector, cachedLocator)
354362
}
355363

356-
locator := requestcontrol.NewDatastorePodLocator(ds)
357-
cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
358364
director := requestcontrol.NewDirectorWithConfig(
359365
ds,
360366
scheduler,

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type shardProcessorFactory func(
6060
ctx context.Context,
6161
shard contracts.RegistryShard,
6262
saturationDetector contracts.SaturationDetector,
63+
podLocator contracts.PodLocator,
6364
clock clock.WithTicker,
6465
cleanupSweepInterval time.Duration,
6566
enqueueChannelBufferSize int,
@@ -95,6 +96,7 @@ type FlowController struct {
9596
config Config
9697
registry registryClient
9798
saturationDetector contracts.SaturationDetector
99+
podLocator contracts.PodLocator
98100
clock clock.WithTicker
99101
logger logr.Logger
100102
shardProcessorFactory shardProcessorFactory
@@ -126,13 +128,15 @@ func NewFlowController(
126128
config Config,
127129
registry contracts.FlowRegistry,
128130
sd contracts.SaturationDetector,
131+
podLocator contracts.PodLocator,
129132
logger logr.Logger,
130133
opts ...flowControllerOption,
131134
) (*FlowController, error) {
132135
fc := &FlowController{
133136
config: config,
134137
registry: registry,
135138
saturationDetector: sd,
139+
podLocator: podLocator,
136140
clock: clock.RealClock{},
137141
logger: logger.WithName("flow-controller"),
138142
parentCtx: ctx,
@@ -142,6 +146,7 @@ func NewFlowController(
142146
ctx context.Context,
143147
shard contracts.RegistryShard,
144148
saturationDetector contracts.SaturationDetector,
149+
podLocator contracts.PodLocator,
145150
clock clock.WithTicker,
146151
cleanupSweepInterval time.Duration,
147152
enqueueChannelBufferSize int,
@@ -151,6 +156,7 @@ func NewFlowController(
151156
ctx,
152157
shard,
153158
saturationDetector,
159+
podLocator,
154160
clock,
155161
cleanupSweepInterval,
156162
enqueueChannelBufferSize,
@@ -448,6 +454,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
448454
processorCtx,
449455
shard,
450456
fc.saturationDetector,
457+
fc.podLocator,
451458
fc.clock,
452459
fc.config.ExpiryCleanupInterval,
453460
fc.config.EnqueueChannelBufferSize,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ type testHarness struct {
7979
// clock is the clock interface used by the controller.
8080
clock clock.WithTicker
8181
mockRegistry *mockRegistryClient
82-
mockDetector *mocks.MockSaturationDetector
8382
// mockClock provides access to FakeClock methods (Step, HasWaiters) if and only if the underlying clock is a
8483
// FakeClock.
8584
mockClock *testclock.FakeClock
@@ -91,6 +90,7 @@ type testHarness struct {
9190
func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *mockRegistryClient) *testHarness {
9291
t.Helper()
9392
mockDetector := &mocks.MockSaturationDetector{}
93+
mockPodLocator := &mocks.MockPodLocator{}
9494

9595
// Initialize the FakeClock with the current system time.
9696
// The controller implementation uses the injected clock to calculate the deadline timestamp,vbut uses the standard
@@ -113,15 +113,14 @@ func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *moc
113113
withClock(mockClock),
114114
withShardProcessorFactory(mockProcessorFactory.new),
115115
}
116-
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, logr.Discard(), opts...)
116+
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, logr.Discard(), opts...)
117117
require.NoError(t, err, "failed to create FlowController for unit test harness")
118118

119119
h := &testHarness{
120120
fc: fc,
121121
cfg: cfg,
122122
clock: mockClock,
123123
mockRegistry: registry,
124-
mockDetector: mockDetector,
125124
mockClock: mockClock,
126125
mockProcessorFactory: mockProcessorFactory,
127126
}
@@ -133,8 +132,9 @@ func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *moc
133132
func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, registry *mockRegistryClient) *testHarness {
134133
t.Helper()
135134
mockDetector := &mocks.MockSaturationDetector{}
136-
// Align FakeClock with system time. See explanation in newUnitHarness.
135+
mockPodLocator := &mocks.MockPodLocator{}
137136

137+
// Align FakeClock with system time. See explanation in newUnitHarness.
138138
mockClock := testclock.NewFakeClock(time.Now())
139139
if registry == nil {
140140
registry = &mockRegistryClient{}
@@ -144,15 +144,14 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, regist
144144
withRegistryClient(registry),
145145
withClock(mockClock),
146146
}
147-
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, logr.Discard(), opts...)
147+
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, logr.Discard(), opts...)
148148
require.NoError(t, err, "failed to create FlowController for integration test harness")
149149

150150
h := &testHarness{
151151
fc: fc,
152152
cfg: cfg,
153153
clock: mockClock,
154154
mockRegistry: registry,
155-
mockDetector: mockDetector,
156155
mockClock: mockClock,
157156
}
158157
return h
@@ -247,6 +246,7 @@ func (f *mockShardProcessorFactory) new(
247246
_ context.Context, // The factory does not use the lifecycle context; it's passed to the processor's Run method later.
248247
shard contracts.RegistryShard,
249248
_ contracts.SaturationDetector,
249+
_ contracts.PodLocator,
250250
_ clock.WithTicker,
251251
_ time.Duration,
252252
_ int,
@@ -1001,6 +1001,7 @@ func TestFlowController_WorkerManagement(t *testing.T) {
10011001
ctx context.Context, // The context created by getOrStartWorker for the potential new processor.
10021002
shard contracts.RegistryShard,
10031003
_ contracts.SaturationDetector,
1004+
_ contracts.PodLocator,
10041005
_ clock.WithTicker,
10051006
_ time.Duration,
10061007
_ int,

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var ErrProcessorBusy = errors.New("shard processor is busy")
6565
type ShardProcessor struct {
6666
shard contracts.RegistryShard
6767
saturationDetector contracts.SaturationDetector
68+
podLocator contracts.PodLocator
6869
clock clock.WithTicker
6970
cleanupSweepInterval time.Duration
7071
logger logr.Logger
@@ -86,6 +87,7 @@ func NewShardProcessor(
8687
ctx context.Context,
8788
shard contracts.RegistryShard,
8889
saturationDetector contracts.SaturationDetector,
90+
podLocator contracts.PodLocator,
8991
clock clock.WithTicker,
9092
cleanupSweepInterval time.Duration,
9193
enqueueChannelBufferSize int,
@@ -94,6 +96,7 @@ func NewShardProcessor(
9496
return &ShardProcessor{
9597
shard: shard,
9698
saturationDetector: saturationDetector,
99+
podLocator: podLocator,
97100
clock: clock,
98101
cleanupSweepInterval: cleanupSweepInterval,
99102
logger: logger,
@@ -307,8 +310,8 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
307310

308311
// --- Viability Check (Saturation/HoL Blocking) ---
309312
req := item.OriginalRequest()
310-
candidatePods := req.CandidatePodsForScheduling()
311-
if sp.saturationDetector.IsSaturated(ctx, candidatePods) {
313+
candidates := sp.podLocator.Locate(ctx, req.GetMetadata())
314+
if sp.saturationDetector.IsSaturated(ctx, candidates) {
312315
sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.",
313316
"flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName())
314317
// Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type testHarness struct {
7575
clock *testclock.FakeClock
7676
logger logr.Logger
7777
saturationDetector *mocks.MockSaturationDetector
78+
podLocator *mocks.MockPodLocator
7879

7980
// --- Centralized Mock State ---
8081
// The harness's mutex protects the single source of truth for all mock state.
@@ -96,6 +97,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
9697
clock: testclock.NewFakeClock(time.Now()),
9798
logger: logr.Discard(),
9899
saturationDetector: &mocks.MockSaturationDetector{},
100+
podLocator: &mocks.MockPodLocator{Pods: []metrics.PodMetrics{&metrics.FakePodMetrics{}}},
99101
startSignal: make(chan struct{}),
100102
queues: make(map[types.FlowKey]*mocks.MockManagedQueue),
101103
priorityFlows: make(map[int][]types.FlowKey),
@@ -123,6 +125,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
123125
h.ctx,
124126
h,
125127
h.saturationDetector,
128+
h.podLocator,
126129
h.clock,
127130
expiryCleanupInterval,
128131
100,

pkg/epp/requestcontrol/admission.go

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"context"
2121
"time"
2222

23+
"github.com/go-logr/logr"
2324
"sigs.k8s.io/controller-runtime/pkg/log"
2425

2526
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
2628
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2729
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
2830
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -40,7 +42,6 @@ type AdmissionController interface {
4042
// Args:
4143
// ctx: The request context, carrying deadlines, cancellation signals, and logger.
4244
// reqCtx: The handlers.RequestContext containing details about the incoming request.
43-
// candidatePods: A list of potential backend pods that can serve the request.
4445
// priority: The priority level of the request, as determined by the InferenceObjective.
4546
//
4647
// Returns:
@@ -49,7 +50,6 @@ type AdmissionController interface {
4950
Admit(
5051
ctx context.Context,
5152
reqCtx *handlers.RequestContext,
52-
candidatePods []backendmetrics.PodMetrics,
5353
priority int,
5454
) error
5555
}
@@ -65,18 +65,17 @@ type flowController interface {
6565
EnqueueAndWait(ctx context.Context, req types.FlowControlRequest) (types.QueueOutcome, error)
6666
}
6767

68-
// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable
69-
// (priority < 0) and the system is saturated.
68+
// rejectIfSheddableAndSaturated checks if a request should be immediately rejected.
7069
func rejectIfSheddableAndSaturated(
7170
ctx context.Context,
7271
sd saturationDetector,
72+
locator contracts.PodLocator,
7373
reqCtx *handlers.RequestContext,
74-
candidatePods []backendmetrics.PodMetrics,
7574
priority int,
75+
logger logr.Logger,
7676
) error {
7777
if requtil.IsSheddable(priority) {
78-
logger := log.FromContext(ctx)
79-
if sd.IsSaturated(ctx, candidatePods) {
78+
if sd.IsSaturated(ctx, locator.Locate(ctx, reqCtx.Request.Metadata)) {
8079
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable",
8180
"requestID", reqCtx.SchedulingRequest.RequestId)
8281
return errutil.Error{
@@ -95,25 +94,37 @@ func rejectIfSheddableAndSaturated(
9594
// saturated. Non-sheddable requests always bypass the saturation check.
9695
type LegacyAdmissionController struct {
9796
saturationDetector saturationDetector
97+
podLocator contracts.PodLocator
9898
}
9999

100100
// NewLegacyAdmissionController creates a new LegacyAdmissionController.
101-
func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController {
102-
return &LegacyAdmissionController{saturationDetector: sd}
101+
func NewLegacyAdmissionController(
102+
sd saturationDetector,
103+
pl contracts.PodLocator,
104+
) *LegacyAdmissionController {
105+
return &LegacyAdmissionController{
106+
saturationDetector: sd,
107+
podLocator: pl,
108+
}
103109
}
104110

105111
// Admit implements the AdmissionController interface for the legacy strategy.
106112
// It checks for saturation only for requests with priority < 0.
107113
func (lac *LegacyAdmissionController) Admit(
108114
ctx context.Context,
109115
reqCtx *handlers.RequestContext,
110-
candidatePods []backendmetrics.PodMetrics,
111116
priority int,
112117
) error {
113118
logger := log.FromContext(ctx)
114119
logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController",
115120
"priority", priority, "fairnessID", reqCtx.FairnessID)
116-
if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
121+
if err := rejectIfSheddableAndSaturated(
122+
ctx,
123+
lac.saturationDetector,
124+
lac.podLocator,
125+
reqCtx, priority,
126+
logger,
127+
); err != nil {
117128
return err
118129
}
119130
logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId)
@@ -123,19 +134,15 @@ func (lac *LegacyAdmissionController) Admit(
123134
// --- FlowControlAdmissionController ---
124135

125136
// FlowControlAdmissionController delegates admission decisions to the Flow Control layer.
126-
// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are
127-
// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome.
137+
// It uses the provided Flow Controller to enqueue the request and await an outcome.
128138
type FlowControlAdmissionController struct {
129-
saturationDetector saturationDetector
130-
flowController flowController
139+
flowController flowController
131140
}
132141

133142
// NewFlowControlAdmissionController creates a new FlowControlAdmissionController.
134-
// It requires a SaturationDetector and a flowController instance.
135-
func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController {
143+
func NewFlowControlAdmissionController(fc flowController) *FlowControlAdmissionController {
136144
return &FlowControlAdmissionController{
137-
saturationDetector: sd,
138-
flowController: fc,
145+
flowController: fc,
139146
}
140147
}
141148

@@ -144,24 +151,18 @@ func NewFlowControlAdmissionController(sd saturationDetector, fc flowController)
144151
func (fcac *FlowControlAdmissionController) Admit(
145152
ctx context.Context,
146153
reqCtx *handlers.RequestContext,
147-
candidatePods []backendmetrics.PodMetrics,
148154
priority int,
149155
) error {
150156
logger := log.FromContext(ctx)
151157
logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController",
152158
"requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID)
153-
if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
154-
return err
155-
}
156-
157-
logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId)
158159

159160
fcReq := &flowControlRequest{
160161
requestID: reqCtx.SchedulingRequest.RequestId,
161162
fairnessID: reqCtx.FairnessID,
162163
priority: priority,
163164
requestByteSize: uint64(reqCtx.RequestSize),
164-
candidatePods: candidatePods,
165+
reqMetadata: reqCtx.Request.Metadata,
165166
}
166167

167168
outcome, err := fcac.flowController.EnqueueAndWait(ctx, fcReq)
@@ -176,20 +177,20 @@ type flowControlRequest struct {
176177
fairnessID string
177178
priority int
178179
requestByteSize uint64
179-
candidatePods []backendmetrics.PodMetrics
180+
reqMetadata map[string]any
180181
}
181182

182183
var _ types.FlowControlRequest = &flowControlRequest{}
183184

184185
func (r *flowControlRequest) ID() string { return r.requestID }
185186
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
186187
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
187-
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
188-
return r.candidatePods
189-
}
190188
func (r *flowControlRequest) FlowKey() types.FlowKey {
191189
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
192190
}
191+
func (r *flowControlRequest) GetMetadata() map[string]any {
192+
return r.reqMetadata
193+
}
193194

194195
// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
195196
// contract used by the Director.

0 commit comments

Comments
 (0)