Skip to content

Commit 2866fa5

Browse files
committed
Add conditional download caching to LoadRuleGroups
Signed-off-by: Paurush Garg <paurushg@amazon.com>
1 parent ad0a071 commit 2866fa5

3 files changed

Lines changed: 187 additions & 11 deletions

File tree

pkg/ruler/rulestore/bucketclient/bucket_client.go

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"io"
99
"strings"
1010
"sync"
11+
"time"
1112

1213
"github.com/go-kit/log"
1314
"github.com/go-kit/log/level"
1415
"github.com/gogo/protobuf/proto"
1516
"github.com/pkg/errors"
1617
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promauto"
1719
"github.com/thanos-io/objstore"
1820
"github.com/thanos-io/thanos/pkg/extprom"
1921
"golang.org/x/sync/errgroup"
@@ -25,6 +27,11 @@ import (
2527
"github.com/cortexproject/cortex/pkg/util/users"
2628
)
2729

30+
type cachedRuleGroup struct {
31+
downloadedAt time.Time
32+
ruleGroup *rulespb.RuleGroupDesc
33+
}
34+
2835
const (
2936
// The bucket prefix under which all tenants rule groups are stored.
3037
rulesPrefix = "rules"
@@ -48,6 +55,10 @@ type BucketRuleStore struct {
4855

4956
usersScanner users.Scanner
5057
userIndexUpdater *users.UserIndexUpdater
58+
59+
ruleGroupCache map[string]*cachedRuleGroup
60+
ruleGroupCacheMu sync.RWMutex
61+
cacheOps *prometheus.CounterVec
5162
}
5263

5364
func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketRuleStore, error) {
@@ -74,6 +85,11 @@ func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerCo
7485
logger: logger,
7586
usersScanner: usersScanner,
7687
userIndexUpdater: userIndexUpdater,
88+
ruleGroupCache: make(map[string]*cachedRuleGroup),
89+
cacheOps: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
90+
Name: "cortex_ruler_rule_group_load_cache_operations_total",
91+
Help: "Total number of rule group load operations by cache result (hit=skipped GET, miss=full GET).",
92+
}, []string{"result"}),
7793
}, nil
7894
}
7995

@@ -82,21 +98,40 @@ func (b *BucketRuleStore) GetUserIndexUpdater() *users.UserIndexUpdater {
8298
}
8399

84100
// getRuleGroup loads and return a rules group. If existing rule group is supplied, it is Reset and reused. If nil, new RuleGroupDesc is allocated.
85-
func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) {
101+
// Uses conditional download: checks LastModified via HEAD before doing a full GET to avoid
102+
// redundant downloads for unchanged rule groups.
103+
func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, _ *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) {
86104
userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider)
87105
objectKey := getRuleGroupObjectKey(namespace, groupName)
106+
cacheKey := userID + "/" + objectKey
107+
108+
// Only check S3 HEAD if we have a cached version to compare against.
109+
b.ruleGroupCacheMu.RLock()
110+
cached, hasCached := b.ruleGroupCache[cacheKey]
111+
b.ruleGroupCacheMu.RUnlock()
112+
113+
if hasCached {
114+
attrs, err := userBucket.Attributes(ctx, objectKey)
115+
if err == nil && cached.downloadedAt.After(attrs.LastModified) {
116+
b.cacheOps.WithLabelValues("hit").Inc()
117+
return cached.ruleGroup, nil
118+
}
119+
// HEAD failed or file changed — fall through to full GET.
120+
}
121+
122+
// Full GET: cold cache or file has changed.
123+
b.cacheOps.WithLabelValues("miss").Inc()
88124

89125
reader, err := userBucket.Get(ctx, objectKey)
90126
if userBucket.IsObjNotFoundErr(err) {
127+
b.evictFromCache(cacheKey)
91128
level.Debug(b.logger).Log("msg", "rule group does not exist", "user", userID, "key", objectKey)
92129
return nil, rulestore.ErrGroupNotFound
93130
}
94-
95131
if userBucket.IsAccessDeniedErr(err) {
96132
level.Debug(b.logger).Log("msg", "permission denied when loading group", "user", userID, "key", objectKey)
97133
return nil, rulestore.ErrAccessDenied
98134
}
99-
100135
if err != nil {
101136
return nil, errors.Wrapf(err, "failed to get rule group %s", objectKey)
102137
}
@@ -107,20 +142,53 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g
107142
return nil, errors.Wrapf(err, "failed to read rule group %s", objectKey)
108143
}
109144

