-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy path_MultiPage.py
More file actions
204 lines (169 loc) · 7.3 KB
/
_MultiPage.py
File metadata and controls
204 lines (169 loc) · 7.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import math
import re
import weakref
from time import time
import dateutil.parser
import humanize
from ._util import _formatDuration
# Handles data multi-page downloads (scalardata, rawdata, archivefiles)
class _MultiPage:
def __init__(self, parent: object):
self.parent = weakref.ref(parent)
self.result = None
def getAllPages(self, service: str, url: str, filters: dict):
"""
Requests all pages from the service, with the url and filters
Multiple pages will be downloaded until completed
@return: Service response with concatenated data for all pages obtained
"""
# pop archivefiles extension
extension = None
if service.startswith("archivefile") and "extension" in filters:
extension = filters["extension"]
del filters["extension"]
# download first page
start = time()
response, responseTime = self._doPageRequest(url, filters, service, extension)
rNext = response["next"]
if rNext is not None:
print(
"Data quantity is greater than the row limit and",
"will be downloaded in multiple pages.",
)
pageCount = 1
pageEstimate = self._estimatePages(response, service)
if pageEstimate > 0:
# Exclude the first page when calculating the time estimation
timeEstimate = _formatDuration((pageEstimate - 1) * responseTime)
print(
f"Downloading time for the first page: {humanize.naturaldelta(responseTime)}" # noqa: E501
)
print(f"Estimated approx. {pageEstimate} pages in total.")
print(
f"Estimated approx. {timeEstimate} to complete for the rest of the pages." # noqa: E501
)
# keep downloading pages until next is None
print("")
while rNext is not None:
pageCount += 1
rowCount = self._rowCount(response, service)
print(f" ({rowCount} samples) Downloading page {pageCount}...")
nextResponse, nextTime = self._doPageRequest(
url, rNext["parameters"], service, extension
)
rNext = nextResponse["next"]
# concatenate new data obtained
self._catenateData(response, nextResponse, service)
totalTime = _formatDuration(time() - start)
print(
f" ({self._rowCount(response, service):d} samples)"
f" Completed in {totalTime}."
)
response["next"] = None
return response
def _doPageRequest(
self, url: str, filters: dict, service: str, extension: str = None
):
"""
Wraps the _doRequest method
Performs additional processing of the response for certain services
@param extension: Only provide for archivefiles filtering
Returns a tuple (jsonResponse, duration)
"""
if service.startswith("archivefile"):
response, duration = self.parent()._doRequest(url, filters, getTime=True)
response = self.parent()._filterByExtension(response, extension)
else:
response, duration = self.parent()._doRequest(url, filters, getTime=True)
return response, duration
def _catenateData(self, response: object, nextResponse: object, service: str):
"""
Concatenates the data results from nextResponse into response
Compatible with the row structure of different services
"""
if service.startswith("scalardata"):
keys = response["sensorData"][0]["data"].keys()
for sensorData in response["sensorData"]:
sensorCode = sensorData["sensorCode"]
nextSensor = next(
ns
for ns in nextResponse["sensorData"]
if ns["sensorCode"] == sensorCode
)
for key in keys:
sensorData["data"][key] += nextSensor["data"][key]
elif service.startswith("rawdata"):
for key in response["data"]:
response["data"][key] += nextResponse["data"][key]
elif service.startswith("archivefile"):
response["files"] += nextResponse["files"]
def _estimatePages(self, response: object, service: str):
"""
Estimate the number of pages the request will require.
It is calculated from the first page's response and its duration.
Parameters
----------
responseTime : float
Request duration in seconds.
"""
# timespan covered by the data in the response
pageTimespan = self._responseTimespan(response, service)
if pageTimespan == 0:
return 0
# total timespan to cover in the next parameter excluding the first page
totalBegin = dateutil.parser.parse(
response["next"]["parameters"]["dateFrom"], ignoretz=True
)
totalEnd = dateutil.parser.parse(
response["next"]["parameters"]["dateTo"], ignoretz=True
)
totalTimespan = totalEnd - totalBegin
# handle cases of very small timeframes
pageSeconds = max(pageTimespan.total_seconds(), 1)
totalSeconds = totalTimespan.total_seconds()
# plus one for the first page
return math.ceil(totalSeconds / pageSeconds) + 1
def _rowCount(self, response, service: str):
"""
Returns the number of records in the response
"""
if service.startswith("scalardata"):
return len(response["sensorData"][0]["data"]["sampleTimes"])
elif service.startswith("rawdata"):
return len(response["data"]["times"])
elif service.startswith("archivefile"):
return len(response["files"])
return 0
def _responseTimespan(self, response, service: str):
"""
Determines the timespan the data in the response covers
Returns a timedelta object
"""
# grab the first and last sample times
if service.startswith("scalardata"):
first = response["sensorData"][0]["data"]["sampleTimes"][0]
last = response["sensorData"][0]["data"]["sampleTimes"][-1]
elif service.startswith("rawdata"):
first = response["data"]["times"][0]
last = response["data"]["times"][-1]
elif service.startswith("archivefile"):
row0 = response["files"][0]
if isinstance(row0, str):
regExp = r"\d{8}T\d{6}\.\d{3}Z"
reFirst = re.search(regExp, response["files"][0])
reLast = re.search(regExp, response["files"][-1])
first = reFirst.group()
last = reLast.group()
if (
reFirst is None
or reLast is None
or reFirst.group() == reLast.group()
):
return 0
else:
first = response["files"][0]["dateFrom"]
last = response["files"][-1]["dateFrom"]
# compute the timedelta
dateFirst = dateutil.parser.parse(first)
dateLast = dateutil.parser.parse(last)
return dateLast - dateFirst