diff --git a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml index faf854cf1..d2ffe3c61 100644 --- a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml +++ b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml @@ -520,4 +520,40 @@ spec: requested by the nova flavor extra specs, like `{"arch": "x86_64", "maxphysaddr:bits": 46, ...}`. weighers: [] +--- +apiVersion: cortex.cloud/v1alpha1 +kind: Pipeline +metadata: + name: kvm-report-capacity +spec: + schedulingDomain: nova + description: | + This pipeline is used by the Liquid capacity reporter to determine the + theoretical maximum capacity of each flavor group per availability zone, + as if all hosts were completely empty. It ignores current VM allocations + and all reservation blockings so that only raw hardware capacity is + considered. + type: filter-weigher + createDecisions: false + # Fetch all placement candidates, ignoring nova's preselection. + ignorePreselection: true + filters: + - name: filter_correct_az + description: | + Restricts host candidates to the requested availability zone. + - name: filter_has_enough_capacity + description: | + Filters hosts that cannot fit the flavor based on raw hardware capacity. + VM allocations and all reservation types are ignored to represent an + empty datacenter scenario. + params: + - {key: ignoreAllocations, boolValue: true} + - {key: ignoredReservationTypes, stringListValue: ["CommittedResourceReservation", "FailoverReservation"]} + - name: filter_has_requested_traits + description: | + Ensures hosts have the hardware traits required by the flavor. + - name: filter_status_conditions + description: | + Excludes hosts that are not ready or are disabled. + weighers: [] {{- end }} diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index ea37e8c11..0a70f2063 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -26,6 +26,10 @@ type FilterHasEnoughCapacityOpts struct { // When a reservation type is in this list, its capacity is not blocked. // Default: empty (all reservation types are considered) IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignoredReservationTypes,omitempty"` + + // IgnoreAllocations skips subtracting current VM allocations from host capacity. + // When true, only raw hardware capacity is considered (empty datacenter scenario). + IgnoreAllocations bool `json:"ignoreAllocations,omitempty"` } func (FilterHasEnoughCapacityOpts) Validate() error { return nil } @@ -71,18 +75,20 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa freeResourcesByHost[hv.Name] = hv.Status.EffectiveCapacity } - // Subtract allocated resources. - for resourceName, allocated := range hv.Status.Allocation { - free, ok := freeResourcesByHost[hv.Name][resourceName] - if !ok { - traceLog.Error( - "hypervisor with allocation for unknown resource", - "host", hv.Name, "resource", resourceName, - ) - continue + // Subtract allocated resources (skip when ignoring allocations for empty-datacenter capacity queries). + if !s.Options.IgnoreAllocations { + for resourceName, allocated := range hv.Status.Allocation { + free, ok := freeResourcesByHost[hv.Name][resourceName] + if !ok { + traceLog.Error( + "hypervisor with allocation for unknown resource", + "host", hv.Name, "resource", resourceName, + ) + continue + } + free.Sub(allocated) + freeResourcesByHost[hv.Name][resourceName] = free } - free.Sub(allocated) - freeResourcesByHost[hv.Name][resourceName] = free } } diff --git a/internal/scheduling/reservations/commitments/api.go b/internal/scheduling/reservations/commitments/api.go index 9d8fd5944..e72e5b3f8 100644 --- a/internal/scheduling/reservations/commitments/api.go +++ b/internal/scheduling/reservations/commitments/api.go @@ -32,7 +32,7 @@ func NewAPIWithConfig(client client.Client, config Config) *HTTPAPI { func (api *HTTPAPI) Init(mux *http.ServeMux) { mux.HandleFunc("/v1/commitments/change-commitments", api.HandleChangeCommitments) - // mux.HandleFunc("/v1/report-capacity", api.HandleReportCapacity) + mux.HandleFunc("/v1/report-capacity", api.HandleReportCapacity) mux.HandleFunc("/v1/commitments/info", api.HandleInfo) } diff --git a/internal/scheduling/reservations/commitments/api_report_capacity_test.go b/internal/scheduling/reservations/commitments/api_report_capacity_test.go index 76140e218..de7a35cd3 100644 --- a/internal/scheduling/reservations/commitments/api_report_capacity_test.go +++ b/internal/scheduling/reservations/commitments/api_report_capacity_test.go @@ -17,7 +17,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + novaapi "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" ) func TestHandleReportCapacity(t *testing.T) { @@ -283,3 +285,304 @@ func createTestFlavorGroupKnowledge(t *testing.T, groupName string) *v1alpha1.Kn }, } } + +func TestCapacityCalculatorWithScheduler(t *testing.T) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + + const ( + flavorGroup = "test-group" + az = "az-a" + flavorMemMB = uint64(32768) + flavorVCPUs = uint64(8) + ) + + flavorGroupKnowledge := createTestFlavorGroupKnowledgeWithSmallest(t, flavorGroup, flavorMemMB, flavorVCPUs) + hostDetailsKnowledge := createTestHostDetailsKnowledge(t, map[string]string{ + "host-1": az, + "host-2": az, + }) + + t.Run("computes capacity and usage via two scheduler calls", func(t *testing.T) { + // kvm-report-capacity returns 5 hosts (total capacity). + // kvm-general-purpose-load-balancing-all-filters-enabled returns 3 hosts (currently available). + // usage = 5 - 3 = 2. + server := newPipelineMockSchedulerServer(t, map[string][]string{ + "kvm-report-capacity": {"h1", "h2", "h3", "h4", "h5"}, + "kvm-general-purpose-load-balancing-all-filters-enabled": {"h1", "h2", "h3"}, + }) + defer server.Close() + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupKnowledge, hostDetailsKnowledge). + Build() + + calculator := &CapacityCalculator{ + client: fakeClient, + schedulerClient: reservations.NewSchedulerClient(server.URL), + } + + report, err := calculator.CalculateCapacity(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + res, ok := report.Resources[liquid.ResourceName("ram_"+flavorGroup)] + if !ok { + t.Fatal("expected ram_test-group resource") + } + azReport, ok := res.PerAZ[liquid.AvailabilityZone(az)] + if !ok { + t.Fatalf("expected %s in perAZ", az) + } + + if azReport.Capacity != 5 { + t.Errorf("expected capacity = 5, got %d", azReport.Capacity) + } + usageVal, ok := azReport.Usage.Unpack() + if !ok { + t.Fatal("expected usage to be set") + } + if usageVal != 2 { + t.Errorf("expected usage = 2, got %d", usageVal) + } + }) + + t.Run("usage is zero when total equals currently available", func(t *testing.T) { + server := newPipelineMockSchedulerServer(t, map[string][]string{ + "kvm-report-capacity": {"h1", "h2"}, + "kvm-general-purpose-load-balancing-all-filters-enabled": {"h1", "h2"}, + }) + defer server.Close() + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupKnowledge, hostDetailsKnowledge). + Build() + + calculator := &CapacityCalculator{ + client: fakeClient, + schedulerClient: reservations.NewSchedulerClient(server.URL), + } + + report, err := calculator.CalculateCapacity(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + azReport := report.Resources[liquid.ResourceName("ram_"+flavorGroup)].PerAZ[liquid.AvailabilityZone(az)] + usageVal, ok := azReport.Usage.Unpack() + if !ok { + t.Fatal("expected usage to be set") + } + if usageVal != 0 { + t.Errorf("expected usage = 0, got %d", usageVal) + } + }) + + t.Run("usage is clamped to zero when currently available exceeds total", func(t *testing.T) { + // Pathological: currently-available call returns more hosts than total capacity call. + server := newPipelineMockSchedulerServer(t, map[string][]string{ + "kvm-report-capacity": {"h1"}, + "kvm-general-purpose-load-balancing-all-filters-enabled": {"h1", "h2", "h3"}, + }) + defer server.Close() + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupKnowledge, hostDetailsKnowledge). + Build() + + calculator := &CapacityCalculator{ + client: fakeClient, + schedulerClient: reservations.NewSchedulerClient(server.URL), + } + + report, err := calculator.CalculateCapacity(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + azReport := report.Resources[liquid.ResourceName("ram_"+flavorGroup)].PerAZ[liquid.AvailabilityZone(az)] + usageVal, ok := azReport.Usage.Unpack() + if !ok { + t.Fatal("expected usage to be set") + } + if usageVal != 0 { + t.Errorf("expected usage = 0 (clamped), got %d", usageVal) + } + }) + + t.Run("scheduler failure yields empty AZ report without aborting", func(t *testing.T) { + failServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "internal error", http.StatusInternalServerError) + })) + defer failServer.Close() + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupKnowledge, hostDetailsKnowledge). + Build() + + calculator := &CapacityCalculator{ + client: fakeClient, + schedulerClient: reservations.NewSchedulerClient(failServer.URL), + } + + report, err := calculator.CalculateCapacity(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + res, ok := report.Resources[liquid.ResourceName("ram_"+flavorGroup)] + if !ok { + t.Fatal("expected resource to exist") + } + azReport := res.PerAZ[liquid.AvailabilityZone(az)] + if azReport == nil { + t.Fatal("expected non-nil AZ report on scheduler failure") + } + if azReport.Capacity != 0 { + t.Errorf("expected capacity = 0 on failure, got %d", azReport.Capacity) + } + }) + + t.Run("multiple AZs are reported independently", func(t *testing.T) { + twoAZHostDetails := createTestHostDetailsKnowledge(t, map[string]string{ + "host-1": "az-a", + "host-2": "az-b", + }) + // Both calls always return 3 hosts regardless of AZ (pipeline-routing mock). + server := newPipelineMockSchedulerServer(t, map[string][]string{ + "kvm-report-capacity": {"h1", "h2", "h3"}, + "kvm-general-purpose-load-balancing-all-filters-enabled": {"h1"}, + }) + defer server.Close() + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(flavorGroupKnowledge, twoAZHostDetails). + Build() + + calculator := &CapacityCalculator{ + client: fakeClient, + schedulerClient: reservations.NewSchedulerClient(server.URL), + } + + report, err := calculator.CalculateCapacity(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + res := report.Resources[liquid.ResourceName("ram_"+flavorGroup)] + if len(res.PerAZ) != 2 { + t.Errorf("expected 2 AZs, got %d", len(res.PerAZ)) + } + if _, ok := res.PerAZ[liquid.AvailabilityZone("az-a")]; !ok { + t.Error("expected az-a in report") + } + if _, ok := res.PerAZ[liquid.AvailabilityZone("az-b")]; !ok { + t.Error("expected az-b in report") + } + }) +} + +// newPipelineMockSchedulerServer starts a test HTTP server that returns different +// host lists depending on the pipeline name in the request body. +func newPipelineMockSchedulerServer(t *testing.T, hostsByPipeline map[string][]string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req novaapi.ExternalSchedulerRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "bad request", http.StatusBadRequest) + return + } + hosts := hostsByPipeline[req.Pipeline] + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(novaapi.ExternalSchedulerResponse{Hosts: hosts}); err != nil { + t.Errorf("mock scheduler: encode error: %v", err) + } + })) +} + +// createTestFlavorGroupKnowledgeWithSmallest creates a Knowledge CRD where smallestFlavor +// is explicitly set so the capacity calculator uses the correct memory unit. +func createTestFlavorGroupKnowledgeWithSmallest(t *testing.T, groupName string, memMB, vcpus uint64) *v1alpha1.Knowledge { + t.Helper() + + features := []map[string]interface{}{ + { + "name": groupName, + "flavors": []map[string]interface{}{ + { + "name": "test_flavor", + "vcpus": vcpus, + "memoryMB": memMB, + "diskGB": 50, + }, + }, + "smallestFlavor": map[string]interface{}{ + "name": "test_flavor", + "vcpus": vcpus, + "memoryMB": memMB, + "diskGB": 50, + }, + "largestFlavor": map[string]interface{}{ + "name": "test_flavor", + "vcpus": vcpus, + "memoryMB": memMB, + "diskGB": 50, + }, + }, + } + + raw, err := v1alpha1.BoxFeatureList(features) + if err != nil { + t.Fatal(err) + } + + return &v1alpha1.Knowledge{ + ObjectMeta: v1.ObjectMeta{Name: "flavor-groups"}, + Spec: v1alpha1.KnowledgeSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Extractor: v1alpha1.KnowledgeExtractorSpec{Name: "flavor_groups"}, + }, + Status: v1alpha1.KnowledgeStatus{ + Conditions: []v1.Condition{{Type: v1alpha1.KnowledgeConditionReady, Status: "True"}}, + Raw: raw, + }, + } +} + +// createTestHostDetailsKnowledge creates a Knowledge CRD with host→AZ mappings. +func createTestHostDetailsKnowledge(t *testing.T, hostToAZ map[string]string) *v1alpha1.Knowledge { + t.Helper() + + features := make([]map[string]interface{}, 0, len(hostToAZ)) + for host, az := range hostToAZ { + features = append(features, map[string]interface{}{ + "computeHost": host, + "availabilityZone": az, + }) + } + + raw, err := v1alpha1.BoxFeatureList(features) + if err != nil { + t.Fatal(err) + } + + return &v1alpha1.Knowledge{ + ObjectMeta: v1.ObjectMeta{Name: "host-details"}, + Spec: v1alpha1.KnowledgeSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Extractor: v1alpha1.KnowledgeExtractorSpec{Name: "host_details"}, + }, + Status: v1alpha1.KnowledgeStatus{ + Conditions: []v1.Condition{{Type: v1alpha1.KnowledgeConditionReady, Status: "True"}}, + Raw: raw, + }, + } +} diff --git a/internal/scheduling/reservations/commitments/capacity.go b/internal/scheduling/reservations/commitments/capacity.go index 04ad177e1..0d7b50a36 100644 --- a/internal/scheduling/reservations/commitments/capacity.go +++ b/internal/scheduling/reservations/commitments/capacity.go @@ -7,7 +7,9 @@ import ( "context" "fmt" "sort" + "time" + . "github.com/majewsky/gg/option" "github.com/sapcc/go-api-declarations/liquid" "sigs.k8s.io/controller-runtime/pkg/client" @@ -18,11 +20,16 @@ import ( // CapacityCalculator computes capacity reports for Limes LIQUID API. type CapacityCalculator struct { - client client.Client + client client.Client + schedulerClient *reservations.SchedulerClient } func NewCapacityCalculator(client client.Client) *CapacityCalculator { - return &CapacityCalculator{client: client} + schedulerClient := reservations.NewSchedulerClient("http://localhost:8080/scheduler/nova/external") + return &CapacityCalculator{ + client: client, + schedulerClient: schedulerClient, + } } // CalculateCapacity computes per-AZ capacity for all flavor groups. @@ -59,61 +66,70 @@ func (c *CapacityCalculator) CalculateCapacity(ctx context.Context) (liquid.Serv func (c *CapacityCalculator) calculateAZCapacity( ctx context.Context, - _ string, // groupName - reserved for future use - _ compute.FlavorGroupFeature, // groupData - reserved for future use + groupName string, + groupData compute.FlavorGroupFeature, ) (map[liquid.AvailabilityZone]*liquid.AZResourceCapacityReport, error) { - // Get list of availability zones from HostDetails Knowledge + azs, err := c.getAvailabilityZones(ctx) if err != nil { return nil, fmt.Errorf("failed to get availability zones: %w", err) } - // Create report entry for each AZ with empty capacity/usage - // Capacity and Usage are left unset (zero value of option.Option[uint64]) - // This signals to Limes: "These AZs exist, but capacity/usage not yet calculated" result := make(map[liquid.AvailabilityZone]*liquid.AZResourceCapacityReport) for _, az := range azs { + capacity, usage, err := c.calculateInstanceCapacity(ctx, groupName, groupData, az) + if err != nil { + // Log error but continue with empty values for this AZ + result[liquid.AvailabilityZone(az)] = &liquid.AZResourceCapacityReport{} + continue + } + result[liquid.AvailabilityZone(az)] = &liquid.AZResourceCapacityReport{ - // Both Capacity and Usage left unset (empty optional values) - // TODO: Calculate actual capacity from Reservation CRDs or host resources - // TODO: Calculate actual usage from VM allocations + Capacity: capacity, + Usage: Some(usage), } } return result, nil } -func (c *CapacityCalculator) getAvailabilityZones(ctx context.Context) ([]string, error) { - // List all Knowledge CRDs to find host-details knowledge +// getHostAZMap returns a map from compute host name to availability zone. +func (c *CapacityCalculator) getHostAZMap(ctx context.Context) (map[string]string, error) { var knowledgeList v1alpha1.KnowledgeList if err := c.client.List(ctx, &knowledgeList); err != nil { return nil, fmt.Errorf("failed to list Knowledge CRDs: %w", err) } - // Find host-details knowledge and extract AZs - azSet := make(map[string]struct{}) + hostAZMap := make(map[string]string) for _, knowledge := range knowledgeList.Items { - // Look for host-details extractor if knowledge.Spec.Extractor.Name != "host_details" { continue } - - // Parse features from Raw data features, err := v1alpha1.UnboxFeatureList[compute.HostDetails](knowledge.Status.Raw) if err != nil { - // Skip if we can't parse this knowledge continue } - - // Collect unique AZ names for _, feature := range features { - if feature.AvailabilityZone != "" { - azSet[feature.AvailabilityZone] = struct{}{} + if feature.ComputeHost != "" && feature.AvailabilityZone != "" { + hostAZMap[feature.ComputeHost] = feature.AvailabilityZone } } } - // Convert set to sorted slice + return hostAZMap, nil +} + +func (c *CapacityCalculator) getAvailabilityZones(ctx context.Context) ([]string, error) { + hostAZMap, err := c.getHostAZMap(ctx) + if err != nil { + return nil, err + } + + azSet := make(map[string]struct{}) + for _, az := range hostAZMap { + azSet[az] = struct{}{} + } + azs := make([]string, 0, len(azSet)) for az := range azSet { azs = append(azs, az) @@ -122,3 +138,58 @@ func (c *CapacityCalculator) getAvailabilityZones(ctx context.Context) ([]string return azs, nil } + +// calculateInstanceCapacity returns the total capacity and current usage for a flavor group in an AZ. +// Capacity is expressed in multiples of the smallest flavor's memory. +// Total capacity is derived directly from Hypervisor CRDs (as if everything were empty). +// Currently available is derived from the scheduler (respecting current VM and reservation state). +// Usage = totalCapacity - currentlyAvailable. +func (c *CapacityCalculator) calculateInstanceCapacity( + ctx context.Context, + groupName string, + groupData compute.FlavorGroupFeature, + az string, +) (capacity, usage uint64, err error) { + + smallestFlavor := groupData.SmallestFlavor + + // Request 1: currently available — how many instances can be placed right now. + currentResp, err := c.schedulerClient.ScheduleReservation(ctx, reservations.ScheduleReservationRequest{ + InstanceUUID: fmt.Sprintf("capacity-current-%s-%s-%d", groupName, az, time.Now().UnixNano()), + ProjectID: "cortex-capacity-check", + FlavorName: smallestFlavor.Name, + MemoryMB: smallestFlavor.MemoryMB, + VCPUs: smallestFlavor.VCPUs, + FlavorExtraSpecs: map[string]string{"hw_version": groupName}, + AvailabilityZone: az, + Pipeline: "kvm-general-purpose-load-balancing-all-filters-enabled", + }) + if err != nil { + return 0, 0, fmt.Errorf("failed to get current available capacity: %w", err) + } + currentlyAvailable := uint64(len(currentResp.Hosts)) + + // Request 2: total capacity — hosts eligible if everything were empty. + // Uses a dedicated pipeline that ignores VM allocations and all reservations. + totalResp, err := c.schedulerClient.ScheduleReservation(ctx, reservations.ScheduleReservationRequest{ + InstanceUUID: fmt.Sprintf("capacity-total-%s-%s-%d", groupName, az, time.Now().UnixNano()), + ProjectID: "cortex-capacity-check", + FlavorName: smallestFlavor.Name, + MemoryMB: smallestFlavor.MemoryMB, + VCPUs: smallestFlavor.VCPUs, + FlavorExtraSpecs: map[string]string{"hw_version": groupName}, + AvailabilityZone: az, + Pipeline: "kvm-report-capacity", + }) + if err != nil { + return 0, 0, fmt.Errorf("failed to get total capacity: %w", err) + } + totalCapacity := uint64(len(totalResp.Hosts)) + + var usageValue uint64 + if totalCapacity >= currentlyAvailable { + usageValue = totalCapacity - currentlyAvailable + } + + return totalCapacity, usageValue, nil +}