From 8759abf961dda4317956b749254b89561afd971b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?p=C3=BDrus?= Date: Mon, 8 Dec 2025 12:29:25 +0100 Subject: [PATCH] lbaas: add a new batch pools members update endpoint support --- go.mod | 1 + go.sum | 4 +-- pkg/openstack/loadbalancer.go | 50 +++++++++++++++++++++++++++++- pkg/openstack/openstack.go | 1 + pkg/util/openstack/loadbalancer.go | 41 ++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index cf994b8c71..7e22c81b56 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( // the below fixes the "go list -m all" execution replace ( + github.com/gophercloud/gophercloud/v2 => github.com/kayrus/gophercloud/v2 v2.10.1-0.20260112125525-25600003eb41 k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.35.0 k8s.io/cri-client => k8s.io/cri-client v0.35.0 k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.35.0 diff --git a/go.sum b/go.sum index 6b0c0421d6..a1318fc559 100644 --- a/go.sum +++ b/go.sum @@ -208,8 +208,6 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gophercloud/gophercloud/v2 v2.8.0 h1:of2+8tT6+FbEYHfYC8GBu8TXJNsXYSNm9KuvpX7Neqo= -github.com/gophercloud/gophercloud/v2 v2.8.0/go.mod h1:Ki/ILhYZr/5EPebrPL9Ej+tUg4lqx71/YH2JWVeU+Qk= github.com/gophercloud/utils/v2 v2.0.0-20250930154317-576cdf6142a7 h1:y0/17y47lq1dD97qEqbbvjsMF8PzLmqmD839aOePRuc= github.com/gophercloud/utils/v2 v2.0.0-20250930154317-576cdf6142a7/go.mod h1:dVCIqYUB0Q8JDbMZaReU6BkAQAS9j3l3Kyc7GuSIztU= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= @@ -237,6 +235,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI= github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= +github.com/kayrus/gophercloud/v2 v2.10.1-0.20260112125525-25600003eb41 h1:xOyZk+OmT61jjR2srqEY3QVy8Fp931Wl0RMYlLID9C0= +github.com/kayrus/gophercloud/v2 v2.10.1-0.20260112125525-25600003eb41/go.mod h1:Ki/ILhYZr/5EPebrPL9Ej+tUg4lqx71/YH2JWVeU+Qk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= diff --git a/pkg/openstack/loadbalancer.go b/pkg/openstack/loadbalancer.go index c67ba1f1b4..fb0a92e73b 100644 --- a/pkg/openstack/loadbalancer.go +++ b/pkg/openstack/loadbalancer.go @@ -89,6 +89,7 @@ const ( ServiceAnnotationLoadBalancerHealthMonitorMaxRetriesDown = "loadbalancer.openstack.org/health-monitor-max-retries-down" ServiceAnnotationLoadBalancerLoadbalancerHostname = "loadbalancer.openstack.org/hostname" ServiceAnnotationLoadBalancerAddress = "loadbalancer.openstack.org/load-balancer-address" + ServiceAnnotationLoadBalancerBatchPoolsMembersUpdate = "loadbalancer.openstack.org/batch-pools-members-update" // revive:disable:var-naming ServiceAnnotationTlsContainerRef = "loadbalancer.openstack.org/default-tls-container-ref" // revive:enable:var-naming @@ -959,7 +960,8 @@ func (lbaas *LbaasV2) ensureOctaviaPool(ctx context.Context, lbID string, name s return nil, err } - if !curMembers.Equal(newMembers) { + batchPoolsMembersUpdate := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerBatchPoolsMembersUpdate, lbaas.opts.BatchPoolsMembersUpdate) + if !curMembers.Equal(newMembers) && !batchPoolsMembersUpdate { klog.V(2).Infof("Updating %d members for pool %s", len(members), pool.ID) if err := openstackutil.BatchUpdatePoolMembers(ctx, lbaas.lb, lbID, pool.ID, members); err != nil { return nil, err @@ -967,6 +969,9 @@ func (lbaas *LbaasV2) ensureOctaviaPool(ctx context.Context, lbID string, name s klog.V(2).Infof("Successfully updated %d members for pool %s", len(members), pool.ID) } + // set the updated members to the pool for further batch update + pool.Members = batchUpdateMemberOptsToMembers(members) + return pool, nil } @@ -1050,6 +1055,20 @@ func (lbaas *LbaasV2) buildBatchUpdateMemberOpts(ctx context.Context, port corev return members, newMembers, nil } +func batchUpdateMemberOptsToMembers(opts []v2pools.BatchUpdateMemberOpts) []v2pools.Member { + members := make([]v2pools.Member, len(opts)) + for i := range opts { + members[i] = v2pools.Member{ + Address: opts[i].Address, + ProtocolPort: opts[i].ProtocolPort, + Name: ptr.Deref(opts[i].Name, ""), + SubnetID: ptr.Deref(opts[i].SubnetID, ""), + MonitorPort: ptr.Deref(opts[i].MonitorPort, 0), + } + } + return members +} + func (lbaas *LbaasV2) buildCreateMemberOpts(ctx context.Context, port corev1.ServicePort, nodes []*corev1.Node, svcConf *serviceConfig) ([]v2pools.CreateMemberOpts, sets.Set[string], error) { batchUpdateMemberOpts, newMembers, err := lbaas.buildBatchUpdateMemberOpts(ctx, port, nodes, svcConf) if err != nil { @@ -1783,6 +1802,8 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName return nil, err } + // a list of pools to update + pools := make([]v2pools.Pool, 0, len(service.Spec.Ports)) for portIndex, port := range service.Spec.Ports { listener, err := lbaas.ensureOctaviaListener(ctx, loadbalancer.ID, cpoutil.Sprintf255(listenerFormat, portIndex, lbName), curListenerMapping, port, svcConf) if err != nil { @@ -1793,6 +1814,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName if err != nil { return nil, err } + pools = append(pools, *pool) if err := lbaas.ensureOctaviaHealthMonitor(ctx, loadbalancer.ID, cpoutil.Sprintf255(monitorFormat, portIndex, lbName), pool, port, svcConf); err != nil { return nil, err @@ -1804,6 +1826,17 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName curListeners = popListener(curListeners, listener.ID) } + batchPoolsMembersUpdate := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerBatchPoolsMembersUpdate, lbaas.opts.BatchPoolsMembersUpdate) + if batchPoolsMembersUpdate { + err := openstackutil.BatchUpdatePoolsMembers(ctx, lbaas.lb, loadbalancer.ID, pools) + if err != nil { + err = PreserveGopherError(err) + msg := fmt.Sprintf("Error updating batch pools members for LoadBalancer: %v", err) + klog.Errorf(msg, "lbID", loadbalancer.ID) + return nil, err + } + } + // Deal with the remaining listeners, delete the listener if it was created by this Service previously. if err := lbaas.deleteOctaviaListeners(ctx, loadbalancer.ID, curListeners, isLBOwner, lbName); err != nil { return nil, err @@ -1935,6 +1968,9 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName lbListeners[key] = l } + // a list of pools to update + pools := make([]v2pools.Pool, 0, len(service.Spec.Ports)) + // Update pool members for each listener. for portIndex, port := range service.Spec.Ports { proto := getListenerProtocol(port.Protocol, svcConf) @@ -1950,6 +1986,7 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName if err != nil { return err } + pools = append(pools, *pool) err = lbaas.ensureOctaviaHealthMonitor(ctx, loadbalancer.ID, cpoutil.Sprintf255(monitorFormat, portIndex, loadbalancer.Name), pool, port, svcConf) if err != nil { @@ -1957,6 +1994,17 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName } } + batchPoolsMembersUpdate := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerBatchPoolsMembersUpdate, lbaas.opts.BatchPoolsMembersUpdate) + if batchPoolsMembersUpdate { + err := openstackutil.BatchUpdatePoolsMembers(ctx, lbaas.lb, loadbalancer.ID, pools) + if err != nil { + err = PreserveGopherError(err) + msg := fmt.Sprintf("Error updating batch pools members for LoadBalancer: %v", err) + klog.Errorf(msg, "lbID", loadbalancer.ID) + return err + } + } + if lbaas.opts.ManageSecurityGroups { err := lbaas.ensureAndUpdateOctaviaSecurityGroup(ctx, clusterName, service, filteredNodes, svcConf) if err != nil { diff --git a/pkg/openstack/openstack.go b/pkg/openstack/openstack.go index ac07d50345..38b80fa43a 100644 --- a/pkg/openstack/openstack.go +++ b/pkg/openstack/openstack.go @@ -120,6 +120,7 @@ type LoadBalancerOpts struct { MaxSharedLB int `gcfg:"max-shared-lb"` // Number of Services in maximum can share a single load balancer. Default 2 ContainerStore string `gcfg:"container-store"` // Used to specify the store of the tls-container-ref ProviderRequiresSerialAPICalls bool `gcfg:"provider-requires-serial-api-calls"` // default false, the provider supports the "bulk update" API call + BatchPoolsMembersUpdate bool `gcfg:"batch-pools-members-update"` // default false, the controller will update all pools members in batch when possible // revive:disable:var-naming TlsContainerRef string `gcfg:"default-tls-container-ref"` // reference to a tls container // revive:enable:var-naming diff --git a/pkg/util/openstack/loadbalancer.go b/pkg/util/openstack/loadbalancer.go index a9225fff44..5f51f21876 100644 --- a/pkg/util/openstack/loadbalancer.go +++ b/pkg/util/openstack/loadbalancer.go @@ -603,6 +603,47 @@ func BatchUpdatePoolMembers(ctx context.Context, client *gophercloud.ServiceClie return nil } +// BatchUpdatePoolsMembers +func BatchUpdatePoolsMembers(ctx context.Context, client *gophercloud.ServiceClient, lbID string, newPools []pools.Pool) error { + mc := metrics.NewMetricContext("loadbalancer_pools_members", "update") + + opts := make([]pools.BatchUpdateMemberOpts, 0, len(newPools)*len(newPools[0].Members)) + for _, p := range newPools { + for _, m := range p.Members { + var name, subnetID *string + if m.Name != "" { + name = &m.Name + } + if m.SubnetID != "" { + subnetID = &m.SubnetID + } + var monitorPort *int + if m.MonitorPort > 0 { + monitorPort = &m.MonitorPort + } + opts = append(opts, pools.BatchUpdateMemberOpts{ + PoolID: p.ID, + Address: m.Address, + ProtocolPort: m.ProtocolPort, + Name: name, + SubnetID: subnetID, + MonitorPort: monitorPort, + }) + } + } + + err := pools.BatchUpdatePoolsMembers(ctx, client, opts).ExtractErr() + if mc.ObserveRequest(err) != nil { + return err + } + + if _, err := WaitActiveAndGetLoadBalancer(ctx, client, lbID); err != nil { + return fmt.Errorf("failed to wait for load balancer %s ACTIVE after updating pools members: %v", lbID, err) + } + + return nil +} + // GetL7policies retrieves all l7 policies for the given listener. func GetL7policies(ctx context.Context, client *gophercloud.ServiceClient, listenerID string) ([]l7policies.L7Policy, error) { var policies []l7policies.L7Policy