-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlock.go
More file actions
117 lines (106 loc) · 2.68 KB
/
lock.go
File metadata and controls
117 lines (106 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package cachegrid
import (
"context"
"time"
)
// LockOptions configures lock acquisition behavior.
type LockOptions struct {
TTL time.Duration // lock auto-release time (required)
RetryCount int // number of retries (0 = no retry)
RetryDelay time.Duration // delay between retries
}
// LockHandle represents a held distributed lock.
type LockHandle struct {
cache *Cache
key string
token string
fencingToken uint64
ownerAddr string // "" if local
}
// Release releases the lock.
func (h *LockHandle) Release() error {
if h.ownerAddr != "" {
ok, err := h.cache.transport.RemoteLockRelease(context.Background(), h.ownerAddr, h.key, h.token)
if err != nil {
return err
}
if !ok {
return ErrLockNotHeld
}
return nil
}
if !h.cache.lockEngine.Release(h.key, h.token) {
return ErrLockNotHeld
}
return nil
}
// Extend extends the lock TTL.
func (h *LockHandle) Extend(ttl time.Duration) error {
if h.ownerAddr != "" {
ok, err := h.cache.transport.RemoteLockExtend(context.Background(), h.ownerAddr, h.key, h.token, ttl)
if err != nil {
return err
}
if !ok {
return ErrLockNotHeld
}
return nil
}
if !h.cache.lockEngine.Extend(h.key, h.token, ttl) {
return ErrLockNotHeld
}
return nil
}
// Token returns the fencing token for this lock.
func (h *LockHandle) Token() uint64 {
return h.fencingToken
}
// Lock acquires a distributed lock with retry logic.
func (c *Cache) Lock(key string, opts LockOptions) (*LockHandle, error) {
if key == "" {
return nil, ErrKeyEmpty
}
if c.closed.Load() {
return nil, ErrShutdown
}
lockKey := "__lock:" + key
for attempt := 0; attempt <= opts.RetryCount; attempt++ {
if addr := c.ownerAddr(lockKey); addr != "" {
token, fencing, acquired, err := c.transport.RemoteLockAcquire(
context.Background(), addr, lockKey, opts.TTL)
if err != nil {
return nil, err
}
if acquired {
return &LockHandle{
cache: c, key: lockKey, token: token,
fencingToken: fencing, ownerAddr: addr,
}, nil
}
} else {
owner := ""
if c.state != nil {
owner = c.state.SelfName()
}
token, fencing, acquired := c.lockEngine.Acquire(lockKey, opts.TTL, owner)
if acquired {
return &LockHandle{
cache: c, key: lockKey, token: token,
fencingToken: fencing,
}, nil
}
}
if attempt < opts.RetryCount && opts.RetryDelay > 0 {
time.Sleep(opts.RetryDelay)
}
}
return nil, ErrLockNotAcquired
}
// TryLock attempts a single non-blocking lock acquisition.
func (c *Cache) TryLock(key string, ttl time.Duration) (*LockHandle, bool) {
handle, err := c.Lock(key, LockOptions{TTL: ttl})
if err != nil {
return nil, false
}
return handle, true
}