-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpriority_queue.go
More file actions
82 lines (63 loc) · 1.72 KB
/
priority_queue.go
File metadata and controls
82 lines (63 loc) · 1.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package workqueue
import (
"math"
"sync"
hp "github.com/shengyanli1982/workqueue/v2/internal/container/heap"
lst "github.com/shengyanli1982/workqueue/v2/internal/container/list"
)
// 预定义优先级,数值越小优先级越高。
const (
PRIORITY_SLOWEST = math.MaxInt64
PRIORITY_LOW = math.MaxInt32
PRIORITY_NORMAL = 0
PRIORITY_HIGH = math.MinInt32
PRIORITY_FASTEST = math.MinInt64
)
// priorityQueueImpl 使用红黑树按优先级排序。
type priorityQueueImpl struct {
Queue
config *PriorityQueueConfig
sorting *hp.RBTree
elementpool *lst.NodePool
lock sync.Mutex
}
// NewPriorityQueue 创建优先级队列。
func NewPriorityQueue(config *PriorityQueueConfig) PriorityQueue {
config = isPriorityQueueConfigEffective(config)
q := &priorityQueueImpl{
config: config,
sorting: hp.New(),
elementpool: lst.NewNodePool(),
}
q.Queue = newQueue(&wrapInternalHeap{RBTree: q.sorting}, q.elementpool, &config.QueueConfig)
return q
}
func (q *priorityQueueImpl) Shutdown() {
q.Queue.Shutdown()
}
func (q *priorityQueueImpl) Put(value interface{}) error {
return q.PutWithPriority(value, PRIORITY_NORMAL)
}
func (q *priorityQueueImpl) PutWithPriority(value interface{}, priority int64) error {
if q.IsClosed() {
return ErrQueueIsClosed
}
if value == nil {
return ErrElementIsNil
}
last := q.elementpool.Get()
last.Value = value
last.Priority = priority
q.lock.Lock()
q.sorting.Push(last)
q.lock.Unlock()
q.config.callback.OnPriority(value, priority)
return nil
}
func (q *priorityQueueImpl) HeapRange(fn func(value interface{}, delay int64) bool) {
q.lock.Lock()
q.sorting.Range(func(node *lst.Node) bool {
return fn(node.Value, node.Priority)
})
q.lock.Unlock()
}