Skip to content

Commit 4738bee

Browse files
authored
feat: Enable Scale-from-Zero with Flow Control enabled (#1952)
* contracts: add PodLocator for candidate resolution This defines the contract for resolving candidate pods based on request metadata, decoupling the resolution logic from the storage layer. * director: delegate candidate resolution Refactors the Director to use the injected PodLocator interface instead of the private getCandidatePodsForScheduling method. This prepares the Director for lazy resolution without changing current behavior. * flowcontrol: add metadata support to request type Updates the FlowControlRequest interface to carry request metadata instead of a pre-resolved list of candidate pods. This prepares the system for lazy pod resolution. - Adds GetMetadata() to FlowControlRequest. - Removes CandidatePodsForScheduling() from FlowControlRequest. - Updates mocks in flowcontrol/types and contracts. * requestcontrol: enable scale-from-zero Refactors the request processing flow to support queuing when no backends are available. - Inverts Director flow: Admission is now called before Pod Resolution. - Updates AdmissionController interface to remove eager pod list. - LegacyAdmissionController now resolves pods internally via PodLocator. - ShardProcessor (Flow Control) now resolves pods lazily via PodLocator during the dispatch cycle. - Updates Runner wiring to inject PodLocator where needed. * resolve merge conflicts after rebase * Address reviewer feedback
1 parent fcf0bda commit 4738bee

File tree

13 files changed

+165
-128
lines changed

13 files changed

+165
-128
lines changed

cmd/epp/runner/runner.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
6161
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
6262
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
63+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
6364
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
6465
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
6566
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -330,7 +331,10 @@ func (r *Runner) Run(ctx context.Context) error {
330331

331332
// --- Admission Control Initialization ---
332333
var admissionController requestcontrol.AdmissionController
334+
var locator contracts.PodLocator
335+
locator = requestcontrol.NewDatastorePodLocator(ds)
333336
if r.featureGates[flowcontrol.FeatureGate] {
337+
locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
334338
setupLog.Info("Initializing experimental Flow Control layer")
335339
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
336340
if err != nil {
@@ -342,24 +346,28 @@ func (r *Runner) Run(ctx context.Context) error {
342346
if err != nil {
343347
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
344348
}
345-
fc, err := fccontroller.NewFlowController(ctx, fcCfg.Controller, registry, saturationDetector, setupLog)
349+
fc, err := fccontroller.NewFlowController(
350+
ctx,
351+
fcCfg.Controller,
352+
registry, saturationDetector,
353+
locator,
354+
setupLog,
355+
)
346356
if err != nil {
347357
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
348358
}
349359
go registry.Run(ctx)
350-
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
360+
admissionController = requestcontrol.NewFlowControlAdmissionController(fc)
351361
} else {
352362
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
353-
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
363+
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector, locator)
354364
}
355365

356-
locator := requestcontrol.NewDatastorePodLocator(ds)
357-
cachedLocator := requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50)
358366
director := requestcontrol.NewDirectorWithConfig(
359367
ds,
360368
scheduler,
361369
admissionController,
362-
cachedLocator,
370+
locator,
363371
r.requestControlConfig)
364372

365373
// --- Setup ExtProc Server Runner ---

pkg/epp/flowcontrol/contracts/mocks/mocks.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import (
4141
typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
4242
)
4343

44+
// --- RegistryShard Mocks ---
45+
4446
// MockRegistryShard is a simple "stub-style" mock for testing.
4547
// Its methods are implemented as function fields (e.g., `IDFunc`). A test can inject behavior by setting the desired
4648
// function field in the test setup. If a func is nil, the method will return a zero value.
@@ -111,6 +113,8 @@ func (m *MockRegistryShard) Stats() contracts.ShardStats {
111113
return contracts.ShardStats{}
112114
}
113115

116+
// --- Dependency Mocks ---
117+
114118
// MockSaturationDetector is a simple "stub-style" mock for testing.
115119
type MockSaturationDetector struct {
116120
IsSaturatedFunc func(ctx context.Context, candidatePods []metrics.PodMetrics) bool
@@ -123,6 +127,30 @@ func (m *MockSaturationDetector) IsSaturated(ctx context.Context, candidatePods
123127
return false
124128
}
125129

130+
// MockPodLocator provides a mock implementation of the contracts.PodLocator interface.
131+
// It allows tests to control the exact set of pods returned for a given request.
132+
type MockPodLocator struct {
133+
// LocateFunc allows injecting custom logic.
134+
LocateFunc func(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics
135+
// Pods is a static return value used if LocateFunc is nil.
136+
Pods []metrics.PodMetrics
137+
}
138+
139+
func (m *MockPodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []metrics.PodMetrics {
140+
if m.LocateFunc != nil {
141+
return m.LocateFunc(ctx, requestMetadata)
142+
}
143+
// Return copy to be safe
144+
if m.Pods == nil {
145+
return nil
146+
}
147+
result := make([]metrics.PodMetrics, len(m.Pods))
148+
copy(result, m.Pods)
149+
return result
150+
}
151+
152+
// --- ManagedQueue Mock ---
153+
126154
// MockManagedQueue is a high-fidelity, thread-safe mock of the `contracts.ManagedQueue` interface, designed
127155
// specifically for testing the concurrent `controller/internal.ShardProcessor`.
128156
//

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type shardProcessorFactory func(
6060
ctx context.Context,
6161
shard contracts.RegistryShard,
6262
saturationDetector contracts.SaturationDetector,
63+
podLocator contracts.PodLocator,
6364
clock clock.WithTicker,
6465
cleanupSweepInterval time.Duration,
6566
enqueueChannelBufferSize int,
@@ -95,6 +96,7 @@ type FlowController struct {
9596
config Config
9697
registry registryClient
9798
saturationDetector contracts.SaturationDetector
99+
podLocator contracts.PodLocator
98100
clock clock.WithTicker
99101
logger logr.Logger
100102
shardProcessorFactory shardProcessorFactory
@@ -126,13 +128,15 @@ func NewFlowController(
126128
config Config,
127129
registry contracts.FlowRegistry,
128130
sd contracts.SaturationDetector,
131+
podLocator contracts.PodLocator,
129132
logger logr.Logger,
130133
opts ...flowControllerOption,
131134
) (*FlowController, error) {
132135
fc := &FlowController{
133136
config: config,
134137
registry: registry,
135138
saturationDetector: sd,
139+
podLocator: podLocator,
136140
clock: clock.RealClock{},
137141
logger: logger.WithName("flow-controller"),
138142
parentCtx: ctx,
@@ -142,6 +146,7 @@ func NewFlowController(
142146
ctx context.Context,
143147
shard contracts.RegistryShard,
144148
saturationDetector contracts.SaturationDetector,
149+
podLocator contracts.PodLocator,
145150
clock clock.WithTicker,
146151
cleanupSweepInterval time.Duration,
147152
enqueueChannelBufferSize int,
@@ -151,6 +156,7 @@ func NewFlowController(
151156
ctx,
152157
shard,
153158
saturationDetector,
159+
podLocator,
154160
clock,
155161
cleanupSweepInterval,
156162
enqueueChannelBufferSize,
@@ -448,6 +454,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
448454
processorCtx,
449455
shard,
450456
fc.saturationDetector,
457+
fc.podLocator,
451458
fc.clock,
452459
fc.config.ExpiryCleanupInterval,
453460
fc.config.EnqueueChannelBufferSize,

pkg/epp/flowcontrol/controller/controller_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ type testHarness struct {
7979
// clock is the clock interface used by the controller.
8080
clock clock.WithTicker
8181
mockRegistry *mockRegistryClient
82-
mockDetector *mocks.MockSaturationDetector
8382
// mockClock provides access to FakeClock methods (Step, HasWaiters) if and only if the underlying clock is a
8483
// FakeClock.
8584
mockClock *testclock.FakeClock
@@ -91,6 +90,7 @@ type testHarness struct {
9190
func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *mockRegistryClient) *testHarness {
9291
t.Helper()
9392
mockDetector := &mocks.MockSaturationDetector{}
93+
mockPodLocator := &mocks.MockPodLocator{}
9494

9595
// Initialize the FakeClock with the current system time.
9696
// The controller implementation uses the injected clock to calculate the deadline timestamp,vbut uses the standard
@@ -113,15 +113,14 @@ func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *moc
113113
withClock(mockClock),
114114
withShardProcessorFactory(mockProcessorFactory.new),
115115
}
116-
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, logr.Discard(), opts...)
116+
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, logr.Discard(), opts...)
117117
require.NoError(t, err, "failed to create FlowController for unit test harness")
118118

119119
h := &testHarness{
120120
fc: fc,
121121
cfg: cfg,
122122
clock: mockClock,
123123
mockRegistry: registry,
124-
mockDetector: mockDetector,
125124
mockClock: mockClock,
126125
mockProcessorFactory: mockProcessorFactory,
127126
}
@@ -133,8 +132,9 @@ func newUnitHarness(t *testing.T, ctx context.Context, cfg Config, registry *moc
133132
func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, registry *mockRegistryClient) *testHarness {
134133
t.Helper()
135134
mockDetector := &mocks.MockSaturationDetector{}
136-
// Align FakeClock with system time. See explanation in newUnitHarness.
135+
mockPodLocator := &mocks.MockPodLocator{}
137136

137+
// Align FakeClock with system time. See explanation in newUnitHarness.
138138
mockClock := testclock.NewFakeClock(time.Now())
139139
if registry == nil {
140140
registry = &mockRegistryClient{}
@@ -144,15 +144,14 @@ func newIntegrationHarness(t *testing.T, ctx context.Context, cfg Config, regist
144144
withRegistryClient(registry),
145145
withClock(mockClock),
146146
}
147-
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, logr.Discard(), opts...)
147+
fc, err := NewFlowController(ctx, cfg, registry, mockDetector, mockPodLocator, logr.Discard(), opts...)
148148
require.NoError(t, err, "failed to create FlowController for integration test harness")
149149

150150
h := &testHarness{
151151
fc: fc,
152152
cfg: cfg,
153153
clock: mockClock,
154154
mockRegistry: registry,
155-
mockDetector: mockDetector,
156155
mockClock: mockClock,
157156
}
158157
return h
@@ -247,6 +246,7 @@ func (f *mockShardProcessorFactory) new(
247246
_ context.Context, // The factory does not use the lifecycle context; it's passed to the processor's Run method later.
248247
shard contracts.RegistryShard,
249248
_ contracts.SaturationDetector,
249+
_ contracts.PodLocator,
250250
_ clock.WithTicker,
251251
_ time.Duration,
252252
_ int,
@@ -1001,6 +1001,7 @@ func TestFlowController_WorkerManagement(t *testing.T) {
10011001
ctx context.Context, // The context created by getOrStartWorker for the potential new processor.
10021002
shard contracts.RegistryShard,
10031003
_ contracts.SaturationDetector,
1004+
_ contracts.PodLocator,
10041005
_ clock.WithTicker,
10051006
_ time.Duration,
10061007
_ int,

pkg/epp/flowcontrol/controller/internal/processor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var ErrProcessorBusy = errors.New("shard processor is busy")
6565
type ShardProcessor struct {
6666
shard contracts.RegistryShard
6767
saturationDetector contracts.SaturationDetector
68+
podLocator contracts.PodLocator
6869
clock clock.WithTicker
6970
cleanupSweepInterval time.Duration
7071
logger logr.Logger
@@ -86,6 +87,7 @@ func NewShardProcessor(
8687
ctx context.Context,
8788
shard contracts.RegistryShard,
8889
saturationDetector contracts.SaturationDetector,
90+
podLocator contracts.PodLocator,
8991
clock clock.WithTicker,
9092
cleanupSweepInterval time.Duration,
9193
enqueueChannelBufferSize int,
@@ -94,6 +96,7 @@ func NewShardProcessor(
9496
return &ShardProcessor{
9597
shard: shard,
9698
saturationDetector: saturationDetector,
99+
podLocator: podLocator,
97100
clock: clock,
98101
cleanupSweepInterval: cleanupSweepInterval,
99102
logger: logger,
@@ -307,8 +310,8 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
307310

308311
// --- Viability Check (Saturation/HoL Blocking) ---
309312
req := item.OriginalRequest()
310-
candidatePods := req.CandidatePodsForScheduling()
311-
if sp.saturationDetector.IsSaturated(ctx, candidatePods) {
313+
candidates := sp.podLocator.Locate(ctx, req.GetMetadata())
314+
if sp.saturationDetector.IsSaturated(ctx, candidates) {
312315
sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.",
313316
"flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName())
314317
// Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where

pkg/epp/flowcontrol/controller/internal/processor_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type testHarness struct {
7575
clock *testclock.FakeClock
7676
logger logr.Logger
7777
saturationDetector *mocks.MockSaturationDetector
78+
podLocator *mocks.MockPodLocator
7879

7980
// --- Centralized Mock State ---
8081
// The harness's mutex protects the single source of truth for all mock state.
@@ -96,6 +97,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
9697
clock: testclock.NewFakeClock(time.Now()),
9798
logger: logr.Discard(),
9899
saturationDetector: &mocks.MockSaturationDetector{},
100+
podLocator: &mocks.MockPodLocator{Pods: []metrics.PodMetrics{&metrics.FakePodMetrics{}}},
99101
startSignal: make(chan struct{}),
100102
queues: make(map[types.FlowKey]*mocks.MockManagedQueue),
101103
priorityFlows: make(map[int][]types.FlowKey),
@@ -123,6 +125,7 @@ func newTestHarness(t *testing.T, expiryCleanupInterval time.Duration) *testHarn
123125
h.ctx,
124126
h,
125127
h.saturationDetector,
128+
h.podLocator,
126129
h.clock,
127130
expiryCleanupInterval,
128131
100,

pkg/epp/flowcontrol/types/mocks/mocks.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,19 @@ package mocks
2121
import (
2222
"time"
2323

24-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2524
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
2625
)
2726

28-
// MockFlowControlRequest provides a mock implementation of the `types.FlowControlRequest` interface.
27+
// MockFlowControlRequest provides a mock implementation of the types.FlowControlRequest interface.
2928
type MockFlowControlRequest struct {
30-
FlowKeyV types.FlowKey
31-
ByteSizeV uint64
32-
InitialEffectiveTTLV time.Duration
33-
IDV string
34-
CandidatePodsForSchedulingV []*metrics.FakePodMetrics
29+
FlowKeyV types.FlowKey
30+
ByteSizeV uint64
31+
InitialEffectiveTTLV time.Duration
32+
IDV string
33+
MetadataV map[string]any
3534
}
3635

37-
// NewMockFlowControlRequest creates a new `MockFlowControlRequest` instance.
36+
// NewMockFlowControlRequest creates a new MockFlowControlRequest instance.
3837
func NewMockFlowControlRequest(
3938
byteSize uint64,
4039
id string,
@@ -44,21 +43,15 @@ func NewMockFlowControlRequest(
4443
ByteSizeV: byteSize,
4544
IDV: id,
4645
FlowKeyV: key,
46+
MetadataV: make(map[string]any),
4747
}
4848
}
4949

5050
func (m *MockFlowControlRequest) FlowKey() types.FlowKey { return m.FlowKeyV }
5151
func (m *MockFlowControlRequest) ByteSize() uint64 { return m.ByteSizeV }
5252
func (m *MockFlowControlRequest) InitialEffectiveTTL() time.Duration { return m.InitialEffectiveTTLV }
5353
func (m *MockFlowControlRequest) ID() string { return m.IDV }
54-
55-
func (m *MockFlowControlRequest) CandidatePodsForScheduling() []metrics.PodMetrics {
56-
pods := make([]metrics.PodMetrics, 0, len(m.CandidatePodsForSchedulingV))
57-
for i, pod := range m.CandidatePodsForSchedulingV {
58-
pods[i] = pod
59-
}
60-
return pods
61-
}
54+
func (m *MockFlowControlRequest) GetMetadata() map[string]any { return m.MetadataV }
6255

6356
var _ types.FlowControlRequest = &MockFlowControlRequest{}
6457

pkg/epp/flowcontrol/types/request.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package types
1818

1919
import (
2020
"time"
21-
22-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2321
)
2422

2523
// FlowControlRequest is the contract for an incoming request submitted to the `controller.FlowController`. It
@@ -45,15 +43,15 @@ type FlowControlRequest interface {
4543
// applied.
4644
InitialEffectiveTTL() time.Duration
4745

48-
// CandidatePodsForScheduling passes through a set of candidate pods a request may be admitted to.
49-
// This is necessary for invoking `contracts.SaturationDetector.IsSaturated`, but it is otherwise unused in the Flow
50-
// Control system.
51-
CandidatePodsForScheduling() []metrics.PodMetrics
52-
5346
// ID returns an optional, user-facing unique identifier for this specific request. It is intended for logging,
5447
// tracing, and observability. The `controller.FlowController` does not use this ID for dispatching decisions; it uses
5548
// the internal, opaque `QueueItemHandle`.
5649
ID() string
50+
51+
// GetMetadata returns the opaque metadata associated with the request (e.g., header-derived context, subset filters).
52+
// This data is passed transparently to components like the contracts.PodLocator to resolve resources (candidate pods)
53+
// lazily during the dispatch cycle.
54+
GetMetadata() map[string]any
5755
}
5856

5957
// QueueItemHandle is an opaque handle to an item that has been successfully added to a `framework.SafeQueue`. It acts

0 commit comments

Comments
 (0)