Skip to content

Commit 5933f85

Browse files
committed
client: add pre-throttling demand RU/s metric
Add a new client-side Prometheus Gauge `resource_manager_client_resource_group_demand_ru_per_sec` that tracks the EMA of demanded RU/s before Resource Control throttling takes effect. The existing `avgRUPerSec` is based on post-throttling consumption: when a request is rejected by the token bucket, its RU cost is subtracted from the consumption counter. This means the consumption-based EMA underreports the true workload demand when the resource group is actively throttled. The new demand metric samples RU cost at every `onRequestWaitImpl`, `onResponseImpl`, `onResponseWaitImpl`, and `addRUConsumption` entry point, accumulating into a monotonically increasing `demandRUTotal` counter that is never subtracted on throttle failure. A demand EMA is then computed using the same `movingAvgFactor` as the consumption EMA and flushed to the Gauge on each `updateAvgRequestResourcePerSec` tick. This enables operators to: - See per-instance RU demand in Grafana (natural `instance` label). - Aggregate cluster-wide demand via `sum by (resource_group)`. - Identify the true workload peak via `max_over_time(...)`. Close #10581 Signed-off-by: JmPotato <ghzpotato@gmail.com> Signed-off-by: JmPotato <github@ipotato.me>
1 parent dca466b commit 5933f85

3 files changed

Lines changed: 115 additions & 5 deletions

File tree

client/resource_group/controller/group_controller.go

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ type groupCostController struct {
5151
consumption *rmpb.Consumption
5252
storeCounter map[uint64]*rmpb.Consumption
5353
globalCounter *rmpb.Consumption
54+
// demandRUTotal accumulates total demanded RU (pre-throttling).
55+
// Unlike consumption, this is never subtracted on throttle failure.
56+
demandRUTotal float64
5457
}
5558

5659
// fast path to make once token limit with un-limit burst.
@@ -75,6 +78,9 @@ type groupCostController struct {
7578
// last update.
7679
targetPeriod time.Duration
7780

81+
// demandRUTotal is a snapshot of mu.demandRUTotal, copied in updateRunState.
82+
demandRUTotal float64
83+
7884
// consumptions stores the last value of mu.consumption.
7985
// requestUnitConsumptions []*rmpb.RequestUnitItem
8086
// resourceConsumptions []*rmpb.ResourceItem
@@ -106,6 +112,7 @@ type groupMetricsCollection struct {
106112
tokenRequestCounter prometheus.Counter
107113
runningKVRequestCounter prometheus.Gauge
108114
consumeTokenHistogram prometheus.Observer
115+
demandRUPerSecGauge prometheus.Gauge
109116
}
110117

111118
func initMetrics(oldName, name string) *groupMetricsCollection {
@@ -122,6 +129,7 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
122129
tokenRequestCounter: metrics.ResourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
123130
runningKVRequestCounter: metrics.GroupRunningKVRequestCounter.WithLabelValues(name),
124131
consumeTokenHistogram: metrics.TokenConsumedHistogram.WithLabelValues(name),
132+
demandRUPerSecGauge: metrics.DemandRUPerSecGauge.WithLabelValues(name),
125133
}
126134
}
127135

