diff --git a/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go b/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go
index 32e83ebde0..6b72e08b7e 100644
--- a/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go
+++ b/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go
@@ -712,7 +712,7 @@ type RedisStorageConfig struct {
// +optional
WriteTimeout string `json:"writeTimeout,omitempty"`
- // TLS configures TLS for connections to the Redis/Valkey master.
+ // TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
// Presence of this field enables TLS. Omit to use plaintext.
// +optional
TLS *RedisTLSConfig `json:"tls,omitempty"`
diff --git a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml
index faa443412e..3142691a4a 100644
--- a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml
+++ b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml
@@ -414,7 +414,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
@@ -1584,7 +1584,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
diff --git a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml
index d752a5a546..c1c1bfb768 100644
--- a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml
+++ b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml
@@ -287,7 +287,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
@@ -2905,7 +2905,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
diff --git a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml
index a8197b955f..7c3ec3c172 100644
--- a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml
+++ b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml
@@ -417,7 +417,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
@@ -1587,7 +1587,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
diff --git a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml
index 1aeaa8fdfa..303eab298b 100644
--- a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml
+++ b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml
@@ -290,7 +290,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
@@ -2908,7 +2908,7 @@ spec:
type: object
tls:
description: |-
- TLS configures TLS for connections to the Redis/Valkey master.
+ TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
diff --git a/docs/operator/crd-api.md b/docs/operator/crd-api.md
index 010b399bde..81feec7554 100644
--- a/docs/operator/crd-api.md
+++ b/docs/operator/crd-api.md
@@ -2877,7 +2877,7 @@ _Appears in:_
| `dialTimeout` _string_ | DialTimeout is the timeout for establishing connections.
Format: Go duration string (e.g., "5s", "1m"). | 5s | Pattern: `^([0-9]+(\.[0-9]+)?(ns\|us\|µs\|ms\|s\|m\|h))+$`
Optional: \{\}
|
| `readTimeout` _string_ | ReadTimeout is the timeout for socket reads.
Format: Go duration string (e.g., "3s", "1m"). | 3s | Pattern: `^([0-9]+(\.[0-9]+)?(ns\|us\|µs\|ms\|s\|m\|h))+$`
Optional: \{\}
|
| `writeTimeout` _string_ | WriteTimeout is the timeout for socket writes.
Format: Go duration string (e.g., "3s", "1m"). | 3s | Pattern: `^([0-9]+(\.[0-9]+)?(ns\|us\|µs\|ms\|s\|m\|h))+$`
Optional: \{\}
|
-| `tls` _[api.v1beta1.RedisTLSConfig](#apiv1beta1redistlsconfig)_ | TLS configures TLS for connections to the Redis/Valkey master.
Presence of this field enables TLS. Omit to use plaintext. | | Optional: \{\}
|
+| `tls` _[api.v1beta1.RedisTLSConfig](#apiv1beta1redistlsconfig)_ | TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext. | | Optional: \{\}
|
| `sentinelTls` _[api.v1beta1.RedisTLSConfig](#apiv1beta1redistlsconfig)_ | SentinelTLS configures TLS for connections to Sentinel instances.
Only applies when sentinelConfig is set. Presence of this field enables TLS. | | Optional: \{\}
|
diff --git a/pkg/authserver/storage/redis.go b/pkg/authserver/storage/redis.go
index 6b3fe4f19a..fa682cb392 100644
--- a/pkg/authserver/storage/redis.go
+++ b/pkg/authserver/storage/redis.go
@@ -14,6 +14,7 @@ import (
"net"
"net/url"
"slices"
+ "strings"
"time"
"github.com/ory/fosite"
@@ -50,6 +51,35 @@ func warnOnCleanupErr(err error, operation, key string) {
}
}
+// filterIndexMembersByPrefix splits SMEMBERS results into members that share
+// this storage instance's key prefix and those that do not. Multi-key
+// operations like MGet and Del fail with CROSSSLOT under Redis Cluster when
+// any member hashes to a different slot, and a stray un-prefixed entry
+// (legacy data, an external admin op, a test fixture) is the most likely
+// source of such drift. Filtering at read time turns a hard cluster failure
+// into a logged warning, while remaining a no-op on standalone Redis.
+func filterIndexMembersByPrefix(prefix string, members []string) (kept, dropped []string) {
+ for _, m := range members {
+ if strings.HasPrefix(m, prefix) {
+ kept = append(kept, m)
+ } else {
+ dropped = append(dropped, m)
+ }
+ }
+ return kept, dropped
+}
+
+// warnDroppedIndexMembers emits a warning for each foreign member returned by
+// filterIndexMembersByPrefix so operators can identify the source of cluster-
+// incompatible entries.
+func warnDroppedIndexMembers(operation, indexKey, expectedPrefix string, dropped []string) {
+ for _, m := range dropped {
+ slog.Warn("dropping foreign index member to prevent CROSSSLOT",
+ "operation", operation, "indexKey", indexKey,
+ "expectedPrefix", expectedPrefix, "member", m)
+ }
+}
+
// RedisConfig holds Redis connection configuration for runtime use.
type RedisConfig struct {
// Addr is the Redis server address (host:port). Required for standalone and cluster modes.
@@ -902,6 +932,17 @@ func (s *storedUpstreamTokens) toUpstreamTokens() *UpstreamTokens {
// ARGV[2] = TTL in milliseconds
// ARGV[3] = new UserID ("" if no user)
// ARGV[4] = user upstream set key prefix (e.g. "thv:auth:{ns:name}:user:upstream:")
+//
+// Cluster slot invariant: this script reads oldUserID from KEYS[1] inside the
+// script body (atomic with the rest of the work), so the user-set keys are
+// constructed dynamically as `ARGV[4] .. userID` rather than being passed as
+// declared KEYS. Every dynamically-built key MUST therefore inherit the
+// `{ns:name}` hash tag that is baked into ARGV[4] (s.keyPrefix). All callers
+// derive ARGV[4] from s.keyPrefix, which DeriveKeyPrefix builds with the
+// `{ns:name}` hash tag, so user-set keys land on the same Redis Cluster slot
+// as KEYS[1] and KEYS[2]. A future refactor that strips the hash tag — or
+// rebuilds setPrefix from raw inputs without the tag — will silently pass on
+// standalone Redis and fail with CROSSSLOT under Cluster.
var storeUpstreamTokensScript = redis.NewScript(`
local oldUserID = ""
local existing = redis.call('GET', KEYS[1])
@@ -1109,6 +1150,9 @@ func (s *RedisStorage) GetAllUpstreamTokens(ctx context.Context, sessionID strin
return nil, fmt.Errorf("failed to get upstream token index: %w", err)
}
+ providerKeys, dropped := filterIndexMembersByPrefix(s.keyPrefix, providerKeys)
+ warnDroppedIndexMembers("GetAllUpstreamTokens", idxKey, s.keyPrefix, dropped)
+
if len(providerKeys) == 0 {
return result, nil
}
@@ -1171,6 +1215,9 @@ func (s *RedisStorage) DeleteUpstreamTokens(ctx context.Context, sessionID strin
return fmt.Errorf("failed to get upstream token index: %w", err)
}
+ providerKeys, dropped := filterIndexMembersByPrefix(s.keyPrefix, providerKeys)
+ warnDroppedIndexMembers("DeleteUpstreamTokens", idxKey, s.keyPrefix, dropped)
+
if len(providerKeys) == 0 {
return fmt.Errorf("%w: %w", ErrNotFound, fosite.ErrNotFound.WithHint("Upstream tokens not found"))
}
@@ -1227,6 +1274,10 @@ func (s *RedisStorage) GetLatestUpstreamTokensForUser(ctx context.Context, userI
if err != nil && !errors.Is(err, redis.Nil) {
return nil, fmt.Errorf("failed to get user upstream index: %w", err)
}
+
+ members, dropped := filterIndexMembersByPrefix(s.keyPrefix, members)
+ warnDroppedIndexMembers("GetLatestUpstreamTokensForUser", setKey, s.keyPrefix, dropped)
+
if len(members) == 0 {
return nil, fmt.Errorf("%w: %w", ErrNotFound, fosite.ErrNotFound.WithHint("Upstream tokens not found"))
}
diff --git a/pkg/authserver/storage/redis_test.go b/pkg/authserver/storage/redis_test.go
index bb7a75a6bf..921ccf9818 100644
--- a/pkg/authserver/storage/redis_test.go
+++ b/pkg/authserver/storage/redis_test.go
@@ -8,9 +8,12 @@
package storage
import (
+ "bytes"
"context"
"fmt"
+ "log/slog"
"net/url"
+ "strings"
"sync"
"testing"
"time"
@@ -2181,3 +2184,144 @@ func TestRedisStorage_GetLatestUpstreamTokensForUser(t *testing.T) {
})
})
}
+
+// captureWarnLogs swaps in a buffered slog handler at warn level for the
+// duration of the test and returns the captured output. Process-global, not
+// safe for t.Parallel().
+func captureWarnLogs(t *testing.T) *bytes.Buffer {
+ t.Helper()
+ var buf bytes.Buffer
+ handler := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn})
+ orig := slog.Default()
+ slog.SetDefault(slog.New(handler))
+ t.Cleanup(func() { slog.SetDefault(orig) })
+ return &buf
+}
+
+// assertForeignDropWarn checks that the warn log emitted for a filtered
+// CROSSSLOT-defending op names the operation, the dropped member, and the
+// expected key prefix.
+func assertForeignDropWarn(t *testing.T, out, operation, member, prefix string) {
+ t.Helper()
+ assert.Contains(t, out, "dropping foreign index member to prevent CROSSSLOT",
+ "expected warn log emitted by %s; got: %s", operation, out)
+ assert.Contains(t, out, member, "warn log should name the dropped member")
+ assert.Contains(t, out, operation, "warn log should name the operation")
+ assert.Contains(t, out, strings.TrimSuffix(prefix, ":"),
+ "warn log should reference the expected prefix")
+}
+
+// TestForeignMembersFilteredFromIndexOps verifies that the three multi-key
+// SMEMBERS-fed operations (GetAllUpstreamTokens, DeleteUpstreamTokens,
+// GetLatestUpstreamTokensForUser) drop foreign members before MGet/Del so a
+// stray un-prefixed entry in an index set cannot escalate into a CROSSSLOT
+// failure on Redis Cluster, and emit a warn log naming the dropped member.
+//
+// This test uses slog.SetDefault (process-global) and therefore does not run
+// in parallel.
+func TestForeignMembersFilteredFromIndexOps(t *testing.T) { //nolint:paralleltest // captures slog default
+ storage, mr := newTestRedisStorage(t)
+ t.Cleanup(func() {
+ _ = storage.Close()
+ mr.Close()
+ })
+ ctx := context.Background()
+
+ tokens := &UpstreamTokens{
+ ProviderID: "github",
+ AccessToken: "real-access",
+ RefreshToken: "real-refresh",
+ UserID: "user-A",
+ ExpiresAt: time.Now().Add(time.Hour),
+ }
+ require.NoError(t, storage.StoreUpstreamTokens(ctx, "session-X", "github", tokens))
+
+ sessionIdxKey := redisSetKey(storage.keyPrefix, KeyTypeUpstreamIdx, "session-X")
+ userIdxKey := redisSetKey(storage.keyPrefix, KeyTypeUserUpstream, "user-A")
+ const foreignMember = "other-tenant:auth:{ns:other}:upstream:s:p"
+ mr.SAdd(sessionIdxKey, foreignMember)
+ mr.SAdd(userIdxKey, foreignMember)
+
+ t.Run("GetAllUpstreamTokens", func(t *testing.T) {
+ buf := captureWarnLogs(t)
+ got, err := storage.GetAllUpstreamTokens(ctx, "session-X")
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.Contains(t, got, "github")
+ assert.Equal(t, "real-access", got["github"].AccessToken)
+ assertForeignDropWarn(t, buf.String(), "GetAllUpstreamTokens", foreignMember, storage.keyPrefix)
+ })
+
+ t.Run("GetLatestUpstreamTokensForUser", func(t *testing.T) {
+ buf := captureWarnLogs(t)
+ got, err := storage.GetLatestUpstreamTokensForUser(ctx, "user-A", "github")
+ require.NoError(t, err)
+ require.NotNil(t, got)
+ assert.Equal(t, "real-refresh", got.RefreshToken)
+ assertForeignDropWarn(t, buf.String(), "GetLatestUpstreamTokensForUser", foreignMember, storage.keyPrefix)
+ })
+
+ // DeleteUpstreamTokens runs last because it removes the index set.
+ t.Run("DeleteUpstreamTokens", func(t *testing.T) {
+ buf := captureWarnLogs(t)
+ require.NoError(t, storage.DeleteUpstreamTokens(ctx, "session-X"))
+ assertForeignDropWarn(t, buf.String(), "DeleteUpstreamTokens", foreignMember, storage.keyPrefix)
+ })
+}
+
+func TestFilterIndexMembersByPrefix(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ prefix string
+ members []string
+ wantKept []string
+ wantDropped []string
+ }{
+ {
+ name: "all members share prefix",
+ prefix: "thv:auth:{ns:name}:",
+ members: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"},
+ wantKept: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"},
+ wantDropped: nil,
+ },
+ {
+ name: "stray un-prefixed member is dropped",
+ prefix: "thv:auth:{ns:name}:",
+ members: []string{"thv:auth:{ns:name}:upstream:s1:p1", "legacy-key", "thv:auth:{ns:name}:upstream:s1:p2"},
+ wantKept: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"},
+ wantDropped: []string{"legacy-key"},
+ },
+ {
+ name: "different-tenant member is dropped",
+ prefix: "thv:auth:{ns-a:srv}:",
+ members: []string{"thv:auth:{ns-a:srv}:upstream:s1:p1", "thv:auth:{ns-b:srv}:upstream:s1:p1"},
+ wantKept: []string{"thv:auth:{ns-a:srv}:upstream:s1:p1"},
+ wantDropped: []string{"thv:auth:{ns-b:srv}:upstream:s1:p1"},
+ },
+ {
+ name: "empty input",
+ prefix: "thv:auth:{ns:name}:",
+ members: nil,
+ wantKept: nil,
+ wantDropped: nil,
+ },
+ {
+ name: "all members dropped",
+ prefix: "thv:auth:{ns:name}:",
+ members: []string{"foo", "bar"},
+ wantKept: nil,
+ wantDropped: []string{"foo", "bar"},
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+ kept, dropped := filterIndexMembersByPrefix(tc.prefix, tc.members)
+ assert.Equal(t, tc.wantKept, kept, "kept slice mismatch")
+ assert.Equal(t, tc.wantDropped, dropped, "dropped slice mismatch")
+ })
+ }
+}