Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions test/e2e/thv-operator/virtualmcp/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,22 @@ func checkHTTPHealthReady(nodePort int32, timeout time.Duration) error {
return nil
}

// WaitForHealthy waits for the VirtualMCPServer to be healthy by polling the /health endpoint.
// This is more robust than time.Sleep as it ensures the server is actually responding.
func WaitForHealthy(nodePort int32, timeout time.Duration, pollingInterval time.Duration) error {
var lastErr error

success := gomega.Eventually(func() error {
lastErr = checkHTTPHealthReady(nodePort, 5*time.Second)
return lastErr
}, timeout, pollingInterval).Should(gomega.Succeed())

if !success {
return fmt.Errorf("health check failed after %v: %w", timeout, lastErr)
}
return nil
}

// TestToolListingAndCall is a shared helper that creates an MCP client, lists tools,
// finds a tool matching the pattern, calls it, and verifies the response.
// This eliminates the duplicate "create client → list → call" pattern found in most tests.
Expand Down Expand Up @@ -984,3 +1000,129 @@ func (gingkoHttpLogger) Infof(format string, v ...any) {
func (gingkoHttpLogger) Errorf(format string, v ...any) {
ginkgo.GinkgoLogr.Error(errors.New("http error"), "ERROR: "+format, v...)
}

// CreateInitializedMCPClientWithRetry creates an MCP client with retry logic that creates
// a fresh client on each attempt. This is more robust than reusing a client across retries
// because the MCP client may be in an inconsistent state after a failed Start() or Initialize().
// Returns the initialized client which must be closed by the caller.
func CreateInitializedMCPClientWithRetry(
nodePort int32,
clientName string,
timeout time.Duration,
pollingInterval time.Duration,
options ...transport.StreamableHTTPCOption,
) (*InitializedMCPClient, error) {
serverURL := fmt.Sprintf("http://localhost:%d/mcp", nodePort)
var result *InitializedMCPClient
var lastErr error

success := gomega.Eventually(func() error {
// Create a fresh client on each attempt - this is crucial because
// the MCP client cannot be reliably restarted after a failed attempt
mcpClient, err := mcpclient.NewStreamableHttpClient(serverURL, options...)
if err != nil {
lastErr = fmt.Errorf("failed to create MCP client: %w", err)
return lastErr
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

if err := mcpClient.Start(ctx); err != nil {
cancel()
_ = mcpClient.Close()
lastErr = fmt.Errorf("failed to start MCP client: %w", err)
return lastErr
}

initRequest := mcp.InitializeRequest{}
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
initRequest.Params.Capabilities = mcp.ClientCapabilities{}
initRequest.Params.ClientInfo = mcp.Implementation{
Name: clientName,
Version: "1.0.0",
}

if _, err := mcpClient.Initialize(ctx, initRequest); err != nil {
cancel()
_ = mcpClient.Close()
lastErr = fmt.Errorf("failed to initialize MCP client: %w", err)
return lastErr
}

// Success - store the result
result = &InitializedMCPClient{
Client: mcpClient,
Ctx: ctx,
Cancel: cancel,
}
return nil
}, timeout, pollingInterval).Should(gomega.Succeed())

if !success {
return nil, fmt.Errorf("failed to create initialized MCP client after retries: %w", lastErr)
}

return result, nil
}

// CreateAuthenticatedMCPClientWithRetry creates an authenticated MCP client with retry logic.
// The httpClient should have the appropriate authentication configured (e.g., Bearer token).
func CreateAuthenticatedMCPClientWithRetry(
nodePort int32,
clientName string,
httpClient *http.Client,
timeout time.Duration,
pollingInterval time.Duration,
) (*InitializedMCPClient, error) {
return CreateInitializedMCPClientWithRetry(
nodePort,
clientName,
timeout,
pollingInterval,
transport.WithHTTPBasicClient(httpClient),
Comment on lines +1070 to +1082
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, feel free to ignore: I would delete this and have the caller create the option transport.WithHTTPBasicClient(httpClient),.

)
}

// WaitForToolsDiscovered waits for the VirtualMCPServer to discover at least minToolCount tools.
// This is useful to ensure tools are available before running tool-related tests.
func WaitForToolsDiscovered(
nodePort int32,
minToolCount int,
timeout time.Duration,
pollingInterval time.Duration,
) error {
var lastErr error
var lastCount int

success := gomega.Eventually(func() (int, error) {
// Create a fresh client for each check
mcpClient, err := CreateInitializedMCPClient(nodePort, "tool-discovery-check", 15*time.Second)
if err != nil {
lastErr = fmt.Errorf("failed to create MCP client: %w", err)
return 0, lastErr
}
defer mcpClient.Close()

listRequest := mcp.ListToolsRequest{}
tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest)
if err != nil {
lastErr = fmt.Errorf("failed to list tools: %w", err)
return 0, lastErr
}

lastCount = len(tools.Tools)
ginkgo.GinkgoWriter.Printf("Tool discovery check: found %d tools (waiting for >= %d)\n",
lastCount, minToolCount)

return lastCount, nil
}, timeout, pollingInterval).Should(gomega.BeNumerically(">=", minToolCount),
fmt.Sprintf("Should discover at least %d tools", minToolCount))

if !success {
if lastErr != nil {
return fmt.Errorf("failed to discover tools: %w", lastErr)
}
return fmt.Errorf("only discovered %d tools, expected at least %d", lastCount, minToolCount)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = Describe("VirtualMCPServer Aggregation Filtering", Ordered, func() {
backend1Name = "yardstick-filter-a"
backend2Name = "yardstick-filter-b"
timeout = 3 * time.Minute
pollingInterval = 1 * time.Second
pollingInterval = 3 * time.Second // Increased from 1s to reduce K8s API pressure
vmcpNodePort int32
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var _ = Describe("VirtualMCPServer Tool Overrides", Ordered, func() {
vmcpServerName = "test-vmcp-overrides"
backendName = "yardstick-override"
timeout = 3 * time.Minute
pollingInterval = 1 * time.Second
pollingInterval = 3 * time.Second // Increased from 1s to reduce K8s API pressure
vmcpNodePort int32

// The original and renamed tool names
Expand Down
103 changes: 26 additions & 77 deletions test/e2e/thv-operator/virtualmcp/virtualmcp_auth_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"time"

mcpclient "github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/client/transport"
"github.com/mark3labs/mcp-go/mcp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -1147,7 +1145,7 @@ with socketserver.TCPServer(("", PORT), OIDCHandler) as httpd:
}
}

It("should list and call tools from all backends with discovered auth", func() {
It("should list and call tools from all backends with discovered auth", FlakeAttempts(2), func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly prefer not introducing FlakeAttempts because it will prolong CI and normalize handling flakiness with more retries. If this pattern proliferates, we'll have a lot of low value, high cost tests.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, good spot, I forgot to take these out when the other changes were added 😅

By("Verifying HTTP connectivity to VirtualMCPServer health endpoint")
Eventually(func() error {
url := fmt.Sprintf("http://localhost:%d/health", vmcpNodePort)
Expand All @@ -1165,44 +1163,24 @@ with socketserver.TCPServer(("", PORT), OIDCHandler) as httpd:
By("Getting OIDC token from mock OIDC server")
oidcToken := getOIDCToken()

By("Creating authenticated MCP client for VirtualMCPServer")
serverURL := fmt.Sprintf("http://localhost:%d/mcp", vmcpNodePort)
By("Creating authenticated MCP client with retry (fresh client on each attempt)")
// Use the helper that creates a fresh client on each retry attempt.
// This is crucial because the MCP client cannot be reliably restarted
// after a failed Start() or Initialize() call.
authenticatedHTTPClient := createAuthenticatedHTTPClient(oidcToken)
mcpClient, err := mcpclient.NewStreamableHttpClient(serverURL, transport.WithHTTPBasicClient(authenticatedHTTPClient))
Expect(err).ToNot(HaveOccurred())
mcpClient, err := CreateAuthenticatedMCPClientWithRetry(
vmcpNodePort,
"toolhive-auth-discovery-test",
authenticatedHTTPClient,
2*time.Minute,
5*time.Second,
)
Expect(err).ToNot(HaveOccurred(), "Should create authenticated MCP client")
defer mcpClient.Close()

By("Starting transport and initializing connection with retries")
// Retry MCP initialization to handle timing issues where the VirtualMCPServer's
// auth middleware (OIDC validation and auth discovery) may not be fully ready
testCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

Eventually(func() error {
initCtx, initCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer initCancel()

if err := mcpClient.Start(initCtx); err != nil {
return fmt.Errorf("failed to start transport: %w", err)
}

initRequest := mcp.InitializeRequest{}
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
initRequest.Params.ClientInfo = mcp.Implementation{
Name: "toolhive-auth-discovery-test",
Version: "1.0.0",
}
_, err := mcpClient.Initialize(initCtx, initRequest)
if err != nil {
return fmt.Errorf("failed to initialize: %w", err)
}

return nil
}, 2*time.Minute, 5*time.Second).Should(Succeed(), "MCP client should initialize successfully")

By("Listing tools from VirtualMCPServer")
listRequest := mcp.ListToolsRequest{}
tools, err := mcpClient.ListTools(testCtx, listRequest)
tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest)
Expect(err).ToNot(HaveOccurred())
Expect(tools.Tools).ToNot(BeEmpty())
Expect(len(tools.Tools)).To(BeNumerically(">=", 2), "Should aggregate tools from multiple backends")
Expand All @@ -1229,63 +1207,34 @@ with socketserver.TCPServer(("", PORT), OIDCHandler) as httpd:
callRequest.Params.Name = targetToolName
callRequest.Params.Arguments = map[string]any{"url": mockServer.URL}

result, err := mcpClient.CallTool(toolCallCtx, callRequest)
result, err := mcpClient.Client.CallTool(toolCallCtx, callRequest)
Expect(err).ToNot(HaveOccurred(), "Tool call should succeed: %s", targetToolName)
Expect(result).ToNot(BeNil())
GinkgoWriter.Printf("✓ Successfully called tool: %s\n", targetToolName)
}
})

It("should send auth tokens to configured auth servers", func() {
It("should send auth tokens to configured auth servers", FlakeAttempts(2), func() {
By("Calling tools to trigger token exchange")

By("Getting OIDC token for test client authentication")
token := getOIDCToken()

// Create authenticated MCP client and call tools to generate traffic
By("Creating authenticated MCP client for VirtualMCPServer")
serverURL := fmt.Sprintf("http://localhost:%d/mcp", vmcpNodePort)
By("Creating authenticated MCP client with retry (fresh client on each attempt)")
httpClient := createAuthenticatedHTTPClient(token)
mcpClient, err := mcpclient.NewStreamableHttpClient(
serverURL,
transport.WithHTTPBasicClient(httpClient),
mcpClient, err := CreateAuthenticatedMCPClientWithRetry(
vmcpNodePort,
"toolhive-auth-test",
httpClient,
2*time.Minute,
5*time.Second,
)
Expect(err).ToNot(HaveOccurred())
Expect(err).ToNot(HaveOccurred(), "Should create authenticated MCP client")
defer mcpClient.Close()

By("Starting transport and initializing connection with retries")
// Retry MCP initialization to handle timing issues where the VirtualMCPServer's
// auth middleware (OIDC validation and auth discovery) may not be fully ready
testCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

Eventually(func() error {
initCtx, initCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer initCancel()

err := mcpClient.Start(initCtx)
if err != nil {
return fmt.Errorf("failed to start transport: %w", err)
}

initRequest := mcp.InitializeRequest{}
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
initRequest.Params.ClientInfo = mcp.Implementation{
Name: "toolhive-auth-test",
Version: "1.0.0",
}

_, err = mcpClient.Initialize(initCtx, initRequest)
if err != nil {
return fmt.Errorf("failed to initialize: %w", err)
}

return nil
}, 2*time.Minute, 5*time.Second).Should(Succeed(), "MCP client should initialize successfully")

By("Listing and calling tools from backend with token exchange")
listRequest := mcp.ListToolsRequest{}
tools, err := mcpClient.ListTools(testCtx, listRequest)
tools, err := mcpClient.Client.ListTools(mcpClient.Ctx, listRequest)
Expect(err).ToNot(HaveOccurred())
Expect(tools.Tools).ToNot(BeEmpty())

Expand All @@ -1310,7 +1259,7 @@ with socketserver.TCPServer(("", PORT), OIDCHandler) as httpd:
"url": mockServer.URL,
}

_, err := mcpClient.CallTool(toolCallCtx, callRequest)
_, err := mcpClient.Client.CallTool(toolCallCtx, callRequest)
if err == nil {
GinkgoWriter.Printf("✓ Successfully called tool: %s\n", tool.Name)
calledTokenExchangeTool = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var _ = Describe("VirtualMCPServer Composite Parallel Workflow", Ordered, func()
backend1Name = "yardstick-par-a"
backend2Name = "yardstick-par-b"
timeout = 3 * time.Minute
pollingInterval = 1 * time.Second
pollingInterval = 3 * time.Second // Increased from 1s to reduce K8s API pressure
vmcpNodePort int32

// Composite tool name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = Describe("VirtualMCPServer Composite Sequential Workflow", Ordered, func
vmcpServerName = "test-vmcp-composite-seq"
backendName = "yardstick-composite-seq"
timeout = 3 * time.Minute
pollingInterval = 1 * time.Second
pollingInterval = 3 * time.Second // Increased from 1s to reduce K8s API pressure
vmcpNodePort int32

// Composite tool names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ = Describe("VirtualMCPServer Conflict Resolution", Ordered, func() {
var (
testNamespace = "default"
timeout = 3 * time.Minute
pollingInterval = 1 * time.Second
pollingInterval = 3 * time.Second // Increased from 1s to reduce K8s API pressure
)

Describe("Prefix Strategy", Ordered, func() {
Expand Down
Loading
Loading