Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ type RedisStorageConfig struct {
// +optional
WriteTimeout string `json:"writeTimeout,omitempty"`

// TLS configures TLS for connections to the Redis/Valkey master.
// TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
// Presence of this field enables TLS. Omit to use plaintext.
// +optional
TLS *RedisTLSConfig `json:"tls,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down Expand Up @@ -1584,7 +1584,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down Expand Up @@ -2905,7 +2905,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down Expand Up @@ -1587,7 +1587,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down Expand Up @@ -2908,7 +2908,7 @@ spec:
type: object
tls:
description: |-
TLS configures TLS for connections to the Redis/Valkey master.
TLS configures TLS for connections to the Redis/Valkey master or cluster nodes.
Presence of this field enables TLS. Omit to use plaintext.
properties:
caCertSecretRef:
Expand Down
2 changes: 1 addition & 1 deletion docs/operator/crd-api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions pkg/authserver/storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"net/url"
"slices"
"strings"
"time"

"github.com/ory/fosite"
Expand Down Expand Up @@ -50,6 +51,35 @@ func warnOnCleanupErr(err error, operation, key string) {
}
}

// filterIndexMembersByPrefix splits SMEMBERS results into members that share
// this storage instance's key prefix and those that do not. Multi-key
// operations like MGet and Del fail with CROSSSLOT under Redis Cluster when
// any member hashes to a different slot, and a stray un-prefixed entry
// (legacy data, an external admin op, a test fixture) is the most likely
// source of such drift. Filtering at read time turns a hard cluster failure
// into a logged warning, while remaining a no-op on standalone Redis.
func filterIndexMembersByPrefix(prefix string, members []string) (kept, dropped []string) {
for _, m := range members {
if strings.HasPrefix(m, prefix) {
kept = append(kept, m)
} else {
dropped = append(dropped, m)
}
}
return kept, dropped
}

// warnDroppedIndexMembers emits a warning for each foreign member returned by
// filterIndexMembersByPrefix so operators can identify the source of cluster-
// incompatible entries.
func warnDroppedIndexMembers(operation, indexKey, expectedPrefix string, dropped []string) {
for _, m := range dropped {
slog.Warn("dropping foreign index member to prevent CROSSSLOT",
"operation", operation, "indexKey", indexKey,
"expectedPrefix", expectedPrefix, "member", m)
}
}

// RedisConfig holds Redis connection configuration for runtime use.
type RedisConfig struct {
// Addr is the Redis server address (host:port). Required for standalone and cluster modes.
Expand Down Expand Up @@ -902,6 +932,17 @@ func (s *storedUpstreamTokens) toUpstreamTokens() *UpstreamTokens {
// ARGV[2] = TTL in milliseconds
// ARGV[3] = new UserID ("" if no user)
// ARGV[4] = user upstream set key prefix (e.g. "thv:auth:{ns:name}:user:upstream:")
//
// Cluster slot invariant: this script reads oldUserID from KEYS[1] inside the
// script body (atomic with the rest of the work), so the user-set keys are
// constructed dynamically as `ARGV[4] .. userID` rather than being passed as
// declared KEYS. Every dynamically-built key MUST therefore inherit the
// `{ns:name}` hash tag that is baked into ARGV[4] (s.keyPrefix). All callers
// derive ARGV[4] from s.keyPrefix, which DeriveKeyPrefix builds with the
// `{ns:name}` hash tag, so user-set keys land on the same Redis Cluster slot
// as KEYS[1] and KEYS[2]. A future refactor that strips the hash tag — or
// rebuilds setPrefix from raw inputs without the tag — will silently pass on
// standalone Redis and fail with CROSSSLOT under Cluster.
var storeUpstreamTokensScript = redis.NewScript(`
local oldUserID = ""
local existing = redis.call('GET', KEYS[1])
Expand Down Expand Up @@ -1109,6 +1150,9 @@ func (s *RedisStorage) GetAllUpstreamTokens(ctx context.Context, sessionID strin
return nil, fmt.Errorf("failed to get upstream token index: %w", err)
}

providerKeys, dropped := filterIndexMembersByPrefix(s.keyPrefix, providerKeys)
warnDroppedIndexMembers("GetAllUpstreamTokens", idxKey, s.keyPrefix, dropped)

if len(providerKeys) == 0 {
return result, nil
}
Expand Down Expand Up @@ -1171,6 +1215,9 @@ func (s *RedisStorage) DeleteUpstreamTokens(ctx context.Context, sessionID strin
return fmt.Errorf("failed to get upstream token index: %w", err)
}

providerKeys, dropped := filterIndexMembersByPrefix(s.keyPrefix, providerKeys)
warnDroppedIndexMembers("DeleteUpstreamTokens", idxKey, s.keyPrefix, dropped)

if len(providerKeys) == 0 {
return fmt.Errorf("%w: %w", ErrNotFound, fosite.ErrNotFound.WithHint("Upstream tokens not found"))
}
Expand Down Expand Up @@ -1227,6 +1274,10 @@ func (s *RedisStorage) GetLatestUpstreamTokensForUser(ctx context.Context, userI
if err != nil && !errors.Is(err, redis.Nil) {
return nil, fmt.Errorf("failed to get user upstream index: %w", err)
}

members, dropped := filterIndexMembersByPrefix(s.keyPrefix, members)
warnDroppedIndexMembers("GetLatestUpstreamTokensForUser", setKey, s.keyPrefix, dropped)

if len(members) == 0 {
return nil, fmt.Errorf("%w: %w", ErrNotFound, fosite.ErrNotFound.WithHint("Upstream tokens not found"))
}
Expand Down
144 changes: 144 additions & 0 deletions pkg/authserver/storage/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
package storage

import (
"bytes"
"context"
"fmt"
"log/slog"
"net/url"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -2181,3 +2184,144 @@ func TestRedisStorage_GetLatestUpstreamTokensForUser(t *testing.T) {
})
})
}

