diff --git a/Dockerfile.auth b/Dockerfile.auth index 6093f68..5477064 100644 --- a/Dockerfile.auth +++ b/Dockerfile.auth @@ -21,7 +21,7 @@ RUN go build -o ./auth/auth ./auth/cmd/auth/main.go FROM alpine:3.22.1 -RUN apk add --no-cache ca-certificates +RUN apk add --no-cache ca-certificates curl WORKDIR /app @@ -29,5 +29,6 @@ COPY --from=builder /app/auth/auth . COPY --from=builder /app/auth/migrations ./migrations EXPOSE 50051 +EXPOSE 3000 CMD ["./auth"] diff --git a/Dockerfile.gateway b/Dockerfile.gateway index 454b5da..162fbe7 100644 --- a/Dockerfile.gateway +++ b/Dockerfile.gateway @@ -21,7 +21,7 @@ RUN go build -o ./gateway/gateway ./gateway/cmd/gateway/main.go FROM alpine:3.22.1 -RUN apk add --no-cache ca-certificates +RUN apk add --no-cache ca-certificates curl WORKDIR /app @@ -29,5 +29,6 @@ COPY --from=builder /app/gateway/gateway . COPY protos/gen/swagger ./swagger EXPOSE 8080 +EXPOSE 3000 CMD ["./gateway"] diff --git a/Dockerfile.profiles b/Dockerfile.profiles index 6eb5bdc..3ef28c3 100644 --- a/Dockerfile.profiles +++ b/Dockerfile.profiles @@ -21,7 +21,7 @@ RUN go build -o ./profiles/profiles ./profiles/cmd/profiles/main.go FROM alpine:3.22.1 -RUN apk add --no-cache ca-certificates +RUN apk add --no-cache ca-certificates curl WORKDIR /app @@ -29,5 +29,6 @@ COPY --from=builder /app/profiles/profiles . COPY --from=builder /app/profiles/migrations ./migrations EXPOSE 50051 +EXPOSE 3000 CMD ["./profiles"] diff --git a/auth/cmd/auth/main.go b/auth/cmd/auth/main.go index 7d97991..f323094 100644 --- a/auth/cmd/auth/main.go +++ b/auth/cmd/auth/main.go @@ -10,12 +10,13 @@ import ( "github.com/alexwatcher/gateofthings/auth/internal/app" "github.com/alexwatcher/gateofthings/auth/internal/config" "github.com/alexwatcher/gateofthings/auth/internal/consts" + "github.com/alexwatcher/gateofthings/shared/pkg/healthz" sharedpgsql "github.com/alexwatcher/gateofthings/shared/pkg/migrator/postgresql" "github.com/alexwatcher/gateofthings/shared/pkg/telemetry" ) func main() { - ctx := context.Background() + ctx, stop := context.WithCancel(context.Background()) cfg := config.MustLoad() res := telemetry.MustCreateResource(consts.ServiceName, consts.ServiceVersion, cfg.Env) @@ -23,18 +24,27 @@ func main() { telemetry.MustInitTracer(context.Background(), res, cfg.Telemetry.TraceEndpoint) telemetry.MustInitMeter(context.Background(), res, cfg.Telemetry.MetricsEndpoint) + application := app.New(ctx, cfg.GRPC, cfg.Database, cfg.TokenSecret, cfg.TokenTTL) + + hc := healthz.New( + healthz.WithPort(cfg.HealthPort), + healthz.WithLiveProbe(func(ctx context.Context) error { return nil }), + healthz.WithReadyProbe(func(ctx context.Context) error { return application.Ready() }), + ) + go hc.MustRun(ctx) + slog.Info("start migration") sharedpgsql.Migrate(cfg.Database) slog.Info("end migration") slog.Info("starting application") - application := app.New(ctx, cfg.GRPC, cfg.Database, cfg.TokenSecret, cfg.TokenTTL) go application.MustRun(ctx) - stop := make(chan os.Signal, 1) - signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT) - sig := <-stop + stopChan := make(chan os.Signal, 1) + signal.Notify(stopChan, syscall.SIGTERM, syscall.SIGINT) + sig := <-stopChan + stop() slog.Info("stopping application", "signal", sig) application.Stop(ctx) } diff --git a/auth/internal/app/app.go b/auth/internal/app/app.go index ffdb84c..a685830 100644 --- a/auth/internal/app/app.go +++ b/auth/internal/app/app.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "net" + "sync/atomic" "time" grpcauth "github.com/alexwatcher/gateofthings/auth/internal/grpc/auth" @@ -14,19 +15,21 @@ import ( "github.com/alexwatcher/gateofthings/shared/pkg/grpc/interceptors/tracing" "github.com/alexwatcher/gateofthings/shared/pkg/grpc/interceptors/valid" sharedpgsql "github.com/alexwatcher/gateofthings/shared/pkg/repository/postgresql" + "github.com/jackc/pgx/v5" "google.golang.org/grpc" ) type App struct { gRPCServer *grpc.Server gRPConfig config.GRPCSrvConfig + dbConn *pgx.Conn + isRunning int32 } // New initializes a new instance of the App struct with a gRPC server // listening on the specified port. It registers the authentication // service with the server and returns the configured App instance. func New(ctx context.Context, gRPConfig config.GRPCSrvConfig, dbConfig config.DatabaseConfig, tokenSecret string, tokenTTL time.Duration) *App { - dbConn, err := sharedpgsql.NewConnection(ctx, dbConfig) if err != nil { slog.Error("failed to connect to database", "error", err) @@ -46,6 +49,7 @@ func New(ctx context.Context, gRPConfig config.GRPCSrvConfig, dbConfig config.Da return &App{ gRPCServer: gRPCServer, gRPConfig: gRPConfig, + dbConn: dbConn, } } @@ -63,6 +67,8 @@ func (a *App) Run(ctx context.Context) error { if err != nil { return err } + atomic.StoreInt32(&a.isRunning, 1) + defer atomic.StoreInt32(&a.isRunning, 0) slog.Info("gRPC server started", "port", a.gRPConfig.Port) if err := a.gRPCServer.Serve(lis); err != nil { return fmt.Errorf("app.run: %w", err) @@ -76,3 +82,17 @@ func (a *App) Stop(ctx context.Context) { slog.Info("stopping gRPC server") a.gRPCServer.GracefulStop() } + +// Readiness probe +func (a *App) Ready() error { + err := a.dbConn.Ping(context.Background()) + if err != nil { + slog.Warn("app: live probe failed", "error", err) + return err + } + if atomic.LoadInt32(&a.isRunning) == 0 { + slog.Warn("app: live probe failed: app is not running") + return fmt.Errorf("app: is not running") + } + return nil +} diff --git a/auth/internal/config/config.go b/auth/internal/config/config.go index 4286c3e..71ea236 100644 --- a/auth/internal/config/config.go +++ b/auth/internal/config/config.go @@ -14,6 +14,7 @@ type Config struct { Telemetry scfg.TelemetryConfig `envPrefix:"TELEMETRY_"` GRPC scfg.GRPCSrvConfig `envPrefix:"GRPC_"` Database scfg.DatabaseConfig `envPrefix:"DB_"` + HealthPort uint16 `env:"HEALTH_PORT,required"` } // MustLoad loads configuration from environment variables into a Config instance. diff --git a/auth/tests/suite/suite.go b/auth/tests/suite/suite.go index 20a0901..a41ead0 100644 --- a/auth/tests/suite/suite.go +++ b/auth/tests/suite/suite.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/http" "os/exec" "strconv" "strings" @@ -26,7 +27,9 @@ var cfg = &config.Config{ TokenTTL: time.Minute * 10, TokenSecret: "test", Telemetry: scfg.TelemetryConfig{}, - GRPC: scfg.GRPCSrvConfig{}, + GRPC: scfg.GRPCSrvConfig{ + Port: 50051, + }, Database: scfg.DatabaseConfig{ Host: "postgres", Port: 5432, @@ -36,6 +39,7 @@ var cfg = &config.Config{ Password: "pass", Migrations: "./migrations", }, + HealthPort: 3000, } type Suite struct { @@ -80,15 +84,20 @@ func New(t *testing.T) (context.Context, *Suite, func()) { } }() - dbRes := mustSetupPostgres(testName, pool, cfg, networkName) + dbRes, err := setupPostgres(testName, pool, cfg, networkName) + if err != nil { + fmt.Printf("Failed setup PostgreSQL: %v", err) + panic(err) + } resources = append(resources, dbRes) cfg.Database.Host = strings.TrimPrefix(dbRes.Container.Name, "/") - authRes, port := mustSetupAuth(testName, pool, cfg, networkName) + authRes, port := mustSetupAuth(testName, pool, cfg, networkName, time.Second*20) resources = append(resources, authRes) cc, err := grpc.NewClient(fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { + fmt.Printf("Failed create grpc client: %v", err) panic(err) } @@ -111,7 +120,7 @@ func New(t *testing.T) (context.Context, *Suite, func()) { } } -func mustSetupPostgres(testName string, pool *dockertest.Pool, cfg *config.Config, network string) *dockertest.Resource { +func setupPostgres(testName string, pool *dockertest.Pool, cfg *config.Config, network string) (*dockertest.Resource, error) { res, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: "postgres", Tag: "17.5", @@ -124,7 +133,7 @@ func mustSetupPostgres(testName string, pool *dockertest.Pool, cfg *config.Confi }, }) if err != nil { - panic(err) + return nil, err } err = pool.Retry(func() error { @@ -138,15 +147,15 @@ func mustSetupPostgres(testName string, pool *dockertest.Pool, cfg *config.Confi return nil }) if err != nil { - panic(err) + return nil, err } - return res + return res, nil } -func mustSetupAuth(testName string, pool *dockertest.Pool, cfg *config.Config, network string) (*dockertest.Resource, uint16) { - port := 3000 - exposedPort := fmt.Sprintf("%d/tcp", port) +func mustSetupAuth(testName string, pool *dockertest.Pool, cfg *config.Config, network string, setupTimeout time.Duration) (*dockertest.Resource, uint16) { + exposedPort := fmt.Sprintf("%d/tcp", cfg.GRPC.Port) + exposedHealthPort := fmt.Sprintf("%d/tcp", cfg.HealthPort) res, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: "got-auth", Tag: "latest", @@ -156,34 +165,67 @@ func mustSetupAuth(testName string, pool *dockertest.Pool, cfg *config.Config, n fmt.Sprintf("ENV=%s", cfg.Env), fmt.Sprintf("TOKEN_TTL=%v", cfg.TokenTTL), fmt.Sprintf("TOKEN_SECRET=%s", cfg.TokenSecret), - fmt.Sprintf("GRPC_PORT=%d", port), + fmt.Sprintf("GRPC_PORT=%d", cfg.GRPC.Port), fmt.Sprintf("DB_HOST=%s", cfg.Database.Host), fmt.Sprintf("DB_PORT=%d", cfg.Database.Port), fmt.Sprintf("DB_NAME=%s", cfg.Database.Name), fmt.Sprintf("DB_USER=%s", cfg.Database.User), fmt.Sprintf("DB_PASSWORD=%s", cfg.Database.Password), fmt.Sprintf("DB_MIGRATIONS=%s", cfg.Database.Migrations), + fmt.Sprintf("HEALTH_PORT=%d", cfg.HealthPort), }, - ExposedPorts: []string{exposedPort}, + ExposedPorts: []string{exposedPort, exposedHealthPort}, }, func(config *docker.HostConfig) { config.PortBindings = map[docker.Port][]docker.PortBinding{ - docker.Port(exposedPort): {{HostIP: "0.0.0.0", HostPort: ""}}, + docker.Port(exposedPort): {{HostIP: "0.0.0.0", HostPort: ""}}, + docker.Port(exposedHealthPort): {{HostIP: "0.0.0.0", HostPort: ""}}, } }) if err != nil { panic(err) } - hostPort, err := strconv.Atoi(res.GetPort(exposedPort)) - if err != nil { + cleanup := func() { if err := pool.Purge(res); err != nil { log.Printf("Could not purge container: %s", err) } + } + + hostPort, err := strconv.Atoi(res.GetPort(exposedPort)) + if err != nil { + cleanup() + panic(err) + } + + healthHostPort, err := strconv.Atoi(res.GetPort(exposedHealthPort)) + if err != nil { + cleanup() panic(err) } - // TODO: implement and then use healthcheck to wait availability of auth service - time.Sleep(time.Second * 5) + err = waitHealthCheck(fmt.Sprintf("http://localhost:%d/readyz", healthHostPort), setupTimeout) + if err != nil { + cleanup() + panic(err) + } return res, uint16(hostPort) } + +func waitHealthCheck(address string, timeout time.Duration) error { + startTime := time.Now() + for { + resp, err := http.Get(address) + if err != nil { + continue + } + if time.Since(startTime) > timeout { + return fmt.Errorf("%s healthcheck timeout", address) + } + if resp.StatusCode == http.StatusOK { + break + } + time.Sleep(time.Millisecond * 500) + } + return nil +} diff --git a/docker-compose.yaml b/docker-compose.yaml index 581cfd5..3b0ab23 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -105,8 +105,16 @@ services: DB_USER: auth_user DB_PASSWORD: auth_pass DB_MIGRATIONS: ./migrations + HEALTH_PORT: 3000 ports: - 50051:50051 + - 3001:3000 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3000/readyz"] + interval: 3s + timeout: 3s + retries: 3 + start_period: 1s depends_on: postgres: condition: service_healthy @@ -130,8 +138,16 @@ services: DB_USER: profiles_user DB_PASSWORD: profiles_pass DB_MIGRATIONS: ./migrations + HEALTH_PORT: 3000 ports: - 50052:50051 + - 3002:3000 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3000/readyz"] + interval: 3s + timeout: 3s + retries: 3 + start_period: 1s depends_on: postgres: condition: service_healthy @@ -151,11 +167,23 @@ services: TELEMETRY_TRACE: otel-collector:4317 TELEMETRY_METRICS: otel-collector:4317 TELEMETRY_LOGS: otel-collector:4317 + HEALTH_PORT: 3000 ports: - 8080:8080 + - 3003:3000 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:3000/readyz"] + interval: 3s + timeout: 3s + retries: 3 + start_period: 1s depends_on: postgres: condition: service_healthy + auth: + condition: service_healthy + profiles: + condition: service_healthy migrator: image: got-migrator:latest diff --git a/gateway/cmd/gateway/main.go b/gateway/cmd/gateway/main.go index 2580b37..bc6e3cb 100644 --- a/gateway/cmd/gateway/main.go +++ b/gateway/cmd/gateway/main.go @@ -10,6 +10,7 @@ import ( "github.com/alexwatcher/gateofthings/gateway/internal/app" "github.com/alexwatcher/gateofthings/gateway/internal/config" "github.com/alexwatcher/gateofthings/gateway/internal/consts" + "github.com/alexwatcher/gateofthings/shared/pkg/healthz" "github.com/alexwatcher/gateofthings/shared/pkg/telemetry" ) @@ -22,8 +23,16 @@ func main() { telemetry.MustInitTracer(context.Background(), res, cfg.Telemetry.TraceEndpoint) telemetry.MustInitMeter(context.Background(), res, cfg.Telemetry.MetricsEndpoint) - slog.Info("starting application") application := app.New(ctx, cfg.HTTP, cfg.Auth, cfg.Profiles, cfg.TokenSecret, cfg.OpenAPI) + + hc := healthz.New( + healthz.WithPort(cfg.HealthPort), + healthz.WithLiveProbe(func(ctx context.Context) error { return nil }), + healthz.WithReadyProbe(func(ctx context.Context) error { return application.Ready() }), + ) + go hc.MustRun(ctx) + + slog.Info("starting application") go application.MustRun(ctx) stop := make(chan os.Signal, 1) diff --git a/gateway/internal/app/app.go b/gateway/internal/app/app.go index 25fb30a..60367f2 100644 --- a/gateway/internal/app/app.go +++ b/gateway/internal/app/app.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "net/http" + "sync/atomic" grpcinterceptors "github.com/alexwatcher/gateofthings/gateway/internal/grpc/interceptors" grpcoptions "github.com/alexwatcher/gateofthings/gateway/internal/grpc/options" @@ -25,6 +26,7 @@ type App struct { tokenSecret string openAPI string server *http.Server + isRunning int32 } func New(ctx context.Context, httpConfig config.HTTPSrvConfig, authConfig config.GRPCClnConfig, profilesConfig config.GRPCClnConfig, tokenSecret string, openAPI string) *App { @@ -74,6 +76,8 @@ func (a *App) Run(ctx context.Context) error { return fmt.Errorf("app.run: register openapi endpoint: %w", err) } + atomic.StoreInt32(&a.isRunning, 1) + defer atomic.StoreInt32(&a.isRunning, 0) slog.Info("HTTP server started", "port", a.httpConfig.Port) a.server = &http.Server{Addr: fmt.Sprintf(":%d", a.httpConfig.Port), Handler: mux} if err := a.server.ListenAndServe(); err != nil && err != context.Canceled { @@ -88,3 +92,12 @@ func (a *App) Stop(ctx context.Context) { slog.Info("stopping HTTP server") a.server.Shutdown(ctx) } + +// Readiness probe +func (a *App) Ready() error { + if atomic.LoadInt32(&a.isRunning) == 0 { + slog.Warn("app: live probe failed: app is not running") + return fmt.Errorf("app: is not running") + } + return nil +} diff --git a/gateway/internal/config/config.go b/gateway/internal/config/config.go index bd54646..8cc43c1 100644 --- a/gateway/internal/config/config.go +++ b/gateway/internal/config/config.go @@ -13,6 +13,7 @@ type Config struct { HTTP scfg.HTTPSrvConfig `envPrefix:"HTTP_"` Auth scfg.GRPCClnConfig `envPrefix:"AUTH_"` Profiles scfg.GRPCClnConfig `envPrefix:"PROFILES_"` + HealthPort uint16 `env:"HEALTH_PORT,required"` } // MustLoad loads configuration from environment variables into a Config instance. diff --git a/profiles/cmd/profiles/main.go b/profiles/cmd/profiles/main.go index e00c905..3afe3ee 100644 --- a/profiles/cmd/profiles/main.go +++ b/profiles/cmd/profiles/main.go @@ -10,6 +10,7 @@ import ( "github.com/alexwatcher/gateofthings/profiles/internal/app" "github.com/alexwatcher/gateofthings/profiles/internal/config" "github.com/alexwatcher/gateofthings/profiles/internal/consts" + "github.com/alexwatcher/gateofthings/shared/pkg/healthz" sharedpgsql "github.com/alexwatcher/gateofthings/shared/pkg/migrator/postgresql" "github.com/alexwatcher/gateofthings/shared/pkg/telemetry" ) @@ -23,12 +24,20 @@ func main() { telemetry.MustInitTracer(context.Background(), res, cfg.Telemetry.TraceEndpoint) telemetry.MustInitMeter(context.Background(), res, cfg.Telemetry.MetricsEndpoint) + application := app.New(ctx, cfg.GRPC, cfg.Database) + + hc := healthz.New( + healthz.WithPort(cfg.HealthPort), + healthz.WithLiveProbe(func(ctx context.Context) error { return nil }), + healthz.WithReadyProbe(func(ctx context.Context) error { return application.Ready() }), + ) + go hc.MustRun(ctx) + slog.Info("start migration") sharedpgsql.Migrate(cfg.Database) slog.Info("end migration") slog.Info("starting application") - application := app.New(ctx, cfg.GRPC, cfg.Database) go application.MustRun(ctx) stop := make(chan os.Signal, 1) diff --git a/profiles/internal/app/app.go b/profiles/internal/app/app.go index 1e7bc74..669fece 100644 --- a/profiles/internal/app/app.go +++ b/profiles/internal/app/app.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "net" + "sync/atomic" grpcprofiles "github.com/alexwatcher/gateofthings/profiles/internal/grpc/profiles" "github.com/alexwatcher/gateofthings/profiles/internal/repository/postgresql" @@ -14,12 +15,15 @@ import ( "github.com/alexwatcher/gateofthings/shared/pkg/grpc/interceptors/tracing" "github.com/alexwatcher/gateofthings/shared/pkg/grpc/interceptors/valid" sharedpgsql "github.com/alexwatcher/gateofthings/shared/pkg/repository/postgresql" + "github.com/jackc/pgx/v5" "google.golang.org/grpc" ) type App struct { gRPCServer *grpc.Server gRPConfig config.GRPCSrvConfig + dbConn *pgx.Conn + isRunning int32 } // New initializes a new instance of the App struct with a gRPC server @@ -46,6 +50,7 @@ func New(ctx context.Context, gRPConfig config.GRPCSrvConfig, dbConfig config.Da return &App{ gRPCServer: gRPCServer, gRPConfig: gRPConfig, + dbConn: dbConn, } } @@ -63,6 +68,8 @@ func (a *App) Run(ctx context.Context) error { if err != nil { return err } + atomic.StoreInt32(&a.isRunning, 1) + defer atomic.StoreInt32(&a.isRunning, 0) slog.Info("gRPC server started", "port", a.gRPConfig.Port) if err := a.gRPCServer.Serve(lis); err != nil { return fmt.Errorf("app.run: %w", err) @@ -76,3 +83,17 @@ func (a *App) Stop(ctx context.Context) { slog.Info("stopping gRPC server") a.gRPCServer.GracefulStop() } + +// Readiness probe +func (a *App) Ready() error { + err := a.dbConn.Ping(context.Background()) + if err != nil { + slog.Warn("app: live probe failed", "error", err) + return err + } + if atomic.LoadInt32(&a.isRunning) == 0 { + slog.Warn("app: live probe failed: app is not running") + return fmt.Errorf("app: is not running") + } + return nil +} diff --git a/profiles/internal/config/config.go b/profiles/internal/config/config.go index b9e7b82..04578fd 100644 --- a/profiles/internal/config/config.go +++ b/profiles/internal/config/config.go @@ -6,10 +6,11 @@ import ( ) type Config struct { - Env string `env:"ENV" envDefault:"local"` - Telemetry scfg.TelemetryConfig `envPrefix:"TELEMETRY_"` - GRPC scfg.GRPCSrvConfig `envPrefix:"GRPC_"` - Database scfg.DatabaseConfig `envPrefix:"DB_"` + Env string `env:"ENV" envDefault:"local"` + Telemetry scfg.TelemetryConfig `envPrefix:"TELEMETRY_"` + GRPC scfg.GRPCSrvConfig `envPrefix:"GRPC_"` + Database scfg.DatabaseConfig `envPrefix:"DB_"` + HealthPort uint16 `env:"HEALTH_PORT,required"` } // MustLoad loads configuration from environment variables into a Config instance. diff --git a/shared/pkg/healthz/healthz.go b/shared/pkg/healthz/healthz.go new file mode 100644 index 0000000..235d7be --- /dev/null +++ b/shared/pkg/healthz/healthz.go @@ -0,0 +1,104 @@ +package healthz + +import ( + "context" + "fmt" + "net/http" +) + +// Probe defines a function that performs a health check and returns an error if it fails. +type Probe func(context.Context) error + +// Option configures a HealthCheck instance. +type Option interface { + apply(*HealthCheck) +} + +// HealthCheck represents a set of probes for Kubernetes health checks. +type HealthCheck struct { + port uint16 + ready Probe + live Probe +} + +// New creates a new HealthCheck with the provided options. +func New(options ...Option) *HealthCheck { + hc := &HealthCheck{ + port: 3000, + } + for _, o := range options { + o.apply(hc) + } + return hc +} + +// Run starts an HTTP server exposing health endpoints for Kubernetes probes. +func (c *HealthCheck) Run(ctx context.Context) error { + mux := http.NewServeMux() + c.register(mux) + + server := &http.Server{Addr: fmt.Sprintf(":%d", c.port), Handler: mux} + + // Gracefully shutdown when the context is canceled. + go func() { + <-ctx.Done() + _ = server.Shutdown(context.Background()) + }() + + return server.ListenAndServe() +} + +// MustRun starts an HTTP server exposing health endpoints. +// panic in case of error +func (c *HealthCheck) MustRun(ctx context.Context) { + if err := c.Run(ctx); err != nil { + panic(err) + } +} + +// RegisterHandlers registers all health endpoints on the given ServeMux. +// If nil is passed, handlers are registered on http.DefaultServeMux. +func (c *HealthCheck) register(mux *http.ServeMux) { + if mux == nil { + mux = http.DefaultServeMux + } + c.addHandler(mux, "/readyz", c.ready, "ready", "not ready") + c.addHandler(mux, "/livez", c.live, "live", "not live") +} + +// addHandler registers a single probe endpoint with appropriate HTTP status codes. +func (c *HealthCheck) addHandler(mux *http.ServeMux, path string, probe Probe, okMsg, failMsg string) { + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + if probe != nil && probe(r.Context()) != nil { + http.Error(w, failMsg, http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(okMsg)) + }) +} + +// withProbeOption is a generic option type for applying probes. +type withValueOption struct { + setter func(*HealthCheck, any) + value any +} + +func (o withValueOption) apply(hc *HealthCheck) { + o.setter(hc, o.value) +} + +// WithReadyProbe sets the readiness probe. +func WithReadyProbe(probe Probe) Option { + return withValueOption{setter: func(h *HealthCheck, p any) { h.ready = p.(Probe) }, value: probe} +} + +// WithLiveProbe sets the liveness probe. +func WithLiveProbe(probe Probe) Option { + return withValueOption{setter: func(h *HealthCheck, p any) { h.live = p.(Probe) }, value: probe} +} + +// WithPort sets port to run on http server. +func WithPort(port uint16) Option { + return withValueOption{setter: func(h *HealthCheck, p any) { h.port = p.(uint16) }, value: port} +}