110-
if rg == nil {
111-
rg = &rulespb.RuleGroupDesc{}
112-
} else {
113-
rg.Reset()
145+
rg := &rulespb.RuleGroupDesc{}
146+
if err = proto.Unmarshal(buf, rg); err != nil {
147+
return nil, errors.Wrapf(err, "failed to unmarshal rule group %s", objectKey)
114148
}
115149

116-
err = proto.Unmarshal(buf, rg)
117-
if err != nil {
118-
return nil, errors.Wrapf(err, "failed to unmarshal rule group %s", objectKey)
150+
b.ruleGroupCacheMu.Lock()
151+
b.ruleGroupCache[cacheKey] = &cachedRuleGroup{
152+
downloadedAt: time.Now(),
153+
ruleGroup: rg,
119154
}
155+
b.ruleGroupCacheMu.Unlock()
120156

121157
return rg, nil
122158
}
123159

160+
// evictFromCache removes a rule group from the cache (e.g., when deleted).
161+
func (b *BucketRuleStore) evictFromCache(cacheKey string) {
162+
b.ruleGroupCacheMu.Lock()
163+
delete(b.ruleGroupCache, cacheKey)
164+
b.ruleGroupCacheMu.Unlock()
165+
}
166+
167+
// ClearCache removes all cached rule groups. Exposed for testing.
168+
func (b *BucketRuleStore) ClearCache() {
169+
b.ruleGroupCacheMu.Lock()
170+
b.ruleGroupCache = make(map[string]*cachedRuleGroup)
171+
b.ruleGroupCacheMu.Unlock()
172+
}
173+
174+
// pruneCache removes cache entries for rule groups not in the current groupsToLoad set.
175+
func (b *BucketRuleStore) pruneCache(groupsToLoad map[string]rulespb.RuleGroupList) {
176+
validKeys := make(map[string]struct{}, len(groupsToLoad))
177+
for user, groups := range groupsToLoad {
178+
for _, g := range groups {
179+
validKeys[user+"/"+getRuleGroupObjectKey(g.Namespace, g.Name)] = struct{}{}
180+
}
181+
}
182+
183+
b.ruleGroupCacheMu.Lock()
184+
for key := range b.ruleGroupCache {
185+
if _, ok := validKeys[key]; !ok {
186+
delete(b.ruleGroupCache, key)
187+
}
188+
}
189+
b.ruleGroupCacheMu.Unlock()
190+
}
191+
124192
// ListAllUsers implements rules.RuleStore.
125193
func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) {
126194
active, deleting, _, err := b.usersScanner.ScanUsers(ctx)
@@ -268,6 +336,9 @@ outer:
268336
return loadedGroups, e
269337
}
270338

339+
// Prune cache entries for rule groups no longer owned by this pod (e.g., after ring rebalance).
340+
b.pruneCache(groupsToLoad)
341+
271342
return loadedGroups, errs.Err()
272343
}
273344

pkg/ruler/rulestore/bucketclient/bucket_client_test.go

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/go-kit/log"
1212
"github.com/pkg/errors"
1313
"github.com/prometheus/client_golang/prometheus"
14+
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
1415
"github.com/prometheus/common/model"
1516
"github.com/prometheus/prometheus/model/rulefmt"
1617
"github.com/stretchr/testify/assert"
@@ -201,7 +202,7 @@ func TestLoadRules(t *testing.T) {
201202
// Load with missing rule groups fails.
202203
require.NoError(t, rs.DeleteRuleGroup(context.Background(), "user1", "hello", "first testGroup"))
203204
_, err = rs.LoadRuleGroups(context.Background(), allGroupsMap)
204-
require.EqualError(t, err, "get rule group user=\"user2\", namespace=\"world\", name=\"first testGroup\": group does not exist")
205+
require.EqualError(t, err, "get rule group user=\"user1\", namespace=\"hello\", name=\"first testGroup\": group does not exist")
205206
})
206207
}
207208