// captureWarnLogs swaps in a buffered slog handler at warn level for the
// duration of the test and returns the captured output. Process-global, not
// safe for t.Parallel().
func captureWarnLogs(t *testing.T) *bytes.Buffer {
t.Helper()
var buf bytes.Buffer
handler := slog.NewJSONHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn})
orig := slog.Default()
slog.SetDefault(slog.New(handler))
t.Cleanup(func() { slog.SetDefault(orig) })
return &buf
}

// assertForeignDropWarn checks that the warn log emitted for a filtered
// CROSSSLOT-defending op names the operation, the dropped member, and the
// expected key prefix.
func assertForeignDropWarn(t *testing.T, out, operation, member, prefix string) {
t.Helper()
assert.Contains(t, out, "dropping foreign index member to prevent CROSSSLOT",
"expected warn log emitted by %s; got: %s", operation, out)
assert.Contains(t, out, member, "warn log should name the dropped member")
assert.Contains(t, out, operation, "warn log should name the operation")
assert.Contains(t, out, strings.TrimSuffix(prefix, ":"),
"warn log should reference the expected prefix")
}

// TestForeignMembersFilteredFromIndexOps verifies that the three multi-key
// SMEMBERS-fed operations (GetAllUpstreamTokens, DeleteUpstreamTokens,
// GetLatestUpstreamTokensForUser) drop foreign members before MGet/Del so a
// stray un-prefixed entry in an index set cannot escalate into a CROSSSLOT
// failure on Redis Cluster, and emit a warn log naming the dropped member.
//
// This test uses slog.SetDefault (process-global) and therefore does not run
// in parallel.
func TestForeignMembersFilteredFromIndexOps(t *testing.T) { //nolint:paralleltest // captures slog default
storage, mr := newTestRedisStorage(t)
t.Cleanup(func() {
_ = storage.Close()
mr.Close()
})
ctx := context.Background()

tokens := &UpstreamTokens{
ProviderID: "github",
AccessToken: "real-access",
RefreshToken: "real-refresh",
UserID: "user-A",
ExpiresAt: time.Now().Add(time.Hour),
}
require.NoError(t, storage.StoreUpstreamTokens(ctx, "session-X", "github", tokens))

sessionIdxKey := redisSetKey(storage.keyPrefix, KeyTypeUpstreamIdx, "session-X")
userIdxKey := redisSetKey(storage.keyPrefix, KeyTypeUserUpstream, "user-A")
const foreignMember = "other-tenant:auth:{ns:other}:upstream:s:p"
mr.SAdd(sessionIdxKey, foreignMember)
mr.SAdd(userIdxKey, foreignMember)

t.Run("GetAllUpstreamTokens", func(t *testing.T) {
buf := captureWarnLogs(t)
got, err := storage.GetAllUpstreamTokens(ctx, "session-X")
require.NoError(t, err)
require.Len(t, got, 1)
require.Contains(t, got, "github")
assert.Equal(t, "real-access", got["github"].AccessToken)
assertForeignDropWarn(t, buf.String(), "GetAllUpstreamTokens", foreignMember, storage.keyPrefix)
})

t.Run("GetLatestUpstreamTokensForUser", func(t *testing.T) {
buf := captureWarnLogs(t)
got, err := storage.GetLatestUpstreamTokensForUser(ctx, "user-A", "github")
require.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, "real-refresh", got.RefreshToken)
assertForeignDropWarn(t, buf.String(), "GetLatestUpstreamTokensForUser", foreignMember, storage.keyPrefix)
})

