-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
67 lines (60 loc) · 2.42 KB
/
main.py
File metadata and controls
67 lines (60 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import aerospike
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='[%(process)d] - %(asctime)s - %(levelname)s - %(message)s')
# if some owner is acquiring the lock should we reset the ttl?
class LoxAM:
"""
A custom context manager to manage locks accross distributed machines using aerospike
"""
def __init__(self, lock_string, owner, db_backend=None, timeout=60, retry=5, max_retries=1):
self.lock_string = lock_string
self.db_backend = db_backend
self.owner = owner
self.timeout = timeout
self.retry = retry
self.max_retries = max_retries
self.key = ('bbcache', 'lox', self.lock_string)
self.config = {
'hosts': [('127.0.0.1', 3000)]
}
def __enter__(self):
try:
self.client = aerospike.client(self.config).connect()
# self.client.udf_put("atomicity.lua")
logging.info("Trying to get lock")
except Exception as e:
logging.debug("Failed to connect to the cluster with {}\n {}".format(self.config['hosts'], e))
return self
def acquire(self):
locked = False
while not locked and self.max_retries:
try:
locked = self.client.apply(self.key, "atomicity", "get_or_create", [{'owner': self.owner}])
if locked:
logging.info("Acquired lock for lock_string {}".format(self.key))
else:
logging.info("Already acquired by someone else. Retrying in {}".format(self.retry))
except Exception as e:
logging.exception("Error while acquiring lock: {}".format(e))
locked = True
time.sleep(self.retry)
self.max_retries -= 1
if locked:
return
raise Exception("Unable to acquire lock")
def release(self):
try:
(key, metadata, record) = self.client.get(self.key)
if key and record.get("owner") == self.owner:
self.client.remove(self.key)
logging.info("Lock released and key{}".format(self.key))
except aerospike.exception.RecordNotFound:
pass
except Exception as e:
logging.exception(self.key)
logging.exception(e)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
self.client.close()
logging.info("Exiting context")