Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2026 Flant JSC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodeaffinity

import corev1 "k8s.io/api/core/v1"

func IntersectTerms(perPVTerms [][]corev1.NodeSelectorTerm) []corev1.NodeSelectorTerm {
if len(perPVTerms) == 0 {
return nil
}
result := perPVTerms[0]
for i := 1; i < len(perPVTerms); i++ {
result = CrossProductTerms(result, perPVTerms[i])
}
return result
}

func CrossProductTerms(a, b []corev1.NodeSelectorTerm) []corev1.NodeSelectorTerm {
var result []corev1.NodeSelectorTerm
for _, termA := range a {
for _, termB := range b {
merged := corev1.NodeSelectorTerm{
MatchExpressions: append(
append([]corev1.NodeSelectorRequirement{}, termA.MatchExpressions...),
termB.MatchExpressions...,
),
}
if len(termA.MatchFields) > 0 || len(termB.MatchFields) > 0 {
merged.MatchFields = append(
append([]corev1.NodeSelectorRequirement{}, termA.MatchFields...),
termB.MatchFields...,
)
}
result = append(result, merged)
}
}
return result
}
27 changes: 27 additions & 0 deletions images/virtualization-artifact/pkg/controller/kvbuilder/kvvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/deckhouse/virtualization-controller/pkg/common"
"github.com/deckhouse/virtualization-controller/pkg/common/array"
"github.com/deckhouse/virtualization-controller/pkg/common/nodeaffinity"
"github.com/deckhouse/virtualization-controller/pkg/common/resource_builder"
"github.com/deckhouse/virtualization-controller/pkg/common/vm"
"github.com/deckhouse/virtualization/api/core/v1alpha2"
Expand Down Expand Up @@ -651,6 +652,32 @@ func (b *KVVM) SetMetadata(metadata metav1.ObjectMeta) {
b.Resource.Spec.Template.ObjectMeta.Annotations = vm.RemoveNonPropagatableAnnotations(b.Resource.Spec.Template.ObjectMeta.Annotations)
}

func (b *KVVM) ApplyPVNodeAffinity(pvTerms []corev1.NodeSelectorTerm) {
if len(pvTerms) == 0 {
return
}

affinity := b.Resource.Spec.Template.Spec.Affinity
if affinity == nil {
affinity = &corev1.Affinity{}
}
if affinity.NodeAffinity == nil {
affinity.NodeAffinity = &corev1.NodeAffinity{}
}
if affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{}
}

existing := affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
if len(existing) == 0 {
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = pvTerms
} else {
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeaffinity.CrossProductTerms(existing, pvTerms)
}

b.Resource.Spec.Template.Spec.Affinity = affinity
}

func (b *KVVM) SetUpdateVolumesStrategy(strategy *virtv1.UpdateVolumesStrategy) {
b.Resource.Spec.UpdateVolumesStrategy = strategy
}
105 changes: 105 additions & 0 deletions images/virtualization-artifact/pkg/controller/kvbuilder/kvvm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,111 @@ func TestSetAffinity(t *testing.T) {
}
}

func TestApplyPVNodeAffinity(t *testing.T) {
nn := types.NamespacedName{Name: "test", Namespace: "test-ns"}

pvTerm := func(key string, nodes ...string) corev1.NodeSelectorTerm {
return corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: nodes,
}},
}
}

t.Run("No PV terms should not modify affinity", func(t *testing.T) {
b := NewEmptyKVVM(nn, KVVMOptions{})
b.ApplyPVNodeAffinity(nil)
if b.Resource.Spec.Template.Spec.Affinity != nil {
t.Error("affinity should remain nil when no PV terms provided")
}
})

t.Run("No PV terms should preserve existing affinity", func(t *testing.T) {
b := NewEmptyKVVM(nn, KVVMOptions{})
existing := &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{pvTerm("k", "v")},
},
},
}
b.Resource.Spec.Template.Spec.Affinity = existing
b.ApplyPVNodeAffinity(nil)
if !reflect.DeepEqual(b.Resource.Spec.Template.Spec.Affinity, existing) {
t.Error("affinity should not change when no PV terms provided")
}
})

