-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathll2.py
More file actions
299 lines (261 loc) · 14.1 KB
/
ll2.py
File metadata and controls
299 lines (261 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import errno
import gc
import json
import requests
import time
from machine import Timer
import medea
from utils import iso8601_to_unix, log_exc, unix_to_iso8601, wrap_timer
from web import connect
class LL2Sync:
def __init__(self, API_throttle: int = 15, keep_seconds: int = 3600, cachefile: str = "llcache.json", dev: bool = False):
connect()
self.API_throttle = API_throttle # Request at most <API_throttle> requests per hour
self.keep_seconds = keep_seconds # Launch will be displayed until at most T+<keep_seconds>
self.dev = dev # Whether to use lldev or ll, for testing purposes.
self.queue = []
self.thresholds = Threshold([180, 60, -60, -180]) # Seconds until launch (<0 is T+) when we will re-fetch data (to detect HOLD HOLD HOLD)
self._t_min = 0 # Earliest time when we want to know a launch (used in get_upcoming)
self.cachefile = cachefile
self.cache_load()
self.timer_tick = Timer()
self.timer_tick.init(period=10_000, mode=Timer.PERIODIC, callback=lambda timer: wrap_timer(self.tick)) # Period in ms
self.tick()
@property
def t_min(self): # Adjusts _t_min appropriately
self._t_min = max(self._t_min, time.time() - self.keep_seconds) # At most keep_seconds ago
if len(self.launches) >= 2: # Check if launch 1 is closer than launch 0
dt_last = time.time() - self.launches[0]["net_epoch"] # Seconds since launch 0
dt_next = self.launches[1]["net_epoch"] - time.time() # Seconds until launch 1
if dt_next < dt_last:
self._t_min = self.launches[0]["net_epoch"] + 1
return self._t_min
@property
def request_dt(self):
# Two launches within 1 hour only happened twice in 2024. Probably more frequent in the future, but still rare.
n = max(1, (self.API_throttle - len(self.thresholds))/2) # So allow room for Thresholds to be triggered once per hour, and then some
dt = int(3600/n) + 1
return min(dt, 600) # Wait at most 10 minutes
def cache_save(self):
with open(self.cachefile, "w") as llcache:
json.dump({"launches": self.launches, "lastfetch": self.lastrequesttime}, llcache)
def cache_load(self):
try:
with open(self.cachefile, "r") as llcache:
llc = json.load(llcache)
self.launches = llc["launches"]
self.lastrequesttime = llc["lastfetch"]
for launch in self.launches:
if not launch.get("detailed", False): # Launch was not yet fetched in detailed mode
self.queue_details(launch["id"])
except (OSError, KeyError) as e: # File not found or invalid
log_exc(e)
self.launches = []
self.lastrequesttime = 0
self.cache_save() # Should create or overwrite file
@property
def NETepoch(self):
if len(self.launches) == 0: return 0
# TODO: return -1 if API issue, but somehow have to store that we had an issue then.
return self.launches[0]["net_epoch"]
@property
def dt(self):
return self.NETepoch - time.time()
@property
def dt_tuple(self):
""" Returns a 4-tuple (sign, H, M, S), where sign is 1 for T-, 0 for T+. """
S = abs(self.dt)
H = S // 3600
S -= H*3600
M = S // 60
S -= M*60
return (self.dt >= 0, H, M, S)
def tick(self): # Performs all the checks and requests information when needed. Should be run every few seconds or so.
gc.collect()
# HOW ABOUT THIS:
# -> Must make sure that we do not empty this queue too rapidly, otherwise threshold requests might fail.
# -> Idea: we could keep track of <i>, the number of requests performed, and when it exceeds
# <API_throttle>-5, we do not request anymore unless it is a threshold. When we hit this
# amount, request our quota from the API after every request and update <i> accordingly.
if self.thresholds.pass_check(self.dt): # Threshold passed: should definitely re-fetch NETs
self.get_upcoming()
if time.time() - self.lastrequesttime > self.request_dt: # Sufficient time has passed since last request
if len(self.queue) == 0: # No special requests
self.get_upcoming()
self.queue_details(self.launches[0]["id"])
else:
self.queue[0]()
self.queue.pop(0)
# Remove launches from before <self.t_min>
n = len(self.launches)
self.launches = list(filter(lambda launch: launch["net_epoch"] > self.t_min, self.launches))
if len(self.launches) < n: # A launch has been removed, so update everything.
self.get_upcoming()
def request(self, endpoint) -> medea.LazyRequest | None:
api = "lldev" if self.dev else "ll"
url = f"https://{api}.thespacedevs.com/2.3.0/" + endpoint.lstrip("/")
if self.lastrequesttime > time.time(): return # Happens if 429 status happened recently
self.lastrequesttime = time.time()
try:
print(gc.mem_alloc(), gc.mem_free())
gc.collect()
print(gc.mem_alloc(), gc.mem_free())
print(url)
response = medea.LazyRequest(url, timeout=10.)
print("Response status code:", response.status_code)
if response.status_code == 429: # Too many requests
response_throttle = requests.get("https://ll.thespacedevs.com/2.3.0/api-throttle/") # Just use requests lib, this is a small JSON
self.lastrequesttime = time.time() + response_throttle.json()["next_use_secs"]
return
elif response.status_code == 200: return response
else: return
except OSError as e:
if e.errno == errno.EHOSTUNREACH:
connect() # WIFI connection likely lost
return self.request(endpoint)
else:
log_exc(e)
raise e
except StopIteration as e:
log_exc(e)
connect()
def update_launch_data(self, lazyreq: medea.LazyRequest, detailed: bool = False): # Puts relevant information from an LL2 launch response into self.launches.
""" When <detailed> is True, the ["detailed"] field of affected launches is set to True, preventing further detailed requests. """
new = [{}] # List of launches in the response. We will build this up during the JSONgen, and merge with self.launches later.
JSONgen = lazyreq.tokenize()
path = []
i = d = 0
def check_field(*keypath): return path[d:] == list(keypath)
for tok, val in JSONgen:
if medea.extendpath(path, tok, val): continue # Not at a key-value pair
# API either returns a pure launch, or an object like {<request_metadata>, "results": [<launch(es)>]}
if path[0] == "results": # Response includes some metadata, so ignore that
i = int(path[1]) # Number of launch we are at in the response (for indexing <new>)
d = 2 # Offset for indexing (because we have to skip ["results"][i])
if len(new) <= i: new.append({})
l = new[i]
if check_field("id"):
l["id"] = val
elif check_field("net"):
l["net"] = val
l["net_epoch"] = iso8601_to_unix(l["net"])
elif check_field("net_precision", "id"):
l["net_precision_id"] = val # >2: Uncertainty >1h, so probably not interesting to show on clock
elif path[d] == "status": # Dict with "id", "name", "abbrev" and "description"
l.setdefault("status", {}) # Setting a dict requires some extra effort
l["status"][path[d+1]] = val
elif check_field("image", "thumbnail_url"):
l["image_thumbnail_url"] = val
elif check_field("name"):
split = val.split(" | ") # Failsafe when not using detailed mode
if len(split) != 2: continue
l.setdefault("rocket_name", split[0])
l.setdefault("payload_name", split[1])
elif check_field("rocket", "configuration", "full_name"):
l["rocket_name"] = val
elif check_field("mission", "name"):
l["payload_name"] = val
elif check_field("pad", "name"):
l["pad"] = val
elif check_field("pad", "location", "name"):
l["pad_location"] = val
elif check_field("launch_service_provider", "name"):
l["lsp"] = val
elif "country" in path: # Country codes are found in many places. Set country in increasing order of importance.
priorities = [ # First has highest priority
("rocket", "configuration", "manufacturer", "country", 0, "alpha_2_code"),
("launch_service_provider", "country", 0, "alpha_2_code"),
("mission", "agencies", 0, "country", 0, "alpha_2_code"),
("pad", "country", "alpha_2_code"),
("pad", "agencies", 0, "country", 0, "alpha_2_code")
]
for i, p in enumerate(priorities):
if check_field(*p):
if l.get("country_priority", 100) >= i:
l["country"] = val
l["country_priority"] = i
break
# Update <self.launches> with <new>
for launch in new:
if (ID := launch.get("id")) is None: continue
launch.pop("country_priority", None)
# Have we requested this ID yet?
ls = [l for l in self.launches if l["id"] == ID]
if ls: # Known launch
l = ls[0]
else: # New launch: add and fetch details
l = {"id": ID, "detailed": False}
self.launches.append(l)
self.queue_details(ID)
merge(l, launch) # <launch> will overwrite fields in <l>
if detailed: l["detailed"] = True
# Remove launches that are not in the upcoming
if not detailed: # Only do this when we are making an "upcoming" request
IDs = [l["id"] for l in new]
self.launches = list(filter(lambda launch: launch["id"] in IDs, self.launches))
# Sort and save
self.launches.sort(key=lambda launch: launch["net_epoch"]) # Keep ordered if times would have changed
self.lastrequesttime = time.time() # Just to be safe, because update_launch_data() can take a while to run
self.cache_save()
def get_upcoming(self, n=10):
t_min = unix_to_iso8601(self.t_min)
endpoint = f"/launches/upcoming/?limit={n:d}&mode=list&include_suborbital=false&ordering=net&net__gt={t_min}"
response = self.request(endpoint)
if response is None: return
self.update_launch_data(response, detailed=False)
def get_details(self, ID): # Fetches launch <ID> in detailed mode
endpoint = f"/launches/upcoming/?id={ID}&mode=normal"
response = self.request(endpoint)
if response is None: return
self.update_launch_data(response, detailed=True)
def queue_details(self, ID):
self.queue.append(lambda: self.get_details(ID))
self.queue.append(self.get_upcoming) # After fetching details, make sure to update NETs before next element in queue
class Threshold:
def __init__(self, thresholds: list[float], start_value=0):
""" Given a list of <thresholds>, calling self.pass_check(value) will check if <value> passes any of
those thresholds when coming from the <old_value> of the previous self.pass_check(old_value) call
(most recently passed threshold is disabled until another is passed, so <len(thresholds)> must be > 2).
"""
self.thresholds = thresholds # List of thresholds where self.check() returns true if passed
self.value = start_value # Last seen value
self._last_th_passed = None # Used to make sure we don't activate the same threshold twice in row (is annoying)
def pass_check(self, value):
""" Returns True if a threshold was passed/reached when going from <value> to <self.value>. """
f = lambda threshold: (threshold - value)*(threshold - self.value) <= 0 and self.value != threshold
thresholds_passed = list(filter(f, self.thresholds))
self.value = value
if len(thresholds_passed) == 0: return False # No thresholds passed
if self._last_th_passed is not None:
if len(thresholds_passed) == 1:
if self._last_th_passed == thresholds_passed[0]: # The only passed threshold is the last passed one
return False
# Find the nearest passed threshold and return True
deltas = [abs(threshold - value) for threshold in self.thresholds]
nearest_th_i = deltas.index(min(deltas)) # Get argmin of <deltas> without numpy
self._last_th_passed = nearest_th_i
return True
def __len__(self): return len(self.thresholds)
def merge(a, b, path=None, update=True):
""" From https://stackoverflow.com/a/25270947 """
if path is None: path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
merge(a[key], b[key], path + [str(key)])
elif a[key] == b[key]:
pass # same leaf value
elif isinstance(a[key], list) and isinstance(b[key], list):
for idx, val in enumerate(b[key]):
a[key][idx] = merge(a[key][idx], b[key][idx], path + [str(key), str(idx)], update=update)
elif update:
a[key] = b[key]
else:
raise Exception('Conflict at %s' % '.'.join(path + [str(key)]))
else:
a[key] = b[key]
return a
if __name__ == "__main__":
LL2 = LL2Sync()
if len(LL2.launches):
LL2.get_details(LL2.launches[0]["id"])