@@ -136,6 +144,12 @@ type tokenCounter struct {
136144
avgRUPerSecLastRU float64
137145
avgLastTime time.Time
138146

147+
// avgDemandRUPerSec is an EMA of the demanded RU/s before throttling,
148+
// reflecting the true workload demand regardless of token bucket limits.
149+
avgDemandRUPerSec float64
150+
avgDemandRUPerSecLastRU float64
151+
avgDemandLastTime time.Time
152+
139153
notify struct {
140154
mu sync.Mutex
141155
setupNotificationCh <-chan time.Time
@@ -220,10 +234,11 @@ func (gc *groupCostController) initRunState() {
220234
defer gc.metaLock.RUnlock()
221235
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(gc.meta.RUSettings.RU), gc.lowRUNotifyChan)
222236
counter := &tokenCounter{
223-
limiter: limiter,
224-
avgRUPerSec: 0,
225-
avgLastTime: now,
226-
fillRate: gc.meta.RUSettings.RU.Settings.FillRate,
237+
limiter: limiter,
238+
avgRUPerSec: 0,
239+
avgLastTime: now,
240+
avgDemandLastTime: now,
241+
fillRate: gc.meta.RUSettings.RU.Settings.FillRate,
227242
}
228243
gc.run.requestUnitTokens = counter
229244
gc.burstable.Store(isBurstable)
@@ -257,6 +272,7 @@ func (gc *groupCostController) updateRunState() {
257272
calc.Trickle(gc.mu.consumption)
258273
}
259274
*gc.run.consumption = *gc.mu.consumption
275+
gc.run.demandRUTotal = gc.mu.demandRUTotal
260276
gc.mu.Unlock()
261277
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption), zap.Bool("is-throttled", gc.isThrottled.Load()))
262278
gc.run.now = newTime
@@ -271,7 +287,9 @@ func (gc *groupCostController) updateAvgRequestResourcePerSec() {
271287
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption)) {
272288
return
273289
}
274-
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
290+
gc.calcDemandAvg(counter, gc.run.demandRUTotal)
291+
gc.metrics.demandRUPerSecGauge.Set(counter.avgDemandRUPerSec)
292+
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Float64("avg-demand-ru-per-sec", counter.avgDemandRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
275293
gc.burstable.Store(isBurstable)
276294
}
277295

@@ -319,6 +337,20 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool
319337
return true
320338
}
321339