t.Run("PV terms applied to empty affinity", func(t *testing.T) {
b := NewEmptyKVVM(nn, KVVMOptions{})
terms := []corev1.NodeSelectorTerm{pvTerm("topology/node", "node-1")}
b.ApplyPVNodeAffinity(terms)

a := b.Resource.Spec.Template.Spec.Affinity
if a == nil || a.NodeAffinity == nil || a.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
t.Fatal("affinity should be set")
}
got := a.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
if !reflect.DeepEqual(got, terms) {
t.Errorf("expected %v, got %v", terms, got)
}
})

t.Run("PV terms merged with existing class affinity via cross-product", func(t *testing.T) {
b := NewEmptyKVVM(nn, KVVMOptions{})
classExpr := []corev1.NodeSelectorRequirement{{
Key: "node-role.kubernetes.io/control-plane",
Operator: corev1.NodeSelectorOpDoesNotExist,
}}
b.SetAffinity(nil, classExpr)

pvTerms := []corev1.NodeSelectorTerm{pvTerm("topology/node", "node-2")}
b.ApplyPVNodeAffinity(pvTerms)

a := b.Resource.Spec.Template.Spec.Affinity
got := a.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
if len(got) != 1 {
t.Fatalf("expected 1 term (cross-product of 1x1), got %d", len(got))
}
if len(got[0].MatchExpressions) != 2 {
t.Errorf("expected 2 match expressions (class + PV), got %d", len(got[0].MatchExpressions))
}
})

t.Run("PV terms cross-product with multiple existing terms", func(t *testing.T) {
b := NewEmptyKVVM(nn, KVVMOptions{})
b.Resource.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
pvTerm("zone", "us-east-1a"),
pvTerm("zone", "us-east-1b"),
},
},
},
}

pvTerms := []corev1.NodeSelectorTerm{
pvTerm("topology/node", "node-1"),
pvTerm("topology/node", "node-2"),
}
b.ApplyPVNodeAffinity(pvTerms)

got := b.Resource.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
// 2 existing x 2 PV = 4 terms
if len(got) != 4 {
t.Fatalf("expected 4 terms (cross-product 2x2), got %d", len(got))
}
for i, term := range got {
if len(term.MatchExpressions) != 2 {
t.Errorf("term %d: expected 2 match expressions, got %d", i, len(term.MatchExpressions))
}
}
})
}

