From 53179280e5ae3a54d3aad224db4fe6d82af55377 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Fri, 7 Nov 2025 23:50:46 +0000 Subject: [PATCH] refactor: Flatten plugins/intraflow structure Moves intra-flow policy implementations directly into the `plugins/intraflow` package, removing the unnecessary nested directories. This simplifies the import paths and project structure. This is a no-op refactoring. --- .../intraflow/dispatch => intraflow}/README.md | 0 .../intraflow/dispatch => intraflow}/factory.go | 2 +- .../dispatch/fcfs => intraflow}/fcfs.go | 5 ++--- .../dispatch/fcfs => intraflow}/fcfs_test.go | 2 +- .../dispatch => intraflow}/functional_test.go | 6 ++---- pkg/epp/flowcontrol/registry/config.go | 17 ++++++++--------- pkg/epp/flowcontrol/registry/config_test.go | 15 +++++++-------- pkg/epp/flowcontrol/registry/registry.go | 4 ++-- pkg/epp/flowcontrol/registry/registry_test.go | 6 +++--- pkg/epp/flowcontrol/registry/shard_test.go | 4 ++-- 10 files changed, 28 insertions(+), 33 deletions(-) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch => intraflow}/README.md (100%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch => intraflow}/factory.go (99%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch/fcfs => intraflow}/fcfs.go (96%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch/fcfs => intraflow}/fcfs_test.go (99%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch => intraflow}/functional_test.go (89%) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/intraflow/README.md similarity index 100% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md rename to pkg/epp/flowcontrol/framework/plugins/intraflow/README.md diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go similarity index 99% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go index b55740cbc..8f476d3d0 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go @@ -17,7 +17,7 @@ limitations under the License. // Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy` // implementations. // It allows new policies to be added to the system and instantiated by name. -package dispatch +package intraflow import ( "fmt" diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs.go similarity index 96% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs.go index 63ffe31b9..364afdbb6 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs.go @@ -15,11 +15,10 @@ limitations under the License. */ // Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`. -package fcfs +package intraflow import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) @@ -53,7 +52,7 @@ import ( const FCFSPolicyName = "FCFS" func init() { - dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName), + MustRegisterPolicy(RegisteredPolicyName(FCFSPolicyName), func() (framework.IntraFlowDispatchPolicy, error) { return newFCFS(), nil }) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs_test.go similarity index 99% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs_test.go index cc6bceecf..6e684ec46 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fcfs +package intraflow import ( "testing" diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/functional_test.go similarity index 89% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/functional_test.go index 4088b4da2..0094c7052 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/functional_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dispatch_test +package intraflow import ( "testing" @@ -23,8 +23,6 @@ import ( "github.com/stretchr/testify/require" frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" - _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" ) // TestIntraFlowDispatchPolicyConformance is the main conformance test suite for `framework.IntraFlowDispatchPolicy` @@ -34,7 +32,7 @@ import ( func TestIntraFlowDispatchPolicyConformance(t *testing.T) { t.Parallel() - for policyName, constructor := range dispatch.RegisteredPolicies { + for policyName, constructor := range RegisteredPolicies { t.Run(string(policyName), func(t *testing.T) { t.Parallel() diff --git a/pkg/epp/flowcontrol/registry/config.go b/pkg/epp/flowcontrol/registry/config.go index ff5d1829d..8c54b4734 100644 --- a/pkg/epp/flowcontrol/registry/config.go +++ b/pkg/epp/flowcontrol/registry/config.go @@ -24,8 +24,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" ) @@ -36,7 +35,7 @@ const ( // It is set to 1 GB. defaultPriorityBandMaxBytes uint64 = 1_000_000_000 // defaultIntraFlowDispatchPolicy is the default policy for selecting items within a single flow's queue. - defaultIntraFlowDispatchPolicy intra.RegisteredPolicyName = fcfs.FCFSPolicyName + defaultIntraFlowDispatchPolicy intraflow.RegisteredPolicyName = intraflow.FCFSPolicyName // defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next. defaultInterFlowDispatchPolicy interflow.RegisteredPolicyName = interflow.BestHeadPolicyName // defaultQueue is the default queue implementation for flows. @@ -54,15 +53,15 @@ const ( // capabilityChecker abstracts the logic required to validate if a policy is compatible with a queue. type capabilityChecker interface { - CheckCompatibility(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error + CheckCompatibility(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error } // runtimeCapabilityChecker is the default implementation used in production. // It instantiates the actual plugins to inspect their required and provided capabilities. type runtimeCapabilityChecker struct{} -func (r *runtimeCapabilityChecker) CheckCompatibility(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error { - tempPolicy, err := intra.NewPolicyFromName(p) +func (r *runtimeCapabilityChecker) CheckCompatibility(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error { + tempPolicy, err := intraflow.NewPolicyFromName(p) if err != nil { return fmt.Errorf("failed to validate policy %q: %w", p, err) } @@ -154,7 +153,7 @@ type PriorityBandConfig struct { // IntraFlowDispatchPolicy specifies the default name of the policy used to select a request from within a single // flow's queue in this band. // Optional: Defaults to defaultIntraFlowDispatchPolicy ("FCFS"). - IntraFlowDispatchPolicy intra.RegisteredPolicyName + IntraFlowDispatchPolicy intraflow.RegisteredPolicyName // InterFlowDispatchPolicy specifies the name of the policy used to select which flow's queue to service next from // this band. @@ -253,8 +252,8 @@ func withCapabilityChecker(checker capabilityChecker) ConfigOption { // PriorityBandConfigOption defines a functional option for configuring a single PriorityBandConfig. type PriorityBandConfigOption func(*PriorityBandConfig) error -// WithIntraFlowPolicy sets the intra-flow dispatch policy (e.g., "FCFS"). -func WithIntraFlowPolicy(name intra.RegisteredPolicyName) PriorityBandConfigOption { +// WithIntraFlowPolicy sets the intraflow-flow dispatch policy (e.g., "FCFS"). +func WithIntraFlowPolicy(name intraflow.RegisteredPolicyName) PriorityBandConfigOption { return func(p *PriorityBandConfig) error { if name == "" { return errors.New("IntraFlowDispatchPolicy cannot be empty") diff --git a/pkg/epp/flowcontrol/registry/config_test.go b/pkg/epp/flowcontrol/registry/config_test.go index dad5cf5dd..9a467605e 100644 --- a/pkg/epp/flowcontrol/registry/config_test.go +++ b/pkg/epp/flowcontrol/registry/config_test.go @@ -24,17 +24,16 @@ import ( "github.com/stretchr/testify/require" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" ) // mockCapabilityChecker is a test double for verifying that NewConfig correctly delegates compatibility checks. type mockCapabilityChecker struct { - checkCompatibilityFunc func(intra intra.RegisteredPolicyName, q queue.RegisteredQueueName) error + checkCompatibilityFunc func(intra intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error } -func (m *mockCapabilityChecker) CheckCompatibility(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error { +func (m *mockCapabilityChecker) CheckCompatibility(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error { if m.checkCompatibilityFunc != nil { return m.checkCompatibilityFunc(p, q) } @@ -128,7 +127,7 @@ func TestNewConfig(t *testing.T) { Queue: "CustomQueue", }), withCapabilityChecker(&mockCapabilityChecker{ - checkCompatibilityFunc: func(_ intra.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil }, + checkCompatibilityFunc: func(_ intraflow.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil }, }), }, assertion: func(t *testing.T, cfg *Config) { @@ -191,7 +190,7 @@ func TestNewConfig(t *testing.T) { opts: []ConfigOption{ WithPriorityBand(mustBand(t, 1, "High")), withCapabilityChecker(&mockCapabilityChecker{ - checkCompatibilityFunc: func(p intra.RegisteredPolicyName, q queue.RegisteredQueueName) error { + checkCompatibilityFunc: func(p intraflow.RegisteredPolicyName, q queue.RegisteredQueueName) error { return contracts.ErrPolicyQueueIncompatible }, }), @@ -210,7 +209,7 @@ func TestNewConfig(t *testing.T) { name: "ShouldError_WhenDefaultRuntimeCheckerDetectsUnknownQueue", opts: []ConfigOption{ WithPriorityBand(mustBand(t, 1, "BadBand", - WithIntraFlowPolicy(fcfs.FCFSPolicyName), + WithIntraFlowPolicy(intraflow.FCFSPolicyName), WithQueue("non-existent-queue"), )), }, @@ -254,7 +253,7 @@ func TestNewPriorityBandConfig(t *testing.T) { require.NoError(t, err) assert.Equal(t, queue.RegisteredQueueName("CustomQueue"), pb.Queue) assert.Equal(t, uint64(999), pb.MaxBytes) - assert.Equal(t, intra.RegisteredPolicyName("CustomPolicy"), pb.IntraFlowDispatchPolicy) + assert.Equal(t, intraflow.RegisteredPolicyName("CustomPolicy"), pb.IntraFlowDispatchPolicy) assert.Equal(t, defaultInterFlowDispatchPolicy, pb.InterFlowDispatchPolicy) // Unchanged default }) diff --git a/pkg/epp/flowcontrol/registry/registry.go b/pkg/epp/flowcontrol/registry/registry.go index 4cbdfa098..3cd9898b9 100644 --- a/pkg/epp/flowcontrol/registry/registry.go +++ b/pkg/epp/flowcontrol/registry/registry.go @@ -30,7 +30,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -591,7 +591,7 @@ func (fr *FlowRegistry) buildFlowComponents(key types.FlowKey, numInstances int) allComponents := make([]flowComponents, numInstances) for i := range numInstances { - policy, err := intra.NewPolicyFromName(bandConfig.IntraFlowDispatchPolicy) + policy, err := intraflow.NewPolicyFromName(bandConfig.IntraFlowDispatchPolicy) if err != nil { return nil, fmt.Errorf("failed to instantiate intra-flow policy %q for flow %s: %w", bandConfig.IntraFlowDispatchPolicy, key, err) diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index 8d337faaa..fa169a62e 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -29,7 +29,7 @@ import ( testclock "k8s.io/utils/clock/testing" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" @@ -198,7 +198,7 @@ func TestFlowRegistry_WithConnection_AndHandle(t *testing.T) { t.Run("ShouldFail_WhenJITFails", func(t *testing.T) { t.Parallel() - badPolicyName := intra.RegisteredPolicyName("non-existent-policy") + badPolicyName := intraflow.RegisteredPolicyName("non-existent-policy") badBand, err := NewPriorityBandConfig(highPriority, "High", WithIntraFlowPolicy(badPolicyName)) require.NoError(t, err) @@ -208,7 +208,7 @@ func TestFlowRegistry_WithConnection_AndHandle(t *testing.T) { cfg, err := NewConfig( WithPriorityBand(badBand), withCapabilityChecker(&mockCapabilityChecker{ - checkCompatibilityFunc: func(_ intra.RegisteredPolicyName, _ queue.RegisteredQueueName) error { + checkCompatibilityFunc: func(_ intraflow.RegisteredPolicyName, _ queue.RegisteredQueueName) error { return nil // Approve everything. }, }), diff --git a/pkg/epp/flowcontrol/registry/shard_test.go b/pkg/epp/flowcontrol/registry/shard_test.go index e3692cfc0..d995c1abc 100644 --- a/pkg/epp/flowcontrol/registry/shard_test.go +++ b/pkg/epp/flowcontrol/registry/shard_test.go @@ -27,7 +27,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" @@ -92,7 +92,7 @@ func newShardTestHarness(t *testing.T) *shardTestHarness { func (h *shardTestHarness) synchronizeFlow(key types.FlowKey) { h.t.Helper() spec := types.FlowSpecification{Key: key} - policy, err := intra.NewPolicyFromName(defaultIntraFlowDispatchPolicy) + policy, err := intraflow.NewPolicyFromName(defaultIntraFlowDispatchPolicy) assert.NoError(h.t, err, "Helper synchronizeFlow: failed to create real intra-flow policy for synchronization") q, err := queue.NewQueueFromName(defaultQueue, policy.Comparator()) assert.NoError(h.t, err, "Helper synchronizeFlow: failed to create real queue for synchronization")