340+
func (gc *groupCostController) calcDemandAvg(counter *tokenCounter, new float64) {
341+
deltaDuration := gc.run.now.Sub(counter.avgDemandLastTime)
342+
if deltaDuration <= 0 {
343+
return
344+
}
345+
delta := (new - counter.avgDemandRUPerSecLastRU) / deltaDuration.Seconds()
346+
counter.avgDemandRUPerSec = movingAvgFactor*counter.avgDemandRUPerSec + (1-movingAvgFactor)*delta
347+
if counter.avgDemandRUPerSec < 0 {
348+
counter.avgDemandRUPerSec = 0
349+
}
350+
counter.avgDemandLastTime = gc.run.now
351+
counter.avgDemandRUPerSecLastRU = new
352+
}
353+
322354
func (gc *groupCostController) shouldReportConsumption() bool {
323355
if !gc.run.initialRequestCompleted {
324356
return true
@@ -554,6 +586,7 @@ func (gc *groupCostController) onRequestWaitImpl(
554586

555587
gc.mu.Lock()
556588
add(gc.mu.consumption, delta)
589+
gc.mu.demandRUTotal += getRUValueFromConsumption(delta)
557590
gc.mu.Unlock()
558591

559592
if !gc.burstable.Load() {
@@ -611,6 +644,8 @@ func (gc *groupCostController) onResponseImpl(
611644
gc.mu.Lock()
612645
// Record the consumption of the request
613646
add(gc.mu.consumption, delta)
647+
// Record the response-phase demand as well (actual read bytes, CPU, etc.)
648+
gc.mu.demandRUTotal += getRUValueFromConsumption(delta)
614649
// Record the consumption of the request by store
615650
count := &rmpb.Consumption{}
616651
*count = *delta
@@ -652,6 +687,7 @@ func (gc *groupCostController) onResponseWaitImpl(
652687
gc.mu.Lock()
653688
// Record the consumption of the request
654689
add(gc.mu.consumption, delta)
690+
gc.mu.demandRUTotal += getRUValueFromConsumption(delta)
655691
// Record the consumption of the request by store
656692
count := &rmpb.Consumption{}
657693
*count = *delta
@@ -669,6 +705,7 @@ func (gc *groupCostController) onResponseWaitImpl(
669705
func (gc *groupCostController) addRUConsumption(consumption *rmpb.Consumption) {
670706
gc.mu.Lock()
671707
add(gc.mu.consumption, consumption)
708+
gc.mu.demandRUTotal += getRUValueFromConsumption(consumption)
672709
gc.mu.Unlock()
673710
}
674711

client/resource_group/controller/group_controller_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,3 +315,64 @@ func TestAcquireTokensFallbackToTimer(t *testing.T) {
315315
// waitDuration should be roughly retryTimes * retryInterval.
316316
re.GreaterOrEqual(waitDuration, gc.mainCfg.WaitRetryInterval*time.Duration(gc.mainCfg.WaitRetryTimes))
317317
}
318+
319+
func TestDemandRUTracking(t *testing.T) {
320+
re := require.New(t)
321+
gc := createTestGroupCostController(re)
322+
323+
// Simulate requests arriving: demand should accumulate regardless of throttling.
324+
req := &TestRequestInfo{
325+
isWrite: true,
326+
writeBytes: 100,
327+
}
328+
resp := &TestResponseInfo{
329+
readBytes: 100,
330+
succeed: true,
331+
}
332+
333+
// Issue several successful requests.
334+
for range 5 {
335+
consumption, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req)
336+
re.NoError(err)
337+
re.NotNil(consumption)
338+
_, err = gc.onResponseImpl(req, resp)
339+
re.NoError(err)
340+
}
341+
342+
// demandRUTotal should have accumulated all pre-request and post-response RU.
343+
gc.mu.Lock()
344+
demandTotal := gc.mu.demandRUTotal
345+
gc.mu.Unlock()
346+
re.Positive(demandTotal, "demand should be accumulated after requests")
347+
348+
// Now issue a request that gets throttled (rejected).
349+
bigReq := &TestRequestInfo{
350+
isWrite: true,
351+
writeBytes: 10000000,
352+
}
353+
_, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), bigReq)
354+
re.Error(err)
355+
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
356+
357+
// demandRUTotal should still include the throttled request's RU.
358+
gc.mu.Lock()
359+
demandAfterThrottle := gc.mu.demandRUTotal
360+
gc.mu.Unlock()
361+
re.Greater(demandAfterThrottle, demandTotal,
362+
"demand should increase even for throttled requests")
363+
364+
// Verify that the demand EMA is computed correctly.
365+
now := time.Now()
366+
gc.run.now = now
367+
gc.updateRunState()
368+
gc.updateAvgRequestResourcePerSec()
369+
370+
// Advance time and update again so the EMA has two data points.
371+
gc.run.now = now.Add(time.Second)
372+
gc.updateRunState()
373+
gc.updateAvgRequestResourcePerSec()
374+
375+
counter := gc.run.requestUnitTokens
376+
re.GreaterOrEqual(counter.avgDemandRUPerSec, 0.0,
377+
"demand EMA should be non-negative")
378+
}

client/resource_group/controller/metrics/metrics.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ const (
3131
var (
3232
// ResourceGroupStatusGauge comments placeholder
3333
ResourceGroupStatusGauge *prometheus.GaugeVec
34+
// DemandRUPerSecGauge is the EMA of demanded RU/s before throttling per resource group.
35+
DemandRUPerSecGauge *prometheus.GaugeVec
3436
// SuccessfulRequestDuration comments placeholder
3537
SuccessfulRequestDuration *prometheus.HistogramVec
3638
// FailedLimitReserveDuration comments placeholder
@@ -69,6 +71,15 @@ func initMetrics(constLabels prometheus.Labels) {
6971
ConstLabels: constLabels,
7072
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})
7173

74+
DemandRUPerSecGauge = prometheus.NewGaugeVec(
75+
prometheus.GaugeOpts{
76+
Namespace: namespace,
77+
Subsystem: "resource_group",
78+
Name: "demand_ru_per_sec",
79+
Help: "EMA of demanded RU/s before throttling for each resource group.",
80+
ConstLabels: constLabels,
81+
}, []string{newResourceGroupNameLabel})
82+
7283
SuccessfulRequestDuration = prometheus.NewHistogramVec(
7384
prometheus.HistogramOpts{
7485
Namespace: namespace,
@@ -162,6 +173,7 @@ func initMetrics(constLabels prometheus.Labels) {
162173
func InitAndRegisterMetrics(constLabels prometheus.Labels) {
163174
initMetrics(constLabels)
164175
prometheus.MustRegister(ResourceGroupStatusGauge)
176+
prometheus.MustRegister(DemandRUPerSecGauge)
165177
prometheus.MustRegister(SuccessfulRequestDuration)
166178
prometheus.MustRegister(FailedRequestCounter)
167179
prometheus.MustRegister(FailedLimitReserveDuration)

0 commit comments

Comments
 (0)