1+ # File: GEMstack/onboard/planning/creep_planning.py
2+ from typing import List , Tuple
3+ import math
4+ from ..component import Component
5+ from ...state import AllState , VehicleState , EntityRelation , EntityRelationEnum , Path , Trajectory , Route , \
6+ ObjectFrameEnum
7+ from ...utils import serialization
8+ from ...mathutils import transforms
9+ import numpy as np
10+ from .parking_motion_planning import longitudinal_plan , longitudinal_brake
11+
12+
13+ DEBUG = False # Set to False to disable debug output
14+
15+
16+ '''
17+ # OTHER CREEP CONTROL CODE
18+
19+ def inverse_speed_function(distance_to_object):
20+
21+ # max_creep_speed = 1.0 # Maximum speed when creeping (m/s)
22+ # min_creep_speed = 0.2 # Minimum speed to maintain (m/s)
23+ # safe_distance = 5.0 # Distance at which to start slowing down (m)
24+
25+ # # Almost stop when very close (≤0.25m)
26+ # if distance_to_object <= 0.25:
27+ # return 0.0 # Practically zero speed
28+
29+ # # Slow creep when close (≤1.5m)
30+ # if distance_to_object <= 3.0:
31+ # return min_creep_speed
32+ # speed = min_creep_speed + (max_creep_speed - min_creep_speed) * (distance_to_object / safe_distance)
33+ # return min(speed, max_creep_speed)
34+
35+ def pid_speed_control(distance_to_object, target_distance, current_speed, prev_error, integral, dt,
36+ kp=0.5, ki=0.1, kd=0.05, min_speed=0.2, max_speed=1.0):
37+ if distance_to_object <= 0.25:
38+ return 0.0, 0.0, 0.0
39+ error = distance_to_object - target_distance
40+ integral += error * dt
41+ integral = max(-5.0, min(integral, 5.0)) # Prevent integral windup
42+ derivative = (error - prev_error) / dt if dt > 0 else 0
43+ p_term = kp * error
44+ i_term = ki * integral
45+ d_term = kd * derivative
46+ speed_adjustment = p_term + i_term + d_term
47+ base_speed = 0.5 # Base creep speed
48+ target_speed = base_speed + speed_adjustment
49+ target_speed = max(min_speed, min(target_speed, max_speed))
50+ return target_speed, error, integral
51+
52+
53+ '''
54+
55+
56+
57+ class CreepTrajectoryPlanner (Component ):
58+ """Follows the given route. Brakes if the ego–vehicle must yield
59+ (e.g. to a pedestrian) or if the end of the route is near; otherwise,
60+ it accelerates (or cruises) toward a desired speed.
61+ """
62+
63+ def __init__ (self ):
64+ self .route_progress = None
65+ self .t_last = None
66+ self .acceleration = 5
67+ self .desired_speed = 2.0
68+ self .deceleration = 2.0
69+ self .emergency_brake = 8.0
70+ # Parameters for end-of-route linear deceleration
71+ self .end_stop_distance = 12.5 # Distance in meters to start linear deceleration
72+ # Did 15, 10, 7.5, 17.5, 20, 12.5
73+ def state_inputs (self ):
74+ return ['all' ]
75+
76+ def state_outputs (self ) -> List [str ]:
77+ return ['trajectory' ]
78+
79+ def rate (self ):
80+ return 10.0
81+
82+ def check_end_of_route (self , route , current_parameter ):
83+ """Check if vehicle is approaching the end of the route and calculate
84+ appropriate linear deceleration parameters.
85+
86+ Args:
87+ route: The complete route
88+ current_parameter: Current position along the route
89+
90+ Returns:
91+ (bool, float): Tuple containing:
92+ - Whether the vehicle should start decelerating
93+ - Adjusted speed if deceleration needed, otherwise desired_speed
94+ """
95+ route_length = route .length ()
96+
97+ distance_remaining = route_length - current_parameter
98+
99+ if DEBUG :
100+ print (f"[DEBUG] check_end_of_route: Route length = { route_length } , "
101+ f"Current position = { current_parameter } , "
102+ f"Distance remaining = { distance_remaining } " )
103+
104+ if distance_remaining <= self .end_stop_distance :
105+ if distance_remaining > 0.1 : # Avoid division by very small numbers
106+ required_decel = (self .desired_speed ** 2 ) / (2 * distance_remaining )
107+
108+ linear_factor = distance_remaining / self .end_stop_distance
109+ adjusted_speed = self .desired_speed * linear_factor
110+ if DEBUG :
111+ print (f"[DEBUG] End deceleration active: { distance_remaining :.2f} m remaining, "
112+ f"required deceleration = { required_decel :.2f} m/s², "
113+ f"speed adjusted to { adjusted_speed :.2f} m/s" )
114+
115+ return True , adjusted_speed
116+ else :
117+ return True , 0.0
118+
119+ return False , self .desired_speed
120+
121+ def update (self , state : AllState ):
122+ vehicle = state .vehicle # type: VehicleState
123+ route = state .route # type: Route
124+ t = state .t
125+
126+ if DEBUG :
127+ print ("[DEBUG] CreepTrajectoryPlanner.update: t =" , t )
128+
129+ if self .t_last is None :
130+ self .t_last = t
131+ dt = t - self .t_last
132+ if DEBUG :
133+ print ("[DEBUG] CreepTrajectoryPlanner.update: dt =" , dt )
134+
135+ curr_x = vehicle .pose .x
136+ curr_y = vehicle .pose .y
137+ curr_v = vehicle .v
138+ if DEBUG :
139+ print (f"[DEBUG] CreepTrajectoryPlanner.update: Vehicle position = ({ curr_x } , { curr_y } ), speed = { curr_v } , " )
140+
141+ # Determine progress along the route.
142+ if self .route_progress is None :
143+ self .route_progress = 0.0
144+ _ , closest_parameter = route .closest_point_local (
145+ [curr_x , curr_y ],
146+ (self .route_progress - 5.0 , self .route_progress + 5.0 )
147+ )
148+ if DEBUG :
149+ print ("[DEBUG] CreepTrajectoryPlanner.update: Closest parameter on route =" , closest_parameter )
150+ self .route_progress = closest_parameter
151+
152+ # Check if approaching end of route and get adjusted speed
153+ approaching_end , target_speed = self .check_end_of_route (route , closest_parameter )
154+ if DEBUG and approaching_end :
155+ print ("[DEBUG] CreepTrajectoryPlanner.update: Vehicle is approaching end of route" )
156+ print (f"[DEBUG] CreepTrajectoryPlanner.update: Target speed = { target_speed } " )
157+
158+ # Extract a 10 m segment of the route for planning lookahead.
159+ route_with_lookahead = route .trim (closest_parameter , closest_parameter + 10.0 )
160+ if DEBUG :
161+ print ("[DEBUG] CreepTrajectoryPlanner.update: Route Lookahead =" , route_with_lookahead )
162+
163+ print ("[DEBUG] state" , state .relations )
164+ # Check whether any yield relations (e.g. due to pedestrians) require braking.
165+ stay_braking = False
166+ pointSet = set ()
167+ for i in range (len (route_with_lookahead .points )):
168+ if tuple (route_with_lookahead .points [i ]) in pointSet :
169+ stay_braking = True
170+ break
171+ pointSet .add (tuple (route_with_lookahead .points [i ]))
172+
173+ should_brake = any (
174+ r .type == EntityRelationEnum .STOPPING_AT and r .obj1 == ''
175+ for r in state .relations
176+ )
177+ should_decelerate = any (
178+ r .type == EntityRelationEnum .YIELDING and r .obj1 == ''
179+ for r in state .relations
180+ ) if should_brake == False else False
181+
182+ # If approaching end of route, override the target speed
183+ if approaching_end :
184+ should_accelerate = (not should_brake and not should_decelerate and curr_v < target_speed )
185+ else :
186+ should_accelerate = (not should_brake and not should_decelerate and curr_v < self .desired_speed )
187+
188+ if DEBUG :
189+ print ("[DEBUG] CreepTrajectoryPlanner.update: stay_braking =" , stay_braking )
190+ print ("[DEBUG] CreepTrajectoryPlanner.update: should_brake =" , should_brake )
191+ print ("[DEBUG] CreepTrajectoryPlanner.update: should_accelerate =" , should_accelerate )
192+ print ("[DEBUG] CreepTrajectoryPlanner.update: should_decelerate =" , should_decelerate )
193+ print ("[DEBUG] CreepTrajectoryPlanner.update: target_speed =" , target_speed if approaching_end else self .desired_speed )
194+
195+ if stay_braking :
196+ traj = longitudinal_brake (route_with_lookahead , 0.0 , 0.0 , 0.0 )
197+ if DEBUG :
198+ print ("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake (stay braking)." )
199+ elif should_brake :
200+ traj = longitudinal_brake (route_with_lookahead , self .emergency_brake , curr_v )
201+ if DEBUG :
202+ print ("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake." )
203+ elif should_decelerate :
204+ traj = longitudinal_brake (route_with_lookahead , self .deceleration , curr_v )
205+ if DEBUG :
206+ print ("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake." )
207+ elif approaching_end :
208+ # Use linear deceleration to stop at end of route
209+ traj = longitudinal_plan (route_with_lookahead , self .acceleration ,
210+ self .deceleration , target_speed , curr_v )
211+ if DEBUG :
212+ print ("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan with end-of-route deceleration." )
213+ elif should_accelerate :
214+ traj = longitudinal_plan (route_with_lookahead , self .acceleration ,
215+ self .deceleration , self .desired_speed , curr_v )
216+ if DEBUG :
217+ print ("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan (accelerate)." )
218+ else :
219+ # Maintain current speed if not accelerating or braking.
220+ traj = longitudinal_plan (route_with_lookahead , 0.0 , self .deceleration , self .desired_speed , curr_v )
221+ if DEBUG :
222+ print (
223+ "[DEBUG] CreepTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel)." )
224+
225+ self .t_last = t
226+ if DEBUG :
227+ print (f'[DEBUG] Current Velocity: { curr_v } , Target Speed: { target_speed if approaching_end else self .desired_speed } ' )
228+ print ("[DEBUG] CreepTrajectoryPlanner.update: Returning trajectory with" , len (traj .points ), "points." )
229+ return traj
0 commit comments