-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.go
More file actions
321 lines (276 loc) · 7.04 KB
/
cache.go
File metadata and controls
321 lines (276 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package cache
import (
"container/heap"
"encoding/json"
"log"
"math/rand"
"os"
"sync"
"time"
)
var (
wg sync.WaitGroup
)
func init() {
rand.Seed(time.Now().UnixNano())
}
/*
Queue Data struct is keep track of individual Data point
1. interface Data -> recommended type Data []byte
2. Key of the Data either string or serialized string
3. expiry of the Key default cache Key or custom Key
4. current Queue index. higher on the Queue denotes closer expiry
*/
type queueData struct {
Key string `json:"key"`
Data interface{} `json:"data"`
ExpireAt int64 `json:"expire_at"`
QueueIndex int `json:"index"`
}
/*
priority Queue impl helps in maintaining expiry
initial priority Queue length is not fixed but
is configurable through global cache.
Impl will take care of memory size and Queue size.
will change dynamically based on the values set in the
cache. 1024 kb to 10,000 keys in current instruction.
*/
type priorityQueue struct {
Items []*queueData `json:"queue_data"`
}
/*
Simple cache is a simple cache uses priority Queue as
the core dta structure to keep keys. it uses Least recently
used keys concept to implement Key store.
the persistent file is called every 15 minute to update the file. though
this is completely experimental and is computationally expensive
to do so.
It uses read write mutex Lock to make sure the read and writes happen without
conflict.
*/
type SimpleCache struct {
FileName string `json:"file_name"`
MaxEntry uint64 `json:"max_entry"`
Queue *priorityQueue `json:"queue"`
TTL int64 `json:"cache_global_ttl"`
Data map[string]*queueData `json:"data"`
Lock *sync.Mutex `json:"lock"`
ExpiryChannel chan bool `json:"-"`
SaveFile bool `json:"-"`
}
/*
newQueueItem create a new item to be inserted into the cache.
time stamp is use to set TTL. if tt is -1, it will set to global
cache timing else it will set to the item presented.
*/
func newQueueItem(key string, data interface{}, ttl int64) *queueData {
item := &queueData{
Data: data,
Key: key,
}
// since nobody is aware yet of this item, it's safe to touch without Lock here
item.addTimeStamp(ttl)
return item
}
/*
expired checks if the item is expired and removes it form the Queue
and the cache server.
*/
func (q *queueData) expired() bool {
return q.ExpireAt < time.Now().Unix()
}
/*
addTimeStamp add expiry time to the Queue item
will be then used by the processExpiryFunction to remove/add/update
the Queue.
*/
func (q *queueData) addTimeStamp(ttl int64) {
q.ExpireAt = time.Now().Add(time.Duration(ttl)).Unix()
}
func (p *priorityQueue) update(data *queueData) {
heap.Fix(p, data.QueueIndex)
}
func (p *priorityQueue) push(data *queueData) {
heap.Push(p, data)
}
func (p *priorityQueue) pop() *queueData {
if p.Len() == 0 {
return nil
}
return heap.Pop(p).(*queueData)
}
func (p *priorityQueue) remove(queueData *queueData) {
heap.Remove(p, queueData.QueueIndex)
}
func (p *priorityQueue) Len() int {
return len(p.Items)
}
func (p *priorityQueue) Less(i, j int) bool {
return p.Items[i].ExpireAt < p.Items[j].ExpireAt
}
func (p *priorityQueue) Swap(i, j int) {
p.Items[i], p.Items[j] = p.Items[j], p.Items[i]
p.Items[i].QueueIndex = i
p.Items[j].QueueIndex = j
}
func (p *priorityQueue) Push(x interface{}) {
item := x.(*queueData)
item.QueueIndex = len(p.Items)
p.Items = append(p.Items, item)
}
func (p *priorityQueue) Pop() interface{} {
old := p.Items
n := len(old)
item := old[n-1]
item.QueueIndex = -1
p.Items = old[0 : n-1]
return item
}
func newPriorityQueue() *priorityQueue {
queue := &priorityQueue{}
heap.Init(queue)
return queue
}
/*
CreateNewCache build necessary models for the cache to work.
and start the background expiry process to run.
initially the ttl is set to -1. setTTL is mandatory to run the cache.
no data will persist if TTl is set to -1.
cache level ttl makes baseline for current set and get tools
wait groups are added aas required. we have a single thread tester
process expiry to do dup checks and expiry on ttl.
*/
func CreateNewCache(fileName string, maxEntry uint64, save bool) *SimpleCache {
cache := &SimpleCache{
Data: make(map[string]*queueData),
FileName: fileName,
MaxEntry: maxEntry,
Queue: newPriorityQueue(),
TTL: -1,
Lock: new(sync.Mutex),
ExpiryChannel: make(chan bool),
SaveFile: save,
}
wg.Add(1)
go cache.concurrentProcessChecks()
return cache
}
/*
global set functions for cache.
*/
func (c *SimpleCache) Set(k string, v interface{}, ttl int64) bool {
c.Lock.Lock()
data, present := c.getData(k, ttl)
if present == true {
if ttl == -1 {
data.ExpireAt = time.Now().Unix() + c.TTL
} else {
data.ExpireAt = time.Now().Unix() + ttl
}
c.Queue.update(data)
c.Data[k] = data
} else {
var ttx int64
if ttl == -1 {
ttx = time.Now().Unix() + c.TTL
} else {
ttx = time.Now().Unix() + ttl
}
newData := newQueueItem(k, v, ttx)
c.Queue.push(newData)
c.Data[k] = newData
}
c.Lock.Unlock()
return true
}
/*
gets the data and updates the data as required.
adds timestamp if queried
*/
func (c *SimpleCache) getData(k string, ttl int64) (*queueData, bool) {
data, present := c.Data[k]
if !present {
return nil, false
}
if ttl != -1 {
data.addTimeStamp(ttl)
} else {
data.addTimeStamp(c.TTL)
}
return data, present
}
/*
global get function for the cache
*/
func (c *SimpleCache) Get(k string) (interface{}, error, bool) {
c.Lock.Lock()
defer c.Lock.Unlock()
data, present := c.getData(k, -1)
if present {
return data.Data, nil, present
}
return nil, nil, false
}
/*
process expiry process the queue and removes keys which have expired.
*/
func (c *SimpleCache) processExpiry() {
c.Lock.Lock()
for c.Queue.Len() > 0 && c.Queue.Items[0].ExpireAt < time.Now().Unix() {
delete(c.Data, c.Queue.Items[0].Key)
c.Queue.pop()
}
c.Lock.Unlock()
}
func (c *SimpleCache) concurrentProcessChecks() {
defer wg.Done()
for {
select {
case <-c.ExpiryChannel:
c.ExpiryChannel <- true
return
default:
break
}
c.processExpiry()
}
}
/*
safe close of go routines and channel closure.
always close it to update the persistent file
*/
func (c *SimpleCache) close() {
c.ExpiryChannel <- true
<-c.ExpiryChannel
wg.Wait()
close(c.ExpiryChannel)
if c.SaveFile {
c.updatePersistentFile()
}
}
func (c *SimpleCache) setTTL(ttl int64) {
c.Lock.Lock()
defer c.Lock.Unlock()
c.TTL = ttl
}
func (c *SimpleCache) updatePersistentFile() {
x, _ := json.Marshal(c)
err := writeGob(c.FileName, string(x))
if err != nil {
log.Printf("error occured while because : %s\n", err.Error())
}
}
func writeGob(filePath string, data string) error {
file, err := os.Create(filePath)
_, _ = file.WriteString(data)
_ = file.Close()
return err
}
var letterRunes = []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#@!$%")
func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}