-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup_coordinator.go
More file actions
94 lines (81 loc) · 3.57 KB
/
group_coordinator.go
File metadata and controls
94 lines (81 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*
Copyright 2026 The ARCORIS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bufferpool
// Tick performs one explicit foreground group coordinator cycle.
//
// Tick samples group state, computes a bounded window against the previous group
// coordinator sample, scores partitions, publishes retained-budget targets into
// owned PoolPartitions when the group has a retained budget, and returns a
// report. It does not execute physical trim, scan shards directly, compute class
// EWMA, call Pool.Get or Pool.Put, propagate pressure, or start background work.
func (g *PoolGroup) Tick() (PoolGroupCoordinatorReport, error) {
g.mustBeInitialized()
var report PoolGroupCoordinatorReport
err := g.TickInto(&report)
return report, err
}
// TickInto writes one explicit foreground group coordinator cycle into dst.
//
// TickInto is serialized with hard Close through runtimeMu because it advances
// the group generation and publishes partition budget targets through the group
// ownership boundary. It reuses dst.Sample.Partitions and report slice capacity.
// A nil dst is a no-op after receiver and lifecycle validation. Direct callers
// and the opt-in scheduler both enter this same foreground coordinator path. dst
// must not be shared by concurrent callers without external synchronization.
//
// A group skipped status means the coordinator found no group-level budget
// target work to publish. An unpublished budget cycle still reports the attempt
// Generation and commits coordinator observation state, but PublishedGeneration
// remains NoGeneration unless every partition accepts the target batch. That
// separation lets callers distinguish "observed and reported" from "runtime
// budget state changed".
func (g *PoolGroup) TickInto(dst *PoolGroupCoordinatorReport) error {
g.mustBeInitialized()
g.runtimeMu.RLock()
defer g.runtimeMu.RUnlock()
if !g.lifecycle.AllowsWork() {
g.finishClosedGroupCoordinatorCycle(dst)
return newError(ErrClosed, errGroupClosed)
}
if dst == nil {
return nil
}
if !g.coordinator.cycleGate.begin() {
g.finishAlreadyRunningGroupCoordinatorCycle(dst)
return nil
}
defer g.coordinator.cycleGate.end()
g.coordinator.mu.Lock()
defer g.coordinator.mu.Unlock()
cycle := g.evaluateGroupCoordinatorCycle(dst)
budgetPublication, err := g.publishGroupCoordinatorBudgets(&cycle, dst.SkippedPartitions[:0])
cycle.budgetPublication = budgetPublication
if err != nil {
g.finishFailedGroupCoordinatorCycle(dst, &cycle, err)
return err
}
decision := selectGroupCoordinatorStatus(&cycle, cycle.budgetPublication)
coordinatorGeneration := g.commitGroupCoordinatorCycle(&cycle)
g.finishGroupCoordinatorCycle(dst, &cycle, decision, coordinatorGeneration)
return nil
}
// ControllerStatus returns the lightweight status for the last group
// coordinator cycle.
//
// The accessor is safe after Close and returns a copy. It does not sample
// partitions, publish targets, run child ticks, or retain the heavy report
// returned by TickInto.
func (g *PoolGroup) ControllerStatus() PoolGroupControllerStatus {
g.mustBeInitialized()
return g.coordinator.status.load()
}