diff --git a/api/core/v1alpha2/usbdevicecondition/condition.go b/api/core/v1alpha2/usbdevicecondition/condition.go index c8a9a1c2cf..57686ca0d7 100644 --- a/api/core/v1alpha2/usbdevicecondition/condition.go +++ b/api/core/v1alpha2/usbdevicecondition/condition.go @@ -51,6 +51,8 @@ const ( Available AttachedReason = "Available" // DetachedForMigration signifies that device was detached for migration (e.g. live migration). DetachedForMigration AttachedReason = "DetachedForMigration" + // NoFreeUSBIPPort signifies that device cannot be attached because there are no free USBIP ports on the target node. + NoFreeUSBIPPort AttachedReason = "NoFreeUSBIPPort" ) func (r ReadyReason) String() string { diff --git a/images/virtualization-artifact/pkg/common/usb/availability.go b/images/virtualization-artifact/pkg/common/usb/availability.go new file mode 100644 index 0000000000..27f006ca3b --- /dev/null +++ b/images/virtualization-artifact/pkg/common/usb/availability.go @@ -0,0 +1,129 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package usb + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" + "github.com/deckhouse/virtualization-controller/pkg/controller/indexer" + "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func CheckFreePortOnNodeExcludingLocalUSBs(ctx context.Context, cl client.Client, nodeName string, speed int) (bool, error) { + return CheckFreePortForRequestOnNodeExcludingLocalUSBs(ctx, cl, nodeName, speed, 1) +} + +func CheckFreePortForRequestOnNodeExcludingLocalUSBs(ctx context.Context, cl client.Client, nodeName string, speed, requestedCount int) (bool, error) { + node := &corev1.Node{} + if err := cl.Get(ctx, client.ObjectKey{Name: nodeName}, node); err != nil { + return false, err + } + + isHS, isSS := ResolveSpeed(speed) + if !isHS && !isSS { + return false, fmt.Errorf("unsupported USB speed: %d", speed) + } + + totalPortsPerHub, err := GetTotalPortsPerHub(node.Annotations) + if err != nil { + return false, err + } + + usedPorts, err := getUsedPortsForSpeed(node.Annotations, speed) + if err != nil { + return false, err + } + + excludedLocalUSBs, err := countLocalAttachedUSBsOnNodeBySpeed(ctx, cl, nodeName, speed) + if err != nil { + return false, err + } + + effectiveUsedPorts := usedPorts - excludedLocalUSBs + if effectiveUsedPorts < 0 { + effectiveUsedPorts = 0 + } + + return (effectiveUsedPorts + requestedCount) <= totalPortsPerHub, nil +} + +func getUsedPortsForSpeed(nodeAnnotations map[string]string, speed int) (int, error) { + isHS, isSS := ResolveSpeed(speed) + + switch { + case isHS: + return GetUsedPorts(nodeAnnotations, annotations.AnnUSBIPHighSpeedHubUsedPorts) + case isSS: + return GetUsedPorts(nodeAnnotations, annotations.AnnUSBIPSuperSpeedHubUsedPorts) + default: + return 0, fmt.Errorf("unsupported USB speed: %d", speed) + } +} + +func countLocalAttachedUSBsOnNodeBySpeed(ctx context.Context, cl client.Client, nodeName string, speed int) (int, error) { + var vmList v1alpha2.VirtualMachineList + if err := cl.List(ctx, &vmList, client.MatchingFields{indexer.IndexFieldVMByNode: nodeName}); err != nil { + return 0, err + } + + count := 0 + usbCache := make(map[client.ObjectKey]*v1alpha2.USBDevice) + for i := range vmList.Items { + vm := &vmList.Items[i] + for _, usbStatus := range vm.Status.USBDevices { + if !usbStatus.Attached { + continue + } + + key := client.ObjectKey{Name: usbStatus.Name, Namespace: vm.Namespace} + usbDevice, ok := usbCache[key] + if !ok { + usbDevice = &v1alpha2.USBDevice{} + if err := cl.Get(ctx, key, usbDevice); err != nil { + if apierrors.IsNotFound(err) { + continue + } + return 0, err + } + usbCache[key] = usbDevice + } + + if usbDevice.Status.NodeName != nodeName { + continue + } + + if sameSpeedClass(usbDevice.Status.Attributes.Speed, speed) { + count++ + } + } + } + + return count, nil +} + +func sameSpeedClass(deviceSpeed, requestedSpeed int) bool { + deviceHS, deviceSS := ResolveSpeed(deviceSpeed) + requestedHS, requestedSS := ResolveSpeed(requestedSpeed) + + return (deviceHS && requestedHS) || (deviceSS && requestedSS) +} diff --git a/images/virtualization-artifact/pkg/common/usb/availability_test.go b/images/virtualization-artifact/pkg/common/usb/availability_test.go new file mode 100644 index 0000000000..465b643e26 --- /dev/null +++ b/images/virtualization-artifact/pkg/common/usb/availability_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package usb + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" + "github.com/deckhouse/virtualization-controller/pkg/controller/indexer" + "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +var _ = Describe("availability helpers", func() { + newNode := func(usedHSPorts string) *corev1.Node { + return &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Annotations: map[string]string{ + annotations.AnnUSBIPTotalPorts: "2", + annotations.AnnUSBIPHighSpeedHubUsedPorts: usedHSPorts, + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }}} + } + + newVM := func(statuses ...v1alpha2.USBDeviceStatusRef) *v1alpha2.VirtualMachine { + return &v1alpha2.VirtualMachine{ + ObjectMeta: metav1.ObjectMeta{Name: "vm-1", Namespace: "default"}, + Status: v1alpha2.VirtualMachineStatus{ + Node: "node-1", + USBDevices: statuses, + }, + } + } + + newUSBDevice := func(name string, speed int) *v1alpha2.USBDevice { + return &v1alpha2.USBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"}, + Status: v1alpha2.USBDeviceStatus{ + NodeName: "node-1", + Attributes: v1alpha2.NodeUSBDeviceAttributes{ + Speed: speed, + }, + }, + } + } + + newClient := func(objects ...client.Object) client.Client { + scheme := apiruntime.NewScheme() + Expect(v1alpha2.AddToScheme(scheme)).To(Succeed()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + return fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue). + Build() + } + + It("excludes local attached USB devices of the same speed class from used port accounting", func() { + cl := newClient( + newNode("1"), + newVM(v1alpha2.USBDeviceStatusRef{Name: "usb-local", Attached: true}), + newUSBDevice("usb-local", 480), + ) + + hasFree, err := CheckFreePortForRequestOnNodeExcludingLocalUSBs(context.Background(), cl, "node-1", 480, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(hasFree).To(BeTrue()) + }) + + It("does not exclude local attached USB devices from another speed class", func() { + cl := newClient( + newNode("1"), + newVM(v1alpha2.USBDeviceStatusRef{Name: "usb-local-ss", Attached: true}), + newUSBDevice("usb-local-ss", 5000), + ) + + hasFree, err := CheckFreePortForRequestOnNodeExcludingLocalUSBs(context.Background(), cl, "node-1", 480, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(hasFree).To(BeFalse()) + }) + + It("ignores stale VM status entries when the referenced USBDevice is missing", func() { + cl := newClient( + newNode("1"), + newVM(v1alpha2.USBDeviceStatusRef{Name: "missing-usb", Attached: true}), + ) + + hasFree, err := CheckFreePortForRequestOnNodeExcludingLocalUSBs(context.Background(), cl, "node-1", 480, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(hasFree).To(BeFalse()) + }) + + It("clamps effective used ports to zero when excluded local devices exceed node annotations", func() { + cl := newClient( + newNode("0"), + newVM( + v1alpha2.USBDeviceStatusRef{Name: "usb-local-1", Attached: true}, + v1alpha2.USBDeviceStatusRef{Name: "usb-local-2", Attached: true}, + ), + newUSBDevice("usb-local-1", 480), + newUSBDevice("usb-local-2", 480), + ) + + hasFree, err := CheckFreePortForRequestOnNodeExcludingLocalUSBs(context.Background(), cl, "node-1", 480, 1) + Expect(err).NotTo(HaveOccurred()) + Expect(hasFree).To(BeTrue()) + }) +}) diff --git a/images/virtualization-artifact/pkg/common/usb/speed.go b/images/virtualization-artifact/pkg/common/usb/speed.go new file mode 100644 index 0000000000..41cb5bb815 --- /dev/null +++ b/images/virtualization-artifact/pkg/common/usb/speed.go @@ -0,0 +1,78 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package usb + +import ( + "fmt" + "strconv" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" +) + +// ResolveSpeed determines USB hub type from speed in Mbps. +// https://mjmwired.net/kernel/Documentation/ABI/testing/sysfs-bus-usb#502 +func ResolveSpeed(speed int) (isHS, isSS bool) { + return speed == 480, speed >= 5000 +} + +// GetTotalPortsPerHub returns the number of ports per hub (total / 2). +func GetTotalPortsPerHub(nodeAnnotations map[string]string) (int, error) { + totalPortsStr, exists := nodeAnnotations[annotations.AnnUSBIPTotalPorts] + if !exists { + return 0, fmt.Errorf("node does not have %s annotation", annotations.AnnUSBIPTotalPorts) + } + totalPorts, err := strconv.Atoi(totalPortsStr) + if err != nil { + return 0, fmt.Errorf("failed to parse %s annotation: %w", annotations.AnnUSBIPTotalPorts, err) + } + return totalPorts / 2, nil +} + +// GetUsedPorts returns the number of used ports for the given hub type. +func GetUsedPorts(nodeAnnotations map[string]string, hubAnnotation string) (int, error) { + usedPortsStr, exists := nodeAnnotations[hubAnnotation] + if !exists { + return 0, fmt.Errorf("node does not have %s annotation", hubAnnotation) + } + usedPorts, err := strconv.Atoi(usedPortsStr) + if err != nil { + return 0, fmt.Errorf("failed to parse %s annotation: %w", hubAnnotation, err) + } + return usedPorts, nil +} + +// CheckFreePort checks if a node has free USBIP ports for the given speed. +// Returns true if there is at least one free port, false otherwise. +func CheckFreePort(nodeAnnotations map[string]string, speed int) (bool, error) { + return CheckFreePortForRequest(nodeAnnotations, speed, 1) +} + +// CheckFreePortForRequest checks if there are enough free ports for a specific request. +// It adds the requested count to the currently used ports and compares with total. +func CheckFreePortForRequest(nodeAnnotations map[string]string, speed, requestedCount int) (bool, error) { + totalPortsPerHub, err := GetTotalPortsPerHub(nodeAnnotations) + if err != nil { + return false, err + } + + usedPorts, err := getUsedPortsForSpeed(nodeAnnotations, speed) + if err != nil { + return false, err + } + + return (usedPorts + requestedCount) <= totalPortsPerHub, nil +} diff --git a/images/virtualization-artifact/pkg/common/usb/speed_test.go b/images/virtualization-artifact/pkg/common/usb/speed_test.go new file mode 100644 index 0000000000..0632b0c64b --- /dev/null +++ b/images/virtualization-artifact/pkg/common/usb/speed_test.go @@ -0,0 +1,237 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package usb + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" +) + +func TestSpeed(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "USB speed package tests") +} + +var _ = Describe("ResolveSpeed", func() { + DescribeTable("returns correct hub type for speed", + func(speed int, expectedHS, expectedSS bool) { + isHS, isSS := ResolveSpeed(speed) + Expect(isHS).To(Equal(expectedHS)) + Expect(isSS).To(Equal(expectedSS)) + }, + Entry("low speed 1.0", 1, false, false), + Entry("full speed 1.1", 12, false, false), + Entry("high speed 2.0", 480, true, false), + Entry("wireless 2.5", 2500, false, false), + Entry("super speed 3.0", 5000, false, true), + Entry("super speed 3.1", 10000, false, true), + Entry("super speed 3.2", 20000, false, true), + Entry("super speed plus 3.1", 10000, false, true), + ) +}) + +var _ = Describe("CheckFreePort", func() { + DescribeTable("returns correct availability", + func(speed int, nodeAnnotations map[string]string, expectedResult, expectError bool) { + result, err := CheckFreePort(nodeAnnotations, speed) + if expectError { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(expectedResult)) + } + }, + Entry("HS speed, enough ports", + 480, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "3", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + true, false, + ), + Entry("HS speed, no ports left", + 480, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "4", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + false, false, + ), + Entry("SS speed, enough ports", + 5000, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "4", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "2", + }, + true, false, + ), + Entry("SS speed, no ports left", + 10000, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "4", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "4", + }, + false, false, + ), + Entry("unsupported speed", + 12, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "0", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + false, true, + ), + Entry("missing total ports annotation", + 480, + map[string]string{ + annotations.AnnUSBIPHighSpeedHubUsedPorts: "0", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + false, true, + ), + Entry("missing HS hub annotation", + 480, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + false, true, + ), + Entry("missing SS hub annotation", + 5000, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "0", + }, + false, true, + ), + ) +}) + +var _ = Describe("CheckFreePortForRequest", func() { + DescribeTable("returns correct availability for request", + func(speed int, nodeAnnotations map[string]string, requestedCount int, expectedResult, expectError bool) { + result, err := CheckFreePortForRequest(nodeAnnotations, speed, requestedCount) + if expectError { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(expectedResult)) + } + }, + Entry("HS speed, request 1, enough ports", + 480, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "3", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + 1, true, false, + ), + Entry("HS speed, request 2, no ports", + 480, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "3", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + 2, false, false, + ), + Entry("HS speed, request 2, exactly at limit", + 480, + map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "2", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + 2, true, false, + ), + ) +}) + +var _ = Describe("GetTotalPortsPerHub", func() { + DescribeTable("returns correct total ports per hub", + func(nodeAnnotations map[string]string, expectedResult int, expectError bool) { + result, err := GetTotalPortsPerHub(nodeAnnotations) + if expectError { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(expectedResult)) + } + }, + Entry("total 8 ports", + map[string]string{annotations.AnnUSBIPTotalPorts: "8"}, + 4, false, + ), + Entry("total 4 ports", + map[string]string{annotations.AnnUSBIPTotalPorts: "4"}, + 2, false, + ), + Entry("missing annotation", + map[string]string{}, + 0, true, + ), + Entry("invalid value", + map[string]string{annotations.AnnUSBIPTotalPorts: "invalid"}, + 0, true, + ), + ) +}) + +var _ = Describe("GetUsedPorts", func() { + DescribeTable("returns correct used ports", + func(nodeAnnotations map[string]string, hubAnnotation string, expectedResult int, expectError bool) { + result, err := GetUsedPorts(nodeAnnotations, hubAnnotation) + if expectError { + Expect(err).To(HaveOccurred()) + } else { + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(expectedResult)) + } + }, + Entry("HS hub, 3 used", + map[string]string{annotations.AnnUSBIPHighSpeedHubUsedPorts: "3"}, + annotations.AnnUSBIPHighSpeedHubUsedPorts, + 3, false, + ), + Entry("SS hub, 0 used", + map[string]string{annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0"}, + annotations.AnnUSBIPSuperSpeedHubUsedPorts, + 0, false, + ), + Entry("missing annotation", + map[string]string{}, + annotations.AnnUSBIPHighSpeedHubUsedPorts, + 0, true, + ), + Entry("invalid value", + map[string]string{annotations.AnnUSBIPHighSpeedHubUsedPorts: "invalid"}, + annotations.AnnUSBIPHighSpeedHubUsedPorts, + 0, true, + ), + ) +}) diff --git a/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle.go b/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle.go index 53c8f6efbd..e4ef7826ed 100644 --- a/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle.go +++ b/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/deckhouse/virtualization-controller/pkg/common/annotations" + "github.com/deckhouse/virtualization-controller/pkg/common/usb" "github.com/deckhouse/virtualization-controller/pkg/controller/service" "github.com/deckhouse/virtualization-controller/pkg/controller/usbdevice/internal/state" "github.com/deckhouse/virtualization-controller/pkg/logger" @@ -192,34 +193,59 @@ func (h *LifecycleHandler) ensureResourceClaimTemplate(ctx context.Context, s st } func (h *LifecycleHandler) syncAttached(ctx context.Context, s state.USBDeviceState) error { + log := logger.FromContext(ctx).With(logger.SlogHandler(nameLifecycleHandler)) + current := s.USBDevice().Current() changed := s.USBDevice().Changed() - vms, err := s.VirtualMachinesUsingDevice(ctx) + attachedVMs, err := s.VirtualMachinesUsingDevice(ctx) if err != nil { return fmt.Errorf("failed to find VirtualMachines using USBDevice: %w", err) } - var reason usbdevicecondition.AttachedReason - var status metav1.ConditionStatus - var message string + referencingVMs, err := s.VirtualMachinesReferencingDevice(ctx) + if err != nil { + return fmt.Errorf("failed to find VirtualMachines referencing USBDevice: %w", err) + } - if len(vms) == 0 { - reason = usbdevicecondition.Available - status = metav1.ConditionFalse - message = "Device is available for attachment to a virtual machine." - setAttachedCondition(current, &changed.Status.Conditions, status, reason, message) + if len(referencingVMs) == 0 { + setAttachedCondition(current, &changed.Status.Conditions, metav1.ConditionFalse, usbdevicecondition.Available, "Device is available for attachment to a virtual machine.") return nil } - reason = usbdevicecondition.AttachedToVirtualMachine - status = metav1.ConditionTrue - message = fmt.Sprintf("Device is attached to %d VirtualMachines.", len(vms)) - if len(vms) == 1 { - message = fmt.Sprintf("Device is attached to VirtualMachine %s/%s.", vms[0].Namespace, vms[0].Name) + if len(attachedVMs) > 0 { + message := fmt.Sprintf("Device is attached to %d VirtualMachines.", len(attachedVMs)) + if len(attachedVMs) == 1 { + message = fmt.Sprintf("Device is attached to VirtualMachine %s/%s.", attachedVMs[0].Namespace, attachedVMs[0].Name) + } + setAttachedCondition(current, &changed.Status.Conditions, metav1.ConditionTrue, usbdevicecondition.AttachedToVirtualMachine, message) + return nil + } + + for _, vm := range referencingVMs { + if changed.Status.NodeName == "" || vm.Status.Node == "" || changed.Status.NodeName == vm.Status.Node { + continue + } + + hasFreePort, err := usb.CheckFreePortOnNodeExcludingLocalUSBs(ctx, h.client, vm.Status.Node, changed.Status.Attributes.Speed) + if err != nil { + if apierrors.IsNotFound(err) { + log.Debug("node not found while checking free USBIP ports", "device", changed.Name, "node", vm.Status.Node) + continue + } + + return fmt.Errorf("failed to check free USBIP ports for USBDevice %s on node %s: %w", changed.Name, vm.Status.Node, err) + } + + if !hasFreePort { + message := fmt.Sprintf("Device is requested by VirtualMachine %s/%s, but no free USBIP ports are available on node %s for speed %d.", + vm.Namespace, vm.Name, vm.Status.Node, changed.Status.Attributes.Speed) + setAttachedCondition(current, &changed.Status.Conditions, metav1.ConditionFalse, usbdevicecondition.NoFreeUSBIPPort, message) + return nil + } } - setAttachedCondition(current, &changed.Status.Conditions, status, reason, message) + setAttachedCondition(current, &changed.Status.Conditions, metav1.ConditionFalse, usbdevicecondition.Available, "Device is requested by a virtual machine but not attached yet.") return nil } diff --git a/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle_test.go b/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle_test.go index 24c20273a4..0bc5ff960c 100644 --- a/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle_test.go +++ b/images/virtualization-artifact/pkg/controller/usbdevice/internal/handler/lifecycle_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" resourcev1 "k8s.io/api/resource/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -31,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/controller/indexer" "github.com/deckhouse/virtualization-controller/pkg/controller/reconciler" "github.com/deckhouse/virtualization-controller/pkg/controller/usbdevice/internal/state" @@ -48,6 +50,7 @@ var _ = Describe("LifecycleHandler", func() { ctx = logger.ToContext(context.TODO(), slog.Default()) scheme = apiruntime.NewScheme() Expect(v1alpha2.AddToScheme(scheme)).To(Succeed()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) Expect(resourcev1.AddToScheme(scheme)).To(Succeed()) }) @@ -88,7 +91,8 @@ var _ = Describe("LifecycleHandler", func() { } vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() - cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).WithIndex(vmObj, vmField, vmExtractValue).Build() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() res := reconciler.NewResource( types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, @@ -124,6 +128,182 @@ var _ = Describe("LifecycleHandler", func() { Entry("node missing", false, "", false, false, metav1.ConditionFalse, string(usbdevicecondition.NotFound), string(usbdevicecondition.Available)), ) + It("should set NoFreeUSBIPPort when VM references device but attach cannot start", func() { + usbDevice := &v1alpha2.USBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1", Namespace: "default", UID: "usb-uid-1"}, + Status: v1alpha2.USBDeviceStatus{Attributes: v1alpha2.NodeUSBDeviceAttributes{ + Name: "usb-device-1", + VendorID: "1234", + ProductID: "5678", + Speed: 480, + }}, + } + + nodeUSBDevice := &v1alpha2.NodeUSBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1"}, + Status: v1alpha2.NodeUSBDeviceStatus{ + Attributes: v1alpha2.NodeUSBDeviceAttributes{Name: "usb-device-1", VendorID: "1234", ProductID: "5678", Speed: 480}, + NodeName: "node-1", + Conditions: []metav1.Condition{{Type: string(nodeusbdevicecondition.ReadyType), Status: metav1.ConditionTrue, Reason: string(nodeusbdevicecondition.Ready), Message: "Node status"}}, + }, + } + + vm := &v1alpha2.VirtualMachine{ + ObjectMeta: metav1.ObjectMeta{Name: "vm-1", Namespace: "default"}, + Spec: v1alpha2.VirtualMachineSpec{USBDevices: []v1alpha2.USBDeviceSpecRef{{Name: "usb-device-1"}}}, + Status: v1alpha2.VirtualMachineStatus{ + Node: "node-2", + USBDevices: []v1alpha2.USBDeviceStatusRef{{Name: "usb-device-1", Attached: false}}, + }, + } + + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Annotations: map[string]string{ + annotations.AnnUSBIPTotalPorts: "2", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "1", + }}} + + vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice, vm, node).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() + + res := reconciler.NewResource( + types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, + cl, + func() *v1alpha2.USBDevice { return &v1alpha2.USBDevice{} }, + func(obj *v1alpha2.USBDevice) v1alpha2.USBDeviceStatus { return obj.Status }, + ) + Expect(res.Fetch(ctx)).To(Succeed()) + + st := state.New(cl, res) + h := NewLifecycleHandler(cl) + _, err := h.Handle(ctx, st) + Expect(err).NotTo(HaveOccurred()) + + attached := meta.FindStatusCondition(res.Changed().Status.Conditions, string(usbdevicecondition.AttachedType)) + Expect(attached).NotTo(BeNil()) + Expect(attached.Reason).To(Equal(string(usbdevicecondition.NoFreeUSBIPPort))) + Expect(attached.Status).To(Equal(metav1.ConditionFalse)) + Expect(attached.Message).To(ContainSubstring("requested by VirtualMachine")) + }) + + It("should keep AttachedToVirtualMachine when at least one VM already has device attached", func() { + usbDevice := &v1alpha2.USBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1", Namespace: "default", UID: "usb-uid-1"}, + Status: v1alpha2.USBDeviceStatus{Attributes: v1alpha2.NodeUSBDeviceAttributes{ + Name: "usb-device-1", + VendorID: "1234", + ProductID: "5678", + Speed: 480, + }}, + } + + nodeUSBDevice := &v1alpha2.NodeUSBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1"}, + Status: v1alpha2.NodeUSBDeviceStatus{ + Attributes: v1alpha2.NodeUSBDeviceAttributes{Name: "usb-device-1", VendorID: "1234", ProductID: "5678", Speed: 480}, + NodeName: "node-1", + Conditions: []metav1.Condition{{Type: string(nodeusbdevicecondition.ReadyType), Status: metav1.ConditionTrue, Reason: string(nodeusbdevicecondition.Ready), Message: "Node status"}}, + }, + } + + vmAttached := &v1alpha2.VirtualMachine{ + ObjectMeta: metav1.ObjectMeta{Name: "vm-attached", Namespace: "default"}, + Spec: v1alpha2.VirtualMachineSpec{USBDevices: []v1alpha2.USBDeviceSpecRef{{Name: "usb-device-1"}}}, + Status: v1alpha2.VirtualMachineStatus{ + Node: "node-2", + USBDevices: []v1alpha2.USBDeviceStatusRef{{Name: "usb-device-1", Attached: true}}, + }, + } + + vmPending := &v1alpha2.VirtualMachine{ + ObjectMeta: metav1.ObjectMeta{Name: "vm-pending", Namespace: "default"}, + Spec: v1alpha2.VirtualMachineSpec{USBDevices: []v1alpha2.USBDeviceSpecRef{{Name: "usb-device-1"}}}, + Status: v1alpha2.VirtualMachineStatus{ + Node: "node-2", + USBDevices: []v1alpha2.USBDeviceStatusRef{{Name: "usb-device-1", Attached: false}}, + }, + } + + node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Annotations: map[string]string{ + annotations.AnnUSBIPTotalPorts: "2", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "1", + }}} + + vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice, vmAttached, vmPending, node).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() + + res := reconciler.NewResource( + types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, + cl, + func() *v1alpha2.USBDevice { return &v1alpha2.USBDevice{} }, + func(obj *v1alpha2.USBDevice) v1alpha2.USBDeviceStatus { return obj.Status }, + ) + Expect(res.Fetch(ctx)).To(Succeed()) + + st := state.New(cl, res) + h := NewLifecycleHandler(cl) + _, err := h.Handle(ctx, st) + Expect(err).NotTo(HaveOccurred()) + + attached := meta.FindStatusCondition(res.Changed().Status.Conditions, string(usbdevicecondition.AttachedType)) + Expect(attached).NotTo(BeNil()) + Expect(attached.Reason).To(Equal(string(usbdevicecondition.AttachedToVirtualMachine))) + Expect(attached.Status).To(Equal(metav1.ConditionTrue)) + Expect(attached.Message).To(ContainSubstring("attached to")) + }) + + It("should return error when USBIP port availability check fails unexpectedly", func() { + usbDevice := &v1alpha2.USBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1", Namespace: "default", UID: "usb-uid-1"}, + Status: v1alpha2.USBDeviceStatus{Attributes: v1alpha2.NodeUSBDeviceAttributes{ + Name: "usb-device-1", + VendorID: "1234", + ProductID: "5678", + Speed: 480, + }}, + } + + nodeUSBDevice := &v1alpha2.NodeUSBDevice{ + ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1"}, + Status: v1alpha2.NodeUSBDeviceStatus{ + Attributes: v1alpha2.NodeUSBDeviceAttributes{Name: "usb-device-1", VendorID: "1234", ProductID: "5678", Speed: 480}, + NodeName: "node-1", + Conditions: []metav1.Condition{{Type: string(nodeusbdevicecondition.ReadyType), Status: metav1.ConditionTrue, Reason: string(nodeusbdevicecondition.Ready), Message: "Node status"}}, + }, + } + + vm := &v1alpha2.VirtualMachine{ + ObjectMeta: metav1.ObjectMeta{Name: "vm-1", Namespace: "default"}, + Spec: v1alpha2.VirtualMachineSpec{USBDevices: []v1alpha2.USBDeviceSpecRef{{Name: "usb-device-1"}}}, + Status: v1alpha2.VirtualMachineStatus{Node: "node-2", USBDevices: []v1alpha2.USBDeviceStatusRef{{Name: "usb-device-1", Attached: false}}}, + } + + badNode := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Annotations: map[string]string{ + annotations.AnnUSBIPTotalPorts: "invalid", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "0", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }}} + + vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice, vm, badNode).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() + + res := reconciler.NewResource( + types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, + cl, + func() *v1alpha2.USBDevice { return &v1alpha2.USBDevice{} }, + func(obj *v1alpha2.USBDevice) v1alpha2.USBDeviceStatus { return obj.Status }, + ) + Expect(res.Fetch(ctx)).To(Succeed()) + + st := state.New(cl, res) + h := NewLifecycleHandler(cl) + _, err := h.Handle(ctx, st) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to check free USBIP ports for USBDevice")) + }) + It("should skip ResourceClaimTemplate when attribute name is empty", func() { usbDevice := &v1alpha2.USBDevice{ ObjectMeta: metav1.ObjectMeta{Name: "usb-device-1", Namespace: "default", UID: "usb-uid-1"}, @@ -133,7 +313,8 @@ var _ = Describe("LifecycleHandler", func() { objects := []client.Object{usbDevice} vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() - cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).WithIndex(vmObj, vmField, vmExtractValue).Build() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() res := reconciler.NewResource( types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, @@ -170,7 +351,8 @@ var _ = Describe("LifecycleHandler", func() { } vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() - cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice).WithIndex(vmObj, vmField, vmExtractValue).Build() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() res := reconciler.NewResource( types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, @@ -238,7 +420,8 @@ var _ = Describe("LifecycleHandler", func() { } vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() - cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice, template).WithIndex(vmObj, vmField, vmExtractValue).Build() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(usbDevice, nodeUSBDevice, template).WithIndex(vmObj, vmField, vmExtractValue).WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue).Build() res := reconciler.NewResource( types.NamespacedName{Name: usbDevice.Name, Namespace: usbDevice.Namespace}, diff --git a/images/virtualization-artifact/pkg/controller/usbdevice/internal/state/state.go b/images/virtualization-artifact/pkg/controller/usbdevice/internal/state/state.go index bf1da15aa4..2aed1eedcc 100644 --- a/images/virtualization-artifact/pkg/controller/usbdevice/internal/state/state.go +++ b/images/virtualization-artifact/pkg/controller/usbdevice/internal/state/state.go @@ -31,6 +31,7 @@ type USBDeviceState interface { USBDevice() *reconciler.Resource[*v1alpha2.USBDevice, v1alpha2.USBDeviceStatus] NodeUSBDevice(ctx context.Context) (*v1alpha2.NodeUSBDevice, error) VirtualMachinesUsingDevice(ctx context.Context) ([]*v1alpha2.VirtualMachine, error) + VirtualMachinesReferencingDevice(ctx context.Context) ([]*v1alpha2.VirtualMachine, error) } func New(client client.Client, usbDevice *reconciler.Resource[*v1alpha2.USBDevice, v1alpha2.USBDeviceStatus]) USBDeviceState { @@ -67,7 +68,7 @@ func (s *usbDeviceState) NodeUSBDevice(ctx context.Context) (*v1alpha2.NodeUSBDe return nodeUSBDevice, nil } -func (s *usbDeviceState) VirtualMachinesUsingDevice(ctx context.Context) ([]*v1alpha2.VirtualMachine, error) { +func (s *usbDeviceState) VirtualMachinesReferencingDevice(ctx context.Context) ([]*v1alpha2.VirtualMachine, error) { usbDevice := s.usbDevice.Current() if usbDevice == nil { return nil, nil @@ -83,14 +84,31 @@ func (s *usbDeviceState) VirtualMachinesUsingDevice(ctx context.Context) ([]*v1a var result []*v1alpha2.VirtualMachine for i := range vmList.Items { vm := &vmList.Items[i] - // Check if VM is in the same namespace as USBDevice if vm.Namespace == usbDevice.Namespace { - // Verify that device is actually attached in VM status - for _, usbStatus := range vm.Status.USBDevices { - if usbStatus.Name == usbDevice.Name && usbStatus.Attached { - result = append(result, vm) - break - } + result = append(result, vm) + } + } + + return result, nil +} + +func (s *usbDeviceState) VirtualMachinesUsingDevice(ctx context.Context) ([]*v1alpha2.VirtualMachine, error) { + usbDevice := s.usbDevice.Current() + if usbDevice == nil { + return nil, nil + } + + vms, err := s.VirtualMachinesReferencingDevice(ctx) + if err != nil { + return nil, err + } + + var result []*v1alpha2.VirtualMachine + for _, vm := range vms { + for _, usbStatus := range vm.Status.USBDevices { + if usbStatus.Name == usbDevice.Name && usbStatus.Attached { + result = append(result, vm) + break } } } diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler.go b/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler.go index 1075afa870..50f7ac2bcd 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/deckhouse/virtualization-controller/pkg/common/usb" "github.com/deckhouse/virtualization-controller/pkg/controller/conditions" "github.com/deckhouse/virtualization-controller/pkg/controller/vm/internal/state" "github.com/deckhouse/virtualization-controller/pkg/logger" @@ -91,7 +92,8 @@ func (h *USBDeviceAttachHandler) Handle(ctx context.Context, s state.VirtualMach var kvvmiLoaded bool var kvvmi *virtv1.VirtualMachineInstance - var hostDeviceReadyByName map[string]bool + var hostDeviceReadyByName map[string]struct{} + var hostDeviceExistsByName map[string]struct{} var nextStatusRefs []v1alpha2.USBDeviceStatusRef for _, usbDeviceRef := range vm.Spec.USBDevices { @@ -140,11 +142,12 @@ func (h *USBDeviceAttachHandler) Handle(ctx context.Context, s state.VirtualMach kvvmiLoaded = true } - if hostDeviceReadyByName == nil { - hostDeviceReadyByName = h.hostDeviceReadyByName(kvvmi) + if hostDeviceReadyByName == nil || hostDeviceExistsByName == nil { + hostDeviceReadyByName, hostDeviceExistsByName = h.hostDeviceMapsByName(kvvmi) } - if hostDeviceReadyByName[deviceName] { + // If device is already attached in KVVMI - preserve status, skip port check. + if _, exists := hostDeviceReadyByName[deviceName]; exists { address := h.getUSBAddressFromKVVMI(deviceName, kvvmi) isHotplugged := vm.Status.Phase == v1alpha2.MachineRunning @@ -160,6 +163,27 @@ func (h *USBDeviceAttachHandler) Handle(ctx context.Context, s state.VirtualMach continue } + // 4) Check free USBIP ports for cross-node attachments until KVVMI reflects the device. + // Once the host device is listed in KVVMI, attach is already in progress, so skip the check. + if _, exists := hostDeviceExistsByName[deviceName]; !exists && usbDevice.Status.NodeName != "" && vm.Status.Node != "" && usbDevice.Status.NodeName != vm.Status.Node { + hasFreePort, err := usb.CheckFreePortOnNodeExcludingLocalUSBs(ctx, h.client, vm.Status.Node, usbDevice.Status.Attributes.Speed) + if err != nil { + if apierrors.IsNotFound(err) { + log.Debug("node not found while checking free USBIP ports", "device", deviceName, "node", vm.Status.Node) + nextStatusRefs = append(nextStatusRefs, h.buildDetachedStatus(existingStatus, deviceName, isReady)) + continue + } + + return reconcile.Result{RequeueAfter: 5 * time.Second}, fmt.Errorf("failed to check free USBIP ports for device %s on node %s: %w", deviceName, vm.Status.Node, err) + } + if !hasFreePort { + log.Info("no free USBIP ports available", "device", deviceName, "speed", usbDevice.Status.Attributes.Speed, "node", vm.Status.Node) + nextStatusRefs = append(nextStatusRefs, h.buildDetachedStatus(existingStatus, deviceName, isReady)) + changed.Status.USBDevices = nextStatusRefs + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + } + requestName := h.getResourceClaimRequestName(deviceName) err := h.attachUSBDevice(ctx, vm, deviceName, templateName, requestName) if err != nil && !apierrors.IsAlreadyExists(err) && !strings.Contains(err.Error(), "already exists") { diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler_test.go b/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler_test.go index 6bf8831ba7..14de18a5cc 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler_test.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_attach_handler_test.go @@ -20,10 +20,12 @@ import ( "context" "errors" "log/slog" + "strconv" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" resourcev1 "k8s.io/api/resource/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/deckhouse/virtualization-controller/pkg/common/annotations" "github.com/deckhouse/virtualization-controller/pkg/controller/reconciler" "github.com/deckhouse/virtualization-controller/pkg/controller/vm/internal/state" "github.com/deckhouse/virtualization-controller/pkg/logger" @@ -152,7 +155,7 @@ var _ = Describe("USBDeviceAttachHandler", func() { return &v1alpha2.USBDevice{ ObjectMeta: usbMeta, Status: v1alpha2.USBDeviceStatus{ - Attributes: v1alpha2.NodeUSBDeviceAttributes{Name: attributeName, VendorID: vid, ProductID: productID}, + Attributes: v1alpha2.NodeUSBDeviceAttributes{Name: attributeName, VendorID: vid, ProductID: productID, Speed: 480}, NodeName: "node-1", Conditions: conds, }, @@ -172,6 +175,19 @@ var _ = Describe("USBDeviceAttachHandler", func() { } } + newNode := func(name string, usedHSPorts, usedSSPorts int) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: map[string]string{ + annotations.AnnUSBIPTotalPorts: "8", + annotations.AnnUSBIPHighSpeedHubUsedPorts: strconv.Itoa(usedHSPorts), + annotations.AnnUSBIPSuperSpeedHubUsedPorts: strconv.Itoa(usedSSPorts), + }, + }, + } + } + runHandle := func(vm *v1alpha2.VirtualMachine, objs ...client.Object) (reconcile.Result, *reconciler.Resource[*v1alpha2.VirtualMachine, v1alpha2.VirtualMachineStatus], state.VirtualMachineState, error) { fakeClient, vmResource, vmState = setupEnvironment(vm, objs...) handler = NewUSBDeviceAttachHandler(fakeClient, mockVirtCl) @@ -481,6 +497,97 @@ var _ = Describe("USBDeviceAttachHandler", func() { Expect(status.Ready).To(BeTrue()) }) + It("requeues on every reconcile while cross-node USBIP ports remain exhausted", func() { + vm := newVM(v1alpha2.MachineRunning) + vm.Status.Node = "node-2" + + result, _, _, err := runHandle( + vm, + newUSBDevice(true, usbDeviceName, false), + newResourceClaimTemplate(), + newNode("node-2", 4, 0), + ) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + + mockVM := mockVirtCl.vmClients[vmNamespace] + Expect(mockVM.addResourceClaimCalls).To(HaveLen(0)) + status := expectSingleUSBStatus() + Expect(status.Attached).To(BeFalse()) + Expect(status.Ready).To(BeTrue()) + + vm.Status.USBDevices = vmResource.Changed().Status.USBDevices + + result, _, _, err = runHandle( + vm, + newUSBDevice(true, usbDeviceName, false), + newResourceClaimTemplate(), + newNode("node-2", 4, 0), + ) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + Expect(mockVM.addResourceClaimCalls).To(HaveLen(0)) + status = expectSingleUSBStatus() + Expect(status.Attached).To(BeFalse()) + Expect(status.Ready).To(BeTrue()) + }) + + It("returns error when USBIP port availability check fails unexpectedly", func() { + vm := newVM(v1alpha2.MachineRunning) + vm.Status.Node = "node-2" + + badNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-2", + Annotations: map[string]string{ + annotations.AnnUSBIPTotalPorts: "invalid", + annotations.AnnUSBIPHighSpeedHubUsedPorts: "0", + annotations.AnnUSBIPSuperSpeedHubUsedPorts: "0", + }, + }, + } + + result, _, _, err := runHandle( + vm, + newUSBDevice(true, usbDeviceName, false), + newResourceClaimTemplate(), + badNode, + ) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to check free USBIP ports for device")) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) + }) + + It("skips port checks once KVVMI already reflects the host device", func() { + vm := newVM(v1alpha2.MachineRunning, v1alpha2.USBDeviceStatusRef{ + Name: usbDeviceName, + Attached: false, + Ready: true, + }) + vm.Status.Node = "node-2" + + hostDeviceStatus := virtv1.DeviceStatusInfo{ + Name: usbDeviceName, + Phase: virtv1.DevicePending, + } + + result, _, _, err := runHandle( + vm, + newUSBDevice(true, usbDeviceName, false), + newResourceClaimTemplate(), + newKVVMI(hostDeviceStatus), + newNode("node-2", 4, 0), + ) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(reconcile.Result{})) + + mockVM := mockVirtCl.vmClients[vmNamespace] + Expect(mockVM.addResourceClaimCalls).To(HaveLen(1)) + status := expectSingleUSBStatus() + Expect(status.Attached).To(BeFalse()) + Expect(status.Ready).To(BeTrue()) + }) + It("clears USB status on VM deletion", func() { now := metav1.NewTime(time.Now()) vm := &v1alpha2.VirtualMachine{ diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_handler.go b/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_handler.go index c4f0000232..19ec956bdb 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_handler.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/usb_device_handler.go @@ -81,10 +81,11 @@ func (h *usbDeviceHandlerBase) isUSBDeviceReady(usbDevice *v1alpha2.USBDevice) b return found && readyCondition.Status == metav1.ConditionTrue } -func (h *usbDeviceHandlerBase) hostDeviceReadyByName(kvvmi *virtv1.VirtualMachineInstance) map[string]bool { - hostDeviceReadyByName := make(map[string]bool) +func (h *usbDeviceHandlerBase) hostDeviceMapsByName(kvvmi *virtv1.VirtualMachineInstance) (map[string]struct{}, map[string]struct{}) { + hostDeviceReadyByName := make(map[string]struct{}) + hostDeviceExistsByName := make(map[string]struct{}) if kvvmi == nil || kvvmi.Status.DeviceStatus == nil { - return hostDeviceReadyByName + return hostDeviceReadyByName, hostDeviceExistsByName } for _, hostDeviceStatus := range kvvmi.Status.DeviceStatus.HostDeviceStatuses { @@ -92,10 +93,13 @@ func (h *usbDeviceHandlerBase) hostDeviceReadyByName(kvvmi *virtv1.VirtualMachin continue } - hostDeviceReadyByName[hostDeviceStatus.Name] = hostDeviceReadyByName[hostDeviceStatus.Name] || hostDeviceStatus.Phase == virtv1.DeviceReady + hostDeviceExistsByName[hostDeviceStatus.Name] = struct{}{} + if hostDeviceStatus.Phase == virtv1.DeviceReady { + hostDeviceReadyByName[hostDeviceStatus.Name] = struct{}{} + } } - return hostDeviceReadyByName + return hostDeviceReadyByName, hostDeviceExistsByName } func (h *usbDeviceHandlerBase) attachUSBDevice( diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator.go b/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator.go index a161f3c7a4..9abc97dd5e 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator.go @@ -19,19 +19,15 @@ package validators import ( "context" "fmt" - "strconv" - "strings" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/component-base/featuregate" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" - "github.com/deckhouse/virtualization-controller/pkg/common/annotations" + "github.com/deckhouse/virtualization-controller/pkg/common/usb" "github.com/deckhouse/virtualization-controller/pkg/controller/indexer" "github.com/deckhouse/virtualization-controller/pkg/featuregates" - "github.com/deckhouse/virtualization-controller/pkg/kubeapi" "github.com/deckhouse/virtualization/api/core/v1alpha2" ) @@ -145,13 +141,6 @@ func getUSBDeviceNames(refs []v1alpha2.USBDeviceSpecRef) map[string]struct{} { } func (v *USBDevicesValidator) validateAvailableUSBIPPorts(ctx context.Context, vm *v1alpha2.VirtualMachine, oldUSBDevices map[string]struct{}) (admission.Warnings, error) { - if kubeapi.HasDRAPartitionableDevices() { - return v.validateAvailableUSBIPPortsWithPartitionableDevices(ctx, vm, oldUSBDevices) - } - return v.validateAvailableUSBIPPortsDefault(ctx, vm, oldUSBDevices) -} - -func (v *USBDevicesValidator) validateAvailableUSBIPPortsWithPartitionableDevices(ctx context.Context, vm *v1alpha2.VirtualMachine, oldUSBDevices map[string]struct{}) (admission.Warnings, error) { if vm.Status.Node == "" { return admission.Warnings{}, nil } @@ -177,9 +166,9 @@ func (v *USBDevicesValidator) validateAvailableUSBIPPortsWithPartitionableDevice continue } - isHs, isSS := resolveSpeed(usbDevice.Status.Attributes.Speed) + isHS, isSS := usb.ResolveSpeed(usbDevice.Status.Attributes.Speed) switch { - case isHs: + case isHS: hsUSBFromOtherNodes = append(hsUSBFromOtherNodes, ref.Name) case isSS: ssUSBFromOtherNodes = append(ssUSBFromOtherNodes, ref.Name) @@ -192,107 +181,25 @@ func (v *USBDevicesValidator) validateAvailableUSBIPPortsWithPartitionableDevice return admission.Warnings{}, nil } - node, totalPorts, err := v.getNodeTotalPorts(ctx, vm.Status.Node) - if err != nil { - return admission.Warnings{}, err - } - - totalPortsPerHub := totalPorts / 2 - if len(hsUSBFromOtherNodes) > 0 { - if err = validateUsedPortsByAnnotation(node, annotations.AnnUSBIPHighSpeedHubUsedPorts, hsUSBFromOtherNodes, totalPortsPerHub); err != nil { + hasFree, err := usb.CheckFreePortForRequestOnNodeExcludingLocalUSBs(ctx, v.client, vm.Status.Node, 480, len(hsUSBFromOtherNodes)) + if err != nil { return admission.Warnings{}, err } - } - - if len(ssUSBFromOtherNodes) > 0 { - if err = validateUsedPortsByAnnotation(node, annotations.AnnUSBIPSuperSpeedHubUsedPorts, ssUSBFromOtherNodes, totalPortsPerHub); err != nil { - return admission.Warnings{}, err + if !hasFree { + return admission.Warnings{}, fmt.Errorf("node %s has no available ports for sharing USB devices %v", vm.Status.Node, hsUSBFromOtherNodes) } } - return admission.Warnings{}, nil -} - -func (v *USBDevicesValidator) getNodeTotalPorts(ctx context.Context, nodeName string) (*corev1.Node, int, error) { - node := &corev1.Node{} - err := v.client.Get(ctx, client.ObjectKey{Name: nodeName}, node) - if err != nil { - return nil, -1, fmt.Errorf("failed to get node %s: %w", nodeName, err) - } - - totalPorts, exists := node.Annotations[annotations.AnnUSBIPTotalPorts] - if !exists { - return nil, -1, fmt.Errorf("node %s does not have %s annotation", nodeName, annotations.AnnUSBIPTotalPorts) - } - totalPortsInt, err := strconv.Atoi(totalPorts) - if err != nil { - return nil, -1, fmt.Errorf("failed to parse %s annotation: %w", annotations.AnnUSBIPTotalPorts, err) - } - - return node, totalPortsInt, nil -} - -func validateUsedPortsByAnnotation(node *corev1.Node, anno string, usbFromOtherNodes []string, totalPortsPerHub int) error { - usedPorts, exists := node.Annotations[anno] - if !exists { - return fmt.Errorf("node %s does not have %s annotation", node.Name, anno) - } - usedPortsInt, err := strconv.Atoi(usedPorts) - if err != nil { - return fmt.Errorf("failed to parse %s annotation: %w", anno, err) - } - - wantedPorts := usedPortsInt + len(usbFromOtherNodes) - if wantedPorts > totalPortsPerHub { - return fmt.Errorf("node %s not available ports for sharing USB devices %s. total: %d, used: %d, wanted: %d", node.Name, strings.Join(usbFromOtherNodes, ", "), totalPortsPerHub, usedPortsInt, wantedPorts) - } - - return nil -} - -// https://mjmwired.net/kernel/Documentation/ABI/testing/sysfs-bus-usb#502 -func resolveSpeed(speed int) (isHs, isSS bool) { - return speed == 480, speed >= 5000 -} - -func (v *USBDevicesValidator) validateAvailableUSBIPPortsDefault(ctx context.Context, vm *v1alpha2.VirtualMachine, oldUSBDevices map[string]struct{}) (admission.Warnings, error) { - if vm.Status.Node == "" { - return admission.Warnings{}, nil - } - if vm.Spec.USBDevices == nil { - return admission.Warnings{}, nil - } - - var usbFromOtherNodes []string - - for _, ref := range vm.Spec.USBDevices { - if _, exists := oldUSBDevices[ref.Name]; exists { - continue - } - - usbDevice := &v1alpha2.USBDevice{} - err := v.client.Get(ctx, client.ObjectKey{Name: ref.Name, Namespace: vm.Namespace}, usbDevice) + if len(ssUSBFromOtherNodes) > 0 { + hasFree, err := usb.CheckFreePortForRequestOnNodeExcludingLocalUSBs(ctx, v.client, vm.Status.Node, 5000, len(ssUSBFromOtherNodes)) if err != nil { - return admission.Warnings{}, fmt.Errorf("failed to get USB device %s: %w", ref.Name, err) + return admission.Warnings{}, err } - - if usbDevice.Status.NodeName != vm.Status.Node { - usbFromOtherNodes = append(usbFromOtherNodes, ref.Name) + if !hasFree { + return admission.Warnings{}, fmt.Errorf("node %s has no available ports for sharing USB devices %v", vm.Status.Node, ssUSBFromOtherNodes) } } - if len(usbFromOtherNodes) == 0 { - return admission.Warnings{}, nil - } - - node, totalPorts, err := v.getNodeTotalPorts(ctx, vm.Status.Node) - if err != nil { - return admission.Warnings{}, err - } - - // total for 2 usb hubs (2.0 and 3.0) - totalPorts /= 2 - - return admission.Warnings{}, validateUsedPortsByAnnotation(node, annotations.AnnUSBIPUsedPorts, usbFromOtherNodes, totalPorts) + return admission.Warnings{}, nil } diff --git a/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator_test.go b/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator_test.go index ad2ad3934e..1b2f4c8a0d 100644 --- a/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator_test.go +++ b/images/virtualization-artifact/pkg/controller/vm/internal/validators/usb_devices_validator_test.go @@ -20,6 +20,7 @@ import ( "strings" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/component-base/featuregate" @@ -111,6 +112,32 @@ func TestUSBDevicesValidatorValidateCreateSucceedsWhenUSBFeatureEnabled(t *testi } } +func TestUSBDevicesValidatorValidateUpdateExcludesLocalUSBsFromPortAccounting(t *testing.T) { + oldVM := newVirtualMachine("vm-current", []v1alpha2.USBDeviceSpecRef{{Name: "usb-local"}}) + oldVM.Status.Node = "node-1" + oldVM.Status.USBDevices = []v1alpha2.USBDeviceStatusRef{{Name: "usb-local", Attached: true}} + + newVM := oldVM.DeepCopy() + newVM.Spec.USBDevices = []v1alpha2.USBDeviceSpecRef{{Name: "usb-local"}, {Name: "usb-remote"}} + + objects := []client.Object{ + oldVM, + &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Annotations: map[string]string{ + "usb.virtualization.deckhouse.io/usbip-total-ports": "2", + "usb.virtualization.deckhouse.io/usbip-high-speed-hub-used-ports": "1", + "usb.virtualization.deckhouse.io/usbip-super-speed-hub-used-ports": "0", + }}}, + &v1alpha2.USBDevice{ObjectMeta: metav1.ObjectMeta{Name: "usb-local", Namespace: "default"}, Status: v1alpha2.USBDeviceStatus{NodeName: "node-1", Attributes: v1alpha2.NodeUSBDeviceAttributes{Speed: 480}}}, + &v1alpha2.USBDevice{ObjectMeta: metav1.ObjectMeta{Name: "usb-remote", Namespace: "default"}, Status: v1alpha2.USBDeviceStatus{NodeName: "node-2", Attributes: v1alpha2.NodeUSBDeviceAttributes{Speed: 480}}}, + } + + validator := NewUSBDevicesValidator(newFakeClientWithUSBVMIndexer(t, objects...), newUSBFeatureGate(t, true)) + _, err := validator.ValidateUpdate(t.Context(), oldVM, newVM) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } +} + func TestUSBDevicesValidatorValidateUpdateReturnsErrorWhenUSBFeatureDisabled(t *testing.T) { oldVM := newVirtualMachine("vm-current", nil) newVM := newVirtualMachine("vm-current", []v1alpha2.USBDeviceSpecRef{{Name: "usb-1"}}) @@ -133,12 +160,17 @@ func newFakeClientWithUSBVMIndexer(t *testing.T, objects ...client.Object) clien if err := v1alpha2.AddToScheme(scheme); err != nil { t.Fatalf("failed to add virtualization API scheme: %v", err) } + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add core API scheme: %v", err) + } vmObj, vmField, vmExtractValue := indexer.IndexVMByUSBDevice() + vmNodeObj, vmNodeField, vmNodeExtractValue := indexer.IndexVMByNode() return fake.NewClientBuilder(). WithScheme(scheme). WithObjects(objects...). WithIndex(vmObj, vmField, vmExtractValue). + WithIndex(vmNodeObj, vmNodeField, vmNodeExtractValue). Build() }