@@ -461,3 +462,101 @@ func (mb mockBucket) Iter(_ context.Context, dir string, f func(string) error, o
461462
}
462463
return nil
463464
}
465+
466+
func TestLoadRuleGroupsCache(t *testing.T) {
467+
bucketClient := objstore.NewInMemBucket()
468+
reg := prometheus.NewPedanticRegistry()
469+
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
470+
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
471+
require.NoError(t, err)
472+
473+
// Setup: create a rule group.
474+
desc := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)})
475+
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc))
476+
477+
allGroups, err := bucketStore.ListAllRuleGroups(context.Background())
478+
require.NoError(t, err)
479+
480+
// First load: cold cache, should do full GET (miss).
481+
loaded, err := bucketStore.LoadRuleGroups(context.Background(), allGroups)
482+
require.NoError(t, err)
483+
require.Len(t, loaded["user1"], 1)
484+
require.Equal(t, "group1", loaded["user1"][0].Name)
485+
486+
// Second load: cache is warm, file unchanged → should be a cache hit.
487+
time.Sleep(10 * time.Millisecond) // ensure downloadedAt is after LastModified
488+
loaded2, err := bucketStore.LoadRuleGroups(context.Background(), allGroups)
489+
require.NoError(t, err)
490+
require.Len(t, loaded2["user1"], 1)
491+
require.Equal(t, "group1", loaded2["user1"][0].Name)
492+
493+
// Verify cache hit metric.
494+
hitCount := promtestutil.ToFloat64(bucketStore.cacheOps.WithLabelValues("hit"))
495+
require.Equal(t, float64(1), hitCount)
496+
}
497+
498+
func TestLoadRuleGroupsCacheMissOnModification(t *testing.T) {
499+
bucketClient := objstore.NewInMemBucket()
500+
reg := prometheus.NewPedanticRegistry()
501+
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
502+
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
503+
require.NoError(t, err)
504+
505+
// Setup: create a rule group.
506+
desc := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)})
507+
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc))
508+
509+
allGroups, err := bucketStore.ListAllRuleGroups(context.Background())
510+
require.NoError(t, err)
511+
512+
// First load: populates cache.
513+
_, err = bucketStore.LoadRuleGroups(context.Background(), allGroups)
514+
require.NoError(t, err)
515+
516+
// Modify the rule group in S3.
517+
time.Sleep(10 * time.Millisecond)
518+
desc2 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(2 * time.Minute)})
519+
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc2))
520+
521+
// Second load: file modified → cache miss → should get new content.
522+
loaded, err := bucketStore.LoadRuleGroups(context.Background(), allGroups)
523+
require.NoError(t, err)
524+
require.Equal(t, 2*time.Minute, loaded["user1"][0].Interval)
525+
}
526+
527+
func TestLoadRuleGroupsCachePrune(t *testing.T) {
528+
bucketClient := objstore.NewInMemBucket()
529+
reg := prometheus.NewPedanticRegistry()
530+
usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList}
531+
bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg)
532+
require.NoError(t, err)
533+
534+
// Setup: create two rule groups.
535+
desc1 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)})
536+
desc2 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group2", Interval: model.Duration(time.Minute)})
537+
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc1))
538+
require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc2))
539+
540+
allGroups, err := bucketStore.ListAllRuleGroups(context.Background())
541+
require.NoError(t, err)
542+
543+
// Load both groups → cache has 2 entries.
544+
_, err = bucketStore.LoadRuleGroups(context.Background(), allGroups)
545+
require.NoError(t, err)
546+
547+
bucketStore.ruleGroupCacheMu.RLock()
548+
require.Len(t, bucketStore.ruleGroupCache, 2)
549+
bucketStore.ruleGroupCacheMu.RUnlock()
550+
551+
// Now load only group1 (simulating ring rebalance where group2 is no longer owned).
552+
partialGroups := map[string]rulespb.RuleGroupList{
553+
"user1": {allGroups["user1"][0]}, // only first group
554+
}
555+
_, err = bucketStore.LoadRuleGroups(context.Background(), partialGroups)
556+
require.NoError(t, err)
557+
558+
// Cache should be pruned to 1 entry.
559+
bucketStore.ruleGroupCacheMu.RLock()
560+
require.Len(t, bucketStore.ruleGroupCache, 1)
561+
bucketStore.ruleGroupCacheMu.RUnlock()
562+
}

pkg/util/testutil/objstore.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ func (m *MockBucketFailure) Attributes(ctx context.Context, name string) (objsto
9797
if e, ok := m.AttributesFailures[name]; ok {
9898
return objstore.ObjectAttributes{}, e
9999
}
100+
// In real object storage, HEAD fails with the same errors as GET (e.g., access denied, not found).
101+
for prefix, err := range m.GetFailures {
102+
if strings.HasPrefix(name, prefix) {
103+
return objstore.ObjectAttributes{}, err
104+
}
105+
}
100106
return m.Bucket.Attributes(ctx, name)
101107
}
102108

0 commit comments

Comments
 (0)