File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -10,8 +10,9 @@ import (
1010
1111type Diskqueue struct {
1212 sync.RWMutex
13- close bool
14- ticker * time.Ticker
13+ close bool
14+ closeChan chan bool
15+ ticker * time.Ticker
1516}
1617
1718var (
@@ -36,20 +37,10 @@ func Start() (*Diskqueue, error) {
3637 return nil , err
3738 }
3839
39- queue := & Diskqueue {close : false }
40+ queue := & Diskqueue {close : false , closeChan : make ( chan bool ) }
4041 queue .ticker = time .NewTicker (Config .BatchTime )
4142 Reader .restore ()
42-
43- go func () {
44- for {
45- <- queue .ticker .C
46- queue .Lock ()
47- Writer .sync ()
48- Reader .sync ()
49- queue .Unlock ()
50- }
51- }()
52-
43+ go queue .sync ()
5344 return queue , nil
5445}
5546
@@ -83,7 +74,27 @@ func (queue *Diskqueue) Read() ([]byte, error) {
8374
8475// Close diskqueue
8576func (queue * Diskqueue ) Close () {
77+ if queue .close {
78+ return
79+ }
80+
8681 queue .close = true
82+ queue .closeChan <- true
8783 Writer .close ()
8884 Reader .close ()
8985}
86+
87+ // sync data
88+ func (queue * Diskqueue ) sync () {
89+ for {
90+ select {
91+ case <- queue .ticker .C :
92+ queue .Lock ()
93+ Writer .sync ()
94+ Reader .sync ()
95+ queue .Unlock ()
96+ case <- queue .closeChan :
97+ return
98+ }
99+ }
100+ }
You can’t perform that action at this time.
0 commit comments