// DeleteUpstreamTokens runs last because it removes the index set.
t.Run("DeleteUpstreamTokens", func(t *testing.T) {
buf := captureWarnLogs(t)
require.NoError(t, storage.DeleteUpstreamTokens(ctx, "session-X"))
assertForeignDropWarn(t, buf.String(), "DeleteUpstreamTokens", foreignMember, storage.keyPrefix)
})
}

func TestFilterIndexMembersByPrefix(t *testing.T) {
t.Parallel()

tests := []struct {
name string
prefix string
members []string
wantKept []string
wantDropped []string
}{
{
name: "all members share prefix",
prefix: "thv:auth:{ns:name}:",
members: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"},
wantKept: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"},
wantDropped: nil,
},
{
name: "stray un-prefixed member is dropped",
prefix: "thv:auth:{ns:name}:",
members: []string{"thv:auth:{ns:name}:upstream:s1:p1", "legacy-key", "thv:auth:{ns:name}:upstream:s1:p2"},
wantKept: []string{"thv:auth:{ns:name}:upstream:s1:p1", "thv:auth:{ns:name}:upstream:s1:p2"},
wantDropped: []string{"legacy-key"},
},
{
name: "different-tenant member is dropped",
prefix: "thv:auth:{ns-a:srv}:",
members: []string{"thv:auth:{ns-a:srv}:upstream:s1:p1", "thv:auth:{ns-b:srv}:upstream:s1:p1"},
wantKept: []string{"thv:auth:{ns-a:srv}:upstream:s1:p1"},
wantDropped: []string{"thv:auth:{ns-b:srv}:upstream:s1:p1"},
},
{
name: "empty input",
prefix: "thv:auth:{ns:name}:",
members: nil,
wantKept: nil,
wantDropped: nil,
},
{
name: "all members dropped",
prefix: "thv:auth:{ns:name}:",
members: []string{"foo", "bar"},
wantKept: nil,
wantDropped: []string{"foo", "bar"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
kept, dropped := filterIndexMembersByPrefix(tc.prefix, tc.members)
assert.Equal(t, tc.wantKept, kept, "kept slice mismatch")
assert.Equal(t, tc.wantDropped, dropped, "dropped slice mismatch")
})
}
}
Loading