-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.go
More file actions
123 lines (107 loc) · 2.79 KB
/
pool.go
File metadata and controls
123 lines (107 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package goroutine_pool
import "sync/atomic"
const (
defaultCap = 1024
noLimitCap = -1 // if cap equals -1, will create goWorker per request with no limit.
// pool state
PoolOpen = iota
PoolClose
)
type Pool interface {
// RunTask submit a user-task to a goroutine,
// and would return an error when creating a goWorker failed
RunTask(func()) error
// RunTaskWithRet submit a user-task which has a return value,
// and would return a closed-chan that func`s result Value would be pushed in,
// or a nil-chan if some error happend while creating a goWorker.
RunTaskWithRet(func() interface{}) (<-chan interface{}, error)
// Close Stop the pool and close all goroutines.
Close()
// IsClosed return whether the pool has been closed.
IsClosed() bool
// RunningCount return workers count which are running.
RunningCount() int32
}
// goPool is a implement of interface Pool
type goPool struct {
// workerPool can get a goWorker to run task
workers workerStore
// state is whether the pool is open
state int32
cap int32
isBlock bool
}
// NewPool create a goroutine goPool
func NewPool(opts ...Option) Pool {
p := &goPool{state: PoolOpen}
for _, opt := range opts {
opt(p)
}
p.workers = newWorkerStore(p.cap, p.isBlock, p)
return p
}
func (p *goPool) RunTask(f func()) error {
// if pool was closed, return err
if p.IsClosed() {
return ErrorPoolClosed
}
worker := p.workers.getWorker()
// has no goWorker in pool, return err
if worker == nil {
return ErrorPoolArriveCapacity
}
// run user-task
worker.submitTask(f)
return nil
}
func (p *goPool) RunTaskWithRet(f func() interface{}) (<-chan interface{}, error) {
// if pool was closed, return err
if p.IsClosed() {
return nil, ErrorPoolClosed
}
worker := p.workers.getWorker()
// has no goWorker in pool, return err
if worker == nil {
return nil, ErrorPoolArriveCapacity
}
// run user-task
var taskWithRet struct {
f func()
retCh chan interface{}
}
taskWithRet.retCh = make(chan interface{}, 1)
taskWithRet.f = func() {
taskWithRet.retCh <- f()
}
worker.submitTask(taskWithRet.f)
return taskWithRet.retCh, nil
}
func (p *goPool) Close() {
// cas to promised only close once
if atomic.CompareAndSwapInt32(&p.state, PoolOpen, PoolClose) {
p.workers.close()
}
}
// IsClosed return whether the pool has been closed.
func (p *goPool) IsClosed() bool {
return atomic.LoadInt32(&p.state) == PoolClose
}
func (p *goPool) RunningCount() int32 {
return p.workers.getRunningCount()
}
type Option func(*goPool)
// WithShardCount set max capacity
func WithCapacity(cap int32) Option {
return func(p *goPool) {
if cap <= 0 && cap != noLimitCap {
cap = defaultCap
}
p.cap = cap
}
}
// WithBlock set whether get a worker need block
func WithBlock(isBlock bool) Option {
return func(p *goPool) {
p.isBlock = isBlock
}
}