diff --git a/cmd/thv/app/server.go b/cmd/thv/app/server.go index 75bb56da81..f01cbd10fd 100644 --- a/cmd/thv/app/server.go +++ b/cmd/thv/app/server.go @@ -192,8 +192,9 @@ func init() { serveCmd.Flags().IntVar(&port, "port", 8080, "Port to bind the server to") serveCmd.Flags().BoolVar(&enableDocs, "openapi", false, "Enable OpenAPI documentation endpoints (/api/openapi.json and /api/doc)") - serveCmd.Flags().StringVar(&socketPath, "socket", "", "UNIX socket path to bind the "+ - "server to (overrides host and port if provided)") + serveCmd.Flags().StringVar(&socketPath, "socket", "", + `UNIX socket path or, on Windows, a named pipe (\\.\pipe\) to bind the `+ + "server to (overrides host and port if provided)") // Add experimental MCP server flags serveCmd.Flags().BoolVar(&enableMCPServer, "experimental-mcp", false, diff --git a/docs/cli/thv_serve.md b/docs/cli/thv_serve.md index c455142639..e7a0c95d87 100644 --- a/docs/cli/thv_serve.md +++ b/docs/cli/thv_serve.md @@ -41,7 +41,7 @@ thv serve [flags] --sentry-dsn string Sentry DSN for error tracking and distributed tracing (falls back to SENTRY_DSN env var) --sentry-environment string Sentry environment name, e.g. production or development (falls back to SENTRY_ENVIRONMENT env var) --sentry-traces-sample-rate float Sentry traces sample rate (0.0-1.0) for performance monitoring (default 1) - --socket string UNIX socket path to bind the server to (overrides host and port if provided) + --socket string UNIX socket path or, on Windows, a named pipe (\\.\pipe\) to bind the server to (overrides host and port if provided) ``` ### Options inherited from parent commands diff --git a/pkg/api/server.go b/pkg/api/server.go index bc23dcd490..31d873ac07 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -366,41 +366,25 @@ func (b *ServerBuilder) setupDefaultRoutes(r *chi.Mux) { } } -func setupTCPListener(address string) (net.Listener, error) { - return net.Listen("tcp", address) +// namedPipePrefix is the Windows named-pipe namespace prefix. The canonical +// definition lives in pkg/server/discovery so the listener and dialer cannot +// drift; pkg/api re-aliases it here so per-platform socket files do not need +// to import discovery directly. +const namedPipePrefix = discovery.NamedPipePrefix + +// isNamedPipeAddress reports whether address is a Windows named-pipe path. +// The check is platform-agnostic so callers on non-Windows can fail fast with +// a clear error before reaching the listener code. The comparison is +// case-insensitive because the Windows pipe namespace is case-insensitive at +// the kernel layer; without EqualFold an address like \\.\Pipe\foo would +// silently fall through to AF_UNIX and then fail to bind. +func isNamedPipeAddress(address string) bool { + return len(address) >= len(namedPipePrefix) && + strings.EqualFold(address[:len(namedPipePrefix)], namedPipePrefix) } -func setupUnixSocket(address string) (net.Listener, error) { - // Remove the socket file if it already exists - if _, err := os.Stat(address); err == nil { - if err := os.Remove(address); err != nil { - return nil, fmt.Errorf("failed to remove existing socket: %w", err) - } - } - - // Create the directory for the socket file if it doesn't exist - if err := os.MkdirAll(filepath.Dir(address), 0750); err != nil { - return nil, fmt.Errorf("failed to create socket directory: %w", err) - } - - // Create UNIX socket listener - listener, err := net.Listen("unix", address) - if err != nil { - return nil, fmt.Errorf("failed to create UNIX socket listener: %w", err) - } - - // Set file permissions on the socket to allow other local processes to connect - if err := os.Chmod(address, socketPermissions); err != nil { - return nil, fmt.Errorf("failed to set socket permissions: %w", err) - } - - return listener, nil -} - -func cleanupUnixSocket(address string) { - if err := os.Remove(address); err != nil && !os.IsNotExist(err) { - slog.Warn("failed to remove socket file", "error", err) - } +func setupTCPListener(address string) (net.Listener, error) { + return net.Listen("tcp", address) } func headersMiddleware(next http.Handler) http.Handler { @@ -592,7 +576,7 @@ func NewServer(ctx context.Context, builder *ServerBuilder) (*Server, error) { // bound address from the listener (important when binding to port 0). func (s *Server) ListenURL() string { if s.isUnixSocket { - return fmt.Sprintf("unix://%s", s.address) + return socketURL(s.address) } return fmt.Sprintf("http://%s", s.listener.Addr().String()) } @@ -715,24 +699,30 @@ func (s *Server) cleanup() { } } -// createListener creates the appropriate listener based on the configuration +// createListener creates the appropriate listener based on the configuration. +// Named-pipe addresses are only supported on Windows; other platforms reject +// them up front rather than creating a literal-backslash file via AF_UNIX. func createListener(address string, isUnixSocket bool) (net.Listener, string, error) { - var listener net.Listener - var addrType string - var err error + if !isUnixSocket { + listener, err := setupTCPListener(address) + if err != nil { + return nil, "", err + } + return listener, "HTTP", nil + } - if isUnixSocket { - listener, err = setupUnixSocket(address) - addrType = "UNIX socket" - } else { - listener, err = setupTCPListener(address) - addrType = "HTTP" + addrType := "UNIX socket" + if isNamedPipeAddress(address) { + if !supportsNamedPipe() { + return nil, "", fmt.Errorf("named pipe addresses are only supported on Windows: %s", address) + } + addrType = "Windows named pipe" } + listener, err := setupUnixSocket(address) if err != nil { return nil, "", err } - return listener, addrType, nil } diff --git a/pkg/api/socket_unix.go b/pkg/api/socket_unix.go new file mode 100644 index 0000000000..3bd6403be4 --- /dev/null +++ b/pkg/api/socket_unix.go @@ -0,0 +1,64 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows + +package api + +import ( + "errors" + "fmt" + "io/fs" + "log/slog" + "net" + "os" + "path/filepath" +) + +// supportsNamedPipe reports whether the current build target can host a +// Windows named-pipe listener. Used by createListener to reject pipe addresses +// before reaching the per-platform setupUnixSocket implementation. +func supportsNamedPipe() bool { return false } + +// setupUnixSocket creates a UNIX domain socket listener at the given path. +// On non-Windows platforms named-pipe addresses are not supported; callers +// guard against that in createListener. +func setupUnixSocket(address string) (net.Listener, error) { + if err := os.Remove(address); err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, fmt.Errorf("failed to remove existing socket: %w", err) + } + + if err := os.MkdirAll(filepath.Dir(address), 0750); err != nil { + return nil, fmt.Errorf("failed to create socket directory: %w", err) + } + + listener, err := net.Listen("unix", address) + if err != nil { + return nil, fmt.Errorf("failed to create UNIX socket listener: %w", err) + } + + if err := os.Chmod(address, socketPermissions); err != nil { + // Roll back the bound listener and the socket file rather than leaking + // either: the listener owns the AF_UNIX file and net.Listen will not + // rebind the same path while the file exists. + _ = listener.Close() + _ = os.Remove(address) + return nil, fmt.Errorf("failed to set socket permissions: %w", err) + } + + return listener, nil +} + +// cleanupUnixSocket removes the socket file at address. Missing files are not +// an error since cleanup may run after a partial startup. +func cleanupUnixSocket(address string) { + if err := os.Remove(address); err != nil && !errors.Is(err, fs.ErrNotExist) { + slog.Warn("failed to remove socket file", "error", err) + } +} + +// socketURL returns the URL form of a Unix-socket address for the discovery +// file. Non-Windows platforms only ever produce unix:// URLs. +func socketURL(address string) string { + return "unix://" + address +} diff --git a/pkg/api/socket_unix_test.go b/pkg/api/socket_unix_test.go new file mode 100644 index 0000000000..27ca7dc691 --- /dev/null +++ b/pkg/api/socket_unix_test.go @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows + +package api + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSocketURL_Unix(t *testing.T) { + t.Parallel() + assert.Equal(t, "unix:///tmp/test.sock", socketURL("/tmp/test.sock")) +} + +func TestIsNamedPipeAddress(t *testing.T) { + t.Parallel() + tests := []struct { + name string + address string + want bool + }{ + {"plain socket", "/tmp/thv.sock", false}, + {"named pipe", `\\.\pipe\thv-api`, true}, + {"named pipe mixed case", `\\.\Pipe\thv-api`, true}, + {"empty", "", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, isNamedPipeAddress(tt.address)) + }) + } +} + +// TestCreateListener_NamedPipe_Unsupported asserts that createListener rejects +// pipe addresses on non-Windows up front, mirroring the dialer-side guard +// covered by TestCheckHealth_NamedPipe_Unsupported_OnNonWindows. +func TestCreateListener_NamedPipe_Unsupported(t *testing.T) { + t.Parallel() + _, _, err := createListener(`\\.\pipe\thv-api`, true) + require.Error(t, err) + assert.Contains(t, err.Error(), "only supported on Windows") +} diff --git a/pkg/api/socket_windows.go b/pkg/api/socket_windows.go new file mode 100644 index 0000000000..494a82a550 --- /dev/null +++ b/pkg/api/socket_windows.go @@ -0,0 +1,91 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows + +package api + +import ( + "errors" + "fmt" + "io/fs" + "log/slog" + "net" + "os" + "path/filepath" + + "github.com/Microsoft/go-winio" +) + +// namedPipeBufferSize is the size of the input/output buffers winio allocates +// per pipe instance. 64 KiB matches what go-winio uses in similar consumers +// (Docker, containerd, Podman) and is well above any single HTTP header chunk. +const namedPipeBufferSize = 64 * 1024 + +// supportsNamedPipe reports whether the current build target can host a +// Windows named-pipe listener. Used by createListener to choose between the +// pipe and AF_UNIX paths without dragging the runtime package into server.go. +func supportsNamedPipe() bool { return true } + +// setupUnixSocket creates either a Windows named-pipe listener (when address +// has the \\.\pipe\ prefix) or an AF_UNIX listener at a filesystem path. +// +// Named pipes are kernel objects rather than files, so the os.Stat / os.Remove +// precheck, os.MkdirAll, and os.Chmod steps are skipped: the pipe namespace +// has no parent directory, and access control is governed by the security +// descriptor on the listener (winio's default restricts access to the +// creating user, which matches the toolhive-studio same-user use case). +// +// AF_UNIX is supported on Windows 10 1803+. The chmod step is dropped on this +// path because POSIX file modes do not apply on Windows. +func setupUnixSocket(address string) (net.Listener, error) { + if isNamedPipeAddress(address) { + // MessageMode is left at false (byte stream) explicitly because HTTP + // requires byte-oriented framing. + listener, err := winio.ListenPipe(address, &winio.PipeConfig{ + MessageMode: false, + InputBufferSize: namedPipeBufferSize, + OutputBufferSize: namedPipeBufferSize, + }) + if err != nil { + return nil, fmt.Errorf("failed to create named pipe listener: %w", err) + } + return listener, nil + } + + if err := os.Remove(address); err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, fmt.Errorf("failed to remove existing socket: %w", err) + } + + if err := os.MkdirAll(filepath.Dir(address), 0750); err != nil { + return nil, fmt.Errorf("failed to create socket directory: %w", err) + } + + listener, err := net.Listen("unix", address) + if err != nil { + return nil, fmt.Errorf("failed to create UNIX socket listener: %w", err) + } + + return listener, nil +} + +// cleanupUnixSocket removes the AF_UNIX socket file at address, or no-ops for +// named pipes (the pipe is destroyed when the listener closes). +func cleanupUnixSocket(address string) { + if isNamedPipeAddress(address) { + return + } + if err := os.Remove(address); err != nil && !errors.Is(err, fs.ErrNotExist) { + slog.Warn("failed to remove socket file", "error", err) + } +} + +// socketURL returns the URL form of a Unix-socket or named-pipe address for +// the discovery file. Named pipes are emitted as npipe:// where +// is everything after the \\.\pipe\ prefix. +func socketURL(address string) string { + if isNamedPipeAddress(address) { + return "npipe://" + address[len(namedPipePrefix):] + } + return "unix://" + address +} diff --git a/pkg/api/socket_windows_test.go b/pkg/api/socket_windows_test.go new file mode 100644 index 0000000000..653ac7160d --- /dev/null +++ b/pkg/api/socket_windows_test.go @@ -0,0 +1,111 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows + +package api + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/Microsoft/go-winio" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// pipeNameSeq disambiguates concurrent test pipes so parallel runs don't +// collide on the global Windows pipe namespace. +var pipeNameSeq atomic.Uint64 + +func uniqueTestPipe() string { + return fmt.Sprintf(`\\.\pipe\thv-api-test-%d`, pipeNameSeq.Add(1)) +} + +func TestSocketURL_Windows(t *testing.T) { + t.Parallel() + tests := []struct { + name string + address string + want string + }{ + {"named pipe", `\\.\pipe\thv-api`, "npipe://thv-api"}, + {"af_unix windows path", `C:\path\thv.sock`, `unix://C:\path\thv.sock`}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, socketURL(tt.address)) + }) + } +} + +func TestSetupUnixSocket_NamedPipe(t *testing.T) { + t.Parallel() + pipePath := uniqueTestPipe() + + listener, err := setupUnixSocket(pipePath) + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + + // The listener should accept a winio dial within a short timeout, proving + // it is wired to the named-pipe namespace and not to AF_UNIX. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + connCh := make(chan error, 1) + go func() { + conn, dialErr := winio.DialPipeContext(ctx, pipePath) + if conn != nil { + _ = conn.Close() + } + connCh <- dialErr + }() + + go func() { + conn, _ := listener.Accept() + if conn != nil { + _ = conn.Close() + } + }() + + select { + case err := <-connCh: + require.NoError(t, err) + case <-ctx.Done(): + t.Fatal("dial against named-pipe listener timed out") + } +} + +func TestCleanupUnixSocket_NamedPipe_NoOp(t *testing.T) { + t.Parallel() + // Passing a pipe address to cleanup must not error or panic. There is no + // file to remove; the assertion here is simply that the call returns + // cleanly. + cleanupUnixSocket(`\\.\pipe\thv-api-cleanup-noop`) +} + +// TestSetupUnixSocket_NamedPipe_FirstInstanceWins pins winio's first-wins +// semantics: ListenPipe sets FILE_FLAG_FIRST_PIPE_INSTANCE, so a second +// listener targeting the same name must fail. This is the safety net the +// discovery layer relies on to detect a stale-PID conflict; if a future winio +// bump silently relaxed it, two thv processes could bind the same pipe and +// quietly race for traffic. +func TestSetupUnixSocket_NamedPipe_FirstInstanceWins(t *testing.T) { + t.Parallel() + pipePath := uniqueTestPipe() + + first, err := setupUnixSocket(pipePath) + require.NoError(t, err) + t.Cleanup(func() { _ = first.Close() }) + + second, err := setupUnixSocket(pipePath) + require.Error(t, err, "second ListenPipe on the same name must fail") + if second != nil { + _ = second.Close() + } + assert.Contains(t, err.Error(), "failed to create named pipe listener") +} diff --git a/pkg/server/discovery/discovery_test.go b/pkg/server/discovery/discovery_test.go index 1ca39a0ada..fadf0ec46a 100644 --- a/pkg/server/discovery/discovery_test.go +++ b/pkg/server/discovery/discovery_test.go @@ -54,6 +54,31 @@ func TestWriteReadServerInfo_UnixSocket(t *testing.T) { assert.Equal(t, info.Nonce, got.Nonce) } +// TestWriteReadServerInfo_NamedPipe pins the producer/consumer contract for +// npipe:// discovery URLs end to end. The individual pieces (socketURL emit, +// HTTPClientForURL dispatch, ParseNamedPipeURL parse) are covered in their +// own tests; this test asserts that an npipe URL survives the discovery +// file's JSON serialization round-trip without being mangled. +func TestWriteReadServerInfo_NamedPipe(t *testing.T) { + t.Parallel() + dir := t.TempDir() + + info := &ServerInfo{ + URL: "npipe://thv-api", + PID: 99999, + Nonce: "test-nonce-pipe", + StartedAt: time.Date(2026, 5, 7, 14, 0, 0, 0, time.UTC), + } + + require.NoError(t, writeServerInfoTo(dir, info)) + + got, err := readServerInfoFrom(dir) + require.NoError(t, err) + assert.Equal(t, info.URL, got.URL) + assert.Equal(t, info.PID, got.PID) + assert.Equal(t, info.Nonce, got.Nonce) +} + func TestReadServerInfo_NotFound(t *testing.T) { t.Parallel() dir := t.TempDir() diff --git a/pkg/server/discovery/health.go b/pkg/server/discovery/health.go index cf80cb718d..6f7d24c185 100644 --- a/pkg/server/discovery/health.go +++ b/pkg/server/discovery/health.go @@ -23,8 +23,15 @@ const ( NonceHeader = "X-Toolhive-Nonce" ) +// NamedPipePrefix is the Windows named-pipe namespace prefix. The discovery +// dialer uses it to reconstruct addresses for winio.DialPipeContext, and the +// API listener (pkg/api) imports it as the canonical definition so the two +// sides cannot drift. +const NamedPipePrefix = `\\.\pipe\` + // CheckHealth verifies that a server at the given URL is healthy and optionally -// matches the expected nonce. It supports http:// and unix:// URL schemes. +// matches the expected nonce. It supports http://, unix://, and npipe:// URL +// schemes (npipe:// only resolves on Windows). func CheckHealth(ctx context.Context, serverURL string, expectedNonce string) error { client, requestURL, err := buildHealthClient(serverURL) if err != nil { @@ -76,9 +83,10 @@ func buildHealthClient(serverURL string) (*http.Client, string, error) { // HTTPClientForURL returns an HTTP client configured for the given server URL // and the base URL to use for requests. For unix:// URLs it creates a client // with a Unix socket transport and returns "http://localhost" as the base URL. -// For http:// URLs it validates the host is a loopback address and returns a -// default client. The returned client has no timeout set; callers should apply -// their own timeout via context or client.Timeout. +// For npipe:// URLs (Windows only) it dials the named pipe via the platform +// dialNamedPipe helper. For http:// URLs it validates the host is a loopback +// address and returns a default client. The returned client has no timeout +// set; callers should apply their own timeout via context or client.Timeout. func HTTPClientForURL(serverURL string) (*http.Client, string, error) { switch { case strings.HasPrefix(serverURL, "unix://"): @@ -95,6 +103,20 @@ func HTTPClientForURL(serverURL string) (*http.Client, string, error) { } return client, "http://localhost", nil + case strings.HasPrefix(serverURL, "npipe://"): + pipePath, err := ParseNamedPipeURL(serverURL) + if err != nil { + return nil, "", err + } + client := &http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return dialNamedPipe(ctx, pipePath) + }, + }, + } + return client, "http://localhost", nil + case strings.HasPrefix(serverURL, "http://"): if err := ValidateLoopbackURL(serverURL); err != nil { return nil, "", err @@ -144,3 +166,25 @@ func ParseUnixSocketPath(rawURL string) (string, error) { return path, nil } + +// ParseNamedPipeURL extracts and validates the pipe name from an npipe:// URL +// and returns the full Windows pipe path (e.g. \\.\pipe\thv-api). The name +// portion must be a single segment with no path separators or traversal +// components, since the toolhive listener only ever publishes local pipes +// under the \\.\pipe\ namespace. +func ParseNamedPipeURL(rawURL string) (string, error) { + if !strings.HasPrefix(rawURL, "npipe://") { + return "", fmt.Errorf("named pipe URL must start with npipe://: %s", rawURL) + } + name := strings.TrimPrefix(rawURL, "npipe://") + if name == "" { + return "", fmt.Errorf("empty named pipe name") + } + if strings.ContainsAny(name, `/\`) { + return "", fmt.Errorf("named pipe name must not contain path separators: %s", name) + } + if strings.Contains(name, "..") { + return "", fmt.Errorf("named pipe name must not contain '..': %s", name) + } + return NamedPipePrefix + name, nil +} diff --git a/pkg/server/discovery/health_test.go b/pkg/server/discovery/health_test.go index 4a18169103..d45c64cb3a 100644 --- a/pkg/server/discovery/health_test.go +++ b/pkg/server/discovery/health_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "runtime" "testing" "github.com/stretchr/testify/assert" @@ -44,6 +45,49 @@ func TestParseUnixSocketPath_Empty(t *testing.T) { assert.Contains(t, err.Error(), "empty") } +func TestParseNamedPipeURL(t *testing.T) { + t.Parallel() + tests := []struct { + name string + raw string + expect string + wantErr bool + errSubstr string + }{ + {name: "valid", raw: "npipe://thv-api", expect: `\\.\pipe\thv-api`}, + {name: "valid with hyphen and digits", raw: "npipe://thv-api-1", expect: `\\.\pipe\thv-api-1`}, + {name: "missing scheme", raw: "thv-api", wantErr: true, errSubstr: "must start with npipe://"}, + {name: "wrong scheme", raw: "unix://thv-api", wantErr: true, errSubstr: "must start with npipe://"}, + {name: "empty name", raw: "npipe://", wantErr: true, errSubstr: "empty"}, + {name: "forward slash", raw: "npipe://thv/api", wantErr: true, errSubstr: "path separators"}, + {name: "backslash", raw: `npipe://thv\api`, wantErr: true, errSubstr: "path separators"}, + {name: "dot dot", raw: "npipe://..thv", wantErr: true, errSubstr: "'..'"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := ParseNamedPipeURL(tt.raw) + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errSubstr) + return + } + require.NoError(t, err) + assert.Equal(t, tt.expect, got) + }) + } +} + +func TestCheckHealth_NamedPipe_Unsupported_OnNonWindows(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("non-Windows guard test") + } + t.Parallel() + err := CheckHealth(context.Background(), "npipe://thv-api", "") + require.Error(t, err) + assert.Contains(t, err.Error(), "Windows") +} + func TestCheckHealth_TCP_Success(t *testing.T) { t.Parallel() expectedNonce := "test-nonce-123" diff --git a/pkg/server/discovery/health_windows_test.go b/pkg/server/discovery/health_windows_test.go new file mode 100644 index 0000000000..9a356efaf5 --- /dev/null +++ b/pkg/server/discovery/health_windows_test.go @@ -0,0 +1,97 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows + +package discovery + +import ( + "context" + "fmt" + "net/http" + "sync/atomic" + "testing" + "time" + + "github.com/Microsoft/go-winio" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// pipeNameSeq disambiguates concurrent test pipes so parallel runs don't +// collide on the global pipe namespace. +var pipeNameSeq atomic.Uint64 + +func TestCheckHealth_NamedPipe_Success(t *testing.T) { + t.Parallel() + + pipeName := fmt.Sprintf("thv-test-%d", pipeNameSeq.Add(1)) + pipePath := `\\.\pipe\` + pipeName + + listener, err := winio.ListenPipe(pipePath, &winio.PipeConfig{}) + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + + expectedNonce := "pipe-nonce" + mux := http.NewServeMux() + mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set(NonceHeader, expectedNonce) + w.WriteHeader(http.StatusNoContent) + }) + srv := &http.Server{Handler: mux} //nolint:gosec // test server, ReadHeaderTimeout not relevant + go func() { _ = srv.Serve(listener) }() + t.Cleanup(func() { _ = srv.Close() }) + + err = CheckHealth(context.Background(), "npipe://"+pipeName, expectedNonce) + require.NoError(t, err) +} + +func TestCheckHealth_NamedPipe_NotFound(t *testing.T) { + t.Parallel() + err := CheckHealth(context.Background(), "npipe://nonexistent-pipe-thv-test", "") + require.Error(t, err) + assert.Contains(t, err.Error(), "health check failed") +} + +// TestCheckHealth_NamedPipe_HungServerCancelsOnContext pins that a peer which +// accepts the connection but never responds does not wedge CheckHealth: when +// the caller's context expires the dial / read returns and CheckHealth surfaces +// a wrapped error. This is the discovery StateUnhealthy path; without this +// guarantee a hung peer would block the previous-instance probe forever. +func TestCheckHealth_NamedPipe_HungServerCancelsOnContext(t *testing.T) { + t.Parallel() + + pipeName := fmt.Sprintf("thv-test-hung-%d", pipeNameSeq.Add(1)) + pipePath := `\\.\pipe\` + pipeName + + listener, err := winio.ListenPipe(pipePath, &winio.PipeConfig{}) + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + + // Drain accepts so the dial succeeds, but never write anything back. The + // goroutine exits when the listener is closed via t.Cleanup. + go func() { + for { + conn, acceptErr := listener.Accept() + if acceptErr != nil { + return + } + // Hold the connection open without responding so CheckHealth's + // HTTP read blocks until the context deadline fires. + t.Cleanup(func() { _ = conn.Close() }) + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + + start := time.Now() + err = CheckHealth(ctx, "npipe://"+pipeName, "") + elapsed := time.Since(start) + + require.Error(t, err) + // The CheckHealth call must return promptly after the context expires + // rather than blocking on the hung peer indefinitely. healthTimeout is + // 5 s, so anything within ~2 s of the 250 ms ctx is the context path. + assert.Less(t, elapsed, 2*time.Second, "CheckHealth wedged on a hung peer") +} diff --git a/pkg/server/discovery/pipe_other.go b/pkg/server/discovery/pipe_other.go new file mode 100644 index 0000000000..b16b046cac --- /dev/null +++ b/pkg/server/discovery/pipe_other.go @@ -0,0 +1,20 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows + +package discovery + +import ( + "context" + "fmt" + "net" +) + +// dialNamedPipe is a no-op stub on non-Windows platforms. The npipe:// scheme +// is reachable here only via a misconfigured discovery file or a hand-crafted +// URL; surface a clear error rather than fail with a confusing dial syscall +// result. +func dialNamedPipe(_ context.Context, name string) (net.Conn, error) { + return nil, fmt.Errorf("named pipes are only supported on Windows: %s", name) +} diff --git a/pkg/server/discovery/pipe_windows.go b/pkg/server/discovery/pipe_windows.go new file mode 100644 index 0000000000..bf08e8f6c6 --- /dev/null +++ b/pkg/server/discovery/pipe_windows.go @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows + +package discovery + +import ( + "context" + "net" + + "github.com/Microsoft/go-winio" +) + +// dialNamedPipe opens a connection to the Windows named pipe at name. The +// caller is expected to have validated name via ParseNamedPipeURL. +func dialNamedPipe(ctx context.Context, name string) (net.Conn, error) { + return winio.DialPipeContext(ctx, name) +}