Skip to content

Commit f5b2a76

Browse files
committed
fix(triple): move autoScaler to be triggered by Get/Put and fix naming
1 parent 5f54ff8 commit f5b2a76

2 files changed

Lines changed: 36 additions & 38 deletions

File tree

protocol/triple/client.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func newClientManager(url *common.URL) (*clientManager, error) {
312312
baseTriURL = httpPrefix + baseTriURL
313313
}
314314

315-
triPools := make(map[string]*TriClientPool)
315+
triClientPools := make(map[string]*TriClientPool)
316316

317317
if len(url.Methods) != 0 {
318318
for _, method := range url.Methods {
@@ -323,7 +323,7 @@ func newClientManager(url *common.URL) (*clientManager, error) {
323323
pool := NewTriClientPool(triClientPoolMaxSize, func() *tri.Client {
324324
return tri.NewClient(httpClient, triURL, cliOpts...)
325325
})
326-
triPools[method] = pool
326+
triClientPools[method] = pool
327327
}
328328
} else {
329329
// This branch is for the non-IDL mode, where we pass in the service solely
@@ -336,21 +336,21 @@ func newClientManager(url *common.URL) (*clientManager, error) {
336336

337337
serviceType := reflect.TypeOf(service)
338338
for i := range serviceType.NumMethod() {
339-
method := serviceType.Method(i).Name
340-
triURL, err := joinPath(baseTriURL, url.Interface(), method)
339+
methodName := serviceType.Method(i).Name
340+
triURL, err := joinPath(baseTriURL, url.Interface(), methodName)
341341
if err != nil {
342-
return nil, fmt.Errorf("failed to join path for method %s: %w", method, err)
342+
return nil, fmt.Errorf("failed to join path for method %s: %w", methodName, err)
343343
}
344344
pool := NewTriClientPool(triClientPoolMaxSize, func() *tri.Client {
345345
return tri.NewClient(httpClient, triURL, cliOpts...)
346346
})
347-
triPools[method] = pool
347+
triClientPools[methodName] = pool
348348
}
349349
}
350350

351351
return &clientManager{
352352
isIDL: isIDL,
353-
triClients: triPools,
353+
triClients: triClientPools,
354354
}, nil
355355
}
356356

protocol/triple/client_pool.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,9 @@ var (
4343
)
4444

4545
const (
46-
autoScalerPeriod = 700 * time.Millisecond // autoScaler interval
47-
maxExpandPerCycle = 16 // maximum number of clients to expand per cycle
48-
lowIdleThresholdDivisor = 5 // consider clients insufficient if idle < curSize/5
49-
highIdleStreakLimit = 10 // consecutive high idle count to trigger shrinking
46+
maxExpandPerCycle = 16 // maximum number of clients to expand per cycle
47+
lowIdleThresholdDivisor = 5 // consider clients insufficient if idle < curSize/5
48+
highIdleStreakLimit = 10 // consecutive high idle count to trigger shrinking
5049
)
5150

5251
type TriClientPool struct {
@@ -58,7 +57,8 @@ type TriClientPool struct {
5857
closed bool
5958
getTimeouts int // recent timeout count, used to trigger expansion
6059

61-
fallback *tri.Client
60+
fallback *tri.Client
61+
consecutiveHighIdle int
6262
}
6363

6464
func NewTriClientPool(maxSize int, factory func() *tri.Client) *TriClientPool {
@@ -67,7 +67,6 @@ func NewTriClientPool(maxSize int, factory func() *tri.Client) *TriClientPool {
6767
factory: factory,
6868
maxSize: maxSize,
6969
}
70-
go pool.autoScaler()
7170
return pool
7271
}
7372

@@ -80,6 +79,8 @@ func NewTriClientPool(maxSize int, factory func() *tri.Client) *TriClientPool {
8079
// If expansion is not allowed, it waits for a client up to timeout.
8180
// After timeout, if still no client, Get() returns ErrTimeout with fallback.
8281
func (p *TriClientPool) Get(timeout time.Duration) (*tri.Client, error) {
82+
p.autoScaler()
83+
8384
select {
8485
case cli := <-p.clients:
8586
return cli, nil
@@ -122,6 +123,8 @@ func (p *TriClientPool) Get(timeout time.Duration) (*tri.Client, error) {
122123
// If it fails, Put will drop the client and notify the pool.
123124
// Dropping a client is part of shrinking.
124125
func (p *TriClientPool) Put(c *tri.Client) {
126+
p.autoScaler()
127+
125128
if c == nil {
126129
return
127130
}
@@ -168,38 +171,33 @@ func (p *TriClientPool) Closed() bool {
168171
return p.closed
169172
}
170173

171-
// autoScaler is the main worker that adjusts pool size.
172-
// It checks regularly whether pool should expand or shrink.
174+
// autoScaler performs scaling check.
175+
// It is triggered by Get() or Put()
173176
// If the timeout count is high, autoScaler tends to expand.
174177
// If the idle client count is often high, autoScaler tends to shrink.
175178
func (p *TriClientPool) autoScaler() {
176-
ticker := time.NewTicker(autoScalerPeriod)
177-
defer ticker.Stop()
178-
consecutiveHighIdle := 0
179+
p.mu.Lock()
180+
defer p.mu.Unlock()
179181

180-
for range ticker.C {
181-
if p.Closed() {
182-
return
183-
}
182+
if p.closed {
183+
return
184+
}
184185

185-
p.mu.Lock()
186-
curSize := p.curSize
187-
idle := len(p.clients)
188-
timeouts := p.getTimeouts
189-
p.getTimeouts = 0
190-
p.mu.Unlock()
186+
curSize := p.curSize
187+
idle := len(p.clients)
188+
timeouts := p.getTimeouts
189+
p.getTimeouts = 0
191190

192-
needExpand := checkExpand(curSize, idle, timeouts)
193-
if needExpand != 0 {
194-
p.expand(needExpand)
195-
consecutiveHighIdle = 0
196-
continue
197-
}
191+
needExpand := checkExpand(curSize, idle, timeouts)
192+
if needExpand != 0 {
193+
p.expand(needExpand)
194+
p.consecutiveHighIdle = 0
195+
return
196+
}
198197

199-
needShrink := checkShrink(curSize, idle, &consecutiveHighIdle)
200-
if needShrink != 0 {
201-
p.shrink(needShrink)
202-
}
198+
needShrink := checkShrink(curSize, idle, &p.consecutiveHighIdle)
199+
if needShrink != 0 {
200+
p.shrink(needShrink)
203201
}
204202
}
205203

0 commit comments

Comments
 (0)