diff --git a/go.mod b/go.mod index a0cb99be1..ae5d064c3 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( k8s.io/apiserver v0.35.0 k8s.io/client-go v0.35.0 k8s.io/klog/v2 v2.130.1 + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 ) require ( @@ -110,7 +111,6 @@ require ( k8s.io/component-base v0.35.0 // indirect k8s.io/kms v0.35.0 // indirect k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect - k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/randfill v1.0.0 // indirect diff --git a/images/router/haproxy/conf/haproxy-config.template b/images/router/haproxy/conf/haproxy-config.template index 4d418fc37..9388d435c 100644 --- a/images/router/haproxy/conf/haproxy-config.template +++ b/images/router/haproxy/conf/haproxy-config.template @@ -744,11 +744,16 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} {{- end }} {{- end }} + {{- if $dynamicConfigManager }} + dynamic-cookie-key {{ $cfg.RoutingKeyName }} + {{- end }} {{- range $serviceUnitName, $weight := $cfg.ServiceUnitNames }} {{- if ge $weight 0 }}{{/* weight=0 is reasonable to keep existing connections to backends with cookies as we can see the HTTP headers */}} {{- with $serviceUnit := index $.ServiceUnits $serviceUnitName }} {{- range $idx, $endpoint := processEndpointsForAlias $cfg $serviceUnit (env "ROUTER_BACKEND_PROCESS_ENDPOINTS" "") }} - server {{ $endpoint.ID }} {{ $endpoint.IP }}:{{ $endpoint.Port }} cookie {{ $endpoint.IdHash }} weight {{ $weight }} + {{- /* This should always follow backend.go/innerAddServer() method, changes here should be reflected there. */}} + {{- /* TODO: either move this configuration to the Go counterpart, or read it from here instead */}} + server {{ $endpoint.ID }} {{ $endpoint.IP }}:{{ $endpoint.Port }}{{ if not $dynamicConfigManager }} cookie {{ $endpoint.IdHash }}{{ end }} weight {{ $weight }} {{- if (eq $cfg.TLSTermination "reencrypt") }} ssl {{- if not (isTrue $router_disable_http2) }} alpn h2,http/1.1 {{- end }} @@ -776,43 +781,6 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} {{- end }}{{/* end get serviceUnit from its name */}} {{- end }}{{/* end range over serviceUnitNames */}} - {{- with $dynamicConfigManager }} - {{- if (eq $cfg.TLSTermination "reencrypt") }} - dynamic-cookie-key {{ $cfg.RoutingKeyName }} - {{- range $idx, $serverName := $dynamicConfigManager.GenerateDynamicServerNames $cfgIdx }} - server {{ $serverName }} 172.4.0.4:8765 weight 0 ssl disabled check inter {{ $health_check_interval }} - {{- if gt (len (index $cfg.Certificates (printf "%s_pod" $cfg.Host)).Contents) 0 }} verify required ca-file {{ $workingDir }}/router/cacerts/{{$cfgIdx }}.pem - {{- else }} - {{- if not (isTrue $router_disable_http2) }} alpn h2,http/1.1 - {{- end }} - {{- /* - Add "verifyhost" for the primary service only. This server is not intended - to be used for alternate backend endpoints. A reload will be triggered if - an alternate backend endpoint is added dynamically. - */ -}} - {{- with $serviceUnit := index $.ServiceUnits $cfg.PrimaryServiceUnitKey -}} - {{- if $cfg.VerifyServiceHostname }} verifyhost {{ $serviceUnit.Hostname }} - {{- end }} - {{- end }} - {{- if gt (len $defaultDestinationCA) 0 }} verify required ca-file {{ $defaultDestinationCA }} - {{- else }} verify none - {{- end }} - {{- end }} - {{- with $podMaxConn := index $cfg.Annotations "haproxy.router.openshift.io/pod-concurrent-connections" }} - {{- if (isInteger (index $cfg.Annotations "haproxy.router.openshift.io/pod-concurrent-connections")) }} maxconn {{$podMaxConn }} {{- end }} - {{- end }}{{/* end pod-concurrent-connections annotation */}} - {{- end }}{{/* end range over dynamic server names */}} - - {{- else }} - {{- with $name := $dynamicConfigManager.ServerTemplateName $cfgIdx }} - {{- with $size := $dynamicConfigManager.ServerTemplateSize $cfgIdx }} - dynamic-cookie-key {{ $cfg.RoutingKeyName }} - server-template {{ $name }}- 1-{{ $size }} 172.4.0.4:8765 check inter {{ $health_check_interval }} disabled - {{- end }} - {{- end }} - {{- end }} - {{- end }} - {{- end }}{{/* end if tls==edge/none/reencrypt */}} {{- if eq $cfg.TLSTermination "passthrough" }} @@ -856,10 +824,16 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} hash-type consistent timeout check 5000ms + + {{- if $dynamicConfigManager }} + dynamic-cookie-key {{ $cfg.RoutingKeyName }} + {{- end }} {{- range $serviceUnitName, $weight := $cfg.ServiceUnitNames }} {{- if ne $weight 0 }}{{/* drop connections where weight=0 as we can't use cookies, leaving only r-r and src-ip as dispatch methods and weight make no sense there */}} {{- with $serviceUnit := index $.ServiceUnits $serviceUnitName }} {{- range $idx, $endpoint := processEndpointsForAlias $cfg $serviceUnit (env "ROUTER_BACKEND_PROCESS_ENDPOINTS" "") }} + {{- /* This should always follow backend.go/innerAddServer() method, changes here should be reflected there. */}} + {{- /* TODO: either move this configuration to the Go counterpart, or read it from here instead */}} server {{ $endpoint.ID }} {{ $endpoint.IP }}:{{ $endpoint.Port }} weight {{ $weight }} {{- if and (not $endpoint.NoHealthCheck) (gt $cfg.ActiveEndpoints 1) }} check inter {{ $health_check_interval }} {{- end }}{{/* end else no health check */}} @@ -872,15 +846,6 @@ backend {{ genBackendNamePrefix $cfg.TLSTermination }}:{{ $cfgIdx }} {{- end }}{{/* end if weight != 0 */}} {{- end }}{{/* end iterate over services*/}} - {{- with $dynamicConfigManager }} - {{- with $name := $dynamicConfigManager.ServerTemplateName $cfgIdx }} - {{- with $size := $dynamicConfigManager.ServerTemplateSize $cfgIdx }} - dynamic-cookie-key {{ $cfg.RoutingKeyName }} - server-template {{ $name }}- 1-{{ $size }} 172.4.0.4:8765 check inter {{ $health_check_interval }} disabled - {{- end }} - {{- end }} - {{- end }} - {{- end }}{{/*end tls==passthrough*/}} {{- end }}{{/* end loop over routes */}} diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index 2505e43ca..25d7aa9ca 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -141,7 +141,6 @@ type TemplateRouterConfigManager struct { BlueprintRouteNamespace string BlueprintRouteLabelSelector string BlueprintRoutePoolSize int - MaxDynamicServers int } // isTrue here has the same logic as the function within package pkg/router/template @@ -182,13 +181,15 @@ func (o *TemplateRouter) Bind(flag *pflag.FlagSet) { flag.StringVar(&o.BlueprintRouteNamespace, "blueprint-route-namespace", env("ROUTER_BLUEPRINT_ROUTE_NAMESPACE", ""), "Specifies the namespace which contains the routes that serve as blueprints for the dynamic configuration manager.") flag.StringVar(&o.BlueprintRouteLabelSelector, "blueprint-route-labels", env("ROUTER_BLUEPRINT_ROUTE_LABELS", ""), "A label selector to apply to the routes in the blueprint route namespace. These selected routes will serve as blueprints for the dynamic dynamic configuration manager.") flag.IntVar(&o.BlueprintRoutePoolSize, "blueprint-route-pool-size", int(envInt("ROUTER_BLUEPRINT_ROUTE_POOL_SIZE", 10, 0)), "Specifies the size of the pre-allocated pool for each route blueprint managed by the router specific dynamic configuration manager. This can be overriden by an annotation router.openshift.io/pool-size on an individual route.") - flag.IntVar(&o.MaxDynamicServers, "max-dynamic-servers", int(envInt("ROUTER_MAX_DYNAMIC_SERVERS", 5, 1)), "Specifies the maximum number of dynamic servers added to a route for use by the router specific dynamic configuration manager.") flag.StringVar(&o.CaptureHTTPRequestHeadersString, "capture-http-request-headers", env("ROUTER_CAPTURE_HTTP_REQUEST_HEADERS", ""), "A comma-delimited list of HTTP request header names and maximum header value lengths that should be captured for logging. Each item must have the following form: name:maxLength") flag.StringVar(&o.CaptureHTTPResponseHeadersString, "capture-http-response-headers", env("ROUTER_CAPTURE_HTTP_RESPONSE_HEADERS", ""), "A comma-delimited list of HTTP response header names and maximum header value lengths that should be captured for logging. Each item must have the following form: name:maxLength") flag.StringVar(&o.CaptureHTTPCookieString, "capture-http-cookie", env("ROUTER_CAPTURE_HTTP_COOKIE", ""), "Name and maximum length of HTTP cookie that should be captured for logging. The argument must have the following form: name:maxLength. Append '=' to the name to indicate that an exact match should be performed; otherwise a prefix match will be performed. The value of first cookie that matches the name is captured.") flag.StringVar(&o.HTTPHeaderNameCaseAdjustmentsString, "http-header-name-case-adjustments", env("ROUTER_H1_CASE_ADJUST", ""), "A comma-delimited list of HTTP header names that should have their case adjusted. Each item must be a valid HTTP header name and should have the desired capitalization.") flag.StringVar(&o.HTTPResponseHeadersString, "set-delete-http-response-header", env("ROUTER_HTTP_RESPONSE_HEADERS", ""), "A comma-delimited list of HTTP response header names and values that should be set/deleted.") flag.StringVar(&o.HTTPRequestHeadersString, "set-delete-http-request-header", env("ROUTER_HTTP_REQUEST_HEADERS", ""), "A comma-delimited list of HTTP request header names and values that should be set/deleted.") + + // deprecated flags + _ = flag.Int("max-dynamic-servers", int(envInt("ROUTER_MAX_DYNAMIC_SERVERS", 5, 1)), "Specifies the maximum number of dynamic servers added to a route for use by the router specific dynamic configuration manager. DEPRECATED: router now created backend servers dynamically.") } type RouterStats struct { @@ -740,9 +741,10 @@ func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error { CommitInterval: o.CommitInterval, BlueprintRoutes: blueprintRoutes, BlueprintRoutePoolSize: o.BlueprintRoutePoolSize, - MaxDynamicServers: o.MaxDynamicServers, WildcardRoutesAllowed: o.AllowWildcardRoutes, ExtendedValidation: o.ExtendedValidation, + WorkingDir: o.WorkingDir, + DefaultDestinationCA: o.DefaultDestinationCAPath, } cfgManager = haproxyconfigmanager.NewHAProxyConfigManager(cmopts) if len(o.BlueprintRouteNamespace) > 0 { diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index d24727e6a..23f20cb6a 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -175,8 +175,7 @@ u3YLAbyW/lHhOCiZu2iAI8AbmXem9lW6Tr7p/97s0w== Action: "Set", }}, DynamicConfigManager: haproxyconfigmanager.NewHAProxyConfigManager(templateplugin.ConfigManagerOptions{ - ConnectionInfo: "unix:///var/lib/dymmy", - MaxDynamicServers: 1, + ConnectionInfo: "unix:///var/lib/dummy", }), } plugin, err = templateplugin.NewTemplatePlugin(pluginCfg, svcFetcher) @@ -749,62 +748,6 @@ func TestConfigTemplate(t *testing.T) { }, }, }, - "Verifyhost for dynamic slot": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "serviceq", - serviceName: "serviceq", - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "q", - host: "qexample.com", - targetServiceName: "serviceq", - weight: int32(100), - time: start, - tlsTermination: routev1.TLSTerminationReencrypt, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: reencryptBackendName(h.namespace, "q"), - attribute: "server", - value: "_dynamic-pod-1 172.4.0.4:8765 weight 0 ssl disabled check inter 5000ms alpn h2,http/1.1 verifyhost serviceq.default.svc verify required ca-file dummy", - fullMatch: true, - }, - }, - }, - "Verifyhost for dynamic slot with alternate backend": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "serviceq1", - serviceName: "serviceq1", - }, - { - name: "serviceq2", - serviceName: "serviceq2", - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "q1", - host: "q1example.com", - targetServiceName: "serviceq1", - weight: int32(50), - alternateBackend: "serviceq2", - alternateBackendWeight: int32(50), - time: start, - tlsTermination: routev1.TLSTerminationReencrypt, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: reencryptBackendName(h.namespace, "q1"), - attribute: "server", - value: "_dynamic-pod-1 172.4.0.4:8765 weight 0 ssl disabled check inter 5000ms alpn h2,http/1.1 verifyhost serviceq1.default.svc verify required ca-file dummy", - fullMatch: true, - }, - }, - }, "valid route health check interval annotation": { mustCreateWithConfig{ mustCreateEndpointSlices: []mustCreateEndpointSlice{ @@ -931,57 +874,6 @@ func TestConfigTemplate(t *testing.T) { }, }, }, - "server-template with default health check interval": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "servicest2", - serviceName: "servicest2", - addresses: []string{"1.1.1.1"}, - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "st2", - host: "st2example.com", - targetServiceName: "servicest2", - weight: 1, - time: start, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: insecureBackendName(h.namespace, "st2"), - attribute: "server-template", - value: "inter 5000ms", // Default value - }, - }, - }, - "server-template with custom health check interval": { - mustCreateWithConfig{ - mustCreateEndpointSlices: []mustCreateEndpointSlice{ - { - name: "servicest1", - serviceName: "servicest1", - addresses: []string{"1.1.1.1"}, // Single endpoint initially - }, - }, - mustCreateRoute: mustCreateRoute{ - name: "st1", - host: "st1example.com", - targetServiceName: "servicest1", - weight: 1, - time: start, - annotations: map[string]string{ - "router.openshift.io/haproxy.health.check.interval": "15s", - }, - }, - mustMatchConfig: mustMatchConfig{ - section: "backend", - sectionName: insecureBackendName(h.namespace, "st1"), - attribute: "server-template", - value: "inter 15s", - }, - }, - }, } defer cleanUpRoutes(t) diff --git a/pkg/router/template/configmanager/haproxy/backend.go b/pkg/router/template/configmanager/haproxy/backend.go index e23857afc..48ff79a04 100644 --- a/pkg/router/template/configmanager/haproxy/backend.go +++ b/pkg/router/template/configmanager/haproxy/backend.go @@ -4,9 +4,12 @@ import ( "bytes" "errors" "fmt" + "os" + "path" "strconv" "strings" + v1 "github.com/openshift/api/route/v1" templaterouter "github.com/openshift/router/pkg/router/template" ) @@ -75,11 +78,9 @@ type serverStateInfo struct { // BackendServerInfo represents a server [endpoint] for a haproxy backend. type BackendServerInfo struct { Name string - FQDN string IPAddress string Port int CurrentWeight int32 - InitialWeight int32 } // Backend represents a specific haproxy backend. @@ -87,7 +88,7 @@ type Backend struct { name templaterouter.ServiceAliasConfigKey servers map[string]*backendServer - client *Client + client HAProxyClient } // backendServer is internally used for managing a haproxy backend server. @@ -118,7 +119,7 @@ func buildHAProxyBackends(c *Client) ([]*Backend, error) { } // newBackend returns a new Backend representing a haproxy backend. -func newBackend(name templaterouter.ServiceAliasConfigKey, c *Client) *Backend { +func newBackend(name templaterouter.ServiceAliasConfigKey, c HAProxyClient) *Backend { return &Backend{ name: name, servers: make(map[string]*backendServer), @@ -152,9 +153,7 @@ func (b *Backend) Refresh() error { Name: v.Name, IPAddress: v.IPAddress, Port: v.Port, - FQDN: v.FQDN, CurrentWeight: v.UserVisibleWeight, - InitialWeight: v.InitialWeight, } b.servers[v.Name] = newBackendServer(info) @@ -304,6 +303,162 @@ func (b *Backend) FindServer(id string) (*backendServer, error) { return nil, fmt.Errorf("no server found for id: %s", id) } +// AddServer dynamically adds a new backend server. It detects if the server already exists, and if so tries to remove it. +// It returns a failure in case HAProxy refuses to dynamically add the server for any reason, or if the existing server +// cannot be removed, e.g., it still have active or steady and established connection(s) to its backend server endpoint. +func (b *Backend) AddServer(cfg *templaterouter.ServiceAliasConfig, svc *templaterouter.ServiceUnit, ep templaterouter.Endpoint, weight int32, workingDir, defaultDestinationCA string) error { + if err := b.innerAddServer(cfg, svc, ep, weight, workingDir, defaultDestinationCA); err != nil { + if !strings.Contains(err.Error(), "Already exists a server ") { + return err + } + // Failed due to already existing server left behind, in maintenance mode, due to in-flight connections. + // Let's just give it another chance to be deleted. + if err := b.innerDeleteServer(ep); err != nil { + // No way, need to fail which will ask for a fork-and-reload. This will leave the existing connections in the old process. + return err + } + if err := b.innerAddServer(cfg, svc, ep, weight, workingDir, defaultDestinationCA); err != nil { + return err + } + } + if err := b.innerSetServerState(ep, true, weight); err != nil { + return err + } + + // health check is disabled by default on new backend servers, its enablement is handled via cm.ReplaceRouteEndpoints(), + // since that method has a better view of former and current active backend servers. + return nil +} + +// UpdateServer dynamically updates the backend server with new address and weight. +func (b *Backend) UpdateServer(ep templaterouter.Endpoint, weight int32, isPassthrough bool) error { + // missing to properly populate the current servers when created, should be done in the next phase. + // After that we can update only changed attributes. + // https://redhat.atlassian.net/browse/NE-2646 + if err := b.innerUpdateServerAddr(ep); err != nil { + return err + } + return b.innerUpdateServerWeight(ep, weight, isPassthrough) +} + +// EnableHealthCheck dynamically enables health check on a backend server that already declares the health check interval. +func (b *Backend) EnableHealthCheck(ep templaterouter.Endpoint) error { + return b.innerSetHealthCheck(ep, true) +} + +// DisableHealthCheck dynamically disables health check on a backend server. +func (b *Backend) DisableHealthCheck(ep templaterouter.Endpoint) error { + return b.innerSetHealthCheck(ep, false) +} + +// DeleteServer dynamically removes the backend server from the load balance. The backend server is put in maintenance mode +// and returns `removed` as false in case it has active or steady and established connections, so these connections continue +// to be handled and new ones are directed to other servers. An error only happens if the server cannot be put in maintenance +// mode, any failure trying to remove the server is logged and just return removed as false. +func (b *Backend) DeleteServer(ep templaterouter.Endpoint) (removed bool, err error) { + // put in maintenance mode first, this is a pre-requisite to remove a backend server. + if err := b.innerSetServerState(ep, false, 0); err != nil { + return false, err + } + if err := b.innerDeleteServer(ep); err != nil { + log.Info("disabling backend server instead of deleting due to a delete failure", "server", ep.ID, "error", err.Error()) + return false, nil + } + return true, nil +} + +func (b *Backend) innerAddServer(cfg *templaterouter.ServiceAliasConfig, svc *templaterouter.ServiceUnit, ep templaterouter.Endpoint, weight int32, workingDir, defaultDestinationCA string) error { + // This should always follow the template, changes here should be reflected there, both regular and passthrough backends + // + // TODO: either read this configuration from the template, or instead make the template read from here. + // For the former, note that creating a new template definition should conflict with the for-loop in templateRouter.writeConfig() + // that assumes that all the definitions should be written to disk. + // + // https://redhat.atlassian.net/browse/NE-2646 + + cmd := fmt.Sprintf("add server %s/%s %s:%s weight %d", b.name, ep.ID, ep.IP, ep.Port, weight) + + switch cfg.TLSTermination { + case v1.TLSTerminationReencrypt: + cmd += " ssl" + if disableHTTP2, _ := strconv.ParseBool(os.Getenv("ROUTER_DISABLE_HTTP2")); !disableHTTP2 { + cmd += " alpn h2,http/1.1" + } + if cfg.VerifyServiceHostname { + cmd += " verifyhost " + svc.Hostname + } + if cert := cfg.Certificates[cfg.Host+"_pod"]; len(cert.Contents) > 0 { + cmd += " verify required ca-file " + path.Join(workingDir, "router/cacerts", cert.ID+".pem") + } else if len(defaultDestinationCA) > 0 { + cmd += " verify required ca-file " + defaultDestinationCA + } else { + cmd += " verify none" + } + case "", v1.TLSTerminationEdge: + if ep.AppProtocol == "h2c" || ep.AppProtocol == "kubernetes.io/h2c" { + cmd += " proto h2" + } + case v1.TLSTerminationPassthrough: + // passthrough is a TCP listener and does not use ssl or proto related config + } + + // health check is always configured and defaults as disabled, being enabled later + // on DCM's manager depending on `cfg.ActiveEndpoints > 1` and `!ep.NoHealthCheck`. + inter := templaterouter.FirstMatch(`[1-9][0-9]*(us|ms|s|m|h|d)?`, + cfg.Annotations["router.openshift.io/haproxy.health.check.interval"], + os.Getenv("ROUTER_BACKEND_CHECK_INTERVAL"), + "5000ms") + cmd += " check inter " + inter + + podMaxConn := cfg.Annotations["haproxy.router.openshift.io/pod-concurrent-connections"] + if _, err := strconv.Atoi(podMaxConn); err == nil { + cmd += " maxconn " + podMaxConn + } + + return execCommand(b.client, apiAddServer, cmd) +} + +func (b *Backend) innerUpdateServerAddr(ep templaterouter.Endpoint) error { + cmd := fmt.Sprintf("set server %s/%s addr %s port %s", b.name, ep.ID, ep.IP, ep.Port) + return execCommand(b.client, apiSetServerAddr, cmd) +} + +func (b *Backend) innerUpdateServerWeight(ep templaterouter.Endpoint, weight int32, isPassthrough bool) error { + cmd := fmt.Sprintf("set server %s/%s", b.name, ep.ID) + if isPassthrough { + // https://github.com/openshift/router/blob/896390778ebe15f57f87e6ca78f11c96e64c2652/pkg/router/template/configmanager/haproxy/manager.go#L446-L454 + cmd += " weight 100%" + } else { + cmd = fmt.Sprintf("%s weight %d", cmd, weight) + } + return execCommand(b.client, apiSetServerWeight, cmd) +} + +func (b *Backend) innerSetHealthCheck(ep templaterouter.Endpoint, enable bool) error { + enableStr := "enable" + if !enable { + enableStr = "disable" + } + cmd := fmt.Sprintf("%s health %s/%s", enableStr, b.name, ep.ID) + return execCommand(b.client, apiSetHealth, cmd) +} + +func (b *Backend) innerSetServerState(ep templaterouter.Endpoint, ready bool, weight int32) error { + state := "ready" + if !ready { + state = "maint" + } else if weight <= 0 { + state = "drain" + } + cmd := fmt.Sprintf("set server %s/%s state %s", b.name, ep.ID, state) + return execCommand(b.client, apiSetServerState, cmd) +} + +func (b *Backend) innerDeleteServer(ep templaterouter.Endpoint) error { + cmd := fmt.Sprintf("del server %s/%s", b.name, ep.ID) + return execCommand(b.client, apiDelServer, cmd) +} + // newBackendServer returns a BackendServer representing a haproxy backend server. func newBackendServer(info BackendServerInfo) *backendServer { return &backendServer{ @@ -317,7 +472,7 @@ func newBackendServer(info BackendServerInfo) *backendServer { } // ApplyChanges applies all the local backend server changes. -func (s *backendServer) ApplyChanges(backendName templaterouter.ServiceAliasConfigKey, client *Client) error { +func (s *backendServer) ApplyChanges(backendName templaterouter.ServiceAliasConfigKey, client HAProxyClient) error { // Build the haproxy dynamic config API commands. commands := []string{} @@ -353,7 +508,7 @@ func (s *backendServer) ApplyChanges(backendName templaterouter.ServiceAliasConf } // executeCommand runs a server change command and handles the response. -func (s *backendServer) executeCommand(cmd string, client *Client) error { +func (s *backendServer) executeCommand(cmd string, client HAProxyClient) error { responseBytes, err := client.Execute(cmd) if err != nil { return err @@ -396,3 +551,45 @@ func stripVersionNumber(data []byte) ([]byte, error) { return data, nil } + +type apiType int + +const ( + apiAddServer apiType = iota + apiDelServer + apiSetHealth + apiSetServerAddr + apiSetServerWeight + apiSetServerState +) + +func execCommand(client HAProxyClient, api apiType, cmd string) error { + responseRaw, err := client.Execute(cmd) + if err != nil { + return err + } + response := strings.TrimSpace(string(responseRaw)) + if len(response) == 0 { + return nil + } + + var valid bool + switch api { + case apiAddServer: + valid = response == "New server registered." + case apiDelServer: + valid = response == "Server deleted." + case apiSetServerAddr: + valid = response == "nothing changed" || strings.HasPrefix(response, "IP changed from ") || strings.HasPrefix(response, "port changed from ") || strings.HasPrefix(response, "no need to change ") + case apiSetHealth, apiSetServerWeight, apiSetServerState: + valid = false // any response from these api calls mean there is a failure + default: + // fail fast in case of a dev error + panic(fmt.Errorf("invalid cmd ID: %d", api)) + } + + if !valid { + return fmt.Errorf("unexpected response from haproxy: %s", response) + } + return nil +} diff --git a/pkg/router/template/configmanager/haproxy/backend_test.go b/pkg/router/template/configmanager/haproxy/backend_test.go new file mode 100644 index 000000000..32c77b221 --- /dev/null +++ b/pkg/router/template/configmanager/haproxy/backend_test.go @@ -0,0 +1,351 @@ +package haproxy + +import ( + "testing" + + routev1 "github.com/openshift/api/route/v1" + templaterouter "github.com/openshift/router/pkg/router/template" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/utils/ptr" +) + +func TestBackendDynamicUpdate(t *testing.T) { + type cmd string + const ( + cmdAdd cmd = "add" + cmdDel cmd = "del" + cmdUpdate cmd = "update" + cmdEnableHealth cmd = "enable-health" + cmdDisableHealth cmd = "disable-health" + ) + + testCases := map[string]struct { + cmd cmd + backendName *templaterouter.ServiceAliasConfigKey // default: "route1" + endpointID *string // default: "server1" + ip *string // default: "10.0.1.11" + port *string // default: "9000" + weight *int32 // default: 1 + workingDir *string // default: "tmp" + publicHostname string + serviceHostname string + tlsTermination routev1.TLSTerminationType + verifyHostname bool + appProtocol string + certificates map[string]templaterouter.Certificate + annotations map[string]string + envvars []string + defaultCA string + cmdCustomResp []string // 1:1 to `cmdExpected`, trailing empty items can be omited. + errExpected string + removedExpected bool + cmdExpected []string + }{ + // + // adding + "should add insecure server": { + cmd: cmdAdd, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add insecure server with zero weight": { + cmd: cmdAdd, + weight: ptr.To[int32](0), + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 0 check inter 5000ms", + "set server route1/server1 state drain", + }, + }, + "should add insecure server with custom health check": { + cmd: cmdAdd, + annotations: map[string]string{ + "router.openshift.io/haproxy.health.check.interval": "10s", + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 10s", + "set server route1/server1 state ready", + }, + }, + "should add insecure server with custom invalid health check": { + cmd: cmdAdd, + annotations: map[string]string{ + "router.openshift.io/haproxy.health.check.interval": "1z", + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add insecure server with custom maxconn": { + cmd: cmdAdd, + annotations: map[string]string{ + "haproxy.router.openshift.io/pod-concurrent-connections": "100", + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms maxconn 100", + "set server route1/server1 state ready", + }, + }, + "should add passthrough server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationPassthrough, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add edge termination server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationEdge, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add edge termination h2 server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationEdge, + appProtocol: "h2c", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 proto h2 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verify none check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server with verify host": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + verifyHostname: true, + serviceHostname: "route1.default.svc", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verifyhost route1.default.svc verify none check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server with default ca": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + defaultCA: "/tmp/default-ca.pem", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verify required ca-file /tmp/default-ca.pem check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should add reencrypt termination server with custom certificate": { + cmd: cmdAdd, + tlsTermination: routev1.TLSTerminationReencrypt, + publicHostname: "route1.local", + certificates: map[string]templaterouter.Certificate{ + "route1.local_pod": { + ID: "default:route1", + Contents: "-----BEGIN CERTIFICATE-----\nzzz\n-----END CERTIFICATE-----", + PrivateKey: "-----BEGIN PRIVATE KEY-----\nzzz\n-----END PRIVATE KEY-----", + }, + }, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 ssl alpn h2,http/1.1 verify required ca-file /tmp/router/cacerts/default:route1.pem check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should retry adding insecure server": { + cmd: cmdAdd, + cmdCustomResp: []string{"Already exists a server ..."}, + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "del server route1/server1", + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + "set server route1/server1 state ready", + }, + }, + "should fail if failing to add insecure server": { + cmd: cmdAdd, + cmdCustomResp: []string{"Some unknown adding error."}, + errExpected: "unexpected response from haproxy: Some unknown adding error.", + cmdExpected: []string{ + "add server route1/server1 10.0.1.11:9000 weight 1 check inter 5000ms", + }, + }, + + // + // updating + "should update server": { + cmd: cmdUpdate, + weight: ptr.To[int32](10), + cmdExpected: []string{ + "set server route1/server1 addr 10.0.1.11 port 9000", + "set server route1/server1 weight 10", + }, + }, + "should update passthrough server with weight 100%": { + cmd: cmdUpdate, + tlsTermination: routev1.TLSTerminationPassthrough, + weight: ptr.To[int32](10), + cmdExpected: []string{ + "set server route1/server1 addr 10.0.1.11 port 9000", + "set server route1/server1 weight 100%", + }, + }, + "should fail if failing to update server": { + cmd: cmdUpdate, + cmdCustomResp: []string{"Some unknown updating error."}, + errExpected: "unexpected response from haproxy: Some unknown updating error.", + cmdExpected: []string{ + "set server route1/server1 addr 10.0.1.11 port 9000", + }, + }, + + // + // health check + "should enable health check": { + cmd: cmdEnableHealth, + cmdExpected: []string{ + "enable health route1/server1", + }, + }, + "should fail if failing to enable health check": { + cmd: cmdEnableHealth, + cmdCustomResp: []string{"Some unknown set health error."}, + errExpected: "unexpected response from haproxy: Some unknown set health error.", + cmdExpected: []string{ + "enable health route1/server1", + }, + }, + "should disable health check": { + cmd: cmdDisableHealth, + cmdExpected: []string{ + "disable health route1/server1", + }, + }, + "should fail if failing to disable health check": { + cmd: cmdDisableHealth, + cmdCustomResp: []string{"Some unknown set health error."}, + errExpected: "unexpected response from haproxy: Some unknown set health error.", + cmdExpected: []string{ + "disable health route1/server1", + }, + }, + + // + // Deleting + "should delete server": { + cmd: cmdDel, + cmdExpected: []string{ + "set server route1/server1 state maint", + "del server route1/server1", + }, + removedExpected: true, + }, + "should fail to delete if failing to disable server": { + cmd: cmdDel, + cmdCustomResp: []string{"Some unknown set server error."}, + errExpected: "unexpected response from haproxy: Some unknown set server error.", + removedExpected: false, + cmdExpected: []string{ + "set server route1/server1 state maint", + }, + }, + "should succeed delete if failing to remove and succeeding to disable": { + cmd: cmdDel, + cmdCustomResp: []string{ + "", // first cmd + "Some unknown del server error.", // second cmd + }, + removedExpected: false, + cmdExpected: []string{ + "set server route1/server1 state maint", + "del server route1/server1", + }, + }, + } + + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + backendName := ptr.Deref(test.backendName, "route1") + endpointID := ptr.Deref(test.endpointID, "server1") + ip := ptr.Deref(test.ip, "10.0.1.11") + port := ptr.Deref(test.port, "9000") + weight := ptr.Deref(test.weight, 1) + workingDir := ptr.Deref(test.workingDir, "/tmp") + + cfg := &templaterouter.ServiceAliasConfig{ + TLSTermination: test.tlsTermination, + VerifyServiceHostname: test.verifyHostname, + Host: test.publicHostname, + Certificates: test.certificates, + Annotations: test.annotations, + } + svc := &templaterouter.ServiceUnit{ + Hostname: test.serviceHostname, + } + ep := templaterouter.Endpoint{ + ID: endpointID, + IP: ip, + Port: port, + AppProtocol: test.appProtocol, + } + isPassthrough := test.tlsTermination == routev1.TLSTerminationPassthrough + client := &fakeClient{cmdCustomResp: test.cmdCustomResp} + + b := newBackend(backendName, client) + + var removed bool + var err error + switch test.cmd { + case cmdAdd: + err = b.AddServer(cfg, svc, ep, weight, workingDir, test.defaultCA) + case cmdDel: + removed, err = b.DeleteServer(ep) + case cmdUpdate: + err = b.UpdateServer(ep, weight, isPassthrough) + case cmdEnableHealth: + err = b.EnableHealthCheck(ep) + case cmdDisableHealth: + err = b.DisableHealthCheck(ep) + default: + t.Errorf("invalid cmd: %s", test.cmd) + } + + if test.errExpected != "" { + require.EqualError(t, err, test.errExpected) + } else { + require.NoError(t, err) + } + assert.Equal(t, test.removedExpected, removed) + assert.Equal(t, test.cmdExpected, client.executedCmds) + }) + } + +} + +type fakeClient struct { + cmdCustomResp []string + executedCmds []string + respCount int +} + +func (c *fakeClient) RunCommand(cmd string, _ Converter) ([]byte, error) { + return c.Execute(cmd) +} + +func (c *fakeClient) Execute(cmd string) ([]byte, error) { + c.executedCmds = append(c.executedCmds, cmd) + + var response string + if len(c.cmdCustomResp) > c.respCount { + response = c.cmdCustomResp[c.respCount] + "\n" + c.respCount++ + } + response += "\n" + + return []byte(response), nil +} diff --git a/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go b/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go index 4cd411a8b..537819012 100644 --- a/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go +++ b/pkg/router/template/configmanager/haproxy/blueprint_plugin_test.go @@ -41,7 +41,7 @@ func (cm *fakeConfigManager) FindBlueprint(id templaterouter.ServiceAliasConfigK return route, ok } -func (cm *fakeConfigManager) Register(id templaterouter.ServiceAliasConfigKey, route *routev1.Route) { +func (cm *fakeConfigManager) Register(id templaterouter.ServiceAliasConfigKey, backend *templaterouter.ServiceAliasConfig, route *routev1.Route) { } func (cm *fakeConfigManager) AddRoute(id templaterouter.ServiceAliasConfigKey, routingKey string, route *routev1.Route) error { @@ -52,7 +52,7 @@ func (cm *fakeConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfigKey return nil } -func (cm *fakeConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { +func (cm *fakeConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, svc *templaterouter.ServiceUnit, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { return nil } @@ -63,18 +63,6 @@ func (cm *fakeConfigManager) RemoveRouteEndpoints(id templaterouter.ServiceAlias func (cm *fakeConfigManager) Notify(event templaterouter.RouterEventType) { } -func (cm *fakeConfigManager) ServerTemplateName(id templaterouter.ServiceAliasConfigKey) string { - return "fakeConfigManager" -} - -func (cm *fakeConfigManager) ServerTemplateSize(id templaterouter.ServiceAliasConfigKey) string { - return "1" -} - -func (cm *fakeConfigManager) GenerateDynamicServerNames(id templaterouter.ServiceAliasConfigKey) []string { - return []string{} -} - func routeKey(route *routev1.Route) templaterouter.ServiceAliasConfigKey { return templaterouter.ServiceAliasConfigKey(fmt.Sprintf("%s:%s", route.Name, route.Namespace)) } diff --git a/pkg/router/template/configmanager/haproxy/client.go b/pkg/router/template/configmanager/haproxy/client.go index 472737639..7bd5f72a1 100644 --- a/pkg/router/template/configmanager/haproxy/client.go +++ b/pkg/router/template/configmanager/haproxy/client.go @@ -24,6 +24,11 @@ const ( maxRetries = 3 ) +type HAProxyClient interface { + RunCommand(cmd string, converter Converter) ([]byte, error) + Execute(cmd string) ([]byte, error) +} + // Client is a client used to dynamically configure haproxy. type Client struct { socketAddress string diff --git a/pkg/router/template/configmanager/haproxy/manager.go b/pkg/router/template/configmanager/haproxy/manager.go index 123e97233..d812c5bba 100644 --- a/pkg/router/template/configmanager/haproxy/manager.go +++ b/pkg/router/template/configmanager/haproxy/manager.go @@ -1,12 +1,13 @@ package haproxy import ( + "errors" "fmt" "os" "path" "reflect" + "slices" "strconv" - "strings" "sync" "time" @@ -27,13 +28,6 @@ const ( // haproxyManagerName is the name of this config manager. haproxyManagerName = "haproxy-manager" - // haproxyRunDir is the name of run directory within the haproxy - // router working directory heirarchy. - haproxyRunDir = "run" - - // haproxySocketFile is the name of haproxy socket file. - haproxySocketFile = "haproxy.sock" - // haproxyConnectionTimeout is the timeout (in seconds) used for // preventing slow connections to the haproxy socket from blocking // the config manager from doing any work. @@ -43,10 +37,6 @@ const ( // pool of blueprint routes. blueprintRoutePoolNamePrefix = "_hapcm_blueprint_pool" - // dynamicServerPrefix is the prefix used in the haproxy template - // for adding dynamic servers (pods) to a backend. - dynamicServerPrefix = "_dynamic" - // routePoolSizeAnnotation is the annotation on the blueprint route // overriding the default pool size. routePoolSizeAnnotation = "router.openshift.io/pool-size" @@ -61,9 +51,6 @@ const ( blueprintRoutePoolServiceName = blueprintRoutePoolNamePrefix + ".svc" ) -// endpointToDynamicServerMap is a map of endpoint to dynamic server names. -type endpointToDynamicServerMap map[string]string - // configEntryMap is a map containing name-value pairs representing the // config entries to add to an haproxy map. type configEntryMap map[string]templaterouter.ServiceAliasConfigKey @@ -76,6 +63,9 @@ type routeBackendEntry struct { // id is the route id. id string + // + backend *templaterouter.ServiceAliasConfig + // termination is the route termination. termination routev1.TLSTerminationType @@ -92,9 +82,6 @@ type routeBackendEntry struct { // poolRouteBackendName is backend name for any associated route // from the pre-configured blueprint route pool. poolRouteBackendName templaterouter.ServiceAliasConfigKey - - // DynamicServerMap is a map of all the allocated dynamic servers. - dynamicServerMap endpointToDynamicServerMap } // haproxyConfigManager is a template router config manager implementation @@ -116,10 +103,6 @@ type haproxyConfigManager struct { // backends for each route blueprint. blueprintRoutePoolSize int - // maxDynamicServers is the maximum number of dynamic servers - // allocated per backend in the haproxy template configuration. - maxDynamicServers int - // wildcardRoutesAllowed indicates if wildcard routes are allowed. wildcardRoutesAllowed bool @@ -129,9 +112,18 @@ type haproxyConfigManager struct { // router is the associated template router. router templaterouter.RouterInterface + // workingDir is the router's working directory containing configuration + // files, certificates, and other router-managed resources. + workingDir string + // defaultCertificate is the default certificate bytes. defaultCertificate string + // defaultDestinationCA is the path to the default CA certificate file used + // to verify backend server certificates for re-encrypt routes when no + // route-specific destination CA is configured. + defaultDestinationCA string + // client is the client used to dynamically manage haproxy. client *Client @@ -163,10 +155,11 @@ func NewHAProxyConfigManager(options templaterouter.ConfigManagerOptions) *hapro commitInterval: options.CommitInterval, blueprintRoutes: buildBlueprintRoutes(options.BlueprintRoutes, options.ExtendedValidation), blueprintRoutePoolSize: options.BlueprintRoutePoolSize, - maxDynamicServers: options.MaxDynamicServers, wildcardRoutesAllowed: options.WildcardRoutesAllowed, extendedValidation: options.ExtendedValidation, + workingDir: options.WorkingDir, defaultCertificate: "", + defaultDestinationCA: options.DefaultDestinationCA, client: client, reloadInProgress: false, @@ -283,14 +276,14 @@ func (cm *haproxyConfigManager) RemoveBlueprint(route *routev1.Route) { } // Register registers an id with an expected haproxy backend for a route. -func (cm *haproxyConfigManager) Register(id templaterouter.ServiceAliasConfigKey, route *routev1.Route) { +func (cm *haproxyConfigManager) Register(id templaterouter.ServiceAliasConfigKey, backend *templaterouter.ServiceAliasConfig, route *routev1.Route) { wildcard := cm.wildcardRoutesAllowed && (route.Spec.WildcardPolicy == routev1.WildcardPolicySubdomain) entry := &routeBackendEntry{ - id: string(id), - termination: routeTerminationType(route), - wildcard: wildcard, - backendName: routeBackendName(id, route), - dynamicServerMap: make(endpointToDynamicServerMap), + id: string(id), + backend: backend, + termination: routeTerminationType(route), + wildcard: wildcard, + backendName: routeBackendName(id, route), } cm.lock.Lock() @@ -317,8 +310,6 @@ func (cm *haproxyConfigManager) AddRoute(id templaterouter.ServiceAliasConfigKey return fmt.Errorf("no blueprint found that would match route %s/%s", route.Namespace, route.Name) } - cm.Register(id, route) - cm.lock.Lock() defer func() { cm.lock.Unlock() @@ -410,10 +401,17 @@ func (cm *haproxyConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfig if err != nil { return err } - log.V(4).Info("disabling all servers for backend", "backend", backendName) - if err := backend.Disable(); err != nil { + + log.V(4).Info("deleting all servers for backend", "backend", backendName) + servers, err := backend.Servers() + if err != nil { return err } + for _, server := range servers { + if _, err := backend.DeleteServer(templaterouter.Endpoint{ID: server.Name}); err != nil { + return err + } + } log.V(4).Info("committing changes made to backend", "backend", backendName) return backend.Commit() @@ -421,7 +419,7 @@ func (cm *haproxyConfigManager) RemoveRoute(id templaterouter.ServiceAliasConfig // ReplaceRouteEndpoints dynamically replaces a subset of the endpoints for // a route - modifies a subset of the servers on an haproxy backend. -func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { +func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceAliasConfigKey, svc *templaterouter.ServiceUnit, oldEndpoints, newEndpoints []templaterouter.Endpoint, weight int32) error { log.V(4).Info("replacing route endpoints", "id", id, "weight", weight) if cm.isReloading() { return fmt.Errorf("Router reload in progress, cannot dynamically add endpoints for %s", id) @@ -442,17 +440,6 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA return fmt.Errorf("route id %s was not registered", id) } - weightIsRelative := false - if entry.termination == routev1.TLSTerminationPassthrough { - // Passthrough is a wee bit odd and is like a boolean on/off - // switch. Setting actual weights, causing the haproxy - // dynamic API to either hang or then haproxy dying off. - // So 100% works for us today because we use a dynamic hash - // balancing algorithm. Needs a follow up on this issue. - weightIsRelative = true - weight = 100 - } - backendName := entry.BackendName() log.V(4).Info("finding backend", "backend", backendName) backend, err := cm.client.FindBackend(backendName) @@ -460,9 +447,17 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA return err } + addedEndpoints := make(map[string]templaterouter.Endpoint) modifiedEndpoints := make(map[string]templaterouter.Endpoint) for _, ep := range newEndpoints { - modifiedEndpoints[ep.ID] = ep + existing := slices.ContainsFunc(oldEndpoints, func(v2ep templaterouter.Endpoint) bool { + return v2ep.ID == ep.ID + }) + if existing { + modifiedEndpoints[ep.ID] = ep + } else { + addedEndpoints[ep.ID] = ep + } } deletedEndpoints := make(map[string]templaterouter.Endpoint) @@ -471,135 +466,74 @@ func (cm *haproxyConfigManager) ReplaceRouteEndpoints(id templaterouter.ServiceA if reflect.DeepEqual(ep, v2ep) { // endpoint was unchanged. delete(modifiedEndpoints, v2ep.ID) + continue } epUsesH2C := ep.AppProtocol == "h2c" || ep.AppProtocol == "kubernetes.io/h2c" v2epUsesH2c := v2ep.AppProtocol == "h2c" || v2ep.AppProtocol == "kubernetes.io/h2c" if (epUsesH2C || v2epUsesH2c) && epUsesH2C != v2epUsesH2c { - return fmt.Errorf("endpoint %s changed appProtocol from %q to %q, and dynamically updating proto is unsupported", ep.ID, ep.AppProtocol, v2ep.AppProtocol) + return fmt.Errorf("endpoint %s changed appProtocol from %q to %q, dynamically updating proto is unsupported - route will be updated on next reload", ep.ID, ep.AppProtocol, v2ep.AppProtocol) } } else { - configChanged = true deletedEndpoints[ep.ID] = ep } } - log.V(4).Info("getting servers for backend", "backend", backendName) - servers, err := backend.Servers() - if err != nil { - return err - } + // there is a configuration change if any of the tracking maps have endpoint(s) + configChanged = len(deletedEndpoints)+len(modifiedEndpoints)+len(addedEndpoints) > 0 - // Backends having only one server have their server's health check turned off; health check - // is only enabled when there are two or more endpoints in the backend. - // - // Currently we don't support enabling or disabling health checks, so we are: - // - // * Checking whether we are adding endpoints; and - // * Checking if the current state of the backend is just one static endpoint. - // - // If both of the above matches, we cannot dynamically update and return from here. - // This behavior should be improved via https://issues.redhat.com/browse/NE-2496 - // using `del server` and `add server` API calls. - // - // This is the expected lay out from the running HAProxy in case there is only one static - // backend server: - // - // # be_id be_name srv_id srv_name srv_addr ... srv_port ... - // 20 be_http:default:route 1 pod:app-848554c7d4-mbhsf:app::10.128.0.78:8000 10.128.0.78 ... 8000 ... - // 20 be_http:default:route 2 _dynamic-pod-1 172.4.0.4 ... 8765 ... - // 20 be_http:default:route 3 _dynamic-pod-2 172.4.0.4 ... 8765 ... - // ... other "_dynamic-pod-NN" - - // checking for more newEndpoints than oldEndpoints, if this happens we are adding new ones - if len(newEndpoints) > len(oldEndpoints) { - var staticCount int - // we cannot infer the first ones are the static, since this list is built from a hashmap, - // so the order is not deterministic. Need to iterate over all of them. - for _, s := range servers { - // if not a dynamic backend server, count as static - if !isDynamicBackendServer(s) { - staticCount++ - } + log.V(4).Info("processing endpoint changes", "added", addedEndpoints, "deleted", deletedEndpoints, "modified", modifiedEndpoints) + + // Aggregating errors instead of failing fast in the first API error. This ensures that the old + // process has a more accurate configuration in case it lives longer due to persistent connections. + var errs []error + + for name, ep := range deletedEndpoints { + if _, err := backend.DeleteServer(ep); err != nil { + errs = append(errs, fmt.Errorf("error deleting backend server %s: %w", name, err)) } - if staticCount == 1 { - return fmt.Errorf("single endpoint on backend %s, need to reload to enable health check", backendName) + } + for name, ep := range modifiedEndpoints { + if err := backend.UpdateServer(ep, weight, entry.termination == routev1.TLSTerminationPassthrough); err != nil { + errs = append(errs, fmt.Errorf("error updating backend server %s: %w", name, err)) } } - - log.V(4).Info("processing endpoint changes", "deleted", deletedEndpoints, "modified", modifiedEndpoints) - - // First process the deleted endpoints and update the servers we - // have already used - these would be the ones where the name - // matches the endpoint name or is a dynamic server already in use. - // Also keep track of the unused dynamic servers. - unusedServerNames := []string{} - for _, s := range servers { - relatedEndpointID := s.Name - if isDynamicBackendServer(s) { - if epid, ok := entry.dynamicServerMap[s.Name]; ok { - relatedEndpointID = epid - } else { - unusedServerNames = append(unusedServerNames, s.Name) - continue - } + for name, ep := range addedEndpoints { + if err := backend.AddServer(entry.backend, svc, ep, weight, cm.workingDir, cm.defaultDestinationCA); err != nil { + errs = append(errs, fmt.Errorf("error adding backend server %s: %w", name, err)) } + } - if _, ok := deletedEndpoints[relatedEndpointID]; ok { - configChanged = true - log.V(4).Info("disabling server for deleted endpoint", "endpoint", relatedEndpointID, "server", s.Name) - backend.DisableServer(s.Name) - if _, ok := entry.dynamicServerMap[s.Name]; ok { - log.V(4).Info("removing server from dynamic server map", "server", s.Name, "backend", backendName) - delete(entry.dynamicServerMap, s.Name) - } - continue + // Checking health check. We need to: + // * enable new endpoints if `cfg.ActiveEndpoints > 1` + // * enable also the only former endpoint if scaling from 1 to 2 or more + // * disable the only current endpoint if scaling to 1 + if len(newEndpoints) > 1 { + var newEPs []templaterouter.Endpoint + for _, ep := range addedEndpoints { + // enabling for all the new added endpoints + newEPs = append(newEPs, ep) } - - if ep, ok := modifiedEndpoints[relatedEndpointID]; ok { - configChanged = true - log.V(4).Info("enabling server for modified endpoint", "endpoint", relatedEndpointID, "server", s.Name, "ip", ep.IP, "port", ep.Port, "appProtocol", ep.AppProtocol, "weight", weight) - backend.UpdateServerInfo(s.Name, ep.IP, ep.Port, ep.AppProtocol, weight, weightIsRelative) - backend.EnableServer(s.Name) - - delete(modifiedEndpoints, relatedEndpointID) + if len(oldEndpoints) == 1 { + // enabling also for the former single endpoint as well + newEPs = append(newEPs, oldEndpoints[0]) } - } - - // Processed all existing endpoints, now check if there's any more - // more modified endpoints (aka newly added ones). For these, we can - // choose any of the unused dynamic servers. - for _, name := range unusedServerNames { - if len(modifiedEndpoints) == 0 { - break + for _, ep := range newEPs { + if !ep.NoHealthCheck { + if err := backend.EnableHealthCheck(ep); err != nil { + errs = append(errs, err) + } + } } - - var ep templaterouter.Endpoint - for _, v := range modifiedEndpoints { - // Just get first modified endpoint. - ep = v - break + } else if len(newEndpoints) == 1 && len(oldEndpoints) != 1 { + // the single backend server scenario, health check should be disabled + if ep := newEndpoints[0]; !ep.NoHealthCheck { + if err := backend.DisableHealthCheck(ep); err != nil { + errs = append(errs, err) + } } - - // Add entry for the dyamic server used. - configChanged = true - entry.dynamicServerMap[name] = ep.ID - - log.V(4).Info("enabling server for added endpoint", "endpoint", ep.ID, "server", name, "ip", ep.IP, "port", ep.Port, "appProtocol", ep.AppProtocol, "weight", weight) - backend.UpdateServerInfo(name, ep.IP, ep.Port, ep.AppProtocol, weight, weightIsRelative) - backend.EnableServer(name) - - delete(modifiedEndpoints, ep.ID) } - // If we got here, then either we are done with all the endpoints or - // there are no free dynamic server slots available that we can use. - if len(modifiedEndpoints) > 0 { - return fmt.Errorf("no free dynamic server slots for backend %s, %d endpoint(s) remaining", - id, len(modifiedEndpoints)) - } - - log.V(4).Info("committing backend", "backend", backendName) - return backend.Commit() + return errors.Join(errs...) } // RemoveRouteEndpoints removes servers matching the endpoints from a haproxy backend. @@ -628,26 +562,15 @@ func (cm *haproxyConfigManager) RemoveRouteEndpoints(id templaterouter.ServiceAl return err } - // Build a reversed map (endpoint id -> server name) to allow us to - // search by endpoint. - endpointToDynServerMap := make(map[string]string) - for serverName, endpointID := range entry.dynamicServerMap { - endpointToDynServerMap[endpointID] = serverName - } - + var errs []error for _, ep := range endpoints { - name := ep.ID - if serverName, ok := endpointToDynServerMap[ep.ID]; ok { - name = serverName - delete(entry.dynamicServerMap, name) + log.V(4).Info("deleting server for endpoint", "endpoint", ep.ID) + if _, err := backend.DeleteServer(ep); err != nil { + errs = append(errs, fmt.Errorf("error deleting server %s: %w", ep.ID, err)) } - - log.V(4).Info("disabling server for endpoint", "endpoint", ep.ID, "server", name) - backend.DisableServer(name) } - log.V(4).Info("committing backend", "backend", backendName) - return backend.Commit() + return errors.Join(errs...) } // Notify informs the config manager of any template router state changes. @@ -675,41 +598,6 @@ func (cm *haproxyConfigManager) Commit() { cm.commitRouterConfig() } -// ServerTemplateName returns the dynamic server template name. -func (cm *haproxyConfigManager) ServerTemplateName(id templaterouter.ServiceAliasConfigKey) string { - if cm.maxDynamicServers > 0 { - // Adding the id makes the name unwieldy - use pod. - return fmt.Sprintf("%s-pod", dynamicServerPrefix) - } - - return "" -} - -// ServerTemplateSize returns the dynamic server template size. -// Note this is returned as a string for easier use in the haproxy template. -func (cm *haproxyConfigManager) ServerTemplateSize(id templaterouter.ServiceAliasConfigKey) string { - if cm.maxDynamicServers < 1 { - return "" - } - - return fmt.Sprintf("%v", cm.maxDynamicServers) -} - -// GenerateDynamicServerNames generates the dynamic server names. -func (cm *haproxyConfigManager) GenerateDynamicServerNames(id templaterouter.ServiceAliasConfigKey) []string { - if cm.maxDynamicServers > 0 { - if prefix := cm.ServerTemplateName(id); len(prefix) > 0 { - names := make([]string, cm.maxDynamicServers) - for i := 0; i < cm.maxDynamicServers; i++ { - names[i] = fmt.Sprintf("%s-%v", prefix, i+1) - } - return names - } - } - - return []string{} -} - // scheduleRouterReload schedules a reload by deferring commit on the // associated template router using a internal flush timer. func (cm *haproxyConfigManager) scheduleRouterReload() { @@ -836,14 +724,10 @@ func (cm *haproxyConfigManager) reset() { cm.commitTimer = nil } - // Reset the blueprint route pool use and dynamic server maps as - // the router was reloaded. + // Reset the blueprint route pool use as the router was reloaded. cm.poolUsage = make(map[templaterouter.ServiceAliasConfigKey]templaterouter.ServiceAliasConfigKey) for _, entry := range cm.backendEntries { entry.poolRouteBackendName = "" - if len(entry.dynamicServerMap) > 0 { - entry.dynamicServerMap = make(endpointToDynamicServerMap) - } } // Reset the client - clear its caches. @@ -1076,15 +960,6 @@ func routeTerminationType(route *routev1.Route) routev1.TLSTerminationType { return termination } -// isDynamicBackendServer indicates if a backend server is a dynamic server. -func isDynamicBackendServer(server BackendServerInfo) bool { - if len(dynamicServerPrefix) == 0 { - return false - } - - return strings.HasPrefix(server.Name, dynamicServerPrefix) -} - // backendModAnnotations return the annotations in a route that will // require custom (or modified) backend configuration in haproxy. func backendModAnnotations(route *routev1.Route) map[string]string { diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index d45201c28..a77324ca5 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -785,7 +785,15 @@ func (r *templateRouter) dynamicallyAddRoute(backendKey ServiceAliasConfigKey, r } log.V(4).Info("dynamically adding route backend", "backendKey", backendKey) - r.dynamicConfigManager.Register(backendKey, route) + r.dynamicConfigManager.Register(backendKey, backend, route) + + // Saving the initial state of the endpoints from all the service units of the backend. + // This state is used later to calculate added and removed endpoints without the need to read proxy api. + for key := range backend.ServiceUnits { + if service, found := r.findMatchingServiceUnit(key); found { + backend.EndpointTable[key] = endpointsForAlias(*backend, service) + } + } // Fully skipping DCM for now when adding or changing routes, // should be reincluded along with the fix for https://issues.redhat.com/browse/OCPBUGS-77344 @@ -799,7 +807,7 @@ func (r *templateRouter) dynamicallyAddRoute(backendKey ServiceAliasConfigKey, r err := r.dynamicConfigManager.AddRoute(backendKey, backend.RoutingKeyName, route) if err != nil { - log.V(4).Info("router will reload as the ConfigManager could not dynamically add route for backend", "backendKey", backendKey, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically add route for backend", "backendKey", backendKey, "error", err) return false } @@ -817,8 +825,8 @@ func (r *templateRouter) dynamicallyAddRoute(backendKey ServiceAliasConfigKey, r if !ok { weight = 0 } - if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, oldEndpoints, newEndpoints, weight); err != nil { - log.V(4).Info("router will reload as the ConfigManager could not dynamically replace endpoints for route backend", + if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, &service, oldEndpoints, newEndpoints, weight); err != nil { + log.Info("router will reload as the ConfigManager could not dynamically replace endpoints for route backend", "backendKey", backendKey, "serviceKey", key, "error", err) return false } @@ -844,7 +852,7 @@ func (r *templateRouter) dynamicallyRemoveRoute(backendKey ServiceAliasConfigKey log.V(4).Info("dynamically removing route backend", "backendKey", backendKey) if err := r.dynamicConfigManager.RemoveRoute(backendKey, route); err != nil { - log.V(4).Info("router will reload as the ConfigManager could not dynamically remove route backend", "backendKey", backendKey, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically remove route backend", "backendKey", backendKey, "error", err) return false } @@ -855,7 +863,7 @@ func (r *templateRouter) dynamicallyRemoveRoute(backendKey ServiceAliasConfigKey // on all the routes associated with a given service. // Note: The config should have been synced at least once initially and // the caller needs to acquire a lock [and release it]. -func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service ServiceUnit, oldEndpoints []Endpoint) bool { +func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service ServiceUnit) bool { if r.dynamicConfigManager == nil || !r.synced { return false } @@ -877,7 +885,11 @@ func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service return false } + // Synchronizing EndpointTable only after backing up the former state, which is the current state in the proxy. + // These old (current haproxy state) and new (current cluster state) are used to calculate added and removed endpoints. + oldEndpoints := cfg.EndpointTable[id] newEndpoints := endpointsForAlias(cfg, service) + cfg.EndpointTable[id] = newEndpoints // If a service is idled, createRouterEndpoints returns a slice // containing 1 endpoint (namely the idled service's ClusterIP @@ -921,9 +933,9 @@ func (r *templateRouter) dynamicallyReplaceEndpoints(id ServiceUnitKey, service } log.V(4).Info("dynamically replacing endpoints for associated backend", "backendKey", backendKey, "newEndpoints", newEndpoints) - if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, oldEndpoints, newEndpoints, weight); err != nil { + if err := r.dynamicConfigManager.ReplaceRouteEndpoints(backendKey, &service, oldEndpoints, newEndpoints, weight); err != nil { // Error dynamically modifying the config, so return false to cause a reload to happen. - log.V(4).Info("router will reload as the ConfigManager could not dynamically replace endpoints for service", "service", id, "backendKey", backendKey, "weight", weight, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically replace endpoints for service", "service", id, "backendKey", backendKey, "weight", weight, "error", err) return false } } @@ -950,7 +962,7 @@ func (r *templateRouter) dynamicallyRemoveEndpoints(service ServiceUnit, endpoin log.V(4).Info("dynamically removing endpoints for associated backend", "backendKey", backendKey) if err := r.dynamicConfigManager.RemoveRouteEndpoints(backendKey, endpoints); err != nil { // Error dynamically modifying the config, so return false to cause a reload to happen. - log.V(4).Info("router will reload as the ConfigManager could not dynamically remove endpoints for backend", "backendKey", backendKey, "error", err) + log.Info("router will reload as the ConfigManager could not dynamically remove endpoints for backend", "backendKey", backendKey, "error", err) return false } } @@ -1033,6 +1045,7 @@ func (r *templateRouter) createServiceAliasConfig(route *routev1.Route, backendK IsWildcard: wildcard, Annotations: route.Annotations, ServiceUnits: serviceUnits, + EndpointTable: make(map[ServiceUnitKey][]Endpoint), PrimaryServiceUnitKey: primaryServiceUnitKey, ActiveServiceUnits: activeServiceUnits, HTTPResponseHeaders: httpResponseHeadersList, @@ -1232,12 +1245,10 @@ func (r *templateRouter) AddEndpoints(id ServiceUnitKey, endpoints []Endpoint) { return } - oldEndpoints := frontend.EndpointTable - frontend.EndpointTable = endpoints r.serviceUnits[id] = frontend - configChanged := r.dynamicallyReplaceEndpoints(id, frontend, oldEndpoints) + configChanged := r.dynamicallyReplaceEndpoints(id, frontend) if len(frontend.ServiceAliasAssociations) > 0 { r.stateChanged = true } @@ -1253,25 +1264,6 @@ func (r *templateRouter) cleanUpServiceAliasConfig(cfg *ServiceAliasConfig) { } } -func cmpStrSlices(first []string, second []string) bool { - if len(first) != len(second) { - return false - } - for _, fi := range first { - found := false - for _, si := range second { - if fi == si { - found = true - break - } - } - if !found { - return false - } - } - return true -} - // shouldWriteCerts determines if the router should ask the cert manager to write out certificates // it will return true if a route is edge or reencrypt and it has all the required (host/key) certificates // defined. If the route does not have the certificates defined it will log an info message if the diff --git a/pkg/router/template/template_helper.go b/pkg/router/template/template_helper.go index 754f6d94c..035c0493a 100644 --- a/pkg/router/template/template_helper.go +++ b/pkg/router/template/template_helper.go @@ -67,6 +67,10 @@ func matchString(pattern string, s string) (bool, error) { return re.MatchString(s), nil } +func FirstMatch(pattern string, values ...string) string { + return firstMatch(pattern, values...) +} + func firstMatch(pattern string, values ...string) string { log.V(7).Info("firstMatch called", "pattern", pattern, "values", values) if re, err := cachedRegexpCompile(`\A(?:` + pattern + `)\z`); err == nil { diff --git a/pkg/router/template/types.go b/pkg/router/template/types.go index 8d6f9eb16..292d4d29a 100644 --- a/pkg/router/template/types.go +++ b/pkg/router/template/types.go @@ -75,6 +75,10 @@ type ServiceAliasConfig struct { // ActiveEndpoints is a count of the route endpoints that are part of a service unit with a non-zero weight ActiveEndpoints int + // EndpointTable are endpoints that back the service in this ServiceAliasConfig: non matching ports from + // route configuration is filtered out. This translates into a final backend implementation for routers. + EndpointTable map[ServiceUnitKey][]Endpoint + // HTTPResponseHeaders has route-specific custom HTTP response headers. HTTPResponseHeaders []HTTPHeader @@ -174,15 +178,20 @@ type ConfigManagerOptions struct { // router.openshift.io/pool-size BlueprintRoutePoolSize int - // MaxDynamicServers is the maximum number of dynamic servers we - // will allocate on a per-route basis. - MaxDynamicServers int - // WildcardRoutesAllowed indicates if wildcard routes are allowed. WildcardRoutesAllowed bool // ExtendedValidation indicates if extended route validation is enabled. ExtendedValidation bool + + // WorkingDir is the router's working directory containing configuration + // files, certificates, and other router-managed resources. + WorkingDir string + + // DefaultDestinationCA is the path to the default CA certificate file used + // to verify backend server certificates for re-encrypt routes when no + // route-specific destination CA is configured. + DefaultDestinationCA string } // ConfigManager is used by the router to make configuration changes using @@ -204,7 +213,7 @@ type ConfigManager interface { RemoveBlueprint(route *routev1.Route) // Register registers an id to be associated with a route. - Register(id ServiceAliasConfigKey, route *routev1.Route) + Register(id ServiceAliasConfigKey, backend *ServiceAliasConfig, route *routev1.Route) // AddRoute adds a new route or updates an existing route. AddRoute(id ServiceAliasConfigKey, routingKey string, route *routev1.Route) error @@ -214,7 +223,7 @@ type ConfigManager interface { // ReplaceRouteEndpoints replaces a subset (the ones associated with // a single service unit) of a route endpoints. - ReplaceRouteEndpoints(id ServiceAliasConfigKey, oldEndpoints, newEndpoints []Endpoint, weight int32) error + ReplaceRouteEndpoints(id ServiceAliasConfigKey, svc *ServiceUnit, oldEndpoints, newEndpoints []Endpoint, weight int32) error // RemoveRouteEndpoints removes a set of endpoints from a route. RemoveRouteEndpoints(id ServiceAliasConfigKey, endpoints []Endpoint) error @@ -224,15 +233,6 @@ type ConfigManager interface { // which indicates whether or not the configuration manager should // reset all the dynamically applied changes it is keeping track of. Notify(event RouterEventType) - - // ServerTemplateName returns the dynamic server template name. - ServerTemplateName(id ServiceAliasConfigKey) string - - // ServerTemplateSize returns the dynamic server template size. - ServerTemplateSize(id ServiceAliasConfigKey) string - - // GenerateDynamicServerNames generates the dynamic server names. - GenerateDynamicServerNames(id ServiceAliasConfigKey) []string } // CaptureHTTPHeader specifies an HTTP header that should be captured for access