1+ from ...utils import settings
2+ from ...state .vehicle import VehicleState ,ObjectFrameEnum
3+ from ...state .trajectory import Trajectory , Path
4+ from ...knowledge .vehicle .geometry import front2steer
5+ from ..component import Component
6+ import numpy as np
7+ import casadi
8+
9+ class MPCController (object ):
10+ """Model Predictive Controller for trajectory tracking."""
11+ def __init__ (self , T = None , dt = None ):
12+ self .T = T if T is not None else settings .get ('control.mpc.horizon' , 10 )
13+ self .dt = dt if dt is not None else settings .get ('control.mpc.dt' , 0.1 )
14+ self .L = settings .get ('vehicle.geometry.wheelbase' )
15+ self .v_bounds = [- settings .get ('vehicle.limits.max_reverse_speed' ), settings .get ('vehicle.limits.max_speed' )]
16+ self .delta_bounds = [settings .get ('vehicle.geometry.min_wheel_angle' ),settings .get ('vehicle.geometry.max_wheel_angle' )]
17+ self .steering_angle_range = [settings .get ('vehicle.geometry.min_steering_angle' ),settings .get ('vehicle.geometry.max_steering_angle' )]
18+ self .a_bounds = [- settings .get ('vehicle.limits.max_deceleration' ), settings .get ('vehicle.limits.max_acceleration' )]
19+ self .trajectory = None
20+ self .prev_x = None # Previous state trajectory
21+ self .prev_u = None # Previous control inputs
22+ self .path = None
23+ self .path_arg = None
24+
25+ # def set_path(self, trajectory: Trajectory):
26+ # """Set the trajectory for the MPC controller."""
27+ # # Assume the argument can only be trajectory
28+ # assert isinstance(trajectory, Trajectory), "Invalid trajectory type."
29+ # print("*"*10)
30+ # print("set_path")
31+ # self.trajectory = trajectory
32+
33+ def set_path (self , path : Path ):
34+ print ("Get new path!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" )
35+ if path == self .path_arg :
36+ return
37+ self .path_arg = path
38+ if len (path .points [0 ]) > 2 :
39+ path = path .get_dims ([0 ,1 ])
40+ self .path = path .arc_length_parameterize ()
41+ self .trajectory = self .path
42+ self .current_traj_parameter = self .trajectory .domain ()[0 ]
43+ self .current_path_parameter = 0.0
44+
45+ def compute (self , state : VehicleState , component : Component = None ):
46+ """Compute the control commands using MPC."""
47+
48+ if self .trajectory is not None :
49+ if self .trajectory .frame != state .pose .frame :
50+ print ("Transforming trajectory from" ,self .trajectory .frame .name ,"to" ,state .pose .frame .name )
51+ self .trajectory = self .trajectory .to_frame (state .pose .frame , current_pose = state .pose )
52+
53+ x0 = np .array ([state .pose .x , state .pose .y , state .pose .yaw , state .v ])
54+
55+ # Interpolate trajectory points to match MPC time horizon
56+ times = self .trajectory .times
57+ points = self .trajectory .points
58+ traj_points = []
59+ j = 0
60+ for i in range (self .T + 1 ):
61+ t_query = times [0 ] + i * self .dt
62+ if t_query <= times [0 ]:
63+ traj_points .append (points [0 ])
64+ elif t_query >= times [- 1 ]:
65+ traj_points .append (points [- 1 ])
66+ else :
67+ while j < len (times ) - 2 and times [j + 1 ] < t_query :
68+ j += 1
69+ alpha = (t_query - times [j ]) / (times [j + 1 ] - times [j ])
70+ pt = (1 - alpha ) * np .array (points [j ]) + alpha * np .array (points [j + 1 ])
71+ traj_points .append (pt )
72+
73+ # Optimization setup
74+ opti = casadi .Opti ()
75+ x = opti .variable (self .T + 1 , 4 ) # [x, y, theta, v]
76+ u = opti .variable (self .T , 2 ) # [a, delta]
77+
78+ def model (x , u ):
79+ """Dynamic model of the vehicle using kinematic bicycle model"""
80+ px , py , theta , v = x [0 ], x [1 ], x [2 ], x [3 ]
81+ a , delta = u [0 ], u [1 ]
82+ beta = casadi .atan (casadi .tan (delta ) / 2.0 )
83+ dx = v * casadi .cos (theta + beta )
84+ dy = v * casadi .sin (theta + beta )
85+ dtheta = v * casadi .tan (delta ) / self .L
86+ dv = a
87+ return casadi .vertcat (dx , dy , dtheta , dv )
88+
89+ obj = 0
90+ for t in range (self .T ):
91+ # Vehicle dynamics
92+ x_next = x [t ,:] + self .dt * model (x [t ,:], u [t ,:]).T
93+ opti .subject_to (x [t + 1 ,:] == x_next )
94+ target = traj_points [t + 1 ]
95+ # Cost function
96+ obj += casadi .sumsqr (x [t + 1 ,0 :2 ] - casadi .reshape (target [0 :2 ], 1 , 2 ))
97+ obj += 0.1 * casadi .sumsqr (u [t ,:])
98+
99+ # Initial condition
100+ opti .subject_to (x [0 , :] == casadi .reshape (x0 [:4 ], 1 , 4 ))
101+
102+ # Constraints
103+ for t in range (self .T ):
104+ opti .subject_to (opti .bounded (self .a_bounds [0 ], u [t ,0 ], self .a_bounds [1 ]))
105+ opti .subject_to (opti .bounded (self .delta_bounds [0 ], u [t ,1 ], self .delta_bounds [1 ]))
106+ opti .subject_to (opti .bounded (self .v_bounds [0 ], x [t + 1 ,3 ], self .v_bounds [1 ]))
107+
108+ # Initial guess
109+ if self .prev_x is not None and self .prev_u is not None :
110+ if len (self .prev_x ) == self .T + 1 and len (self .prev_u ) == self .T :
111+ opti .set_initial (x , np .vstack ((self .prev_x [1 :], self .prev_x [- 1 ])))
112+ opti .set_initial (u , np .vstack ((self .prev_u [1 :], self .prev_u [- 1 ])))
113+
114+ # Solver settings
115+ opti .minimize (obj )
116+ opti .solver ("ipopt" , {"expand" : True }, {"max_iter" : 100 })
117+
118+ try :
119+ # Solve the optimization problem
120+ sol = opti .solve ()
121+ acc = float (sol .value (u [0 ,0 ]))
122+ delta = float (sol .value (u [0 ,1 ]))
123+ self .prev_x = sol .value (x )
124+ self .prev_u = sol .value (u )
125+ print (acc , delta )
126+ return acc , delta
127+ except RuntimeError :
128+ # Handle optimization failure
129+ print ("MPC optimization failed." )
130+ return 0.0 , 0.0
131+
132+
133+ class MPCTrajectoryTracker (Component ):
134+ def __init__ (self , vehicle_interface = None , ** args ):
135+ self .mpc = MPCController (** args )
136+ self .vehicle_interface = vehicle_interface
137+ print (type (vehicle_interface ))
138+
139+ def rate (self ):
140+ return 5.0
141+
142+ def state_inputs (self ):
143+ return ['vehicle' , 'trajectory' ]
144+
145+ def state_outputs (self ):
146+ return []
147+
148+ def update (self , vehicle : VehicleState , trajectory : Trajectory ):
149+ self .mpc .set_path (trajectory )
150+ accel , delta = self .mpc .compute (vehicle , self )
151+
152+ # Clip acceleration and steering angle to vehicle limits
153+ accel = np .clip (accel , self .mpc .a_bounds [0 ], self .mpc .a_bounds [1 ])
154+ delta = np .clip (delta , self .mpc .delta_bounds [0 ], self .mpc .delta_bounds [1 ])
155+
156+ # Convert delta to steering angle
157+ steering_angle = np .clip (front2steer (delta ),
158+ self .mpc .steering_angle_range [0 ], self .mpc .steering_angle_range [1 ])
159+ self .vehicle_interface .send_command (self .vehicle_interface .simple_command (accel , steering_angle , vehicle ))
160+
161+ def healthy (self ):
162+ return self .mpc .path is not None
0 commit comments