-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathratelimiting_queue.go
More file actions
47 lines (36 loc) · 1 KB
/
ratelimiting_queue.go
File metadata and controls
47 lines (36 loc) · 1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package workqueue
// ratelimitingQueueImpl 组合 DelayingQueue 实现限流入队。
type ratelimitingQueueImpl struct {
DelayingQueue
config *RateLimitingQueueConfig
}
// NewRateLimitingQueue 创建限流队列。
func NewRateLimitingQueue(config *RateLimitingQueueConfig) RateLimitingQueue {
config = isRateLimitingQueueConfigEffective(config)
q := &ratelimitingQueueImpl{
config: config,
DelayingQueue: NewDelayingQueue(&config.DelayingQueueConfig),
}
return q
}
func (q *ratelimitingQueueImpl) Shutdown() {
q.DelayingQueue.Shutdown()
}
func (q *ratelimitingQueueImpl) PutWithLimited(value interface{}) error {
if q.IsClosed() || value == nil {
if q.IsClosed() {
return ErrQueueIsClosed
}
return ErrElementIsNil
}
delay := q.config.limiter.When(value).Milliseconds()
// 有等待时间时转为延迟入队,否则立即入队。
var err error
if delay > 0 {
err = q.PutWithDelay(value, delay)
} else {
err = q.Put(value)
}
q.config.callback.OnLimited(value)
return err
}