From a94c31fb8c40ffbf93d8b86f0417365cd85e89ed Mon Sep 17 00:00:00 2001 From: Joao Morais Date: Fri, 10 Apr 2026 13:26:02 -0300 Subject: [PATCH 1/2] use add/del haproxy api calls DCM currently works by creating blueprint backends and empty backend server slots upfront. These empty slots are updated later with new endpoints, created on scale out operations, without the need to fork-and-reload haproxy. Scale in operations work in a similar way: backend servers are disabled when endpoints are not available anymore. HAProxy has add/del apis since 2.5, which adds the benefit of not having to create empty slots. This means that we don't need to predict the ideal number of empty servers per backend, so backends can be configured with the currently active endpoints only, and it can scale to any number of new backend servers without the need to reload. This update is being made in at least two phases: Phase 1: update config manager (manager.go) and client's backend (backend.go) the minimum to support the new api calls. The priorities are: new approach fully functional and an easier code update to revise. Phase 2: cleanup dead code, eventually combining types into a single, coherent and simplified struct. New phases should happen as a revisit on other parts of the code that can be optimized with the new approach, or the code cleanup, like changing route resources and the handling of the haproxy lookup maps. --- go.mod | 2 +- .../haproxy/conf/haproxy-config.template | 59 +-- pkg/cmd/infra/router/template.go | 8 +- pkg/router/router_test.go | 110 +----- .../template/configmanager/haproxy/backend.go | 199 +++++++++- .../configmanager/haproxy/backend_test.go | 356 ++++++++++++++++++ .../haproxy/blueprint_plugin_test.go | 16 +- .../template/configmanager/haproxy/client.go | 5 + .../template/configmanager/haproxy/manager.go | 311 +++++---------- pkg/router/template/router.go | 54 ++- pkg/router/template/template_helper.go | 4 + pkg/router/template/types.go | 30 +- 12 files changed, 708 insertions(+), 446 deletions(-) create mode 100644 pkg/router/template/configmanager/haproxy/backend_test.go diff --git a/go.mod b/go.mod index a0cb99be1..ae5d064c3 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( k8s.io/apiserver v0.35.0 k8s.io/client-go v0.35.0 k8s.io/klog/v2 v2.130.1 + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 ) require ( @@ -110,7 +111,6 @@ require ( k8s.io/component-base v0.35.0 // indirect k8s.io/kms v0.35.0 // indirect k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect - k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/randfill v1.0.0 // indirect diff --git a/images/router/haproxy/conf/haproxy-config.template b/images/router/haproxy/conf/haproxy-config.template index 4d418fc37..9388d435c 100644 --- a/images/router/haproxy/conf/haproxy-config.template +++ b/images/router/haproxy/conf/haproxy-config.template @@ -744,11 +744,16 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} {{- end }} {{- end }} + {{- if $dynamicConfigManager }} + dynamic-cookie-key {{ $cfg.RoutingKeyName }} + {{- end }} {{- range $serviceUnitName, $weight := $cfg.ServiceUnitNames }} {{- if ge $weight 0 }}{{/* weight=0 is reasonable to keep existing connections to backends with cookies as we can see the HTTP headers */}} {{- with $serviceUnit := index $.ServiceUnits $serviceUnitName }} {{- range $idx, $endpoint := processEndpointsForAlias $cfg $serviceUnit (env "ROUTER_BACKEND_PROCESS_ENDPOINTS" "") }} - server {{ $endpoint.ID }} {{ $endpoint.IP }}:{{ $endpoint.Port }} cookie {{ $endpoint.IdHash }} weight {{ $weight }} + {{- /* This should always follow backend.go/innerAddServer() method, changes here should be reflected there. */}} + {{- /* TODO: either move this configuration to the Go counterpart, or read it from here instead */}} + server {{ $endpoint.ID }} {{ $endpoint.IP }}:{{ $endpoint.Port }}{{ if not $dynamicConfigManager }} cookie {{ $endpoint.IdHash }}{{ end }} weight {{ $weight }} {{- if (eq $cfg.TLSTermination "reencrypt") }} ssl {{- if not (isTrue $router_disable_http2) }} alpn h2,http/1.1 {{- end }} @@ -776,43 +781,6 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} {{- end }}{{/* end get serviceUnit from its name */}} {{- end }}{{/* end range over serviceUnitNames */}} - {{- with $dynamicConfigManager }} - {{- if (eq $cfg.TLSTermination "reencrypt") }} - dynamic-cookie-key {{ $cfg.RoutingKeyName }} - {{- range $idx, $serverName := $dynamicConfigManager.GenerateDynamicServerNames $cfgIdx }} - server {{ $serverName }} 172.4.0.4:8765 weight 0 ssl disabled check inter {{ $health_check_interval }} - {{- if gt (len (index $cfg.Certificates (printf "%s_pod" $cfg.Host)).Contents) 0 }} verify required ca-file {{ $workingDir }}/router/cacerts/{{$cfgIdx }}.pem - {{- else }} - {{- if not (isTrue $router_disable_http2) }} alpn h2,http/1.1 - {{- end }} - {{- /* - Add "verifyhost" for the primary service only. This server is not intended - to be used for alternate backend endpoints. A reload will be triggered if - an alternate backend endpoint is added dynamically. - */ -}} - {{- with $serviceUnit := index $.ServiceUnits $cfg.PrimaryServiceUnitKey -}} - {{- if $cfg.VerifyServiceHostname }} verifyhost {{ $serviceUnit.Hostname }} - {{- end }} - {{- end }} - {{- if gt (len $defaultDestinationCA) 0 }} verify required ca-file {{ $defaultDestinationCA }} - {{- else }} verify none - {{- end }} - {{- end }} - {{- with $podMaxConn := index $cfg.Annotations "haproxy.router.openshift.io/pod-concurrent-connections" }} - {{- if (isInteger (index $cfg.Annotations "haproxy.router.openshift.io/pod-concurrent-connections")) }} maxconn {{$podMaxConn }} {{- end }} - {{- end }}{{/* end pod-concurrent-connections annotation */}} - {{- end }}{{/* end range over dynamic server names */}} - - {{- else }} - {{- with $name := $dynamicConfigManager.ServerTemplateName $cfgIdx }} - {{- with $size := $dynamicConfigManager.ServerTemplateSize $cfgIdx }} - dynamic-cookie-key {{ $cfg.RoutingKeyName }} - server-template {{ $name }}- 1-{{ $size }} 172.4.0.4:8765 check inter {{ $health_check_interval }} disabled - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }}{{/* end if tls==edge/none/reencrypt */}} {{- if eq $cfg.TLSTermination "passthrough" }} @@ -856,10 +824,16 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} hash-type consistent timeout check 5000ms + + {{- if $dynamicConfigManager }} + dynamic-cookie-key {{ $cfg.RoutingKeyName }} + {{- end }} {{- range $serviceUnitName, $weight := $cfg.ServiceUnitNames }} {{- if ne $weight 0 }}{{/* drop connections where weight=0 as we can't use cookies, leaving only r-r and src-ip as dispatch methods and weight make no sense there */}} {{- with $serviceUnit := index $.ServiceUnits $serviceUnitName }} {{- range $idx, $endpoint := processEndpointsForAlias $cfg $serviceUnit (env "ROUTER_BACKEND_PROCESS_ENDPOINTS" "") }} + {{- /* This should always follow backend.go/innerAddServer() method, changes here should be reflected there. */}} + {{- /* TODO: either move this configuration to the Go counterpart, or read it from here instead */}} server {{ $endpoint.ID }} {{ $endpoint.IP }}:{{ $endpoint.Port }} weight {{ $weight }} {{- if and (not $endpoint.NoHealthCheck) (gt $cfg.ActiveEndpoints 1) }} check inter {{ $health_check_interval }} {{- end }}{{/* end else no health check */}} @@ -872,15 +846,6 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} {{- end }}{{/* end if weight != 0 */}} {{- end }}{{/* end iterate over services*/}} - {{- with $dynamicConfigManager }} - {{- with $name := $dynamicConfigManager.ServerTemplateName $cfgIdx }} - {{- with $size := $dynamicConfigManager.ServerTemplateSize $cfgIdx }} - dynamic-cookie-key {{ $cfg.RoutingKeyName }} - server-template {{ $name }}- 1-{{ $size }} 172.4.0.4:8765 check inter {{ $health_check_interval }} disabled - {{- end }} - {{- end }} - {{- end }} - {{- end }}{{/*end tls==passthrough*/}} {{- end }}{{/* end loop over routes */}} diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index 2505e43ca..25d7aa9ca 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -141,7 +141,6 @@ type TemplateRouterConfigManager struct { BlueprintRouteNamespace string BlueprintRouteLabelSelector string BlueprintRoutePoolSize int - MaxDynamicServers int } // isTrue here has the same logic as the function within package pkg/router/template @@ -182,13 +181,15 @@ func (o *TemplateRouter) Bind(flag *pflag.FlagSet) { flag.StringVar(&o.BlueprintRouteNamespace, "blueprint-route-namespace", env("ROUTER_BLUEPRINT_ROUTE_NAMESPACE", ""), "Specifies the namespace which contains the routes that serve as blueprints for the dynamic configuration manager.") flag.StringVar(&o.BlueprintRouteLabelSelector, "blueprint-route-labels", env("ROUTER_BLUEPRINT_ROUTE_LABELS", ""), "A label selector to apply to the routes in the blueprint route namespace. These selected routes will serve as blueprints for the dynamic dynamic configuration manager.") flag.IntVar(&o.BlueprintRoutePoolSize, "blueprint-route-pool-size", int(envInt("ROUTER_BLUEPRINT_ROUTE_POOL_SIZE", 10, 0)), "Specifies the size of the pre-allocated pool for each route blueprint managed by the router specific dynamic configuration manager. This can be overriden by an annotation router.openshift.io/pool-size on an individual route.") - flag.IntVar(&o.MaxDynamicServers, "max-dynamic-servers", int(envInt("ROUTER_MAX_DYNAMIC_SERVERS", 5, 1)), "Specifies the maximum number of dynamic servers added to a route for use by the router specific dynamic configuration manager.") flag.StringVar(&o.CaptureHTTPRequestHeadersString, "capture-http-request-headers", env("ROUTER_CAPTURE_HTTP_REQUEST_HEADERS", ""), "A comma-delimited list of HTTP request header names and maximum header value lengths that should be captured for logging. Each item must have the following form: name:maxLength") flag.StringVar(&o.CaptureHTTPResponseHeadersString, "capture-http-response-headers", env("ROUTER_CAPTURE_HTTP_RESPONSE_HEADERS", ""), "A comma-delimited list of HTTP response header names and maximum header value lengths that should be captured for logging. Each item must have the following form: name:maxLength") flag.StringVar(&o.CaptureHTTPCookieString, "capture-http-cookie", env("ROUTER_CAPTURE_HTTP_COOKIE", ""), "Name and maximum length of HTTP cookie that should be captured for logging. The argument must have the following form: name:maxLength. Append '=' to the name to indicate that an exact match should be performed; otherwise a prefix match will be performed. The value of first cookie that matches the name is captured.") flag.StringVar(&o.HTTPHeaderNameCaseAdjustmentsString, "http-header-name-case-adjustments", env("ROUTER_H1_CASE_ADJUST", ""), "A comma-delimited list of HTTP header names that should have their case adjusted. Each item must be a valid HTTP header name and should have the desired capitalization.") flag.StringVar(&o.HTTPResponseHeadersString, "set-delete-http-response-header", env("ROUTER_HTTP_RESPONSE_HEADERS", ""), "A comma-delimited list of HTTP response header names and values that should be set/deleted.") flag.StringVar(&o.HTTPRequestHeadersString, "set-delete-http-request-header", env("ROUTER_HTTP_REQUEST_HEADERS", ""), "A comma-delimited list of HTTP request header names and values that should be set/deleted.") + + // deprecated flags + _ = flag.Int("max-dynamic-servers", int(envInt("ROUTER_MAX_DYNAMIC_SERVERS", 5, 1)), "Specifies the maximum number of dynamic servers added to a route for use by the router specific dynamic configuration manager. DEPRECATED: router now created backend servers dynamically.") } type RouterStats struct { @@ -740,9 +741,10 @@ func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error { CommitInterval: o.CommitInterval, BlueprintRoutes: blueprintRoutes, BlueprintRoutePoolSize: o.BlueprintRoutePoolSize, - MaxDynamicServers: o.MaxDynamicServers, WildcardRoutesAllowed: o.AllowWildcardRoutes, ExtendedValidation: o.ExtendedValidation, + WorkingDir: o.WorkingDir, + DefaultDestinationCA: o.DefaultDestinationCAPath, } cfgManager = haproxyconfigmanager.NewHAProxyConfigManager(cmopts) if len(o.BlueprintRouteNamespace) > 0 { diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index d24727e6a..23f20cb6a 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -175,8 +175,7 @@ u3YLAbyW/lHhOCiZu2iAI8AbmXem9lW6Tr7p/97s0w== Action: "Set", }}, DynamicConfigManager: haproxyconfigmanager.NewHAProxyConfigManager(templateplugin.ConfigManagerOptions{ - ConnectionInfo: "unix:///var/lib/dymmy", - MaxDynamicServers: 1, + ConnectionInfo: "unix:///var/lib/dummy", }), } plugin, err = templateplugin.NewTemplatePlugin(pluginCfg, svcFetcher) @@ -749,62 +748,6 @@ func TestConfigTemplate(t *testing.T) { }, }, }, - "Verifyhost for dynamic slot": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "serviceq", - serviceName: "serviceq", - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "q", - host: "qexample.com", - targetServiceName: "serviceq", - weight: int32(100), - time: start, - tlsTermination: routev1.TLSTerminationReencrypt, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: reencryptBackendName(h.namespace, "q"), - attribute: "server", - value: "_dynamic-pod-1 172.4.0.4:8765 weight 0 ssl disabled check inter 5000ms alpn h2,http/1.1 verifyhost serviceq.default.svc verify required ca-file dummy", - fullMatch: true, - }, - }, - }, - "Verifyhost for dynamic slot with alternate backend": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "serviceq1", - serviceName: "serviceq1", - }, - { - name: "serviceq2", - serviceName: "serviceq2", - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "q1", - host: "q1example.com", - targetServiceName: "serviceq1", - weight: int32(50), - alternateBackend: "serviceq2", - alternateBackendWeight: int32(50), - time: start, - tlsTermination: routev1.TLSTerminationReencrypt, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: reencryptBackendName(h.namespace, "q1"), - attribute: "server", - value: "_dynamic-pod-1 172.4.0.4:8765 weight 0 ssl disabled check inter 5000ms alpn h2,http/1.1 verifyhost serviceq1.default.svc verify required ca-file dummy", - fullMatch: true, - }, - }, - }, "valid route health check interval annotation": { mustCreateWithConfig{ mustCreateEndpointSlices: []mustCreateEndpointSlice{ @@ -931,57 +874,6 @@ func TestConfigTemplate(t *testing.T) { }, }, }, - "server-template with default health check interval": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "servicest2", - serviceName: "servicest2", - addresses: []string{"1.1.1.1"}, - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "st2", - host: "st2example.com", - targetServiceName: "servicest2", - weight: 1, - time: start, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: insecureBackendName(h.namespace, "st2"), - attribute: "server-template", - value: "inter 5000ms", // Default value - }, - }, - }, - "server-template with custom health check interval": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "servicest1", - serviceName: "servicest1", - addresses: []string{"1.1.1.1"}, // Single endpoint initially - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "st1", - host: "st1example.com", - targetServiceName: "servicest1", - weight: 1, - time: start, - annotations: map[string]string{ - "router.openshift.io/haproxy.health.check.interval": "15s", - }, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: insecureBackendName(h.namespace, "st1"), - attribute: "server-template", - value: "inter 15s", - }, - }, - }, } defer cleanUpRoutes(t) diff --git a/pkg/router/template/configmanager/haproxy/backend.go b/pkg/router/template/configmanager/haproxy/backend.go index e23857afc..50773ce9c 100644 --- a/pkg/router/template/configmanager/haproxy/backend.go +++ b/pkg/router/template/configmanager/haproxy/backend.go @@ -4,9 +4,12 @@ import ( "bytes" "errors" "fmt" + "os" + "path" "strconv" "strings" + v1 "github.com/openshift/api/route/v1" templaterouter "github.com/openshift/router/pkg/router/template" ) @@ -75,11 +78,9 @@ type serverStateInfo struct { // BackendServerInfo represents a server [endpoint] for a haproxy backend. type BackendServerInfo struct { Name string - FQDN string IPAddress string Port int CurrentWeight int32 - InitialWeight int32 } // Backend represents a specific haproxy backend. @@ -87,7 +88,7 @@ type Backend struct { name templaterouter.ServiceAliasConfigKey servers map[string]*backendServer - client *Client + client HAProxyClient } // backendServer is internally used for managing a haproxy backend server. @@ -118,7 +119,7 @@ func buildHAProxyBackends(c *Client) ([]*Backend, error) { } // newBackend returns a new Backend representing a haproxy backend. -func newBackend(name templaterouter.ServiceAliasConfigKey, c *Client) *Backend { +func newBackend(name templaterouter.ServiceAliasConfigKey, c HAProxyClient) *Backend { return &Backend{ name: name, servers: make(map[string]*backendServer), @@ -152,9 +153,7 @@ func (b *Backend) Refresh() error { Name: v.Name, IPAddress: v.IPAddress, Port: v.Port, - FQDN: v.FQDN, CurrentWeight: v.UserVisibleWeight, - InitialWeight: v.InitialWeight, } b.servers[v.Name] = newBackendServer(info) @@ -304,6 +303,148 @@ func (b *Backend) FindServer(id string) (*backendServer, error) { return nil, fmt.Errorf("no server found for id: %s", id) } +func (b *Backend) AddServer(cfg *templaterouter.ServiceAliasConfig, svc *templaterouter.ServiceUnit, ep templaterouter.Endpoint, weight int32, workingDir, defaultDestinationCA string) error { + if err := b.innerAddServer(cfg, svc, ep, weight, workingDir, defaultDestinationCA); err != nil { + if !strings.Contains(err.Error(), "Already exists a server ") { + return err + } + // Failed due to already existing server left behind, in maintenance mode, due to in-flight connections. + // Let's just give it another chance to be deleted. + if err := b.innerDeleteServer(ep); err != nil { + // No way, need to fail which will ask for a fork-and-reload. This will leave the existing connections in the old process. + return err + } + if err := b.innerAddServer(cfg, svc, ep, weight, workingDir, defaultDestinationCA); err != nil { + return err + } + } + if err := b.innerSetServerState(ep, true, weight); err != nil { + return err + } + + // health check is disabled by default on new backend servers, its enablement is handled via cm.ReplaceRouteEndpoints(), + // since that method has a better view of former and current active backend servers. + return nil +} + +func (b *Backend) UpdateServer(ep templaterouter.Endpoint, weight int32, isPassthrough bool) error { + // missing to properly populate the current servers when created, should be done in the next phase. + // After that we can update only changed attributes. + if err := b.innerUpdateServerAddr(ep); err != nil { + return err + } + return b.innerUpdateServerWeight(ep, weight, isPassthrough) +} + +func (b *Backend) EnableHealthCheck(ep templaterouter.Endpoint) error { + return b.innerSetHealthCheck(ep, true) +} + +func (b *Backend) DisableHealthCheck(ep templaterouter.Endpoint) error { + return b.innerSetHealthCheck(ep, false) +} + +func (b *Backend) DeleteServer(ep templaterouter.Endpoint) error { + if err := b.innerSetServerState(ep, false, 0); err != nil { + return err + } + if err := b.innerDeleteServer(ep); err != nil { + log.Info("disabling backend server instead of deleting due to a delete failure", "server", ep.ID, "error", err.Error()) + } + return nil +} + +func (b *Backend) innerAddServer(cfg *templaterouter.ServiceAliasConfig, svc *templaterouter.ServiceUnit, ep templaterouter.Endpoint, weight int32, workingDir, defaultDestinationCA string) error { + // This should always follow the template, changes here should be reflected there, both regular and passthrough backends + // + // TODO: either read this configuration from the template, or instead make the template read from here. + // For the former, note that creating a new template definition should conflict with the for-loop in templateRouter.writeConfig() + // that assumes that all the definitions should be written to disk. + + cmd := fmt.Sprintf("add server %s/%s %s:%s weight %d", b.name, ep.ID, ep.IP, ep.Port, weight) + + switch cfg.TLSTermination { + case v1.TLSTerminationReencrypt: + cmd += " ssl" + if disableHTTP2, _ := strconv.ParseBool(os.Getenv("ROUTER_DISABLE_HTTP2")); !disableHTTP2 { + cmd += " alpn h2,http/1.1" + } + if cfg.VerifyServiceHostname { + cmd += " verifyhost " + svc.Hostname + } + if cert := cfg.Certificates[cfg.Host+"_pod"]; len(cert.Contents) > 0 { + cmd += " verify required ca-file " + path.Join(workingDir, "router/cacerts", cert.ID+".pem") + } else if len(defaultDestinationCA) > 0 { + cmd += " verify required ca-file " + defaultDestinationCA + } else { + cmd += " verify none" + } + case "", v1.TLSTerminationEdge: + if ep.AppProtocol == "h2c" || ep.AppProtocol == "kubernetes.io/h2c" { + cmd += " proto h2" + } + case v1.TLSTerminationPassthrough: + // passthrough is a TCP listener and does not use ssl or proto related config + } + + if !ep.NoHealthCheck { + // health check is always configured, being enabled depending on `cfg.ActiveEndpoints > 1 ` + inter := templaterouter.FirstMatch(`[1-9][0-9]*(us|ms|s|m|h|d)?`, + cfg.Annotations["router.openshift.io/haproxy.health.check.interval"], + os.Getenv("ROUTER_BACKEND_CHECK_INTERVAL"), + "5000ms") + cmd += " check inter " + inter + } + + podMaxConn := cfg.Annotations["haproxy.router.openshift.io/pod-concurrent-connections"] + if _, err := strconv.Atoi(podMaxConn); err == nil { + cmd += " maxconn " + podMaxConn + } + + return execCommand(b.client, apiAddServer, cmd) +} + +func (b *Backend) innerUpdateServerAddr(ep templaterouter.Endpoint) error { + cmd := fmt.Sprintf("set server %s/%s addr %s port %s", b.name, ep.ID, ep.IP, ep.Port) + return execCommand(b.client, apiSetServerAddr, cmd) +} + +func (b *Backend) innerUpdateServerWeight(ep templaterouter.Endpoint, weight int32, isPassthrough bool) error { + cmd := fmt.Sprintf("set server %s/%s", b.name, ep.ID) + if isPassthrough { + // https://github.com/openshift/router/blob/896390778ebe15f57f87e6ca78f11c96e64c2652/pkg/router/template/configmanager/haproxy/manager.go#L446-L454 + cmd += " weight 100%" + } else { + cmd = fmt.Sprintf("%s weight %d", cmd, weight) + } + return execCommand(b.client, apiSetServerWeight, cmd) +} + +func (b *Backend) innerSetHealthCheck(ep templaterouter.Endpoint, enable bool) error { + enableStr := "enable" + if !enable { + enableStr = "disable" + } + cmd := fmt.Sprintf("%s health %s/%s", enableStr, b.name, ep.ID) + return execCommand(b.client, apiSetHealth, cmd) +} + +func (b *Backend) innerSetServerState(ep templaterouter.Endpoint, ready bool, weight int32) error { + state := "ready" + if !ready { + state = "maint" + } else if weight <= 0 { + state = "drain" + } + cmd := fmt.Sprintf("set server %s/%s state %s", b.name, ep.ID, state) + return execCommand(b.client, apiSetServerState, cmd) +} + +func (b *Backend) innerDeleteServer(ep templaterouter.Endpoint) error { + cmd := fmt.Sprintf("del server %s/%s", b.name, ep.ID) + return execCommand(b.client, apiDelServer, cmd) +} + // newBackendServer returns a BackendServer representing a haproxy backend server. func newBackendServer(info BackendServerInfo) *backendServer { return &backendServer{ @@ -317,7 +458,7 @@ func newBackendServer(info BackendServerInfo) *backendServer { } // ApplyChanges applies all the local backend server changes. -func (s *backendServer) ApplyChanges(backendName templaterouter.ServiceAliasConfigKey, client *Client) error { +func (s *backendServer) ApplyChanges(backendName templaterouter.ServiceAliasConfigKey, client HAProxyClient) error { // Build the haproxy dynamic config API commands. commands := []string{} @@ -353,7 +494,7 @@ func (s *backendServer) ApplyChanges(backendName templaterouter.ServiceAliasConf } // executeCommand runs a server change command and handles the response. -func (s *backendServer) executeCommand(cmd string, client *Client) error { +func (s *backendServer) executeCommand(cmd string, client HAProxyClient) error { responseBytes, err := client.Execute(cmd) if err != nil { return err @@ -396,3 +537,45 @@ func stripVersionNumber(data []byte) ([]byte, error) { return data, nil } + +type apiType int + +const ( + apiAddServer apiType = iota + apiDelServer + apiSetHealth + apiSetServerAddr + apiSetServerWeight + apiSetServerState +) + +func execCommand(client HAProxyClient, api apiType, cmd string) error { + responseRaw, err := client.Execute(cmd) + if err != nil { + return err + } + response := strings.TrimSpace(string(responseRaw)) + if len(response) == 0 { + return nil + } + + var valid bool + switch api { + case apiAddServer: + valid = response == "New server registered." + case apiDelServer: + valid = response == "Server deleted." + case apiSetServerAddr: + valid = response == "nothing changed" || strings.HasPrefix(response, "IP changed from ") || strings.HasPrefix(response, "port changed from ") || strings.HasPrefix(response, "no need to change ") + case apiSetHealth, apiSetServerWeight, apiSetServerState: + valid = response == "" + default: + // fail fast in case of a dev error + panic(fmt.Errorf("invalid cmd ID: %d", api)) + } + + if !valid { + return fmt.Errorf("unexpected response from haproxy: %s", response) + } + return nil +} diff --git a/pkg/router/template/configmanager/haproxy/backend_test.go b/pkg/router/template/configmanager/haproxy/backend_test.go new file mode 100644 index 000000000..606f6b3fa --- /dev/null +++ b/pkg/router/template/configmanager/haproxy/backend_test.go @@ -0,0 +1,356 @@ +package haproxy + +import ( + "testing" + + routev1 "github.com/openshift/api/route/v1" + templaterouter "github.com/openshift/router/pkg/router/template" + "github.com/stretchr/testify/require" + "k8s.io/utils/ptr" +) + +func TestBackendDynamicUpdate(t *testing.T) { + type cmd string + const ( + cmdAdd cmd = "add" + cmdDel cmd = "del" + cmdUpdate cmd = "update" + cmdEnableHealth cmd = "enable-health" + cmdDisableHealth cmd = "disable-health" + ) + + testCases := map[string]struct { + cmd cmd + backendName *templaterouter.ServiceAliasConfigKey // default: "route1" + endpointID *string // default: "server1" + ip *string // default: "10.0.1.11" + port *string // default: "9000" + weight *int32 // default: 1 + workingDir *string // default: "tmp" + publicHostname string + serviceHostname string + tlsTermination routev1.TLSTerminationType + verifyHostname bool + appProtocol string + certificates map[string]templaterouter.Certificate + annotations map[string]string + envvars []string + noHealthCheck bool + defaultCA string + cmdCustomResp []string // 1:1 to `cmdExpected`, trailing empty items can be omited. + errExpected string + cmdExpected []string + }{ + // + // adding + "should add insecure server": { + cmd: cmdAdd, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add insecure server without health check": { + cmd: cmdAdd, + noHealthCheck: true, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1", + "set server route1/server1 state ready", + }, + }, + "should add insecure server with zero weight": { + cmd: cmdAdd, + weight: ptr.To[int32](0), + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 0 check inter 5000ms", + "set server route1/server1 state drain", + }, + }, + "should add insecure server with custom health check": { + cmd: cmdAdd, + annotations: map[string]string{ + "router.openshift.io/haproxy.health.check.interval": "10s", + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 10s", + "set server route1/server1 state ready", + }, + }, + "should add insecure server with custom invalid health check": { + cmd: cmdAdd, + annotations: map[string]string{ + "router.openshift.io/haproxy.health.check.interval": "1z", + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add insecure server with custom maxconn": { + cmd: cmdAdd, + annotations: map[string]string{ + "haproxy.router.openshift.io/pod-concurrent-connections": "100", + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms maxconn 100", + "set server route1/server1 state ready", + }, + }, + "should add passthrough server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationPassthrough, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add edge termination server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationEdge, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add edge termination h2 server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationEdge, + appProtocol: "h2c", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 proto h2 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verify none check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server with verify host": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + verifyHostname: true, + serviceHostname: "route1.default.svc", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verifyhost route1.default.svc verify none check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server with default ca": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + defaultCA: "/tmp/default-ca.pem", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verify required ca-file /tmp/default-ca.pem check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server with custom certificate": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + publicHostname: "route1.local", + certificates: map[string]templaterouter.Certificate{ + "route1.local_pod": { + ID: "default:route1", + Contents: "-----BEGIN CERTIFICATE-----\nzzz\n-----END CERTIFICATE-----", + PrivateKey: "-----BEGIN PRIVATE KEY-----\nzzz\n-----END PRIVATE KEY-----", + }, + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verify required ca-file /tmp/router/cacerts/default:route1.pem check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should retry adding insecure server": { + cmd: cmdAdd, + noHealthCheck: true, + cmdCustomResp: []string{"Already exists a server ..."}, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1", + "del server route1/server1", + "add server route1/server1 10.0.1.11:9000 weight 1", + "set server route1/server1 state ready", + }, + }, + "should fail if failing to add insecure server": { + cmd: cmdAdd, + noHealthCheck: true, + cmdCustomResp: []string{"Some unknown adding error."}, + errExpected: "unexpected response from haproxy: Some unknown adding error.", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1", + }, + }, + + // + // updating + "should update server": { + cmd: cmdUpdate, + weight: ptr.To[int32](10), + cmdExpected: []string{ + "set server route1/server1 addr 10.0.1.11 port 9000", + "set server route1/server1 weight 10", + }, + }, + "should update passthrough server with weight 100%": { + cmd: cmdUpdate, + tlsTermination: routev1.TLSTerminationPassthrough, + weight: ptr.To[int32](10), + cmdExpected: []string{ + "set server route1/server1 addr 10.0.1.11 port 9000", + "set server route1/server1 weight 100%", + }, + }, + "should fail if failing to update server": { + cmd: cmdUpdate, + cmdCustomResp: []string{"Some unknown updating error."}, + errExpected: "unexpected response from haproxy: Some unknown updating error.", + cmdExpected: []string{ + "set server route1/server1 addr 10.0.1.11 port 9000", + }, + }, + + // + // health check + "should enable health check": { + cmd: cmdEnableHealth, + cmdExpected: []string{ + "enable health route1/server1", + }, + }, + "should fail if failing to enable health check": { + cmd: cmdEnableHealth, + cmdCustomResp: []string{"Some unknown set health error."}, + errExpected: "unexpected response from haproxy: Some unknown set health error.", + cmdExpected: []string{ + "enable health route1/server1", + }, + }, + "should disable health check": { + cmd: cmdDisableHealth, + cmdExpected: []string{ + "disable health route1/server1", + }, + }, + "should fail if failing to disable health check": { + cmd: cmdDisableHealth, + cmdCustomResp: []string{"Some unknown set health error."}, + errExpected: "unexpected response from haproxy: Some unknown set health error.", + cmdExpected: []string{ + "disable health route1/server1", + }, + }, + + // + // Deleting + "should delete server": { + cmd: cmdDel, + cmdExpected: []string{ + "set server route1/server1 state maint", + "del server route1/server1", + }, + }, + "should fail to delete if failing to disable server": { + cmd: cmdDel, + cmdCustomResp: []string{"Some unknown set server error."}, + errExpected: "unexpected response from haproxy: Some unknown set server error.", + cmdExpected: []string{ + "set server route1/server1 state maint", + }, + }, + "should succeed delete if failing to remove and succeeding to disable": { + cmd: cmdDel, + cmdCustomResp: []string{ + "", // first cmd + "Some unknown del server error.", // second cmd + }, + cmdExpected: []string{ + "set server route1/server1 state maint", + "del server route1/server1", + }, + }, + } + + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + backendName := ptr.Deref(test.backendName, "route1") + endpointID := ptr.Deref(test.endpointID, "server1") + ip := ptr.Deref(test.ip, "10.0.1.11") + port := ptr.Deref(test.port, "9000") + weight := ptr.Deref(test.weight, 1) + workingDir := ptr.Deref(test.workingDir, "/tmp") + + cfg := &templaterouter.ServiceAliasConfig{ + TLSTermination: test.tlsTermination, + VerifyServiceHostname: test.verifyHostname, + Host: test.publicHostname, + Certificates: test.certificates, + Annotations: test.annotations, + } + svc := &templaterouter.ServiceUnit{ + Hostname: test.serviceHostname, + } + ep := templaterouter.Endpoint{ + ID: endpointID, + IP: ip, + Port: port, + AppProtocol: test.appProtocol, + NoHealthCheck: test.noHealthCheck, + } + isPassthrough := test.tlsTermination == routev1.TLSTerminationPassthrough + client := &fakeClient{cmdCustomResp: test.cmdCustomResp} + + b := newBackend(backendName, client) + + var err error + switch test.cmd { + case cmdAdd: + err = b.AddServer(cfg, svc, ep, weight, workingDir, test.defaultCA) + case cmdDel: + err = b.DeleteServer(ep) + case cmdUpdate: + err = b.UpdateServer(ep, weight, isPassthrough) + case cmdEnableHealth: + err = b.EnableHealthCheck(ep) + case cmdDisableHealth: + err = b.DisableHealthCheck(ep) + default: + t.Errorf("invalid cmd: %s", test.cmd) + } + + if test.errExpected != "" { + require.EqualError(t, err, test.errExpected) + } else { + require.NoError(t, err) + } + require.Equal(t, test.cmdExpected, client.executedCmds) + }) + } + +} + +type fakeClient struct { + cmdCustomResp []string + executedCmds []string + respCount int +} + +func (c *fakeClient) RunCommand(cmd string, _ Converter) ([]byte, error) { + return c.Execute(cmd) +} + +func (c *fakeClient) Execute(cmd string) ([]byte, error) { + c.executedCmds = append(c.executedCmds, cmd) + + var response string + if len(c.cmdCustomResp) > c.respCount { + response = c.cmdCustomResp[c.respCount] + "\n" + c.respCount++ + } + response += "\n" + + return []byte(response), nil +} diff --git a/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go b/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go index 4cd411a8b..537819012 100644 --- a/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go +++ b/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go @@ -41,7 +41,7 @@ func (cm *fakeConfigManager) FindBlueprint(id templaterouter.ServiceAliasConfigK return route, ok } -func (cm *fakeConfigManager) Register(id templaterouter.ServiceAliasConfigKey, route *routev1.Route) { +func (cm *fakeConfigManager) Register(id templaterouter.ServiceAliasConfigKey, backend *templaterouter.ServiceAliasConfig, route *routev1.Route) { } func (cm *fakeConfigManager) AddRoute(id templaterouter.ServiceAliasConfigKey, routingKey string, route *routev1.Route) error { @@ -52,7 +52,7 @@ func (cm *fakeConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfigKey return nil } -func (cm *fakeConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { +func (cm *fakeConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, svc *templaterouter.ServiceUnit, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { return nil } @@ -63,18 +63,6 @@ func (cm *fakeConfigManager) RemoveRouteEndpoints(id templaterouter.ServiceAlias func (cm *fakeConfigManager) Notify(event templaterouter.RouterEventType) { } -func (cm *fakeConfigManager) ServerTemplateName(id templaterouter.ServiceAliasConfigKey) string { - return "fakeConfigManager" -} - -func (cm *fakeConfigManager) ServerTemplateSize(id templaterouter.ServiceAliasConfigKey) string { - return "1" -} - -func (cm *fakeConfigManager) GenerateDynamicServerNames(id templaterouter.ServiceAliasConfigKey) []string { - return []string{} -} - func routeKey(route *routev1.Route) templaterouter.ServiceAliasConfigKey { return templaterouter.ServiceAliasConfigKey(fmt.Sprintf("%s:%s", route.Name, route.Namespace)) } diff --git a/pkg/router/template/configmanager/haproxy/client.go b/pkg/router/template/configmanager/haproxy/client.go index 472737639..7bd5f72a1 100644 --- a/pkg/router/template/configmanager/haproxy/client.go +++ b/pkg/router/template/configmanager/haproxy/client.go @@ -24,6 +24,11 @@ const ( maxRetries = 3 ) +type HAProxyClient interface { + RunCommand(cmd string, converter Converter) ([]byte, error) + Execute(cmd string) ([]byte, error) +} + // Client is a client used to dynamically configure haproxy. type Client struct { socketAddress string diff --git a/pkg/router/template/configmanager/haproxy/manager.go b/pkg/router/template/configmanager/haproxy/manager.go index 123e97233..4c0359c43 100644 --- a/pkg/router/template/configmanager/haproxy/manager.go +++ b/pkg/router/template/configmanager/haproxy/manager.go @@ -1,12 +1,13 @@ package haproxy import ( + "errors" "fmt" "os" "path" "reflect" + "slices" "strconv" - "strings" "sync" "time" @@ -27,13 +28,6 @@ const ( // haproxyManagerName is the name of this config manager. haproxyManagerName = "haproxy-manager" - // haproxyRunDir is the name of run directory within the haproxy - // router working directory heirarchy. - haproxyRunDir = "run" - - // haproxySocketFile is the name of haproxy socket file. - haproxySocketFile = "haproxy.sock" - // haproxyConnectionTimeout is the timeout (in seconds) used for // preventing slow connections to the haproxy socket from blocking // the config manager from doing any work. @@ -43,10 +37,6 @@ const ( // pool of blueprint routes. blueprintRoutePoolNamePrefix = "_hapcm_blueprint_pool" - // dynamicServerPrefix is the prefix used in the haproxy template - // for adding dynamic servers (pods) to a backend. - dynamicServerPrefix = "_dynamic" - // routePoolSizeAnnotation is the annotation on the blueprint route // overriding the default pool size. routePoolSizeAnnotation = "router.openshift.io/pool-size" @@ -61,9 +51,6 @@ const ( blueprintRoutePoolServiceName = blueprintRoutePoolNamePrefix + ".svc" ) -// endpointToDynamicServerMap is a map of endpoint to dynamic server names. -type endpointToDynamicServerMap map[string]string - // configEntryMap is a map containing name-value pairs representing the // config entries to add to an haproxy map. type configEntryMap map[string]templaterouter.ServiceAliasConfigKey @@ -76,6 +63,9 @@ type routeBackendEntry struct { // id is the route id. id string + // + backend *templaterouter.ServiceAliasConfig + // termination is the route termination. termination routev1.TLSTerminationType @@ -92,9 +82,6 @@ type routeBackendEntry struct { // poolRouteBackendName is backend name for any associated route // from the pre-configured blueprint route pool. poolRouteBackendName templaterouter.ServiceAliasConfigKey - - // DynamicServerMap is a map of all the allocated dynamic servers. - dynamicServerMap endpointToDynamicServerMap } // haproxyConfigManager is a template router config manager implementation @@ -116,10 +103,6 @@ type haproxyConfigManager struct { // backends for each route blueprint. blueprintRoutePoolSize int - // maxDynamicServers is the maximum number of dynamic servers - // allocated per backend in the haproxy template configuration. - maxDynamicServers int - // wildcardRoutesAllowed indicates if wildcard routes are allowed. wildcardRoutesAllowed bool @@ -129,9 +112,18 @@ type haproxyConfigManager struct { // router is the associated template router. router templaterouter.RouterInterface + // workingDir is the router's working directory containing configuration + // files, certificates, and other router-managed resources. + workingDir string + // defaultCertificate is the default certificate bytes. defaultCertificate string + // defaultDestinationCA is the path to the default CA certificate file used + // to verify backend server certificates for re-encrypt routes when no + // route-specific destination CA is configured. + defaultDestinationCA string + // client is the client used to dynamically manage haproxy. client *Client @@ -163,10 +155,11 @@ func NewHAProxyConfigManager(options templaterouter.ConfigManagerOptions) *hapro commitInterval: options.CommitInterval, blueprintRoutes: buildBlueprintRoutes(options.BlueprintRoutes, options.ExtendedValidation), blueprintRoutePoolSize: options.BlueprintRoutePoolSize, - maxDynamicServers: options.MaxDynamicServers, wildcardRoutesAllowed: options.WildcardRoutesAllowed, extendedValidation: options.ExtendedValidation, + workingDir: options.WorkingDir, defaultCertificate: "", + defaultDestinationCA: options.DefaultDestinationCA, client: client, reloadInProgress: false, @@ -283,14 +276,14 @@ func (cm *haproxyConfigManager) RemoveBlueprint(route *routev1.Route) { } // Register registers an id with an expected haproxy backend for a route. -func (cm *haproxyConfigManager) Register(id templaterouter.ServiceAliasConfigKey, route *routev1.Route) { +func (cm *haproxyConfigManager) Register(id templaterouter.ServiceAliasConfigKey, backend *templaterouter.ServiceAliasConfig, route *routev1.Route) { wildcard := cm.wildcardRoutesAllowed && (route.Spec.WildcardPolicy == routev1.WildcardPolicySubdomain) entry := &routeBackendEntry{ - id: string(id), - termination: routeTerminationType(route), - wildcard: wildcard, - backendName: routeBackendName(id, route), - dynamicServerMap: make(endpointToDynamicServerMap), + id: string(id), + backend: backend, + termination: routeTerminationType(route), + wildcard: wildcard, + backendName: routeBackendName(id, route), } cm.lock.Lock() @@ -317,8 +310,6 @@ func (cm *haproxyConfigManager) AddRoute(id templaterouter.ServiceAliasConfigKey return fmt.Errorf("no blueprint found that would match route %s/%s", route.Namespace, route.Name) } - cm.Register(id, route) - cm.lock.Lock() defer func() { cm.lock.Unlock() @@ -410,10 +401,17 @@ func (cm *haproxyConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfig if err != nil { return err } - log.V(4).Info("disabling all servers for backend", "backend", backendName) - if err := backend.Disable(); err != nil { + + log.V(4).Info("deleting all servers for backend", "backend", backendName) + servers, err := backend.Servers() + if err != nil { return err } + for _, server := range servers { + if err := backend.DeleteServer(templaterouter.Endpoint{ID: server.Name}); err != nil { + return err + } + } log.V(4).Info("committing changes made to backend", "backend", backendName) return backend.Commit() @@ -421,7 +419,7 @@ func (cm *haproxyConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfig // ReplaceRouteEndpoints dynamically replaces a subset of the endpoints for // a route - modifies a subset of the servers on an haproxy backend. -func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { +func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, svc *templaterouter.ServiceUnit, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { log.V(4).Info("replacing route endpoints", "id", id, "weight", weight) if cm.isReloading() { return fmt.Errorf("Router reload in progress, cannot dynamically add endpoints for %s", id) @@ -442,17 +440,6 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA return fmt.Errorf("route id %s was not registered", id) } - weightIsRelative := false - if entry.termination == routev1.TLSTerminationPassthrough { - // Passthrough is a wee bit odd and is like a boolean on/off - // switch. Setting actual weights, causing the haproxy - // dynamic API to either hang or then haproxy dying off. - // So 100% works for us today because we use a dynamic hash - // balancing algorithm. Needs a follow up on this issue. - weightIsRelative = true - weight = 100 - } - backendName := entry.BackendName() log.V(4).Info("finding backend", "backend", backendName) backend, err := cm.client.FindBackend(backendName) @@ -460,9 +447,17 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA return err } + addedEndpoints := make(map[string]templaterouter.Endpoint) modifiedEndpoints := make(map[string]templaterouter.Endpoint) for _, ep := range newEndpoints { - modifiedEndpoints[ep.ID] = ep + existing := slices.ContainsFunc(oldEndpoints, func(v2ep templaterouter.Endpoint) bool { + return v2ep.ID == ep.ID + }) + if existing { + modifiedEndpoints[ep.ID] = ep + } else { + addedEndpoints[ep.ID] = ep + } } deletedEndpoints := make(map[string]templaterouter.Endpoint) @@ -471,135 +466,74 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA if reflect.DeepEqual(ep, v2ep) { // endpoint was unchanged. delete(modifiedEndpoints, v2ep.ID) + continue } epUsesH2C := ep.AppProtocol == "h2c" || ep.AppProtocol == "kubernetes.io/h2c" v2epUsesH2c := v2ep.AppProtocol == "h2c" || v2ep.AppProtocol == "kubernetes.io/h2c" if (epUsesH2C || v2epUsesH2c) && epUsesH2C != v2epUsesH2c { - return fmt.Errorf("endpoint %s changed appProtocol from %q to %q, and dynamically updating proto is unsupported", ep.ID, ep.AppProtocol, v2ep.AppProtocol) + return fmt.Errorf("endpoint %s changed appProtocol from %q to %q, dynamically updating proto is unsupported - route will be updated on next reload", ep.ID, ep.AppProtocol, v2ep.AppProtocol) } } else { - configChanged = true deletedEndpoints[ep.ID] = ep } } - log.V(4).Info("getting servers for backend", "backend", backendName) - servers, err := backend.Servers() - if err != nil { - return err - } + // there is a configuration change if any of the tracking maps have endpoint(s) + configChanged = len(deletedEndpoints)+len(modifiedEndpoints)+len(addedEndpoints) > 0 - // Backends having only one server have their server's health check turned off; health check - // is only enabled when there are two or more endpoints in the backend. - // - // Currently we don't support enabling or disabling health checks, so we are: - // - // * Checking whether we are adding endpoints; and - // * Checking if the current state of the backend is just one static endpoint. - // - // If both of the above matches, we cannot dynamically update and return from here. - // This behavior should be improved via https://issues.redhat.com/browse/NE-2496 - // using `del server` and `add server` API calls. - // - // This is the expected lay out from the running HAProxy in case there is only one static - // backend server: - // - // # be_id be_name srv_id srv_name srv_addr ... srv_port ... - // 20 be_http:default:route 1 pod:app-848554c7d4-mbhsf:app::10.128.0.78:8000 10.128.0.78 ... 8000 ... - // 20 be_http:default:route 2 _dynamic-pod-1 172.4.0.4 ... 8765 ... - // 20 be_http:default:route 3 _dynamic-pod-2 172.4.0.4 ... 8765 ... - // ... other "_dynamic-pod-NN" - - // checking for more newEndpoints than oldEndpoints, if this happens we are adding new ones - if len(newEndpoints) > len(oldEndpoints) { - var staticCount int - // we cannot infer the first ones are the static, since this list is built from a hashmap, - // so the order is not deterministic. Need to iterate over all of them. - for _, s := range servers { - // if not a dynamic backend server, count as static - if !isDynamicBackendServer(s) { - staticCount++ - } + log.V(4).Info("processing endpoint changes", "added", addedEndpoints, "deleted", deletedEndpoints, "modified", modifiedEndpoints) + + // Aggregating errors instead of failing fast in the first API error. This ensures that the old + // process has a more accurate configuration in case it lives longer due to persistent connections. + var errs []error + + for name, ep := range deletedEndpoints { + if err := backend.DeleteServer(ep); err != nil { + errs = append(errs, fmt.Errorf("error deleting backend server %s: %w", name, err)) } - if staticCount == 1 { - return fmt.Errorf("single endpoint on backend %s, need to reload to enable health check", backendName) + } + for name, ep := range modifiedEndpoints { + if err := backend.UpdateServer(ep, weight, entry.termination == routev1.TLSTerminationPassthrough); err != nil { + errs = append(errs, fmt.Errorf("error updating backend server %s: %w", name, err)) } } - - log.V(4).Info("processing endpoint changes", "deleted", deletedEndpoints, "modified", modifiedEndpoints) - - // First process the deleted endpoints and update the servers we - // have already used - these would be the ones where the name - // matches the endpoint name or is a dynamic server already in use. - // Also keep track of the unused dynamic servers. - unusedServerNames := []string{} - for _, s := range servers { - relatedEndpointID := s.Name - if isDynamicBackendServer(s) { - if epid, ok := entry.dynamicServerMap[s.Name]; ok { - relatedEndpointID = epid - } else { - unusedServerNames = append(unusedServerNames, s.Name) - continue - } + for name, ep := range addedEndpoints { + if err := backend.AddServer(entry.backend, svc, ep, weight, cm.workingDir, cm.defaultDestinationCA); err != nil { + errs = append(errs, fmt.Errorf("error adding backend server %s: %w", name, err)) } + } - if _, ok := deletedEndpoints[relatedEndpointID]; ok { - configChanged = true - log.V(4).Info("disabling server for deleted endpoint", "endpoint", relatedEndpointID, "server", s.Name) - backend.DisableServer(s.Name) - if _, ok := entry.dynamicServerMap[s.Name]; ok { - log.V(4).Info("removing server from dynamic server map", "server", s.Name, "backend", backendName) - delete(entry.dynamicServerMap, s.Name) - } - continue + // Checking health check. We need to: + // * enable new endpoints if `cfg.ActiveEndpoints > 1` + // * enable also the only former endpoint if scaling from 1 to 2 or more + // * disable the only current endpoint if scaling to 1 + if len(newEndpoints) > 1 { + var newEPs []templaterouter.Endpoint + for _, ep := range addedEndpoints { + // enabling for all the new added endpoints + newEPs = append(newEPs, ep) } - - if ep, ok := modifiedEndpoints[relatedEndpointID]; ok { - configChanged = true - log.V(4).Info("enabling server for modified endpoint", "endpoint", relatedEndpointID, "server", s.Name, "ip", ep.IP, "port", ep.Port, "appProtocol", ep.AppProtocol, "weight", weight) - backend.UpdateServerInfo(s.Name, ep.IP, ep.Port, ep.AppProtocol, weight, weightIsRelative) - backend.EnableServer(s.Name) - - delete(modifiedEndpoints, relatedEndpointID) + if len(oldEndpoints) == 1 { + // enabling also for the former single endpoint as well + newEPs = append(newEPs, oldEndpoints[0]) } - } - - // Processed all existing endpoints, now check if there's any more - // more modified endpoints (aka newly added ones). For these, we can - // choose any of the unused dynamic servers. - for _, name := range unusedServerNames { - if len(modifiedEndpoints) == 0 { - break + for _, ep := range newEPs { + if !ep.NoHealthCheck { + if err := backend.EnableHealthCheck(ep); err != nil { + errs = append(errs, err) + } + } } - - var ep templaterouter.Endpoint - for _, v := range modifiedEndpoints { - // Just get first modified endpoint. - ep = v - break + } else if len(newEndpoints) == 1 && len(oldEndpoints) != 1 { + // the single backend server scenario, health check should be disabled + if ep := newEndpoints[0]; !ep.NoHealthCheck { + if err := backend.DisableHealthCheck(ep); err != nil { + errs = append(errs, err) + } } - - // Add entry for the dyamic server used. - configChanged = true - entry.dynamicServerMap[name] = ep.ID - - log.V(4).Info("enabling server for added endpoint", "endpoint", ep.ID, "server", name, "ip", ep.IP, "port", ep.Port, "appProtocol", ep.AppProtocol, "weight", weight) - backend.UpdateServerInfo(name, ep.IP, ep.Port, ep.AppProtocol, weight, weightIsRelative) - backend.EnableServer(name) - - delete(modifiedEndpoints, ep.ID) } - // If we got here, then either we are done with all the endpoints or - // there are no free dynamic server slots available that we can use. - if len(modifiedEndpoints) > 0 { - return fmt.Errorf("no free dynamic server slots for backend %s, %d endpoint(s) remaining", - id, len(modifiedEndpoints)) - } - - log.V(4).Info("committing backend", "backend", backendName) - return backend.Commit() + return errors.Join(errs...) } // RemoveRouteEndpoints removes servers matching the endpoints from a haproxy backend. @@ -628,26 +562,15 @@ func (cm *haproxyConfigManager) RemoveRouteEndpoints(id templaterouter.ServiceAl return err } - // Build a reversed map (endpoint id -> server name) to allow us to - // search by endpoint. - endpointToDynServerMap := make(map[string]string) - for serverName, endpointID := range entry.dynamicServerMap { - endpointToDynServerMap[endpointID] = serverName - } - + var errs []error for _, ep := range endpoints { - name := ep.ID - if serverName, ok := endpointToDynServerMap[ep.ID]; ok { - name = serverName - delete(entry.dynamicServerMap, name) + log.V(4).Info("deleting server for endpoint", "endpoint", ep.ID) + if err := backend.DeleteServer(ep); err != nil { + errs = append(errs, fmt.Errorf("error deleting server %s: %w", ep.ID, err)) } - - log.V(4).Info("disabling server for endpoint", "endpoint", ep.ID, "server", name) - backend.DisableServer(name) } - log.V(4).Info("committing backend", "backend", backendName) - return backend.Commit() + return errors.Join(errs...) } // Notify informs the config manager of any template router state changes. @@ -675,41 +598,6 @@ func (cm *haproxyConfigManager) Commit() { cm.commitRouterConfig() } -// ServerTemplateName returns the dynamic server template name. -func (cm *haproxyConfigManager) ServerTemplateName(id templaterouter.ServiceAliasConfigKey) string { - if cm.maxDynamicServers > 0 { - // Adding the id makes the name unwieldy - use pod. - return fmt.Sprintf("%s-pod", dynamicServerPrefix) - } - - return "" -} - -// ServerTemplateSize returns the dynamic server template size. -// Note this is returned as a string for easier use in the haproxy template. -func (cm *haproxyConfigManager) ServerTemplateSize(id templaterouter.ServiceAliasConfigKey) string { - if cm.maxDynamicServers < 1 { - return "" - } - - return fmt.Sprintf("%v", cm.maxDynamicServers) -} - -// GenerateDynamicServerNames generates the dynamic server names. -func (cm *haproxyConfigManager) GenerateDynamicServerNames(id templaterouter.ServiceAliasConfigKey) []string { - if cm.maxDynamicServers > 0 { - if prefix := cm.ServerTemplateName(id); len(prefix) > 0 { - names := make([]string, cm.maxDynamicServers) - for i := 0; i < cm.maxDynamicServers; i++ { - names[i] = fmt.Sprintf("%s-%v", prefix, i+1) - } - return names - } - } - - return []string{} -} - // scheduleRouterReload schedules a reload by deferring commit on the // associated template router using a internal flush timer. func (cm *haproxyConfigManager) scheduleRouterReload() { @@ -836,14 +724,10 @@ func (cm *haproxyConfigManager) reset() { cm.commitTimer = nil } - // Reset the blueprint route pool use and dynamic server maps as - // the router was reloaded. + // Reset the blueprint route pool use as the router was reloaded. cm.poolUsage = make(map[templaterouter.ServiceAliasConfigKey]templaterouter.ServiceAliasConfigKey) for _, entry := range cm.backendEntries { entry.poolRouteBackendName = "" - if len(entry.dynamicServerMap) > 0 { - entry.dynamicServerMap = make(endpointToDynamicServerMap) - } } // Reset the client - clear its caches. @@ -1076,15 +960,6 @@ func routeTerminationType(route *routev1.Route) routev1.TLSTerminationType { return termination } -// isDynamicBackendServer indicates if a backend server is a dynamic server. -func isDynamicBackendServer(server BackendServerInfo) bool { - if len(dynamicServerPrefix) == 0 { - return false - } - - return strings.HasPrefix(server.Name, dynamicServerPrefix) -} - // backendModAnnotations return the annotations in a route that will // require custom (or modified) backend configuration in haproxy. func backendModAnnotations(route *routev1.Route) map[string]string { diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index d45201c28..a77324ca5 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -785,7 +785,15 @@ func (r *templateRouter) dynamicallyAddRoute(backendKey ServiceAliasConfigKey, r } log.V(4).Info("dynamically adding route backend", "backendKey", backendKey) - r.dynamicConfigManager.Register(backendKey, route) + r.dynamicConfigManager.Register(backendKey, backend, route) + + // Saving the initial state of the endpoints from all the service units of the backend. + // This state is used later to calculate added and removed endpoints without the need to read proxy api. + for key := range backend.ServiceUnits { + if service, found := r.findMatchingServiceUnit(key); found { + backend.EndpointTable[key] = endpointsForAlias(*backend, service) + } + } // Fully skipping DCM for now when adding or changing routes, // should be reincluded along with the fix for https://issues.redhat.com/browse/OCPBUGS-77344 @@ -799,7 +807,7 @@ func (r *templateRouter) dynamicallyAddRoute(backendKey ServiceAliasConfigKey, r err := r.dynamicConfigManager.AddRoute(backendKey, backend.RoutingKeyName, route) if err != nil { - log.V(4).Info("router will reload as the ConfigManager could not dynamically add route for backend", "backendKey", backendKey, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically add route for backend", "backendKey", backendKey, "error", err) return false } @@ -817,8 +825,8 @@ func (r *templateRouter) dynamicallyAddRoute(backendKey ServiceAliasConfigKey, r if !ok { weight = 0 } - if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, oldEndpoints, newEndpoints, weight); err != nil { - log.V(4).Info("router will reload as the ConfigManager could not dynamically replace endpoints for route backend", + if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, &service, oldEndpoints, newEndpoints, weight); err != nil { + log.Info("router will reload as the ConfigManager could not dynamically replace endpoints for route backend", "backendKey", backendKey, "serviceKey", key, "error", err) return false } @@ -844,7 +852,7 @@ func (r *templateRouter) dynamicallyRemoveRoute(backendKey ServiceAliasConfigKey log.V(4).Info("dynamically removing route backend", "backendKey", backendKey) if err := r.dynamicConfigManager.RemoveRoute(backendKey, route); err != nil { - log.V(4).Info("router will reload as the ConfigManager could not dynamically remove route backend", "backendKey", backendKey, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically remove route backend", "backendKey", backendKey, "error", err) return false } @@ -855,7 +863,7 @@ func (r *templateRouter) dynamicallyRemoveRoute(backendKey ServiceAliasConfigKey // on all the routes associated with a given service. // Note: The config should have been synced at least once initially and // the caller needs to acquire a lock [and release it]. -func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service ServiceUnit, oldEndpoints []Endpoint) bool { +func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service ServiceUnit) bool { if r.dynamicConfigManager == nil || !r.synced { return false } @@ -877,7 +885,11 @@ func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service return false } + // Synchronizing EndpointTable only after backing up the former state, which is the current state in the proxy. + // These old (current haproxy state) and new (current cluster state) are used to calculate added and removed endpoints. + oldEndpoints := cfg.EndpointTable[id] newEndpoints := endpointsForAlias(cfg, service) + cfg.EndpointTable[id] = newEndpoints // If a service is idled, createRouterEndpoints returns a slice // containing 1 endpoint (namely the idled service's ClusterIP @@ -921,9 +933,9 @@ func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service } log.V(4).Info("dynamically replacing endpoints for associated backend", "backendKey", backendKey, "newEndpoints", newEndpoints) - if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, oldEndpoints, newEndpoints, weight); err != nil { + if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, &service, oldEndpoints, newEndpoints, weight); err != nil { // Error dynamically modifying the config, so return false to cause a reload to happen. - log.V(4).Info("router will reload as the ConfigManager could not dynamically replace endpoints for service", "service", id, "backendKey", backendKey, "weight", weight, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically replace endpoints for service", "service", id, "backendKey", backendKey, "weight", weight, "error", err) return false } } @@ -950,7 +962,7 @@ func (r *templateRouter) dynamicallyRemoveEndpoints(service ServiceUnit, endpoin log.V(4).Info("dynamically removing endpoints for associated backend", "backendKey", backendKey) if err := r.dynamicConfigManager.RemoveRouteEndpoints(backendKey, endpoints); err != nil { // Error dynamically modifying the config, so return false to cause a reload to happen. - log.V(4).Info("router will reload as the ConfigManager could not dynamically remove endpoints for backend", "backendKey", backendKey, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically remove endpoints for backend", "backendKey", backendKey, "error", err) return false } } @@ -1033,6 +1045,7 @@ func (r *templateRouter) createServiceAliasConfig(route *routev1.Route, backendK IsWildcard: wildcard, Annotations: route.Annotations, ServiceUnits: serviceUnits, + EndpointTable: make(map[ServiceUnitKey][]Endpoint), PrimaryServiceUnitKey: primaryServiceUnitKey, ActiveServiceUnits: activeServiceUnits, HTTPResponseHeaders: httpResponseHeadersList, @@ -1232,12 +1245,10 @@ func (r *templateRouter) AddEndpoints(id ServiceUnitKey, endpoints []Endpoint) { return } - oldEndpoints := frontend.EndpointTable - frontend.EndpointTable = endpoints r.serviceUnits[id] = frontend - configChanged := r.dynamicallyReplaceEndpoints(id, frontend, oldEndpoints) + configChanged := r.dynamicallyReplaceEndpoints(id, frontend) if len(frontend.ServiceAliasAssociations) > 0 { r.stateChanged = true } @@ -1253,25 +1264,6 @@ func (r *templateRouter) cleanUpServiceAliasConfig(cfg *ServiceAliasConfig) { } } -func cmpStrSlices(first []string, second []string) bool { - if len(first) != len(second) { - return false - } - for _, fi := range first { - found := false - for _, si := range second { - if fi == si { - found = true - break - } - } - if !found { - return false - } - } - return true -} - // shouldWriteCerts determines if the router should ask the cert manager to write out certificates // it will return true if a route is edge or reencrypt and it has all the required (host/key) certificates // defined. If the route does not have the certificates defined it will log an info message if the diff --git a/pkg/router/template/template_helper.go b/pkg/router/template/template_helper.go index 754f6d94c..035c0493a 100644 --- a/pkg/router/template/template_helper.go +++ b/pkg/router/template/template_helper.go @@ -67,6 +67,10 @@ func matchString(pattern string, s string) (bool, error) { return re.MatchString(s), nil } +func FirstMatch(pattern string, values ...string) string { + return firstMatch(pattern, values...) +} + func firstMatch(pattern string, values ...string) string { log.V(7).Info("firstMatch called", "pattern", pattern, "values", values) if re, err := cachedRegexpCompile(`\A(?:` + pattern + `)\z`); err == nil { diff --git a/pkg/router/template/types.go b/pkg/router/template/types.go index 8d6f9eb16..292d4d29a 100644 --- a/pkg/router/template/types.go +++ b/pkg/router/template/types.go @@ -75,6 +75,10 @@ type ServiceAliasConfig struct { // ActiveEndpoints is a count of the route endpoints that are part of a service unit with a non-zero weight ActiveEndpoints int + // EndpointTable are endpoints that back the service in this ServiceAliasConfig: non matching ports from + // route configuration is filtered out. This translates into a final backend implementation for routers. + EndpointTable map[ServiceUnitKey][]Endpoint + // HTTPResponseHeaders has route-specific custom HTTP response headers. HTTPResponseHeaders []HTTPHeader @@ -174,15 +178,20 @@ type ConfigManagerOptions struct { // router.openshift.io/pool-size BlueprintRoutePoolSize int - // MaxDynamicServers is the maximum number of dynamic servers we - // will allocate on a per-route basis. - MaxDynamicServers int - // WildcardRoutesAllowed indicates if wildcard routes are allowed. WildcardRoutesAllowed bool // ExtendedValidation indicates if extended route validation is enabled. ExtendedValidation bool + + // WorkingDir is the router's working directory containing configuration + // files, certificates, and other router-managed resources. + WorkingDir string + + // DefaultDestinationCA is the path to the default CA certificate file used + // to verify backend server certificates for re-encrypt routes when no + // route-specific destination CA is configured. + DefaultDestinationCA string } // ConfigManager is used by the router to make configuration changes using @@ -204,7 +213,7 @@ type ConfigManager interface { RemoveBlueprint(route *routev1.Route) // Register registers an id to be associated with a route. - Register(id ServiceAliasConfigKey, route *routev1.Route) + Register(id ServiceAliasConfigKey, backend *ServiceAliasConfig, route *routev1.Route) // AddRoute adds a new route or updates an existing route. AddRoute(id ServiceAliasConfigKey, routingKey string, route *routev1.Route) error @@ -214,7 +223,7 @@ type ConfigManager interface { // ReplaceRouteEndpoints replaces a subset (the ones associated with // a single service unit) of a route endpoints. - ReplaceRouteEndpoints(id ServiceAliasConfigKey, oldEndpoints, newEndpoints []Endpoint, weight int32) error + ReplaceRouteEndpoints(id ServiceAliasConfigKey, svc *ServiceUnit, oldEndpoints, newEndpoints []Endpoint, weight int32) error // RemoveRouteEndpoints removes a set of endpoints from a route. RemoveRouteEndpoints(id ServiceAliasConfigKey, endpoints []Endpoint) error @@ -224,15 +233,6 @@ type ConfigManager interface { // which indicates whether or not the configuration manager should // reset all the dynamically applied changes it is keeping track of. Notify(event RouterEventType) - - // ServerTemplateName returns the dynamic server template name. - ServerTemplateName(id ServiceAliasConfigKey) string - - // ServerTemplateSize returns the dynamic server template size. - ServerTemplateSize(id ServiceAliasConfigKey) string - - // GenerateDynamicServerNames generates the dynamic server names. - GenerateDynamicServerNames(id ServiceAliasConfigKey) []string } // CaptureHTTPHeader specifies an HTTP header that should be captured for access From 80b8c12c78bb7fff892460bce9b7c6347c6028ed Mon Sep 17 00:00:00 2001 From: Joao Morais Date: Tue, 28 Apr 2026 11:12:42 -0300 Subject: [PATCH 2/2] handle comments --- .../template/configmanager/haproxy/backend.go | 38 ++++++++++------ .../configmanager/haproxy/backend_test.go | 43 ++++++++----------- .../template/configmanager/haproxy/manager.go | 6 +-- 3 files changed, 48 insertions(+), 39 deletions(-) diff --git a/pkg/router/template/configmanager/haproxy/backend.go b/pkg/router/template/configmanager/haproxy/backend.go index 50773ce9c..48ff79a04 100644 --- a/pkg/router/template/configmanager/haproxy/backend.go +++ b/pkg/router/template/configmanager/haproxy/backend.go @@ -303,6 +303,9 @@ func (b *Backend) FindServer(id string) (*backendServer, error) { return nil, fmt.Errorf("no server found for id: %s", id) } +// AddServer dynamically adds a new backend server. It detects if the server already exists, and if so tries to remove it. +// It returns a failure in case HAProxy refuses to dynamically add the server for any reason, or if the existing server +// cannot be removed, e.g., it still have active or steady and established connection(s) to its backend server endpoint. func (b *Backend) AddServer(cfg *templaterouter.ServiceAliasConfig, svc *templaterouter.ServiceUnit, ep templaterouter.Endpoint, weight int32, workingDir, defaultDestinationCA string) error { if err := b.innerAddServer(cfg, svc, ep, weight, workingDir, defaultDestinationCA); err != nil { if !strings.Contains(err.Error(), "Already exists a server ") { @@ -327,31 +330,41 @@ func (b *Backend) AddServer(cfg *templaterouter.ServiceAliasConfig, svc *templat return nil } +// UpdateServer dynamically updates the backend server with new address and weight. func (b *Backend) UpdateServer(ep templaterouter.Endpoint, weight int32, isPassthrough bool) error { // missing to properly populate the current servers when created, should be done in the next phase. // After that we can update only changed attributes. + // https://redhat.atlassian.net/browse/NE-2646 if err := b.innerUpdateServerAddr(ep); err != nil { return err } return b.innerUpdateServerWeight(ep, weight, isPassthrough) } +// EnableHealthCheck dynamically enables health check on a backend server that already declares the health check interval. func (b *Backend) EnableHealthCheck(ep templaterouter.Endpoint) error { return b.innerSetHealthCheck(ep, true) } +// DisableHealthCheck dynamically disables health check on a backend server. func (b *Backend) DisableHealthCheck(ep templaterouter.Endpoint) error { return b.innerSetHealthCheck(ep, false) } -func (b *Backend) DeleteServer(ep templaterouter.Endpoint) error { +// DeleteServer dynamically removes the backend server from the load balance. The backend server is put in maintenance mode +// and returns `removed` as false in case it has active or steady and established connections, so these connections continue +// to be handled and new ones are directed to other servers. An error only happens if the server cannot be put in maintenance +// mode, any failure trying to remove the server is logged and just return removed as false. +func (b *Backend) DeleteServer(ep templaterouter.Endpoint) (removed bool, err error) { + // put in maintenance mode first, this is a pre-requisite to remove a backend server. if err := b.innerSetServerState(ep, false, 0); err != nil { - return err + return false, err } if err := b.innerDeleteServer(ep); err != nil { log.Info("disabling backend server instead of deleting due to a delete failure", "server", ep.ID, "error", err.Error()) + return false, nil } - return nil + return true, nil } func (b *Backend) innerAddServer(cfg *templaterouter.ServiceAliasConfig, svc *templaterouter.ServiceUnit, ep templaterouter.Endpoint, weight int32, workingDir, defaultDestinationCA string) error { @@ -360,6 +373,8 @@ func (b *Backend) innerAddServer(cfg *templaterouter.ServiceAliasConfig, svc *te // TODO: either read this configuration from the template, or instead make the template read from here. // For the former, note that creating a new template definition should conflict with the for-loop in templateRouter.writeConfig() // that assumes that all the definitions should be written to disk. + // + // https://redhat.atlassian.net/browse/NE-2646 cmd := fmt.Sprintf("add server %s/%s %s:%s weight %d", b.name, ep.ID, ep.IP, ep.Port, weight) @@ -387,14 +402,13 @@ func (b *Backend) innerAddServer(cfg *templaterouter.ServiceAliasConfig, svc *te // passthrough is a TCP listener and does not use ssl or proto related config } - if !ep.NoHealthCheck { - // health check is always configured, being enabled depending on `cfg.ActiveEndpoints > 1 ` - inter := templaterouter.FirstMatch(`[1-9][0-9]*(us|ms|s|m|h|d)?`, - cfg.Annotations["router.openshift.io/haproxy.health.check.interval"], - os.Getenv("ROUTER_BACKEND_CHECK_INTERVAL"), - "5000ms") - cmd += " check inter " + inter - } + // health check is always configured and defaults as disabled, being enabled later + // on DCM's manager depending on `cfg.ActiveEndpoints > 1` and `!ep.NoHealthCheck`. + inter := templaterouter.FirstMatch(`[1-9][0-9]*(us|ms|s|m|h|d)?`, + cfg.Annotations["router.openshift.io/haproxy.health.check.interval"], + os.Getenv("ROUTER_BACKEND_CHECK_INTERVAL"), + "5000ms") + cmd += " check inter " + inter podMaxConn := cfg.Annotations["haproxy.router.openshift.io/pod-concurrent-connections"] if _, err := strconv.Atoi(podMaxConn); err == nil { @@ -568,7 +582,7 @@ func execCommand(client HAProxyClient, api apiType, cmd string) error { case apiSetServerAddr: valid = response == "nothing changed" || strings.HasPrefix(response, "IP changed from ") || strings.HasPrefix(response, "port changed from ") || strings.HasPrefix(response, "no need to change ") case apiSetHealth, apiSetServerWeight, apiSetServerState: - valid = response == "" + valid = false // any response from these api calls mean there is a failure default: // fail fast in case of a dev error panic(fmt.Errorf("invalid cmd ID: %d", api)) diff --git a/pkg/router/template/configmanager/haproxy/backend_test.go b/pkg/router/template/configmanager/haproxy/backend_test.go index 606f6b3fa..32c77b221 100644 --- a/pkg/router/template/configmanager/haproxy/backend_test.go +++ b/pkg/router/template/configmanager/haproxy/backend_test.go @@ -5,6 +5,7 @@ import ( routev1 "github.com/openshift/api/route/v1" templaterouter "github.com/openshift/router/pkg/router/template" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/utils/ptr" ) @@ -35,10 +36,10 @@ func TestBackendDynamicUpdate(t *testing.T) { certificates map[string]templaterouter.Certificate annotations map[string]string envvars []string - noHealthCheck bool defaultCA string cmdCustomResp []string // 1:1 to `cmdExpected`, trailing empty items can be omited. errExpected string + removedExpected bool cmdExpected []string }{ // @@ -50,14 +51,6 @@ func TestBackendDynamicUpdate(t *testing.T) { "set server route1/server1 state ready", }, }, - "should add insecure server without health check": { - cmd: cmdAdd, - noHealthCheck: true, - cmdExpected: []string{ - "add server route1/server1 10.0.1.11:9000 weight 1", - "set server route1/server1 state ready", - }, - }, "should add insecure server with zero weight": { cmd: cmdAdd, weight: ptr.To[int32](0), @@ -166,22 +159,20 @@ func TestBackendDynamicUpdate(t *testing.T) { }, "should retry adding insecure server": { cmd: cmdAdd, - noHealthCheck: true, cmdCustomResp: []string{"Already exists a server ..."}, cmdExpected: []string{ - "add server route1/server1 10.0.1.11:9000 weight 1", + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", "del server route1/server1", - "add server route1/server1 10.0.1.11:9000 weight 1", + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", "set server route1/server1 state ready", }, }, "should fail if failing to add insecure server": { cmd: cmdAdd, - noHealthCheck: true, cmdCustomResp: []string{"Some unknown adding error."}, errExpected: "unexpected response from haproxy: Some unknown adding error.", cmdExpected: []string{ - "add server route1/server1 10.0.1.11:9000 weight 1", + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", }, }, @@ -252,11 +243,13 @@ func TestBackendDynamicUpdate(t *testing.T) { "set server route1/server1 state maint", "del server route1/server1", }, + removedExpected: true, }, "should fail to delete if failing to disable server": { - cmd: cmdDel, - cmdCustomResp: []string{"Some unknown set server error."}, - errExpected: "unexpected response from haproxy: Some unknown set server error.", + cmd: cmdDel, + cmdCustomResp: []string{"Some unknown set server error."}, + errExpected: "unexpected response from haproxy: Some unknown set server error.", + removedExpected: false, cmdExpected: []string{ "set server route1/server1 state maint", }, @@ -267,6 +260,7 @@ func TestBackendDynamicUpdate(t *testing.T) { "", // first cmd "Some unknown del server error.", // second cmd }, + removedExpected: false, cmdExpected: []string{ "set server route1/server1 state maint", "del server route1/server1", @@ -294,23 +288,23 @@ func TestBackendDynamicUpdate(t *testing.T) { Hostname: test.serviceHostname, } ep := templaterouter.Endpoint{ - ID: endpointID, - IP: ip, - Port: port, - AppProtocol: test.appProtocol, - NoHealthCheck: test.noHealthCheck, + ID: endpointID, + IP: ip, + Port: port, + AppProtocol: test.appProtocol, } isPassthrough := test.tlsTermination == routev1.TLSTerminationPassthrough client := &fakeClient{cmdCustomResp: test.cmdCustomResp} b := newBackend(backendName, client) + var removed bool var err error switch test.cmd { case cmdAdd: err = b.AddServer(cfg, svc, ep, weight, workingDir, test.defaultCA) case cmdDel: - err = b.DeleteServer(ep) + removed, err = b.DeleteServer(ep) case cmdUpdate: err = b.UpdateServer(ep, weight, isPassthrough) case cmdEnableHealth: @@ -326,7 +320,8 @@ func TestBackendDynamicUpdate(t *testing.T) { } else { require.NoError(t, err) } - require.Equal(t, test.cmdExpected, client.executedCmds) + assert.Equal(t, test.removedExpected, removed) + assert.Equal(t, test.cmdExpected, client.executedCmds) }) } diff --git a/pkg/router/template/configmanager/haproxy/manager.go b/pkg/router/template/configmanager/haproxy/manager.go index 4c0359c43..d812c5bba 100644 --- a/pkg/router/template/configmanager/haproxy/manager.go +++ b/pkg/router/template/configmanager/haproxy/manager.go @@ -408,7 +408,7 @@ func (cm *haproxyConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfig return err } for _, server := range servers { - if err := backend.DeleteServer(templaterouter.Endpoint{ID: server.Name}); err != nil { + if _, err := backend.DeleteServer(templaterouter.Endpoint{ID: server.Name}); err != nil { return err } } @@ -488,7 +488,7 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA var errs []error for name, ep := range deletedEndpoints { - if err := backend.DeleteServer(ep); err != nil { + if _, err := backend.DeleteServer(ep); err != nil { errs = append(errs, fmt.Errorf("error deleting backend server %s: %w", name, err)) } } @@ -565,7 +565,7 @@ func (cm *haproxyConfigManager) RemoveRouteEndpoints(id templaterouter.ServiceAl var errs []error for _, ep := range endpoints { log.V(4).Info("deleting server for endpoint", "endpoint", ep.ID) - if err := backend.DeleteServer(ep); err != nil { + if _, err := backend.DeleteServer(ep); err != nil { errs = append(errs, fmt.Errorf("error deleting server %s: %w", ep.ID, err)) } }