From b7f4b178484d6e613d507308c8ea760de386c49c Mon Sep 17 00:00:00 2001 From: Reynier Ortiz Vega Date: Wed, 6 May 2026 19:38:21 -0400 Subject: [PATCH] Address Redis Cluster mode review follow-ups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three small follow-ups from the PR #5153 review that were deferred so the core cluster-mode support could land: - Document the cluster slot invariant for storeUpstreamTokensScript. The script reads oldUserID from KEYS[1] inside its body to keep the user-set bookkeeping atomic, so the user-set keys are constructed dynamically from ARGV[4] rather than declared as KEYS. The new comment makes the {ns:name} hash-tag requirement explicit, so a future refactor that rebuilds the prefix without the tag will be caught in review rather than silently regressing on standalone Redis and exploding with CROSSSLOT on a real cluster. - Filter SMEMBERS results in GetAllUpstreamTokens, DeleteUpstreamTokens, and GetLatestUpstreamTokensForUser to entries that share the storage instance's keyPrefix, warn-logging anything dropped. A stray un-prefixed member (legacy data, an external admin op, a test fixture) would today surface as CROSSSLOT under cluster while passing on standalone; this turns it into a logged warning. The defensive filter is a pure helper with table-driven tests, plus a behavior test that captures slog output to prove the wiring at all three call sites. - Align the operator CRD `tls` doc with the storage-layer comment so that crd-api.md and `kubectl explain` both describe TLS as applying to "Redis/Valkey master or cluster nodes" — previously cluster-mode users reading the CRD field would not realise this same field configured TLS for cluster-node connections. Regenerated CRD YAML, Helm templates, and crd-api.md to match. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../v1beta1/mcpexternalauthconfig_types.go | 2 +- ...e.stacklok.dev_mcpexternalauthconfigs.yaml | 4 +- ...olhive.stacklok.dev_virtualmcpservers.yaml | 4 +- ...e.stacklok.dev_mcpexternalauthconfigs.yaml | 4 +- ...olhive.stacklok.dev_virtualmcpservers.yaml | 4 +- docs/operator/crd-api.md | 2 +- pkg/authserver/storage/redis.go | 51 +++++++ pkg/authserver/storage/redis_test.go | 144 ++++++++++++++++++ 8 files changed, 205 insertions(+), 10 deletions(-) diff --git a/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go b/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go index 32e83ebde0..6b72e08b7e 100644 --- a/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go +++ b/cmd/thv-operator/api/v1beta1/mcpexternalauthconfig_types.go @@ -712,7 +712,7 @@ type RedisStorageConfig struct { // +optional WriteTimeout string `json:"writeTimeout,omitempty"` - // TLS configures TLS for connections to the Redis/Valkey master. + // TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. // Presence of this field enables TLS. Omit to use plaintext. // +optional TLS *RedisTLSConfig `json:"tls,omitempty"` diff --git a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml index faa443412e..3142691a4a 100644 --- a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml +++ b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml @@ -414,7 +414,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: @@ -1584,7 +1584,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: diff --git a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml index d752a5a546..c1c1bfb768 100644 --- a/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml +++ b/deploy/charts/operator-crds/files/crds/toolhive.stacklok.dev_virtualmcpservers.yaml @@ -287,7 +287,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: @@ -2905,7 +2905,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: diff --git a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml index a8197b955f..7c3ec3c172 100644 --- a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml +++ b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_mcpexternalauthconfigs.yaml @@ -417,7 +417,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: @@ -1587,7 +1587,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: diff --git a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml index 1aeaa8fdfa..303eab298b 100644 --- a/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml +++ b/deploy/charts/operator-crds/templates/toolhive.stacklok.dev_virtualmcpservers.yaml @@ -290,7 +290,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: @@ -2908,7 +2908,7 @@ spec: type: object tls: description: |- - TLS configures TLS for connections to the Redis/Valkey master. + TLS configures TLS for connections to the Redis/Valkey master or cluster nodes. Presence of this field enables TLS. Omit to use plaintext. properties: caCertSecretRef: diff --git a/docs/operator/crd-api.md b/docs/operator/crd-api.md index 010b399bde..81feec7554 100644 --- a/docs/operator/crd-api.md +++ b/docs/operator/crd-api.md @@ -2877,7 +2877,7 @@ _Appears in:_ | `dialTimeout` _string_ | DialTimeout is the timeout for establishing connections.
Format: Go duration string (e.g., "5s", "1m"). | 5s | Pattern: `^([0-9]+(\.[0-9]+)?(ns\|us\|µs\|ms\|s\|m\|h))+$`
Optional: \{\}
| | `readTimeout` _string_ | ReadTimeout is the timeout for socket reads.
Format: Go duration string (e.g., "3s", "1m"). | 3s | Pattern: `^([0-9]+(\.[0-9]+)?(ns\|us\|µs\|ms\|s\|m\|h))+$`
Optional: \{\}
| | `writeTimeout` _string_ | WriteTimeout is the timeout for socket writes.
Format: Go duration string (e.g., "3s", "1m"). | 3s | Pattern: `^([0-9]+(\.[0-9]+)?(ns\|us\|µs\|ms\|s\|m\|h))+$`
Optional: \{\}
| -| `tls` _[api.v1beta1.RedisTLSConfig](#apiv1beta1redistlsconfig)_ | TLS configures TLS for connections to the Redis/Valkey master.
Presence of this field enables TLS. Omit to use plaintext. | | Optional: \{\}
| +| `tls` _[api.v1beta1.RedisTLSConfig](#apiv1beta1redistlsconfig)_ | TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext. | | Optional: \{\}
| | `sentinelTls` _[api.v1beta1.RedisTLSConfig](#apiv1beta1redistlsconfig)_ | SentinelTLS configures TLS for connections to Sentinel instances.
Only applies when sentinelConfig is set. Presence of this field enables TLS. | | Optional: \{\}
| diff --git a/pkg/authserver/storage/redis.go b/pkg/authserver/storage/redis.go index 6b3fe4f19a..fa682cb392 100644 --- a/pkg/authserver/storage/redis.go +++ b/pkg/authserver/storage/redis.go @@ -14,6 +14,7 @@ import ( "net" "net/url" "slices" + "strings" "time" "github.com/ory/fosite" @@ -50,6 +51,35 @@ func warnOnCleanupErr(err error, operation, key string) { } } +// filterIndexMembersByPrefix splits SMEMBERS results into members that share +// this storage instance's key prefix and those that do not. Multi-key +// operations like MGet and Del fail with CROSSSLOT under Redis Cluster when +// any member hashes to a different slot, and a stray un-prefixed entry +// (legacy data, an external admin op, a test fixture) is the most likely +// source of such drift. Filtering at read time turns a hard cluster failure +// into a logged warning, while remaining a no-op on standalone Redis. +func filterIndexMembersByPrefix(prefix string, members []string) (kept, dropped []string) { + for _, m := range members { + if strings.HasPrefix(m, prefix) { + kept = append(kept, m) + } else { + dropped = append(dropped, m) + } + } + return kept, dropped +} + +// warnDroppedIndexMembers emits a warning for each foreign member returned by +// filterIndexMembersByPrefix so operators can identify the source of cluster- +// incompatible entries. +func warnDroppedIndexMembers(operation, indexKey, expectedPrefix string, dropped []string) { + for _, m := range dropped { + slog.Warn("dropping foreign index member to prevent CROSSSLOT", + "operation", operation, "indexKey", indexKey, + "expectedPrefix", expectedPrefix, "member", m) + } +} + // RedisConfig holds Redis connection configuration for runtime use. type RedisConfig struct { // Addr is the Redis server address (host:port). Required for standalone and cluster modes. @@ -902,6 +932,17 @@ func (s *storedUpstreamTokens) toUpstreamTokens() *UpstreamTokens { // ARGV[2] = TTL in milliseconds // ARGV[3] = new UserID ("" if no user) // ARGV[4] = user upstream set key prefix (e.g. "thv:auth:{ns:name}:user:upstream:") +// +// Cluster slot invariant: this script reads oldUserID from KEYS[1] inside the +// script body (atomic with the rest of the work), so the user-set keys are +// constructed dynamically as `ARGV[4] .. userID` rather than being passed as +// declared KEYS. Every dynamically-built key MUST therefore inherit the +// `{ns:name}` hash tag that is baked into ARGV[4] (s.keyPrefix). All callers +// derive ARGV[4] from s.keyPrefix, which DeriveKeyPrefix builds with the +// `{ns:name}` hash tag, so user-set keys land on the same Redis Cluster slot +// as KEYS[1] and KEYS[2]. A future refactor that strips the hash tag — or +// rebuilds setPrefix from raw inputs without the tag — will silently pass on +// standalone Redis and fail with CROSSSLOT under Cluster. var storeUpstreamTokensScript = redis.NewScript(` local oldUserID = "" local existing = redis.call('GET', KEYS[1]) @@ -1109,6 +1150,9 @@ func (s *RedisStorage) GetAllUpstreamTokens(ctx context.Context, sessionID strin return nil, fmt.Errorf("failed to get upstream token index: %w", err) } + providerKeys, dropped := filterIndexMembersByPrefix(s.keyPrefix, providerKeys) + warnDroppedIndexMembers("GetAllUpstreamTokens", idxKey, s.keyPrefix, dropped) + if len(providerKeys) == 0 { return result, nil } @@ -1171,6 +1215,9 @@ func (s *RedisStorage) DeleteUpstreamTokens(ctx context.Context, sessionID strin return fmt.Errorf("failed to get upstream token index: %w", err) } + providerKeys, dropped := filterIndexMembersByPrefix(s.keyPrefix, providerKeys) + warnDroppedIndexMembers("DeleteUpstreamTokens", idxKey, s.keyPrefix, dropped) + if len(providerKeys) == 0 { return fmt.Errorf("%w: %w", ErrNotFound, fosite.ErrNotFound.WithHint("Upstream tokens not found")) } @@ -1227,6 +1274,10 @@ func (s *RedisStorage) GetLatestUpstreamTokensForUser(ctx context.Context, userI if err != nil && !errors.Is(err, redis.Nil) { return nil, fmt.Errorf("failed to get user upstream index: %w", err) } + + members, dropped := filterIndexMembersByPrefix(s.keyPrefix, members) + warnDroppedIndexMembers("GetLatestUpstreamTokensForUser", setKey, s.keyPrefix, dropped) + if len(members) == 0 { return nil, fmt.Errorf("%w: %w", ErrNotFound, fosite.ErrNotFound.WithHint("Upstream tokens not found")) } diff --git a/pkg/authserver/storage/redis_test.go b/pkg/authserver/storage/redis_test.go index bb7a75a6bf..921ccf9818 100644 --- a/pkg/authserver/storage/redis_test.go +++ b/pkg/authserver/storage/redis_test.go @@ -8,9 +8,12 @@ package storage import ( + "bytes" "context" "fmt" + "log/slog" "net/url" + "strings" "sync" "testing" "time" @@ -2181,3 +2184,144 @@ func TestRedisStorage_GetLatestUpstreamTokensForUser(t *testing.T) { }) }) } + +// captureWarnLogs swaps in a buffered slog handler at warn level for the +// duration of the test and returns the captured output. Process-global, not +// safe for t.Parallel(). +func captureWarnLogs(t *testing.T) *bytes.Buffer { + t.Helper() + var buf bytes.Buffer + handler := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn}) + orig := slog.Default() + slog.SetDefault(slog.New(handler)) + t.Cleanup(func() { slog.SetDefault(orig) }) + return &buf +} + +// assertForeignDropWarn checks that the warn log emitted for a filtered +// CROSSSLOT-defending op names the operation, the dropped member, and the +// expected key prefix. +func assertForeignDropWarn(t *testing.T, out, operation, member, prefix string) { + t.Helper() + assert.Contains(t, out, "dropping foreign index member to prevent CROSSSLOT", + "expected warn log emitted by %s; got: %s", operation, out) + assert.Contains(t, out, member, "warn log should name the dropped member") + assert.Contains(t, out, operation, "warn log should name the operation") + assert.Contains(t, out, strings.TrimSuffix(prefix, ":"), + "warn log should reference the expected prefix") +} + +// TestForeignMembersFilteredFromIndexOps verifies that the three multi-key +// SMEMBERS-fed operations (GetAllUpstreamTokens, DeleteUpstreamTokens, +// GetLatestUpstreamTokensForUser) drop foreign members before MGet/Del so a +// stray un-prefixed entry in an index set cannot escalate into a CROSSSLOT +// failure on Redis Cluster, and emit a warn log naming the dropped member. +// +// This test uses slog.SetDefault (process-global) and therefore does not run +// in parallel. +func TestForeignMembersFilteredFromIndexOps(t *testing.T) { //nolint:paralleltest // captures slog default + storage, mr := newTestRedisStorage(t) + t.Cleanup(func() { + _ = storage.Close() + mr.Close() + }) + ctx := context.Background() + + tokens := &UpstreamTokens{ + ProviderID: "github", + AccessToken: "real-access", + RefreshToken: "real-refresh", + UserID: "user-A", + ExpiresAt: time.Now().Add(time.Hour), + } + require.NoError(t, storage.StoreUpstreamTokens(ctx, "session-X", "github", tokens)) + + sessionIdxKey := redisSetKey(storage.keyPrefix, KeyTypeUpstreamIdx, "session-X") + userIdxKey := redisSetKey(storage.keyPrefix, KeyTypeUserUpstream, "user-A") + const foreignMember = "other-tenant:auth:{ns:other}:upstream:s:p" + mr.SAdd(sessionIdxKey, foreignMember) + mr.SAdd(userIdxKey, foreignMember) + + t.Run("GetAllUpstreamTokens", func(t *testing.T) { + buf := captureWarnLogs(t) + got, err := storage.GetAllUpstreamTokens(ctx, "session-X") + require.NoError(t, err) + require.Len(t, got, 1) + require.Contains(t, got, "github") + assert.Equal(t, "real-access", got["github"].AccessToken) + assertForeignDropWarn(t, buf.String(), "GetAllUpstreamTokens", foreignMember, storage.keyPrefix) + }) + + t.Run("GetLatestUpstreamTokensForUser", func(t *testing.T) { + buf := captureWarnLogs(t) + got, err := storage.GetLatestUpstreamTokensForUser(ctx, "user-A", "github") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "real-refresh", got.RefreshToken) + assertForeignDropWarn(t, buf.String(), "GetLatestUpstreamTokensForUser", foreignMember, storage.keyPrefix) + }) + + // DeleteUpstreamTokens runs last because it removes the index set. + t.Run("DeleteUpstreamTokens", func(t *testing.T) { + buf := captureWarnLogs(t) + require.NoError(t, storage.DeleteUpstreamTokens(ctx, "session-X")) + assertForeignDropWarn(t, buf.String(), "DeleteUpstreamTokens", foreignMember, storage.keyPrefix) + }) +} + +func TestFilterIndexMembersByPrefix(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + prefix string + members []string + wantKept []string + wantDropped []string + }{ + { + name: "all members share prefix", + prefix: "thv:auth:{ns:name}:", + members: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"}, + wantKept: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"}, + wantDropped: nil, + }, + { + name: "stray un-prefixed member is dropped", + prefix: "thv:auth:{ns:name}:", + members: []string{"thv:auth:{ns:name}:upstream:s1:p1", "legacy-key", "thv:auth:{ns:name}:upstream:s1:p2"}, + wantKept: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"}, + wantDropped: []string{"legacy-key"}, + }, + { + name: "different-tenant member is dropped", + prefix: "thv:auth:{ns-a:srv}:", + members: []string{"thv:auth:{ns-a:srv}:upstream:s1:p1", "thv:auth:{ns-b:srv}:upstream:s1:p1"}, + wantKept: []string{"thv:auth:{ns-a:srv}:upstream:s1:p1"}, + wantDropped: []string{"thv:auth:{ns-b:srv}:upstream:s1:p1"}, + }, + { + name: "empty input", + prefix: "thv:auth:{ns:name}:", + members: nil, + wantKept: nil, + wantDropped: nil, + }, + { + name: "all members dropped", + prefix: "thv:auth:{ns:name}:", + members: []string{"foo", "bar"}, + wantKept: nil, + wantDropped: []string{"foo", "bar"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + kept, dropped := filterIndexMembersByPrefix(tc.prefix, tc.members) + assert.Equal(t, tc.wantKept, kept, "kept slice mismatch") + assert.Equal(t, tc.wantDropped, dropped, "dropped slice mismatch") + }) + } +}