Skip to content

Commit 2cfa22f

Browse files
committed
Revert "client: reload RM snapshot before watch retry"
This reverts commit daef237.
1 parent 9595152 commit 2cfa22f

2 files changed

Lines changed: 3 additions & 117 deletions

File tree

client/resource_group/controller/global_controller.go

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
357357
}
358358
case <-watchRetryTimer.C:
359359
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
360-
watchMetaChannel, err = c.reloadResourceGroupMetaWatch(ctx)
360+
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
361+
prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID)
362+
watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithPrefix(), opt.WithPrevKV())
361363
if err != nil {
362364
log.Warn("watch resource group meta failed", zap.Error(err))
363365
watchRetryTimer.Reset(watchRetryInterval)
@@ -489,57 +491,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
489491
}()
490492
}
491493

492-
func (c *ResourceGroupsController) reloadResourceGroupMetaWatch(
493-
ctx context.Context,
494-
) (chan []*meta_storagepb.Event, error) {
495-
groups, revision, err := c.provider.LoadResourceGroups(ctx)
496-
if err != nil {
497-
return nil, err
498-
}
499-
c.syncResourceGroupSnapshot(groups)
500-
// Start from the next revision after the freshly loaded snapshot so reconnects
501-
// keep the cache in sync without replaying the snapshot itself as watch events.
502-
return c.provider.Watch(
503-
ctx,
504-
pd.GroupSettingsPathPrefixBytes(c.keyspaceID),
505-
opt.WithRev(revision+1),
506-
opt.WithPrefix(),
507-
opt.WithPrevKV(),
508-
)
509-
}
510-
511-
func (c *ResourceGroupsController) syncResourceGroupSnapshot(groups []*rmpb.ResourceGroup) {
512-
groupMap := make(map[string]*rmpb.ResourceGroup, len(groups))
513-
for _, group := range groups {
514-
groupMap[group.GetName()] = group
515-
}
516-
517-
c.groupsController.Range(func(key, value any) bool {
518-
name := key.(string)
519-
group, exists := groupMap[name]
520-
if !exists {
521-
c.tombstoneGroupCostController(name)
522-
return true
523-
}
524-
gc := value.(*groupCostController)
525-
if !gc.tombstone.Load() {
526-
gc.modifyMeta(group)
527-
return true
528-
}
529-
newGC, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
530-
if err != nil {
531-
log.Warn("[resource group controller] re-create resource group cost controller from snapshot failed",
532-
zap.String("name", name), zap.Error(err))
533-
return true
534-
}
535-
if c.groupsController.CompareAndSwap(name, gc, newGC) {
536-
log.Info("[resource group controller] re-create resource group cost controller from snapshot",
537-
zap.String("name", name))
538-
}
539-
return true
540-
})
541-
}
542-
543494
// Stop stops ResourceGroupController service.
544495
func (c *ResourceGroupsController) Stop() error {
545496
if c.loopCancel == nil {

client/resource_group/controller/global_controller_test.go

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -381,71 +381,6 @@ func TestGetResourceGroup(t *testing.T) {
381381
re.Nil(gc02)
382382
}
383383

384-
func TestReloadResourceGroupMetaWatch(t *testing.T) {
385-
re := require.New(t)
386-
ctx, cancel := context.WithCancel(context.Background())
387-
defer cancel()
388-
389-
mockProvider := &MockResourceGroupProvider{}
390-
mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
391-
controller, err := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID)
392-
re.NoError(err)
393-
394-
defaultGroup := &rmpb.ResourceGroup{
395-
Name: defaultResourceGroupName,
396-
Mode: rmpb.GroupMode_RUMode,
397-
RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}},
398-
}
399-
staleGroup := &rmpb.ResourceGroup{
400-
Name: "stale-group",
401-
Mode: rmpb.GroupMode_RUMode,
402-
RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 100}}},
403-
}
404-
updatedDefaultGroup := &rmpb.ResourceGroup{
405-
Name: defaultResourceGroupName,
406-
Mode: rmpb.GroupMode_RUMode,
407-
RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 2000}}},
408-
}
409-
410-
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultGroup, nil)
411-
mockProvider.On("GetResourceGroup", mock.Anything, "stale-group", mock.Anything).Return(staleGroup, nil)
412-
413-
defaultGC, err := controller.tryGetResourceGroupController(ctx, defaultResourceGroupName, false)
414-
re.NoError(err)
415-
re.NotNil(defaultGC)
416-
417-
staleGC, err := controller.tryGetResourceGroupController(ctx, "stale-group", false)
418-
re.NoError(err)
419-
re.NotNil(staleGC)
420-
re.False(staleGC.tombstone.Load())
421-
422-
watchCh := make(chan []*meta_storagepb.Event)
423-
mockProvider.On("LoadResourceGroups", mock.Anything).
424-
Return([]*rmpb.ResourceGroup{updatedDefaultGroup}, int64(88), nil).
425-
Once()
426-
mockProvider.On("Watch", mock.Anything, pd.GroupSettingsPathPrefixBytes(constants.NullKeyspaceID), mock.Anything).
427-
Run(func(args mock.Arguments) {
428-
opts := args.Get(2).([]opt.MetaStorageOption)
429-
metaOp := &opt.MetaStorageOp{}
430-
for _, apply := range opts {
431-
apply(metaOp)
432-
}
433-
re.Equal(int64(89), metaOp.Revision)
434-
re.True(metaOp.IsOptsWithPrefix)
435-
re.True(metaOp.PrevKv)
436-
}).
437-
Return(watchCh, nil).
438-
Once()
439-
440-
reloadedWatchCh, err := controller.reloadResourceGroupMetaWatch(ctx)
441-
re.NoError(err)
442-
re.Equal(watchCh, reloadedWatchCh)
443-
re.Equal(updatedDefaultGroup, defaultGC.getMeta())
444-
staleGC, err = controller.tryGetResourceGroupController(ctx, "stale-group", true)
445-
re.NoError(err)
446-
re.True(staleGC.tombstone.Load())
447-
}
448-
449384
func TestTokenBucketsRequestWithKeyspaceID(t *testing.T) {
450385
re := require.New(t)
451386
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)