|
7 | 7 | from ...utils import serialization |
8 | 8 | from ...mathutils import transforms |
9 | 9 | import numpy as np |
| 10 | +from .longitudinal_planning import longitudinal_plan, longitudinal_brake |
10 | 11 |
|
11 | | -DEBUG = False # Set to False to disable debug output |
12 | | - |
13 | | - |
14 | | -def generate_dense_points(points: List[Tuple[float, float]], density: int = 10) -> List[Tuple[float, float]]: |
15 | | - if not points: |
16 | | - return [] |
17 | | - if len(points) == 1: |
18 | | - return points.copy() |
19 | | - |
20 | | - dense_points = [points[0]] |
21 | | - for i in range(len(points) - 1): |
22 | | - p0 = points[i] |
23 | | - p1 = points[i + 1] |
24 | | - dx = p1[0] - p0[0] |
25 | | - dy = p1[1] - p0[1] |
26 | | - seg_length = math.hypot(dx, dy) |
27 | | - |
28 | | - n_interp = int(round(seg_length * density)) |
29 | | - |
30 | | - for j in range(1, n_interp + 1): |
31 | | - fraction = j / (n_interp + 1) |
32 | | - x_interp = p0[0] + fraction * dx |
33 | | - y_interp = p0[1] + fraction * dy |
34 | | - dense_points.append((x_interp, y_interp)) |
35 | | - |
36 | | - dense_points.append(p1) |
37 | | - |
38 | | - return dense_points |
39 | | - |
40 | | - |
41 | | -def compute_cumulative_distances(points: List[List[float]]) -> List[float]: |
42 | | - s_vals = [0.0] |
43 | | - for i in range(1, len(points)): |
44 | | - dx = points[i][0] - points[i - 1][0] |
45 | | - dy = points[i][1] - points[i - 1][1] |
46 | | - ds = math.hypot(dx, dy) |
47 | | - s_vals.append(s_vals[-1] + ds) |
48 | | - |
49 | | - if DEBUG: |
50 | | - print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) |
51 | | - |
52 | | - return s_vals |
53 | | - |
54 | | - |
55 | | -def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed): |
56 | | - path_normalized = path.arc_length_parameterize() |
57 | | - points = list(path_normalized.points) |
58 | | - dense_points = generate_dense_points(points) |
59 | | - s_vals = compute_cumulative_distances(dense_points) |
60 | | - L = s_vals[-1] # Total path length |
61 | | - stopping_distance = (current_speed ** 2) / (2 * deceleration) |
62 | | - |
63 | | - if DEBUG: |
64 | | - print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) |
65 | | - print("[DEBUG] longitudinal_plan: Total path length L =", L) |
66 | | - print("[DEBUG] longitudinal_plan: Braking distance needed =", stopping_distance) |
67 | | - |
68 | | - if stopping_distance > L: # Case where there is not enough stopping distance to stop before path ends (calls emergency brake) |
69 | | - return longitudinal_brake(path, deceleration, current_speed) |
70 | | - |
71 | | - if current_speed > max_speed: # Case where car is exceeding the max speed so we need to slow down (do initial slowdown) |
72 | | - if DEBUG: |
73 | | - print(f"[DEBUG] Handling case where current_speed ({current_speed:.2f}) > max_speed ({max_speed:.2f})") |
74 | | - |
75 | | - # Initial deceleration phase to reach max_speed |
76 | | - initial_decel_distance = (current_speed ** 2 - max_speed ** 2) / (2 * deceleration) |
77 | | - initial_decel_time = (current_speed - max_speed) / deceleration |
78 | | - remaining_distance = L - initial_decel_distance |
79 | | - |
80 | | - if DEBUG: |
81 | | - print( |
82 | | - f"[DEBUG] Phase 1 - Initial Decel: distance = {initial_decel_distance:.2f}, time = {initial_decel_time:.2f}") |
83 | | - print(f"[DEBUG] Remaining distance after reaching max_speed: {remaining_distance:.2f}") |
84 | | - |
85 | | - # Calculate final deceleration distance needed to stop from max_speed |
86 | | - final_decel_distance = (max_speed ** 2) / (2 * deceleration) |
87 | | - cruise_distance = remaining_distance - final_decel_distance |
88 | | - |
89 | | - if DEBUG: |
90 | | - print(f"[DEBUG] Phase 2 - Cruise: distance = {cruise_distance:.2f}") |
91 | | - print(f"[DEBUG] Phase 3 - Final Decel: distance = {final_decel_distance:.2f}") |
92 | | - |
93 | | - times = [] |
94 | | - for s in s_vals: |
95 | | - if s <= initial_decel_distance: # Phase 1: Initial deceleration to max_speed |
96 | | - v = math.sqrt(current_speed ** 2 - 2 * deceleration * s) |
97 | | - t = (current_speed - v) / deceleration |
98 | | - if DEBUG: # Print every 10m |
99 | | - print(f"[DEBUG] Initial Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") |
100 | | - |
101 | | - elif s <= initial_decel_distance + cruise_distance: # Phase 2: Cruise at max_speed |
102 | | - s_in_cruise = s - initial_decel_distance |
103 | | - t = initial_decel_time + s_in_cruise / max_speed |
104 | | - if DEBUG: # Print every 10m |
105 | | - print(f"[DEBUG] Cruise: s = {s:.2f}, v = {max_speed:.2f}, t = {t:.2f}") |
106 | | - |
107 | | - else: # Phase 3: Final deceleration to stop |
108 | | - s_in_final_decel = s - (initial_decel_distance + cruise_distance) |
109 | | - v = math.sqrt(max(max_speed ** 2 - 2 * deceleration * s_in_final_decel, 0.0)) |
110 | | - t = initial_decel_time + cruise_distance / max_speed + (max_speed - v) / deceleration |
111 | | - if DEBUG: # Print every 10m |
112 | | - print(f"[DEBUG] Final Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") |
113 | | - |
114 | | - times.append(t) |
115 | | - |
116 | | - if DEBUG: |
117 | | - print("[DEBUG] Trajectory complete: Three phases executed") |
118 | | - print(f"[DEBUG] Total time: {times[-1]:.2f}") |
119 | | - |
120 | | - return Trajectory(frame=path.frame, points=dense_points, times=times) |
121 | | - |
122 | | - if acceleration <= 0: |
123 | | - if DEBUG: |
124 | | - print(f"[DEBUG] No acceleration allowed. Current speed: {current_speed:.2f}") |
125 | | - |
126 | | - # Pure deceleration phase |
127 | | - s_decel = (current_speed ** 2) / (2 * deceleration) |
128 | | - T_decel = current_speed / deceleration |
129 | | - |
130 | | - if DEBUG: |
131 | | - print(f"[DEBUG] Will maintain speed until s_decel: {s_decel:.2f}") |
132 | | - print(f"[DEBUG] Total deceleration time will be: {T_decel:.2f}") |
133 | | - |
134 | | - times = [] |
135 | | - for s in s_vals: |
136 | | - if s <= L - s_decel: # Maintain current speed until deceleration point |
137 | | - t_point = s / current_speed |
138 | | - if DEBUG: |
139 | | - print(f"[DEBUG] Constant Speed Phase: s = {s:.2f}, v = {current_speed:.2f}, t = {t_point:.2f}") |
140 | | - else: # Deceleration phase |
141 | | - s_in_decel = s - (L - s_decel) |
142 | | - v = math.sqrt(max(current_speed ** 2 - 2 * deceleration * s_in_decel, 0.0)) |
143 | | - t_point = (L - s_decel) / current_speed + (current_speed - v) / deceleration |
144 | | - if DEBUG: |
145 | | - print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") |
146 | | - |
147 | | - times.append(t_point) |
148 | | - |
149 | | - return Trajectory(frame=path.frame, points=dense_points, times=times) |
150 | | - |
151 | | - # Determine max possible peak speed given distance |
152 | | - v_peak_possible = math.sqrt( |
153 | | - (2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) |
154 | | - v_target = min(max_speed, v_peak_possible) |
155 | | - |
156 | | - if DEBUG: |
157 | | - print("[DEBUG] longitudinal_plan: v_peak_possible =", v_peak_possible, "v_target =", v_target) |
158 | | - |
159 | | - # Compute acceleration phase |
160 | | - s_accel = max(0.0, (v_target ** 2 - current_speed ** 2) / (2 * acceleration)) |
161 | | - t_accel = max(0.0, (v_target - current_speed) / acceleration) |
162 | | - |
163 | | - # Compute deceleration phase |
164 | | - s_decel = max(0.0, (v_target ** 2) / (2 * deceleration)) |
165 | | - t_decel = max(0.0, v_target / deceleration) |
166 | | - |
167 | | - # Compute cruise phase |
168 | | - s_cruise = max(0.0, L - s_accel - s_decel) |
169 | | - t_cruise = s_cruise / v_target if v_target > 0 else 0.0 |
170 | | - |
171 | | - if DEBUG: |
172 | | - print("[DEBUG] longitudinal_plan: s_accel =", s_accel, "t_accel =", t_accel) |
173 | | - print("[DEBUG] longitudinal_plan: s_decel =", s_decel, "t_decel =", t_decel) |
174 | | - print("[DEBUG] longitudinal_plan: s_cruise =", s_cruise, "t_cruise =", t_cruise) |
175 | | - |
176 | | - times = [] |
177 | | - for s in s_vals: |
178 | | - if s <= s_accel: # Acceleration phase |
179 | | - v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) |
180 | | - t_point = (v - current_speed) / acceleration |
181 | | - |
182 | | - if DEBUG: |
183 | | - print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") |
184 | | - |
185 | | - elif s <= s_accel + s_cruise: # Cruise phase |
186 | | - t_point = t_accel + (s - s_accel) / v_target |
187 | | - |
188 | | - if DEBUG: |
189 | | - print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") |
190 | | - |
191 | | - else: # Deceleration phase |
192 | | - s_decel_phase = s - s_accel - s_cruise |
193 | | - v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) |
194 | | - t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration |
195 | | - |
196 | | - if t_point < times[-1]: # Ensure time always increases |
197 | | - t_point = times[-1] + 0.01 # Small time correction step |
198 | 12 |
|
199 | | - if DEBUG: |
200 | | - print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") |
201 | | - |
202 | | - times.append(t_point) |
203 | | - |
204 | | - if DEBUG: |
205 | | - print("[DEBUG] longitudinal_plan: Final times =", times) |
206 | | - |
207 | | - return Trajectory(frame=path.frame, points=dense_points, times=times) |
208 | | - |
209 | | - |
210 | | -def longitudinal_brake(path: Path, deceleration: float, current_speed: float, |
211 | | - emergency_decel: float = 8.0) -> Trajectory: |
212 | | - # Vehicle already stopped - maintain position |
213 | | - if current_speed <= 0: |
214 | | - print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) |
215 | | - return Trajectory( |
216 | | - frame=path.frame, |
217 | | - points=[path.points[0]] * len(path.points), |
218 | | - times=[float(i) for i in range(len(path.points))] |
219 | | - ) |
220 | | - |
221 | | - # Get total path length |
222 | | - path_length = sum( |
223 | | - np.linalg.norm(np.array(path.points[i + 1]) - np.array(path.points[i])) |
224 | | - for i in range(len(path.points) - 1) |
225 | | - ) |
226 | | - |
227 | | - # Calculate stopping distance with normal deceleration |
228 | | - T_stop_normal = current_speed / deceleration |
229 | | - s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) |
230 | | - |
231 | | - # Check if emergency braking is needed |
232 | | - if s_stop_normal > path_length: |
233 | | - if DEBUG: |
234 | | - print("[DEBUG] longitudinal_brake: Emergency braking needed!") |
235 | | - print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") |
236 | | - print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") |
237 | | - |
238 | | - # Calculate emergency braking parameters |
239 | | - T_stop = current_speed / emergency_decel |
240 | | - s_stop = current_speed * T_stop - 0.5 * emergency_decel * (T_stop ** 2) |
241 | | - |
242 | | - if DEBUG: |
243 | | - print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") |
244 | | - print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") |
245 | | - |
246 | | - decel_to_use = emergency_decel |
247 | | - |
248 | | - else: |
249 | | - if DEBUG: |
250 | | - print("[DEBUG] longitudinal_brake: Normal braking sufficient") |
251 | | - T_stop = T_stop_normal |
252 | | - decel_to_use = deceleration |
253 | | - |
254 | | - # Generate time points (use more points for smoother trajectory) |
255 | | - num_points = max(len(path.points), 50) |
256 | | - times = np.linspace(0, T_stop, num_points) |
257 | | - |
258 | | - # Calculate distances at each time point using physics equation |
259 | | - distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) |
260 | | - |
261 | | - # Generate points along the path |
262 | | - points = [] |
263 | | - for d in distances: |
264 | | - if d <= path_length: |
265 | | - points.append(path.eval(d)) |
266 | | - else: |
267 | | - points.append(path.eval(d)) |
268 | | - |
269 | | - if DEBUG: |
270 | | - print(f"[DEBUG] longitudinal_brake: Using deceleration of {decel_to_use:.2f} m/s²") |
271 | | - print(f"[DEBUG] longitudinal_brake: Final stopping time: {T_stop:.2f}s") |
272 | | - |
273 | | - return Trajectory(frame=path.frame, points=points, times=times.tolist()) |
274 | | - |
275 | | - times = [] |
276 | | - for s in s_vals: |
277 | | - if s <= s_accel: # Acceleration phase |
278 | | - v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) |
279 | | - t_point = (v - current_speed) / acceleration |
280 | | - |
281 | | - if DEBUG: |
282 | | - print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") |
283 | | - |
284 | | - elif s <= s_accel + s_cruise: # Cruise phase |
285 | | - t_point = t_accel + (s - s_accel) / v_target |
286 | | - |
287 | | - if DEBUG: |
288 | | - print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") |
289 | | - |
290 | | - else: # Deceleration phase |
291 | | - s_decel_phase = s - s_accel - s_cruise |
292 | | - v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) |
293 | | - t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration |
294 | | - |
295 | | - if t_point < times[-1]: # Ensure time always increases |
296 | | - t_point = times[-1] + 0.01 # Small time correction step |
297 | | - |
298 | | - if DEBUG: |
299 | | - print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") |
300 | | - |
301 | | - times.append(t_point) |
302 | | - |
303 | | - if DEBUG: |
304 | | - print("[DEBUG] longitudinal_plan: Final times =", times) |
305 | | - |
306 | | - return Trajectory(frame=path.frame, points=dense_points, times=times) |
307 | | - |
308 | | -def longitudinal_brake(path: Path, deceleration: float, current_speed: float, emergency_decel: float = 8.0) -> Trajectory: |
309 | | - # Vehicle already stopped - maintain position |
310 | | - if current_speed <= 0: |
311 | | - print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) |
312 | | - return Trajectory( |
313 | | - frame=path.frame, |
314 | | - points=[path.points[0]] * len(path.points), |
315 | | - times=[float(i) for i in range(len(path.points))] |
316 | | - ) |
317 | | - |
318 | | - # Get total path length |
319 | | - path_length = sum( |
320 | | - np.linalg.norm(np.array(path.points[i+1]) - np.array(path.points[i])) |
321 | | - for i in range(len(path.points)-1) |
322 | | - ) |
323 | | - |
324 | | - # Calculate stopping distance with normal deceleration |
325 | | - T_stop_normal = current_speed / deceleration |
326 | | - s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) |
327 | | - |
328 | | - # Check if emergency braking is needed |
329 | | - if s_stop_normal > path_length: |
330 | | - if DEBUG: |
331 | | - print("[DEBUG] longitudinal_brake: Emergency braking needed!") |
332 | | - print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") |
333 | | - print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") |
334 | | - |
335 | | - # Calculate emergency braking parameters |
336 | | - T_stop = current_speed / emergency_decel |
337 | | - s_stop = current_speed * T_stop - 0.5 * emergency_decel * (T_stop ** 2) |
338 | | - |
339 | | - if DEBUG: |
340 | | - print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") |
341 | | - print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") |
342 | | - |
343 | | - decel_to_use = emergency_decel |
344 | | - |
345 | | - else: |
346 | | - if DEBUG: |
347 | | - print("[DEBUG] longitudinal_brake: Normal braking sufficient") |
348 | | - T_stop = T_stop_normal |
349 | | - decel_to_use = deceleration |
350 | | - |
351 | | - # Generate time points (use more points for smoother trajectory) |
352 | | - num_points = max(len(path.points), 50) |
353 | | - times = np.linspace(0, T_stop, num_points) |
354 | | - |
355 | | - # Calculate distances at each time point using physics equation |
356 | | - distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) |
357 | | - |
358 | | - # Generate points along the path |
359 | | - points = [] |
360 | | - for d in distances: |
361 | | - if d <= path_length: |
362 | | - points.append(path.eval(d)) |
363 | | - else: |
364 | | - points.append(path.eval(d)) |
365 | | - |
366 | | - if DEBUG: |
367 | | - print(f"[DEBUG] longitudinal_brake: Using deceleration of {decel_to_use:.2f} m/s²") |
368 | | - print(f"[DEBUG] longitudinal_brake: Final stopping time: {T_stop:.2f}s") |
369 | | - |
370 | | - return Trajectory(frame=path.frame, points=points, times=times.tolist()) |
| 13 | +DEBUG = False # Set to False to disable debug output |
371 | 14 |
|
372 | 15 |
|
373 | 16 | ''' |
@@ -425,7 +68,7 @@ def __init__(self): |
425 | 68 | self.deceleration = 2.0 |
426 | 69 | self.emergency_brake = 8.0 |
427 | 70 | # Parameters for end-of-route linear deceleration |
428 | | - self.end_stop_distance = 12.5 # Distance in meters to start linear deceleration |
| 71 | + self.end_stop_distance = 15 # Distance in meters to start linear deceleration |
429 | 72 | # Did 15, 10, 7.5, 17.5, 20, 12.5 |
430 | 73 | def state_inputs(self): |
431 | 74 | return ['all'] |
|
0 commit comments