forked from Graham277/NewDozer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAttendanceCodeCommunicator.py
More file actions
196 lines (164 loc) · 7.83 KB
/
Copy pathAttendanceCodeCommunicator.py
File metadata and controls
196 lines (164 loc) · 7.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import ipaddress
import logging
import json
import os
import socket
import sys
import threading
from enum import Enum
from json import JSONDecodeError
from dotenv import load_dotenv
from SheetsManager import SheetManager
load_dotenv()
class Status(Enum):
DISCONNECTED = 0
CONNECTING = 1
CONNECTED = 2
class AttendanceCodeCommunicator:
sheet_id = None
sheet_manager = None
received_codes = {}
claimed_codes = []
status: Status = Status.DISCONNECTED
thread: threading.Thread = None
def __init__(self):
"""
:raises ValueError: if no valid sheet exists
"""
secrets_location = os.getenv("secrets_json_path")
if not secrets_location:
secrets_location = "secrets.json"
self.sheet_manager = SheetManager(secrets_location)
self.sheet_id = self.sheet_manager.find_sheet("[attendance-bot]")
if self.sheet_id is None:
raise ValueError('No valid sheet exists to store attendance codes. Either create one '
'with the suffix "[attendance-bot]" (no quotes) or disable '
'attendance features with the command-line switch.')
def _discover(self, sock: socket.socket):
const_version = 1
logging.log(logging.DEBUG, "Discovering available endpoints")
while True:
try:
message = json.loads(sock.recv(1024))
except ValueError:
continue
if message['app'] == 'attendance' and message['type'] == 'discovery' and message['version'] == const_version:
logging.log(logging.DEBUG, "Found an available endpoint")
if ipaddress.ip_address(message['host']).is_loopback:
logging.log(logging.WARNING, "Received valid broadcast, but the host is a loopback address"
f" ({message['host']}). Check the network configuration on the client.")
continue
return message['host']
def _handshake(self, sock: socket.socket):
connect_message = json.loads(sock.recv(1024))
if connect_message['type'] != 'connect':
logging.log(logging.ERROR, f"Failed to connect with endpoint, wrong type {connect_message['type']}")
return False # let endpoint try again
response = {
'type': 'acknowledge',
'targeting': connect_message['type']
}
val1_tmp = json.dumps(response)
val2_tmp = val1_tmp.encode()
sock.send(val2_tmp)
return True
def _communicate_after_handshake(self, sock: socket.socket):
const_code_show_duration = 30 # seconds
const_code_valid_duration = 60 # seconds
counter = 0
while True:
data = sock.recv(1024)
if len(data) == 0:
logging.log(logging.WARN, "Remote endpoint disconnected")
return
message = json.loads(data)
if message['type'] == 'heartbeat':
response = {
'type': 'acknowledge',
'targeting': message['type'],
'counter': counter
}
counter += 1
sock.sendall(bytes(json.dumps(response), 'utf-8'))
elif message['type'] == 'heartbeat_error':
response = {
'type': 'acknowledge',
'targeting': message['type']
}
sock.sendall(bytes(json.dumps(response), 'utf-8'))
logging.log(logging.WARN, f"Logged heartbeat error: expected {counter}, told {message['counter']}")
counter = message['counter'] + 1
elif message['type'] == 'code':
code = int(message['code'])
generation_time = int(message['generation_time'])
response = {
'type': 'acknowledge',
'targeting': message['type'],
'code': code,
'valid_to': generation_time + const_code_show_duration
}
sock.sendall(bytes(json.dumps(response), 'utf-8'))
self.received_codes[response['code']] = generation_time + const_code_valid_duration
# check for expired codes
import datetime
codes_to_remove = []
for code_to_check, expiry in self.received_codes.items():
# check if the code is 3m past expiry - give a chance to see the expired message
if datetime.datetime.now(datetime.UTC).timestamp() > expiry + 180:
codes_to_remove.append(code_to_check)
for code_to_remove in codes_to_remove:
self.received_codes.pop(code_to_remove, None)
if code_to_remove in self.claimed_codes:
self.claimed_codes.remove(code_to_remove)
else:
logging.log(logging.WARN, f"Unknown message {message['type']}, ignoring")
def _communicate(self, should_fail_on_exception: bool):
"""
Attempt to communicate with the remote screen.
Handles both broadcasts and regular communication.
Intended to be run as a thread
:return: None
"""
while True:
try:
logging.log(logging.INFO, f"Communicator thread started")
broadcast_in_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_in_sock.bind(('', 5789))
logging.log(logging.INFO, "Now listening on all interfaces, port 5789")
host = self._discover(broadcast_in_sock)
logging.log(logging.INFO, f"Found an endpoint at {host}")
self.status = Status.CONNECTING
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, 5789))
if self._handshake(sock):
self.status = Status.CONNECTED
self._communicate_after_handshake(sock)
except socket.error | JSONDecodeError as e:
if should_fail_on_exception:
logging.log(logging.ERROR, f"Exception raised during communication!")
raise e # terminates thread
else:
logging.log(logging.ERROR, f"Exception raised during communication - {e} - continuing anyways!")
# loop again
def run(self):
# Specs:
#
# A screen sends out a 255.255.255.255 broadcast app: attendance, type: discovery, version: 1 (as of now), host matching local IP
# Server connects to host given in the parameter
#
# On connect, screen sends JSON object with type = "connect"
# Server sends type = "acknowledge", targeting = "connect"
# On every code generation event (screen): type = "code", code = <generated code>, generation_time = <generation Unix timestamp>
# Server acknowledges with type = "acknowledge", targeting = "code", code = <same>, valid_to = <expiry time>
# Every 10s, screen sends type = "heartbeat", counter = <counter that increments each heartbeat>
# Server sends type = "acknowledge", targeting = "heartbeat", counter = <counter>
#
# Errors:
# If the heartbeat counters do not match, server sends type = "heartbeat_error", counter = <corrected value>
# If connection fails, disconnect and try again after 10s
#
# Note that the "server" here is the discord bot, but the server is technically the tablet
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logging.log(logging.INFO, f"Starting attendance communicator")
self.thread = threading.Thread(target=lambda: self._communicate(False), daemon=True)
self.thread.start()