1+ from ...mathutils .control import PID
2+ from ...utils import settings
3+ from ...mathutils import transforms
4+ from ...knowledge .vehicle .dynamics import acceleration_to_pedal_positions
5+ from ...state .vehicle import VehicleState ,ObjectFrameEnum
6+ from ...state .trajectory import Path ,Trajectory ,compute_headings
7+ from ...knowledge .vehicle .geometry import front2steer
8+ from ..interface .gem import GEMVehicleCommand
9+ from ..component import Component
10+ import numpy as np
11+
12+ class PurePursuit (object ):
13+ """Implements a pure pursuit controller on a second-order Dubins vehicle."""
14+ def __init__ (self , lookahead = None , lookahead_scale = None , crosstrack_gain = None , desired_speed = None ):
15+ self .look_ahead = lookahead if lookahead is not None else settings .get ('control.pure_pursuit.lookahead' ,4.0 )
16+ self .look_ahead_scale = lookahead_scale if lookahead_scale is not None else settings .get ('control.pure_pursuit.lookahead_scale' ,3.0 )
17+ self .crosstrack_gain = crosstrack_gain if crosstrack_gain is not None else settings .get ('control.pure_pursuit.crosstrack_gain' ,0.41 )
18+ self .front_wheel_angle_scale = 3.0
19+ self .wheelbase = settings .get ('vehicle.geometry.wheelbase' )
20+ self .wheel_angle_range = [settings .get ('vehicle.geometry.min_wheel_angle' ),settings .get ('vehicle.geometry.max_wheel_angle' )]
21+ self .steering_angle_range = [settings .get ('vehicle.geometry.min_steering_angle' ),settings .get ('vehicle.geometry.max_steering_angle' )]
22+
23+ if desired_speed is not None :
24+ self .desired_speed_source = desired_speed
25+ else :
26+ self .desired_speed_source = settings .get ('control.pure_pursuit.desired_speed' ,"path" )
27+ self .desired_speed = self .desired_speed_source if isinstance (self .desired_speed_source ,(int ,float )) else None
28+ self .speed_limit = settings .get ('vehicle.limits.max_speed' )
29+ self .max_accel = settings .get ('vehicle.limits.max_acceleration' ) # m/s^2
30+ self .max_decel = settings .get ('vehicle.limits.max_deceleration' ) # m/s^2
31+ self .pid_speed = PID (settings .get ('control.longitudinal_control.pid_p' ,0.5 ), settings .get ('control.longitudinal_control.pid_d' ,0.0 ), settings .get ('control.longitudinal_control.pid_i' ,0.1 ), windup_limit = 20 )
32+
33+ self .path_arg = None
34+ self .path = None # type: Trajectory
35+ #if trajectory = None, then only an untimed path was provided and we can't use the path velocity as the reference
36+ self .trajectory = None # type: Trajectory
37+
38+ self .current_path_parameter = 0.0
39+ self .current_traj_parameter = 0.0
40+ self .t_last = None
41+
42+ def set_path (self , path : Path ):
43+ if path == self .path_arg :
44+ return
45+ self .path_arg = path
46+ if len (path .points [0 ]) > 2 :
47+ path = path .get_dims ([0 ,1 ])
48+ if not isinstance (path ,Trajectory ):
49+ self .path = path .arc_length_parameterize ()
50+ self .trajectory = None
51+ self .current_traj_parameter = 0.0
52+ if self .desired_speed_source in ['path' ,'trajectory' ]:
53+ raise ValueError ("Can't provide an untimed path to PurePursuit and expect it to use the path velocity. Set control.pure_pursuit.desired_speed to a constant." )
54+ else :
55+ self .path = path .arc_length_parameterize ()
56+ self .trajectory = path
57+ self .current_traj_parameter = self .trajectory .domain ()[0 ]
58+ self .current_path_parameter = 0.0
59+
60+ def compute (self , state : VehicleState , component : Component = None ):
61+ assert state .pose .frame != ObjectFrameEnum .CURRENT
62+ t = state .pose .t
63+
64+ if self .t_last is None :
65+ self .t_last = t
66+ dt = t - self .t_last
67+
68+ curr_x = state .pose .x
69+ curr_y = state .pose .y
70+ curr_yaw = state .pose .yaw if state .pose .yaw is not None else 0.0
71+ speed = state .v
72+
73+ if self .path is None :
74+ #just stop
75+ accel = self .pid_speed .advance (0.0 , t )
76+ # TODO
77+ raise RuntimeError ("Behavior without path not implemented" )
78+
79+ if self .path .frame != state .pose .frame :
80+ print ("Transforming path from" ,self .path .frame .name ,"to" ,state .pose .frame .name )
81+ self .path = self .path .to_frame (state .pose .frame , current_pose = state .pose )
82+ if self .trajectory is not None :
83+ if self .trajectory .frame != state .pose .frame :
84+ print ("Transforming trajectory from" ,self .trajectory .frame .name ,"to" ,state .pose .frame .name )
85+ self .trajectory = self .trajectory .to_frame (state .pose .frame , current_pose = state .pose )
86+
87+ closest_dist ,closest_parameter = self .path .closest_point_local ((curr_x ,curr_y ),[self .current_path_parameter - 5.0 ,self .current_path_parameter + 5.0 ])
88+ self .current_path_parameter = closest_parameter
89+ self .current_traj_parameter += dt
90+ #TODO: calculate parameter that is look_ahead distance away from the closest point?
91+ #(rather than just advancing the parameter)
92+
93+ #Stanley
94+ des_parameter = closest_parameter + self .look_ahead
95+
96+ #Pure pursuit
97+ #des_parameter = closest_parameter + self.look_ahead + self.look_ahead_scale * speed
98+
99+ print ("Closest parameter: " + str (closest_parameter ),"distance to path" ,closest_dist )
100+ if closest_dist > 0.1 :
101+ print ("Closest point" ,self .path .eval (closest_parameter ),"vs" ,(curr_x ,curr_y ))
102+ if des_parameter >= self .path .domain ()[1 ]:
103+ #we're at the end of the path, calculate desired point by extrapolating from the end of the path
104+ end_pt = self .path .points [- 1 ]
105+ if len (self .path .points ) > 1 :
106+ end_dir = self .path .eval_tangent (self .path .domain ()[1 ])
107+ else :
108+ #path is just a single point, just look at current direction
109+ end_dir = (np .cos (curr_yaw ),np .sin (curr_yaw ))
110+ desired_x ,desired_y = transforms .vector_madd (end_pt ,end_dir ,(des_parameter - self .path .domain ()[1 ]))
111+ else :
112+ desired_x ,desired_y = self .path .eval (des_parameter )
113+ desired_yaw = np .arctan2 (desired_y - curr_y ,desired_x - curr_x )
114+ #print("Desired point",(desired_x,desired_y)," with lookahead distance",self.look_ahead + self.look_ahead_scale * speed)
115+ #print("Current yaw",curr_yaw,"desired yaw",desired_yaw)
116+
117+
118+ # distance between the desired point and the vehicle
119+ L = transforms .vector2_dist ((desired_x ,desired_y ),(curr_x ,curr_y ))
120+
121+ # find the curvature and the angle
122+ alpha = transforms .normalize_angle (desired_yaw - curr_yaw )
123+ if alpha > np .pi :
124+ alpha -= np .pi * 2
125+
126+ ## STANLEY
127+ closest_x ,closest_y = self .path .eval (closest_parameter )
128+
129+
130+ e_numerator = (desired_x - closest_x ) * (closest_y - curr_y ) - (closest_x - curr_x ) * (desired_y - closest_y )
131+
132+ e_denominator = np .sqrt ((desired_x - closest_x )** 2 + (desired_y - closest_y )** 2 )
133+
134+ e = e_numerator / e_denominator
135+
136+ k = self .crosstrack_gain
137+
138+ crosstrack_steering = np .arctan2 (k * e , speed )
139+ heading_error = alpha # change to (desired_to_closest_yaw - curr_yaw)
140+
141+ angle_i = alpha + crosstrack_steering
142+ angle = angle_i * self .front_wheel_angle_scale
143+
144+ ## Pure pursuit
145+ # ----------------- tuning this part as needed -----------------
146+
147+ # k = self.crosstrack_gain
148+ # angle_i = np.arctan((k * 2 * self.wheelbase * np.sin(alpha)) / L)
149+ # angle = angle_i*self.front_wheel_angle_scale
150+ # ----------------- tuning this part as needed -----------------
151+
152+ f_delta = np .clip (angle , self .wheel_angle_range [0 ], self .wheel_angle_range [1 ])
153+
154+ #print("Closest point distance: " + str(L))
155+ print ("Forward velocity: " + str (speed ))
156+ ct_error = np .sin (alpha ) * L
157+ print ("Crosstrack Error: " + str (round (ct_error ,3 )))
158+ print ("Front wheel angle: " + str (round (np .degrees (f_delta ),2 )) + " degrees" )
159+ steering_angle = np .clip (front2steer (f_delta ), self .steering_angle_range [0 ], self .steering_angle_range [1 ])
160+ print ("Steering wheel angle: " + str (round (np .degrees (steering_angle ),2 )) + " degrees" )
161+
162+ desired_speed = self .desired_speed
163+ feedforward_accel = 0.0
164+ if self .desired_speed_source in ['path' ,'trajectory' ]:
165+ #determine desired speed from trajectory
166+ if len (self .trajectory .points ) < 2 or self .current_path_parameter >= self .path .domain ()[1 ]:
167+ if component is not None :
168+ component .debug_event ('Past the end of trajectory' )
169+ #past the end, just stop
170+ desired_speed = 0.0
171+ feedforward_accel = - 2.0
172+ else :
173+ if self .desired_speed_source == 'path' :
174+ current_trajectory_time = self .trajectory .parameter_to_time (self .current_path_parameter )
175+ else :
176+ current_trajectory_time = self .current_traj_parameter
177+ deriv = self .trajectory .eval_derivative (current_trajectory_time )
178+ desired_speed = min (np .linalg .norm (deriv ),self .speed_limit )
179+ difference_dt = 0.1
180+ if current_trajectory_time >= self .trajectory .domain ()[1 ]:
181+ prev_deriv = self .trajectory .eval_derivative (current_trajectory_time - difference_dt )
182+ prev_desired_speed = min (np .linalg .norm (prev_deriv ),self .speed_limit )
183+ feedforward_accel = (desired_speed - prev_desired_speed )/ difference_dt
184+ print ("Desired speed" ,desired_speed ,"m/s" ,", from prior" ,prev_desired_speed ,"m/s" )
185+ else :
186+ next_deriv = self .trajectory .eval_derivative (current_trajectory_time + difference_dt )
187+ next_desired_speed = min (np .linalg .norm (next_deriv ),self .speed_limit )
188+ feedforward_accel = (next_desired_speed - desired_speed )/ difference_dt
189+ print ("Desired speed" ,desired_speed ,"m/s" ,", trying to reach desired" ,next_desired_speed ,"m/s" )
190+ feedforward_accel = np .clip (feedforward_accel , - self .max_decel , self .max_accel )
191+ print ("Feedforward accel: " + str (feedforward_accel ) + " m/s^2" )
192+ else :
193+ #decay speed when crosstrack error is high
194+ desired_speed *= np .exp (- abs (ct_error )* 0.4 )
195+ if desired_speed > self .speed_limit :
196+ desired_speed = self .speed_limit
197+ output_accel = self .pid_speed .advance (e = desired_speed - speed , t = t , feedforward_term = feedforward_accel )
198+ if component is not None :
199+ component .debug ('curr pt' ,(curr_x ,curr_y ))
200+ component .debug ('curr param' ,self .current_path_parameter )
201+ component .debug ('desired pt' ,(desired_x ,desired_y ))
202+ component .debug ('desired yaw (rad)' ,desired_yaw )
203+ component .debug ('crosstrack error' ,ct_error )
204+ component .debug ('front wheel angle (rad)' ,f_delta )
205+ component .debug ('desired speed (m/s)' ,desired_speed )
206+ component .debug ('feedforward accel (m/s^2)' ,feedforward_accel )
207+ component .debug ('output accel (m/s^2)' ,output_accel )
208+ print ("Output accel: " + str (output_accel ) + " m/s^2" )
209+
210+ if output_accel > self .max_accel :
211+ output_accel = self .max_accel
212+
213+ if output_accel < - self .max_decel :
214+ output_accel = - self .max_decel
215+
216+ self .t_last = t
217+ return (output_accel , f_delta )
218+
219+
220+ class PurePursuitTrajectoryTracker (Component ):
221+ def __init__ (self ,vehicle_interface = None , ** args ):
222+ self .pure_pursuit = PurePursuit (** args )
223+ self .vehicle_interface = vehicle_interface
224+
225+ def rate (self ):
226+ return 50.0
227+
228+ def state_inputs (self ):
229+ return ['vehicle' ,'trajectory' ]
230+
231+ def state_outputs (self ):
232+ return []
233+
234+ def update (self , vehicle : VehicleState , trajectory : Trajectory ):
235+ self .pure_pursuit .set_path (trajectory )
236+ accel ,wheel_angle = self .pure_pursuit .compute (vehicle , self )
237+ #print("Desired wheel angle",wheel_angle)
238+ steering_angle = np .clip (front2steer (wheel_angle ), self .pure_pursuit .steering_angle_range [0 ], self .pure_pursuit .steering_angle_range [1 ])
239+ #print("Desired steering angle",steering_angle)
240+ self .vehicle_interface .send_command (self .vehicle_interface .simple_command (accel ,steering_angle , vehicle ))
241+
242+ def healthy (self ):
243+ return self .pure_pursuit .path is not None
0 commit comments