Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/scheduler/apis/config/v1/default_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func getDefaultPlugins() *v1.Plugins {
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread, Weight: ptr.To[int32](2)},
{Name: names.InterPodAffinity, Weight: ptr.To[int32](2)},
//{Name: names.FastPodSpread},
//{Name: names.InterPodAffinity, Weight: ptr.To[int32](2)},
{Name: names.FastPodAffinity},
{Name: names.DefaultPreemption},
{Name: names.NodeResourcesBalancedAllocation, Weight: ptr.To[int32](1)},
{Name: names.ImageLocality, Weight: ptr.To[int32](1)},
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/backend/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type Snapshot struct {
// keyed in the format "namespace/name".
usedPVCSet sets.Set[string]
generation int64
nodeState map[string]SnapshotState
}

type SnapshotState interface {
Update(old fwk.NodeInfo, current fwk.NodeInfo)
}

var _ fwk.SharedLister = &Snapshot{}
Expand Down
137 changes: 137 additions & 0 deletions pkg/scheduler/backend/fort/connectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package fort

// A target to store the results from a view.
// Only relevant externally for creating new operators.
type Target[K comparable, V any] interface {
OnUpdate(key K, value V, source Source[K, V]) error
OnDelete(key K, value V, source Source[K, V]) error
}

// Sources should be opaque externally. They are only used
// to link sources to one another using the operators.
type Source[K comparable, V any] interface {
addTarget(t Target[K, V])
Print()
}

func (s KeyValueSet[K, V]) All() KeyValueIterator[K, V] {
return func(yield func(key K, value V) bool) {
for _, kv := range s {
if !yield(kv.Key, kv.Value) {
return
}
}
}
}

type keyValueConnector[K comparable, V any] struct {
targets []Target[K, V]
}

var _ Source[string, int] = &keyValueConnector[string, int]{}

func newKeyValueConnector[K comparable, V any]() *keyValueConnector[K, V] {
return &keyValueConnector[K, V]{
targets: []Target[K, V]{},
}
}

func (m *keyValueConnector[K, V]) addTarget(target Target[K, V]) {
m.targets = append(m.targets, target)
}

func (m *keyValueConnector[K, V]) Update(key K, value V) {
for _, target := range m.targets {
target.OnUpdate(key, value, m)
}
}

func (m *keyValueConnector[K, V]) Delete(key K, value V) {
for _, target := range m.targets {
target.OnDelete(key, value, m)
}
}

func (c *keyValueConnector[K, V]) Clone() any {
return &keyValueConnector[K, V]{
targets: append([]Target[K, V]{}, c.targets...),
}
}

func (c *keyValueConnector[K, V]) Release() {}

func (c *keyValueConnector[K, V]) Print() {}

type writeMap[K comparable, V any] struct {
targets []Target[K, V]
}

var _ WriteMap[string, int] = &writeMap[string, int]{}

func newWriteMap[K comparable, V any]() *writeMap[K, V] {
return &writeMap[K, V]{targets: []Target[K, V]{}}
}

func (w *writeMap[K, V]) addTarget(t Target[K, V]) {
w.targets = append(w.targets, t)
}

func (w *writeMap[K, V]) Update(key K, value V) error {
for _, t := range w.targets {
if err := t.OnUpdate(key, value, w); err != nil {
return err
}
}
return nil
}

func (w *writeMap[K, V]) Delete(key K, value V) error {
for _, t := range w.targets {
if err := t.OnDelete(key, value, w); err != nil {
return err
}
}
return nil
}

func (w *writeMap[K, V]) Print() {}

type readMap[K comparable, V any] struct {
data *CloneMap[K, V]
}

var _ ReadMap[string, int] = &readMap[string, int]{}

func newReadMap[K comparable, V any]() *readMap[K, V] {
return &readMap[K, V]{
data: newCloneMap[K, V](map[K]any{}, nil, 0),
}
}

func (r *readMap[K, V]) Get(key K) (V, bool) {
return r.data.Get(key)
}

func (r *readMap[K, V]) All() KeyValueIterator[K, V] {
return r.data.All()
}

func (r *readMap[K, V]) OnUpdate(key K, value V, source Source[K, V]) error {
r.data.Update(key, value)
return nil
}

func (r *readMap[K, V]) OnDelete(key K, value V, source Source[K, V]) error {
r.data.Delete(key)
return nil
}

func (r *readMap[K, V]) Clone() *readMap[K, V] {
return &readMap[K, V]{
data: r.data.Clone(),
}
}

func (r *readMap[K, V]) Print() {
r.data.Print()
}
140 changes: 140 additions & 0 deletions pkg/scheduler/backend/fort/examples/simple_topo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package forttest

import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/backend/fort"
)

// Data structures used for our test.
type TestNode struct {
Name string
Domains []string
Pods []TestPod
}

type TestPod struct {
Label string
}

type DomLabel struct {
Domain string
Label string
}

