77from ...utils import serialization
88from ...mathutils import transforms
99import numpy as np
10- from .parking_motion_planning import longitudinal_plan , longitudinal_brake
11-
10+ from .parking_motion_planning import longitudinal_plan as lp_fn , longitudinal_brake as lb_fn
1211
1312DEBUG = False # Set to False to disable debug output
1413
1514
16- '''
17- # OTHER CREEP CONTROL CODE
18-
19- def inverse_speed_function(distance_to_object):
20-
21- # max_creep_speed = 1.0 # Maximum speed when creeping (m/s)
22- # min_creep_speed = 0.2 # Minimum speed to maintain (m/s)
23- # safe_distance = 5.0 # Distance at which to start slowing down (m)
24-
25- # # Almost stop when very close (≤0.25m)
26- # if distance_to_object <= 0.25:
27- # return 0.0 # Practically zero speed
28-
29- # # Slow creep when close (≤1.5m)
30- # if distance_to_object <= 3.0:
31- # return min_creep_speed
32- # speed = min_creep_speed + (max_creep_speed - min_creep_speed) * (distance_to_object / safe_distance)
33- # return min(speed, max_creep_speed)
34-
35- def pid_speed_control(distance_to_object, target_distance, current_speed, prev_error, integral, dt,
36- kp=0.5, ki=0.1, kd=0.05, min_speed=0.2, max_speed=1.0):
37- if distance_to_object <= 0.25:
38- return 0.0, 0.0, 0.0
39- error = distance_to_object - target_distance
40- integral += error * dt
41- integral = max(-5.0, min(integral, 5.0)) # Prevent integral windup
42- derivative = (error - prev_error) / dt if dt > 0 else 0
43- p_term = kp * error
44- i_term = ki * integral
45- d_term = kd * derivative
46- speed_adjustment = p_term + i_term + d_term
47- base_speed = 0.5 # Base creep speed
48- target_speed = base_speed + speed_adjustment
49- target_speed = max(min_speed, min(target_speed, max_speed))
50- return target_speed, error, integral
51-
52-
53- '''
54-
55-
56-
5715class CreepTrajectoryPlanner (Component ):
5816 """Follows the given route. Brakes if the ego–vehicle must yield
5917 (e.g. to a pedestrian) or if the end of the route is near; otherwise,
6018 it accelerates (or cruises) toward a desired speed.
6119 """
6220
6321 def __init__ (self ):
64- self .route_progress = None
65- self .t_last = None
66- self .acceleration = 5
67- self .desired_speed = 2.0
68- self .deceleration = 2.0
69- self .emergency_brake = 8.0
70- # Parameters for end-of-route linear deceleration
71- self .end_stop_distance = 12.5 # Distance in meters to start linear deceleration
72- # Did 15, 10, 7.5, 17.5, 20, 12.5
22+ self .route_index = None
23+ self .time_prev = None
24+ self .accel_gain = 5
25+ self .target_vel = 2.0
26+ self .decel_gain = 2.0
27+ self .full_brake = 8.0
28+ self .stop_dist_threshold = 10 # Distance in meters to start linear deceleration
29+
7330 def state_inputs (self ):
7431 return ['all' ]
7532
@@ -79,103 +36,91 @@ def state_outputs(self) -> List[str]:
7936 def rate (self ):
8037 return 10.0
8138
82- def check_end_of_route (self , route , current_parameter ):
83- """Check if vehicle is approaching the end of the route and calculate
84- appropriate linear deceleration parameters.
85-
86- Args:
87- route: The complete route
88- current_parameter: Current position along the route
89-
90- Returns:
91- (bool, float): Tuple containing:
92- - Whether the vehicle should start decelerating
93- - Adjusted speed if deceleration needed, otherwise desired_speed
94- """
95- route_length = route .length ()
96-
97- distance_remaining = route_length - current_parameter
98-
39+ def is_near_route_end (self , given_route , pos_param ):
40+ route_len = given_route .length ()
41+ remaining_dist = route_len - pos_param
42+
9943 if DEBUG :
100- print (f"[DEBUG] check_end_of_route: Route length = { route_length } , "
101- f"Current position = { current_parameter } , "
102- f"Distance remaining = { distance_remaining } " )
103-
104- if distance_remaining <= self .end_stop_distance :
105- if distance_remaining > 0.1 : # Avoid division by very small numbers
106- required_decel = (self .desired_speed ** 2 ) / (2 * distance_remaining )
107-
108- linear_factor = distance_remaining / self .end_stop_distance
109- adjusted_speed = self .desired_speed * linear_factor
44+ print (f"[DEBUG] is_near_route_end: Route length = { route_len } , "
45+ f"Current position = { pos_param } , "
46+ f"Distance remaining = { remaining_dist } " )
47+
48+ if remaining_dist <= self .stop_dist_threshold :
49+ if remaining_dist > 0.1 :
50+ req_decel = (self .target_vel ** 2 ) / (2 * remaining_dist )
51+ linear_scale = remaining_dist / self .stop_dist_threshold
52+ speed_mod = self .target_vel * linear_scale
11053 if DEBUG :
111- print (f"[DEBUG] End deceleration active: { distance_remaining :.2f} m remaining, "
112- f"required deceleration = { required_decel :.2f} m/s², "
113- f"speed adjusted to { adjusted_speed :.2f} m/s" )
114-
115- return True , adjusted_speed
54+ print (f"[DEBUG] Linear decel active: { remaining_dist :.2f} m left, "
55+ f"required deceleration = { req_decel :.2f} m/s², "
56+ f"adjusted speed = { speed_mod :.2f} m/s" )
57+ return True , speed_mod
11658 else :
11759 return True , 0.0
118-
119- return False , self .desired_speed
120-
121- def update (self , state : AllState ):
122- vehicle = state .vehicle # type: VehicleState
123- route = state .route # type: Route
124- t = state .t
125- if self .t_last is None :
126- self .t_last = t
127- dt = t - self .t_last
128- curr_x = vehicle .pose .x
129- curr_y = vehicle .pose .y
130- curr_v = vehicle .v
131- if self .route_progress is None :
132- self .route_progress = 0.0
133- _ , closest_parameter = route .closest_point_local (
134- [curr_x , curr_y ],
135- (self .route_progress - 5.0 , self .route_progress + 5.0 )
60+
61+ return False , self .target_vel
62+
63+ def update (self , full_state : AllState ):
64+ ego = full_state .vehicle # type: VehicleState
65+ nav_path = full_state .route # type: Route
66+ current_time = full_state .t
67+
68+ if self .time_prev is None :
69+ self .time_prev = current_time
70+ delta_time = current_time - self .time_prev
71+
72+ ego_x = ego .pose .x
73+ ego_y = ego .pose .y
74+ ego_v = ego .v
75+
76+ if self .route_index is None :
77+ self .route_index = 0.0
78+
79+ _ , new_param = nav_path .closest_point_local (
80+ [ego_x , ego_y ],
81+ (self .route_index - 5.0 , self .route_index + 5.0 )
13682 )
137- self .route_progress = closest_parameter
138- approaching_end , target_speed = self .check_end_of_route ( route , closest_parameter )
139- route_with_lookahead = route .trim (closest_parameter , closest_parameter + 10.0 )
140-
141- stay_braking = False
142- pointSet = set ()
143- for i in range (len (route_with_lookahead .points )):
144- if tuple (route_with_lookahead .points [i ]) in pointSet :
145- stay_braking = True
83+ self .route_index = new_param
84+ near_end , speed_setpoint = self .is_near_route_end ( nav_path , new_param )
85+ trimmed_path = nav_path .trim (new_param , new_param + 10.0 )
86+
87+ duplicate_detected = False
88+ unique_pts = set ()
89+ for j in range (len (trimmed_path .points )):
90+ if tuple (trimmed_path .points [j ]) in unique_pts :
91+ duplicate_detected = True
14692 break
147- pointSet .add (tuple (route_with_lookahead .points [i ]))
93+ unique_pts .add (tuple (trimmed_path .points [j ]))
14894
149- should_brake = any (
150- r .type == EntityRelationEnum .STOPPING_AT and r .obj1 == ''
151- for r in state .relations
95+ brake_trigger = any (
96+ rel .type == EntityRelationEnum .STOPPING_AT and rel .obj1 == ''
97+ for rel in full_state .relations
15298 )
153- should_decelerate = any (
154- r .type == EntityRelationEnum .YIELDING and r .obj1 == ''
155- for r in state .relations
156- ) if should_brake == False else False
99+ yield_trigger = any (
100+ rel .type == EntityRelationEnum .YIELDING and rel .obj1 == ''
101+ for rel in full_state .relations
102+ ) if not brake_trigger else False
157103
158- if approaching_end :
159- should_accelerate = (not should_brake and not should_decelerate and curr_v < target_speed )
104+ if near_end :
105+ accelerate = (not brake_trigger and not yield_trigger and ego_v < speed_setpoint )
160106 else :
161- should_accelerate = (not should_brake and not should_decelerate and curr_v < self .desired_speed )
162-
163- if stay_braking :
164- traj = longitudinal_brake (route_with_lookahead , 0.0 , 0.0 , 0.0 )
165- elif should_brake :
166- traj = longitudinal_brake (route_with_lookahead , self .emergency_brake , curr_v )
167- elif should_decelerate :
168- traj = longitudinal_brake (route_with_lookahead , self .deceleration , curr_v )
169- elif approaching_end :
170- # Use linear deceleration to stop at end of route
171- traj = longitudinal_plan (route_with_lookahead , self .acceleration ,
172- self .deceleration , target_speed , curr_v )
173- elif should_accelerate :
174- traj = longitudinal_plan (route_with_lookahead , self .acceleration ,
175- self .deceleration , self .desired_speed , curr_v )
107+ accelerate = (not brake_trigger and not yield_trigger and ego_v < self .target_vel )
108+
109+ if duplicate_detected :
110+ trajectory = lb_fn (trimmed_path , 0.0 , 0.0 , 0.0 )
111+ elif brake_trigger :
112+ trajectory = lb_fn (trimmed_path , self .full_brake , ego_v )
113+ elif yield_trigger :
114+ trajectory = lb_fn (trimmed_path , self .decel_gain , ego_v )
115+ elif near_end :
116+ trajectory = lp_fn (trimmed_path , self .accel_gain ,
117+ self .decel_gain , speed_setpoint , ego_v )
118+ elif accelerate :
119+ trajectory = lp_fn (trimmed_path , self .accel_gain ,
120+ self .decel_gain , self .target_vel , ego_v )
176121 else :
177- # Maintain current speed if not accelerating or braking.
178- traj = longitudinal_plan (route_with_lookahead , 0.0 , self .deceleration , self .desired_speed , curr_v )
122+ trajectory = lp_fn (trimmed_path , 0.0 , self .decel_gain , self .target_vel , ego_v )
123+
124+ self .time_prev = current_time
125+ return trajectory
179126
180- self .t_last = t
181- return traj
0 commit comments