diff --git a/images/router/haproxy/start-haproxy b/images/router/haproxy/start-haproxy new file mode 100755 index 000000000..e1dce551d --- /dev/null +++ b/images/router/haproxy/start-haproxy @@ -0,0 +1,57 @@ +#!/bin/bash +set -euo pipefail + +## +## This script expects HAProxy running in foreground, do not initialize it in daemon mode! + +timeoutStr=${ROUTER_GRACEFUL_SHUTDOWN_DELAY:-45s} +if ! [[ "$timeoutStr" =~ ^[0-9]+s$ ]]; then + echo "Invalid timeout: $timeoutStr" + exit 1 +fi + +timeout="${timeoutStr%s}" +signaled=0 + +stopHAProxy() { + signaled=1 + + # HAProxy handles SIGUSR1 by finishing its process as soon as all the current connections, + # active or not, are closed by either the client or the backend server. + echo "Sending SIGUSR1 to HAProxy process $haproxyPID" + kill -s USR1 $haproxyPID + + # Poll the process, retuning as soon as it is not alive anymore. + for i in $(seq 1 $timeout); do + sleep 1 + if ! kill -0 $haproxyPID 2>/dev/null; then + echo "All connections are closed" + return + fi + done + + # HAProxy handles SIGTERM by closing all the TCP connections and waiting for the full TCP handshake + # (FIN / ACK / FIN-ACK) from both sides, and only after that it finishes. `kill` runs asynchronous, + # and `wait` maintains this script alive until haproxy finishes. + echo "SIGUSR1 timed out, sending SIGTERM to HAProxy process $haproxyPID" + kill -s TERM $haproxyPID 2>/dev/null || true +} + +echo "Starting HAProxy. SIGUSR1 timeout is ${timeout}s" +/usr/sbin/haproxy "$@" & +haproxyPID=$! + +trap stopHAProxy SIGTERM SIGUSR1 SIGINT +trap "kill -s HUP $haproxyPID" SIGHUP + +exit_code=0 +while kill -0 $haproxyPID 2>/dev/null; do + # Start `wait` again in case it returned due to a non terminate signal, e.g. SIGHUP + wait $haproxyPID || exit_code=$? +done + +# Received signal (usually SIGTERM) takes precedence during `wait`, overriding with a clean exit +[ "$signaled" = 1 ] && exit 0 + +echo "haproxy exited with status code $exit_code" +exit $exit_code diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index 2505e43ca..5204c39dd 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -756,8 +756,11 @@ func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error { } secretManager := secretmanager.NewManager(kc, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() pluginCfg := templateplugin.TemplatePluginConfig{ + AppCtx: ctx, WorkingDir: o.WorkingDir, TemplatePath: o.TemplateFile, ReloadScriptPath: o.ReloadScript, @@ -839,6 +842,7 @@ func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error { select { case <-stopCh: + cancel() // 45s is the default interval that almost all cloud load balancers require to take an unhealthy // endpoint out of rotation. delay := getIntervalFromEnv("ROUTER_GRACEFUL_SHUTDOWN_DELAY", 45) diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 830db0e5b..ced222bf9 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -120,6 +120,7 @@ func TestMain(m *testing.M) { // The template plugin which is wrapped svcFetcher := templateplugin.NewListWatchServiceLookup(client.CoreV1(), 60*time.Second, namespace) pluginCfg := templateplugin.TemplatePluginConfig{ + AppCtx: context.Background(), WorkingDir: workdir, DefaultCertificate: `-----BEGIN CERTIFICATE----- MIIDIjCCAgqgAwIBAgIBBjANBgkqhkiG9w0BAQUFADCBoTELMAkGA1UEBhMCVVMx diff --git a/pkg/router/template/plugin.go b/pkg/router/template/plugin.go index 66fbd6af7..68c32f074 100644 --- a/pkg/router/template/plugin.go +++ b/pkg/router/template/plugin.go @@ -1,6 +1,7 @@ package templaterouter import ( + "context" "crypto/md5" "fmt" "net" @@ -42,6 +43,7 @@ func newDefaultTemplatePlugin(router RouterInterface, includeUDP bool, lookupSvc } type TemplatePluginConfig struct { + AppCtx context.Context WorkingDir string TemplatePath string ReloadScriptPath string @@ -141,7 +143,12 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp templates[template.Name()] = templateWithHelper } + ctx := cfg.AppCtx + if ctx == nil { + ctx = context.Background() + } templateRouterCfg := templateRouterCfg{ + appCtx: ctx, dir: cfg.WorkingDir, templates: templates, reloadScriptPath: cfg.ReloadScriptPath, diff --git a/pkg/router/template/router.go b/pkg/router/template/router.go index 1f1f4ca1d..c705e0ac9 100644 --- a/pkg/router/template/router.go +++ b/pkg/router/template/router.go @@ -2,6 +2,7 @@ package templaterouter import ( "bytes" + "context" "crypto/md5" "encoding/pem" "fmt" @@ -15,10 +16,12 @@ import ( "text/template" "time" + "github.com/bcicen/go-haproxy" "github.com/fsnotify/fsnotify" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" routev1 "github.com/openshift/api/route/v1" @@ -53,6 +56,7 @@ const ( // that generates configuration files via a set of templates // and manages the backend process with a reload script. type templateRouter struct { + appCtx context.Context // the directory to write router output to dir string templates map[string]*template.Template @@ -131,6 +135,7 @@ type templateRouter struct { // templateRouterCfg holds all configuration items required to initialize the template router type templateRouterCfg struct { + appCtx context.Context dir string templates map[string]*template.Template reloadScriptPath string @@ -243,6 +248,7 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) { prometheus.MustRegister(metricWriteConfig) router := &templateRouter{ + appCtx: cfg.appCtx, dir: dir, templates: cfg.templates, reloadScriptPath: cfg.reloadScriptPath, @@ -666,8 +672,22 @@ func (r *templateRouter) writeCertificates(cfg *ServiceAliasConfig) error { return nil } -// reloadRouter executes the router's reload script. +// reloadRouter reloads haproxy. func (r *templateRouter) reloadRouter(shutdown bool) error { + adminSocket := os.Getenv("ROUTER_HAPROXY_ADMIN_UNIX_SOCKET") + if adminSocket != "" { + if shutdown { + // We are in HAProxy's master/worker mode, currently implemented as a sidecar, + // so there is no local process to handle and the sidecar one already received SIGTERM. + return nil + } + return r.reloadRouterExternal(adminSocket) + } + return r.reloadRouterEmbedded(shutdown) +} + +// reloadRouterEmbedded executes the router's reload script. +func (r *templateRouter) reloadRouterEmbedded(shutdown bool) error { if r.reloadFn != nil { return r.reloadFn(shutdown) } @@ -679,7 +699,37 @@ func (r *templateRouter) reloadRouter(shutdown bool) error { if err != nil { return fmt.Errorf("error reloading router: %v\n%s", err, string(out)) } - log.V(0).Info("router reloaded", "output", string(out)) + log.V(0).Info("router reloaded", "mode", "embedded", "output", string(out)) + return nil +} + +// reloadRouterExternal sends a reload command to the external haproxy. +func (r *templateRouter) reloadRouterExternal(adminSocket string) error { + err := wait.PollUntilContextCancel(r.appCtx, 2*time.Second, true, func(ctx context.Context) (done bool, err error) { + _, errstat := os.Lstat(adminSocket) + if errstat != nil { + log.Info("waiting for haproxy socket", "message", errstat.Error()) + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("router is terminating") + } + + client := haproxy.HAProxyClient{Addr: "unix://" + adminSocket, Timeout: 10 /*seconds*/} + outputBuffer, err := client.RunCommand("reload") + if err != nil { + return fmt.Errorf("error connecting haproxy: %w", err) + } + output := outputBuffer.String() + + // `reload` command is synchronous since haproxy 2.7, so it is safe to continue as soon as it returns. + // It should return Success=1 in the first line in case everything went well, anything else is considered a failure. + if !strings.HasPrefix(output, "Success=1") { + return fmt.Errorf("error reloading router: %s", output) + } + log.Info("router reloaded", "mode", "sidecar") return nil }