-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtrack.py
More file actions
458 lines (269 loc) · 15.3 KB
/
track.py
File metadata and controls
458 lines (269 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
import json
import math
import os
import sys
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from utils import checkTTOBenchVersion, convertUnit
def importTuples(tuples, xLabel, yLabels):
"""
Convert list of tuples (or lists) into pandas dataframe.
"""
if not isinstance(yLabels, list):
yLabels = [yLabels]
if not isinstance(tuples, list):
raise ValueError("Input must be a list (of tuples or lists)!")
elementOk = lambda x: (isinstance(x, tuple) or isinstance(x, list)) and len(x) == 1 + len(yLabels)
if not all([elementOk(tup) for tup in tuples]):
raise ValueError("Error in list!")
index = np.array([tup[0] for tup in tuples])
if any(index < 0):
raise ValueError("Position data cannot be negative!")
if any(np.isinf(index)):
raise ValueError("Position data cannot be infinite!")
if any(np.diff(index) <= 0):
raise ValueError("Position data must monotonically increase!")
df = pd.DataFrame({xLabel:index}).set_index(xLabel)
for ii, yLabel in enumerate(yLabels):
data = [float(tup[1+ii]) for tup in tuples]
df[yLabel] = data
return df
def checkDataFrame(df, trackLength):
"""
Check validity of initial and end position in pandas dataframe.
"""
if df.index[0] != 0:
raise ValueError("Error in '{}': First track section must start at 0 m (beginning of track)!".format(df.columns[0]))
if df.index[-1] > trackLength:
raise ValueError("Error in '{}': Last track section must start before {} m (end of track)!".format(df.columns[0], trackLength))
return True
def computeAltitude(gradients, length, altitudeStart=0):
"""
Calculate altitude profile from gradients.
"""
position = np.append(gradients.index.values, length)
altitude = np.array([altitudeStart])
for ii in range(1, len(gradients)+1):
posStart = gradients.index[ii-1]
posEnd = gradients.index[ii] if ii < len(gradients) else length
gradient = gradients.iloc[ii-1][0] if isinstance(gradients, pd.DataFrame) else gradients.iloc[ii-1]
height = (posEnd - posStart)*(gradient/1e3)
altitude = np.append(altitude, altitude[-1] + height)
df = pd.DataFrame({gradients.index.name:position, 'Altitude [m]':altitude}).set_index(gradients.index.name)
return df
def computeDiscretizationPoints(track, numIntervals):
"""
Compute the space discretization points based on track characteristics and horizon length.
"""
df1 = track.mergeDataFrames()
pos = np.linspace(0, track.length, numIntervals + 1 - (len(df1) - 1))
df2 = pd.DataFrame({'position [m]':pos}).set_index('position [m]')
df3 = df2.join(df1, how='outer').ffill()
if len(df3) != numIntervals + 1:
raise ValueError("Wrong number of computed discretization intervals!")
return df3
class Track():
CURVATURE_THRESHOLD = 1/150 # absolute value of maximum allowed cruvature [1/m]
def __init__(self, config, pathJSON='tracks'):
"""
Constructor of Track objects.
"""
# check config
if not isinstance(config, dict):
raise ValueError("Track configuration should be provided as a dictionary!")
if 'id' not in config:
raise ValueError("Track ID must be specified in configuration!")
# open json file
filename = os.path.join(pathJSON, config['id']+'.json')
with open(filename) as file:
data = json.load(file)
checkTTOBenchVersion(data, ['1.1', '1.2', '1.3'])
# read data
self.length = convertUnit(data['stops']['values'][-1], data['stops']['unit'])
self.altitude = convertUnit(data['altitude']['value'], data['altitude']['unit']) if 'altitude' in data else 0
self.title = data['metadata']['id']
self.importSpeedLimitTuples(data['speed limits']['values'], data['speed limits']['units']['velocity'])
self.importGradientTuples(data['gradients']['values'] if 'gradients' in data else [(0.0, 0.0)],
data['gradients']['units']['slope'] if 'gradients' in data else 'permil')
self.importCurvatureTuples(data['curvatures']['values'] if 'curvatures' in data else [(0.0, "infinity", "infinity")],
data['curvatures']['units']['radius at start'] if 'curvatures' in data else "m",
data['curvatures']['units']['radius at end'] if 'curvatures' in data else "m",
config['clothoidSamplingInterval'] if 'clothoidSamplingInterval' in config else None)
numStops = len(data['stops']['values'])
indxDeparture = config['from'] if 'from' in config else 0
indxDestination = config['to'] if 'to' in config else numStops-1
if not 0 <= indxDeparture < numStops - 1:
raise ValueError("Index of departure is out of bounds!")
if not indxDeparture < indxDestination < numStops:
raise ValueError("Index of destination is out of bounds!")
posDeparture = convertUnit(data['stops']['values'][indxDeparture], data['stops']['unit'])
posDestination = convertUnit(data['stops']['values'][indxDestination], data['stops']['unit'])
self.updateLimits(posDeparture, posDestination)
self.checkFields()
def lengthOk(self):
return True if self.length is not None and self.length > 0 and not np.isinf(self.length) else False
def gradientsOk(self):
return True if self.gradients.shape[0] > 0 and checkDataFrame(self.gradients, self.length) else False
def speedLimitsOk(self):
return True if self.speedLimits.shape[0] > 0 and checkDataFrame(self.speedLimits, self.length) else False
def curvaturesOk(self):
if (abs(self.curvatures['Curvature [1/m]']) > Track.CURVATURE_THRESHOLD).values.any():
return False
return True if self.curvatures.shape[0] > 0 and checkDataFrame(self.curvatures, self.length) else False
def checkFields(self):
if not self.lengthOk():
raise ValueError("Track length must be a strictly positive number, not {}!".format(self.length))
if self.altitude is None or np.isinf(self.altitude):
raise ValueError("Altitude must be a number, not {}!".format(self.altitude))
if not self.gradientsOk():
raise ValueError("Issue with track gradients!")
if not self.speedLimitsOk():
raise ValueError("Issue with track speed limits!")
if not self.curvaturesOk():
raise ValueError("Issue with track curvatures!")
def importGradientTuples(self, tuples, unit='permil'):
if not self.lengthOk():
raise ValueError("Cannot import gradients without a valid track length!")
if unit not in {'permil'}:
raise ValueError("Specified gradient unit not supported!")
self.gradients = importTuples(tuples, 'Position [m]', 'Gradient [permil]')
checkDataFrame(self.gradients, self.length)
def importSpeedLimitTuples(self, tuples, unit='km/h'):
if not self.lengthOk():
raise ValueError("Cannot import speed limits without a valid track length!")
if unit not in {'km/h', 'm/s'}:
raise ValueError("Specified speed unit not supported!")
tuples = [(p, convertUnit(v, unit)) for p,v in tuples]
self.speedLimits = importTuples(tuples, 'Position [m]', 'Speed limit [m/s]')
checkDataFrame(self.speedLimits, self.length)
def importCurvatureTuples(self, tuples, unitRadiusStart='m', unitRadiusEnd='m', clothoidSamplingInterval=None):
if not self.lengthOk():
raise ValueError("Cannot import curvature without a valid track length!")
if unitRadiusStart not in {'m', 'km'} or unitRadiusEnd not in {'m', 'km'}:
raise ValueError("Specified curvature radius unit not supported!")
# if radius is 'infinity', the casting to float produces the float inf
tuples = [(p, convertUnit(float(radiusStart), unitRadiusStart), convertUnit(float(radiusEnd), unitRadiusEnd)) for p, radiusStart, radiusEnd in tuples]
tuples = self.sampleClothoid(tuples, clothoidSamplingInterval)
self.curvatures = importTuples(tuples, 'Position [m]', ['Curvature [1/m]'])
checkDataFrame(self.curvatures, self.length)
def sampleClothoid(self, tuples, ds=None):
"""
Approximates clothoid transition curve with piecewise-constant function.
Given an interval [s_i, s_i + ds] the approximation of K(s) on the interval
is K_avg(s) = (K(s_i) + K(s_i + ds))/2. For the last interval [s_i, s_f] the
approximation of K(s) is K_avg(s) = (K(s_i) + K(s_f))/2 where K(s_f) is the
curvature at the end of the section. When ds is not specified, K(s) is
approximated as K_approx(s) = (K(s_0) + K(s_f))/2.
- param tuples: a list of triples of form (p, Rstart, Rend) where p is the
coordinate [m] at the start of the track section; Rstart is the radius [m] at the
start of the section and Rend the radius [m] at the end.
- param ds: the step size [m] used to approximate the clothoid. Note that we cannot
guarantee that all intervals have size ds. Hence, in
general, the last interval has lenght L such that: ds <= L < 2*ds while all other
intervals have size ds.
- return: a list of pairs (p, K) where K [1/m] is the approximation
of the clothoid curvature in the track section starting at position p.
"""
if any([radiusValue == 0 for radiusValue in [trackSection[radiusType] for trackSection in tuples for radiusType in range(1,3)]]):
raise ValueError("Curvature radius cannot be 0!")
if any([tuples[sectionIndex][0] < 0 for sectionIndex in range(len(tuples))]):
raise ValueError("Positions cannot be negative!")
if any([tuples[sectionIndex][0] == tuples[sectionIndex+1][0] for sectionIndex in range(len(tuples)-1)]):
raise ValueError("Positions must be monotonically increasing")
if (ds != None and ds <= 0 ):
raise ValueError("Discretization step must be greater than zero or None!")
result = []
epsilon = sys.float_info.epsilon
for trackIndex, trackSection in enumerate(tuples):
sectionStart = trackSection[0]
curvatureStart = 1/trackSection[1]
curvatureEnd = 1/trackSection[2]
if abs(curvatureStart - curvatureEnd) <= epsilon:
result.append((sectionStart, curvatureStart))
else:
sectionEnd = tuples[trackIndex+1][0] if trackIndex < len(tuples)-1 else self.length
if ds == None or int((sectionEnd-sectionStart)/ds) == 0:
result.append((sectionStart, (curvatureStart + curvatureEnd)/2))
else:
nIntervals = int((sectionEnd-sectionStart)/ds)
# the curvature of a clothoid is K(s) = K_0 + (s-s_0)/alpha
alpha = (sectionEnd-sectionStart)/(curvatureEnd-curvatureStart)
for intervalIndex in range(nIntervals):
discretizationPoint = sectionStart+intervalIndex*ds
curvatureAtDiscretizationPoint = curvatureStart + intervalIndex*ds/alpha
# remark that the last interval has lenght L such that: ds <= L < 2*ds.
avgCurvature = (curvatureAtDiscretizationPoint + curvatureEnd)/2 if intervalIndex==nIntervals-1 \
else curvatureAtDiscretizationPoint + ds/(2*alpha)
result.append((discretizationPoint, avgCurvature))
return result
def reverse(self):
# switch to opposite trip
try:
self.checkFields()
except ValueError as e:
raise ValueError("Track cannot be reversed due to error: {}".format(str(e)))
def flipData(df):
newIndex = np.flip(self.length - np.append(df.index[1:], self.length))
newValues = np.flip(df[df.keys()[0]].values)
return pd.DataFrame({df.index.name:newIndex, df.keys()[0]:newValues}).set_index(df.index.name)
self.gradients = -flipData(self.gradients)
self.speedLimits = flipData(self.speedLimits)
self.curvatures = -flipData(self.curvatures)
self.title = self.title + ' (reversed)'
return self
def mergeDataFrames(self):
"""
Build dataframe with intervals of constant gradient, speed limit and curvature.
"""
joinedGradientsAndSpeedLimits = self.gradients.join(self.speedLimits, how='outer').fillna(method='ffill')
return self.curvatures.join(joinedGradientsAndSpeedLimits, how='outer').fillna(method='ffill')
def print(self):
"""
Basic printing functionality.
"""
df = self.mergeDataFrames()
print(df)
def plot(self, figSize=[12, 6]):
"""
Basic plotting functionality.
"""
speedLimits = self.speedLimits
speedLimits = pd.concat([speedLimits, pd.DataFrame({speedLimits.index.name:[self.length], speedLimits.keys()[0]:[None]}).set_index(speedLimits.index.name)])
speedLimits.index = speedLimits.index.map(lambda x: x/1e3) # convert m to km
speedLimits['Speed limit [m/s]'] *= 3.6 # convert m/s to km/h
speedLimits.rename(columns={'Speed limit [m/s]':'Speed limit'}, inplace=True)
axV = speedLimits.plot(color='purple', drawstyle='steps-post', xlabel = 'Position [km]', ylabel='Velocity [km/h]', figsize=figSize)
axV.legend(loc='lower left')
axA = axV.twinx()
altitude = computeAltitude(self.gradients, self.length)
altitude.index = altitude.index.map(lambda x: x/1e3) # convert m to km
altitude.rename(columns={'Altitude [m]':'Track profile'}, inplace=True)
axA = altitude.plot(ax=axA, color='gray', title='Visualization of ' + self.title + ' track', grid=True, ylabel='Altitude [m]')
axA.legend(loc='upper right')
plt.show()
def updateLimits(self, positionStart=None, positionEnd=None, unit='m'):
"""
Truncate track to given positions.
"""
positionStart = 0 if positionStart is None else positionStart
positionEnd = self.length if positionEnd is None else positionEnd
if (not 0 <= positionStart < self.length) or (not 0 < positionEnd <= self.length) :
raise ValueError("Given positions must be between limits of track!")
positionStart = convertUnit(positionStart, unit)
positionEnd = convertUnit(positionEnd, unit)
newPos = pd.DataFrame({'Position [m]':[positionStart]}).set_index('Position [m]')
def crop(dfIn):
dfOut = newPos.join(dfIn, how='outer').ffill()
dfOut = dfOut.loc[(dfOut.index >= positionStart)&(dfOut.index <= positionEnd)]
dfOut['Position [m]'] = dfOut.index - dfOut.index[0]
dfOut.set_index('Position [m]', inplace=True)
return dfOut
self.length -= positionStart + (self.length - positionEnd)
self.speedLimits = crop(self.speedLimits)
self.gradients = crop(self.gradients)
self.curvatures = crop(self.curvatures)
if __name__ == '__main__':
# Example on how to load and plot a track
track = Track(config={'id':'CH_StGallen_Wil'})
track.plot()