-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnotification_manager.py
More file actions
66 lines (60 loc) · 2.02 KB
/
notification_manager.py
File metadata and controls
66 lines (60 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from retrying import retry
from gql import gql
from logger import logger
class NotificationManager:
def __init__(self, client):
self.client = client
@retry(wait_exponential_multiplier=3000, wait_exponential_max=10000, stop_max_attempt_number=3, retry_on_exception=lambda e: True)
def execute(self, query, variables=None, attempt=1):
gql_query = gql(query)
try:
result = self.client.execute(gql_query, variable_values=variables)
if attempt > 1:
logger.info(f"Query succeeded on attempt {attempt}.")
return result
except Exception as e:
logger.error(f"Error executing query on attempt {attempt}: {str(e)}")
if attempt < 3: # If it's not the last attempt
return self.execute(query, variables, attempt + 1)
raise
def has_new_notifications(self):
query = '''
{
hasNewNotes
}
'''
return self.execute(query)
def get_notifications(self, cursor=None, inc=None):
query = '''
query($cursor: String, $inc: String) {
notifications(cursor: $cursor, inc: $inc) {
lastChecked
cursor
notifications {
... on Reply {
id
item {
id
title
}
sortTime
}
... on Votification {
id
earnedSats
item {
id
title
}
sortTime
}
# Add other notification types here.
}
}
}
'''
variables = {
"cursor": cursor,
"inc": inc
}
return self.execute(query, variables)