type DomNode struct {
Domain string
Node string
}

type DomCount struct {
Domain string
Count int64
}

// The state managed by Fort.

type TestData struct {
Nodes fort.WriteMap[string, *TestNode]
DomainLabelCounts fort.Source[DomLabel, int64]
LabelSetsInt fort.Source[string, sets.Set[DomCount]]
LabelSets fort.ReadMap[string, sets.Set[DomCount]]
}

// If passed an empty TestData as cloneFrom this will intitialize
// a new TestData structure. If passed a non-nil cloneFrom
// this will make the given TestData a clone of cloneFrom.
func (d *TestData) InitOrClone(cloneFrom *TestData) {
d.Nodes = fort.NewWriteMap(cloneFrom.Nodes)

d.DomainLabelCounts = fort.MapReduce(
d.Nodes,
func(kv *fort.KeyValue[string, *TestNode]) fort.KeyValueSet[DomLabel, int64] {
node := kv.Value
ret := fort.KeyValueSet[DomLabel, int64]{}

// For each node, create an entry for each pod associated
// with the pod's label and each domain of the node.
for _, p := range node.Pods {
for _, dom := range node.Domains {
ret = append(ret, fort.KeyValue[DomLabel, int64]{
Key: DomLabel{Domain: dom, Label: p.Label},
Value: 1,
})
}
}
return ret
},
// Sum the number of pods in each domain, label pair.
fort.Sum[int64],
cloneFrom.DomainLabelCounts,
)

d.LabelSetsInt = fort.MapReduce(
d.DomainLabelCounts,
func(kv *fort.KeyValue[DomLabel, int64]) fort.KeyValueSet[string, DomCount] {
// Now that we have the counts per label/domain, group them by label.
return fort.KeyValueSet[string, DomCount]{
{
Key: kv.Key.Label,
Value: DomCount{Domain: kv.Key.Domain, Count: kv.Value},
},
}
},
// Track the domains and their counts per label.
fort.Distinct[DomCount],
cloneFrom.LabelSetsInt,
)

// Make our label indexed map available externally.
d.LabelSets = fort.NewReadMap(
d.LabelSetsInt,
cloneFrom.LabelSets,
)
}

func TestLightPodSpread(t *testing.T) {
d := fort.New[TestData]()

d.Nodes.Update("foo", &TestNode{
Domains: []string{"d"},
Pods: []TestPod{
{
Label: "p",
},
},
})

d.Nodes.Update("foo2", &TestNode{
Domains: []string{"d"},
Pods: []TestPod{
{
Label: "p",
},
{
Label: "q",
},
},
})

d.Nodes.Update("foo3", &TestNode{
Domains: []string{"e"},
Pods: []TestPod{
{
Label: "p",
},
{
Label: "q",
},
},
})

d.DomainLabelCounts.Print()
d.LabelSetsInt.Print()

myClone := fort.Clone(d)

myClone.DomainLabelCounts.Print()
myClone.LabelSetsInt.Print()
}
75 changes: 75 additions & 0 deletions pkg/scheduler/backend/fort/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package fort

import (
"sync"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
)

type wrappedInformer struct {
lock sync.Mutex
keyValueConnector[string]
informer cache.SharedInformer
registration cache.ResourceEventHandlerRegistration
}

var _ cache.ResourceEventHandler = &wrappedInformer{}
var _ KeyValueSource = &wrappedInformer{}
var _ Cloneable = &wrappedInformer{}

type hasUID interface {
GetUID() types.UID
}

type hasName interface {
GetName() string
}

func (w *wrappedInformer) OnAdd(obj interface{}, isInInitialList bool) {
w.lock.Lock()
defer w.lock.Unlock()
key := string(obj.(hasName).GetName())
w.keyValueConnector.Update(key, obj)
}

func (w *wrappedInformer) OnUpdate(oldObj, newObj interface{}) {
w.lock.Lock()
defer w.lock.Unlock()
key := string(newObj.(hasName).GetName())
w.keyValueConnector.Update(key, newObj)
}

func (w *wrappedInformer) OnDelete(obj interface{}) {
w.lock.Lock()
defer w.lock.Unlock()
key := string(obj.(hasName).GetName())
w.keyValueConnector.Delete(key, obj)
}

func (w *wrappedInformer) addTarget(target KeyValueTarget) {
w.lock.Lock()
defer w.lock.Unlock()
w.keyValueConnector.addTarget(target)
}

func (w *wrappedInformer) Clone(owner any) Cloneable {
return newKeyValueConnector[string]()
}

func wrapInformer(informer cache.SharedInformer) (*wrappedInformer, error) {
w := &wrappedInformer{
keyValueConnector: keyValueConnector[string]{
targets: []KeyValueTarget{},
},
informer: informer,
}

var err error
w.registration, err = informer.AddEventHandler(w)
if err != nil {
return nil, err
}

return w, nil
}
Loading