func TestSetOsType(t *testing.T) {
name := "test-name"
namespace := "test-namespace"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (

"github.com/deckhouse/virtualization-controller/pkg/common/annotations"
kvvmutil "github.com/deckhouse/virtualization-controller/pkg/common/kvvm"
"github.com/deckhouse/virtualization-controller/pkg/common/nodeaffinity"
"github.com/deckhouse/virtualization-controller/pkg/common/object"
"github.com/deckhouse/virtualization-controller/pkg/controller/indexer"
"github.com/deckhouse/virtualization-controller/pkg/controller/powerstate"
"github.com/deckhouse/virtualization-controller/pkg/controller/reconciler"
"github.com/deckhouse/virtualization/api/core/v1alpha2"
Expand All @@ -54,6 +56,7 @@ type VirtualMachineState interface {
VMOPs(ctx context.Context) ([]*v1alpha2.VirtualMachineOperation, error)
Shared(fn func(s *Shared))
ReadWriteOnceVirtualDisks(ctx context.Context) ([]*v1alpha2.VirtualDisk, error)
PVNodeAffinityTerms(ctx context.Context) ([]corev1.NodeSelectorTerm, error)
USBDevice(ctx context.Context, name string) (*v1alpha2.USBDevice, error)
USBDevicesByName(ctx context.Context) (map[string]*v1alpha2.USBDevice, error)
}
Expand Down Expand Up @@ -386,6 +389,109 @@ func (s *state) ReadWriteOnceVirtualDisks(ctx context.Context) ([]*v1alpha2.Virt
return nonMigratableVirtualDisks, nil
}

func (s *state) PVNodeAffinityTerms(ctx context.Context) ([]corev1.NodeSelectorTerm, error) {
refs := s.collectBlockDeviceRefs(ctx)

var perPVTerms [][]corev1.NodeSelectorTerm
namespace := s.vm.Current().GetNamespace()

for _, ref := range refs {
pvcName, err := s.resolvePVCName(ctx, ref.Kind, ref.Name)
if err != nil || pvcName == "" {
continue
}

terms, err := s.pvNodeAffinityTermsForPVC(ctx, pvcName, namespace)
if err != nil || terms == nil {
continue
}
perPVTerms = append(perPVTerms, terms)
}

return nodeaffinity.IntersectTerms(perPVTerms), nil
}

func (s *state) collectBlockDeviceRefs(ctx context.Context) []blockDeviceRef {
seen := make(map[blockDeviceRef]struct{})
var refs []blockDeviceRef

for _, bd := range s.vm.Current().Spec.BlockDeviceRefs {
ref := blockDeviceRef{Name: bd.Name, Kind: bd.Kind}
if _, ok := seen[ref]; !ok {
seen[ref] = struct{}{}
refs = append(refs, ref)
}
}

var vmbdaList v1alpha2.VirtualMachineBlockDeviceAttachmentList
err := s.client.List(ctx, &vmbdaList,
client.InNamespace(s.vm.Current().GetNamespace()),
client.MatchingFields{indexer.IndexFieldVMBDAByVM: s.vm.Current().GetName()},
)
if err != nil {
return refs
}

for _, vmbda := range vmbdaList.Items {
if vmbda.Status.Phase != v1alpha2.BlockDeviceAttachmentPhaseAttached {
continue
}
ref := blockDeviceRef{
Name: vmbda.Spec.BlockDeviceRef.Name,
Kind: v1alpha2.BlockDeviceKind(vmbda.Spec.BlockDeviceRef.Kind),
}
if _, ok := seen[ref]; !ok {
seen[ref] = struct{}{}
refs = append(refs, ref)
}
}

return refs
}

func (s *state) resolvePVCName(ctx context.Context, kind v1alpha2.BlockDeviceKind, name string) (string, error) {
switch kind {
case v1alpha2.DiskDevice:
vd, err := s.VirtualDisk(ctx, name)
if err != nil || vd == nil {
return "", err
}
return vd.Status.Target.PersistentVolumeClaim, nil
case v1alpha2.ImageDevice:
vi, err := s.VirtualImage(ctx, name)
if err != nil || vi == nil {
return "", err
}
if vi.Spec.Storage != v1alpha2.StorageKubernetes && vi.Spec.Storage != v1alpha2.StoragePersistentVolumeClaim {
return "", nil
}
return vi.Status.Target.PersistentVolumeClaim, nil
default:
return "", nil
}
}

func (s *state) pvNodeAffinityTermsForPVC(ctx context.Context, pvcName, namespace string) ([]corev1.NodeSelectorTerm, error) {
pvc, err := object.FetchObject(ctx, types.NamespacedName{
Name: pvcName, Namespace: namespace,
}, s.client, &corev1.PersistentVolumeClaim{})
if err != nil || pvc == nil || pvc.Spec.VolumeName == "" {
return nil, err
}

pv, err := object.FetchObject(ctx, types.NamespacedName{
Name: pvc.Spec.VolumeName,
}, s.client, &corev1.PersistentVolume{})
if err != nil || pv == nil {
return nil, err
}

if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required != nil && len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) > 0 {
return pv.Spec.NodeAffinity.Required.NodeSelectorTerms, nil
}
return nil, nil
}

func (s *state) USBDevice(ctx context.Context, name string) (*v1alpha2.USBDevice, error) {
return object.FetchObject(ctx, types.NamespacedName{
Name: name,
Expand Down
Loading
Loading