11import os
2- from typing import List
2+ from typing import Dict , List
33
44import numpy as np
55from GEMstack .onboard .component import Component
6+ from GEMstack .state .agent import AgentState
67from GEMstack .state .all import AllState
78from GEMstack .state .physical_object import ObjectFrameEnum
89from GEMstack .state .route import PlannerEnum , Route
10+ from .rrt_star import RRTStar
11+ import rospy
12+ from sensor_msgs .msg import Image
13+ from cv_bridge import CvBridge
914
1015
16+ import cv2
17+
18+ ORIGIN_PX = (190 , 80 )
19+ SCALE_PX_PER_M = 6.5 # px per meter
1120
1221class RoutePlanningComponent (Component ):
1322 """Reads a route from disk and returns it as the desired route."""
1423 def __init__ (self ):
1524 print ("Route Planning Component init" )
25+ self .planner = None
26+ self .route = None
27+ self .bridge = CvBridge ()
28+ self .img_pub = rospy .Publisher ("/image_with_car_xy" , Image , queue_size = 1 )
1629
1730 def state_inputs (self ):
1831 return ["all" ]
@@ -22,27 +35,151 @@ def state_outputs(self) -> List[str]:
2235
2336 def rate (self ):
2437 return 10.0
38+
39+ def _car_to_pixel (self , x , y , img_w , img_h ):
40+ # (x,y)[m] → (u,v)[px]
41+ u0 , v0 = ORIGIN_PX
42+ u = int (round (u0 + SCALE_PX_PER_M * x ))
43+ v = int (round (v0 - SCALE_PX_PER_M * y ))
44+
45+ # clamp to image bounds
46+ u = max (0 , min (u , img_w - 1 ))
47+ v = max (0 , min (v , img_h - 1 ))
48+ return u , v
49+
50+ def _pixel_to_car (self , u , v , img_w , img_h ):
51+ # clamp to image bounds
52+ u = max (0 , min (u , img_w - 1 ))
53+ v = max (0 , min (v , img_h - 1 ))
54+
55+ # origin and scale (same as in _car_to_pixel)
56+ u0 , v0 = ORIGIN_PX
57+ scale = SCALE_PX_PER_M
58+
59+ # invert:
60+ # u = u0 + scale * x → x = (u - u0) / scale
61+ # v = v0 - scale * y → y = (v0 - v) / scale
62+ x = (u - u0 ) / scale
63+ y = (v0 - v ) / scale
64+
65+ return x , y
66+
67+ def visualize_route_pixels (self , route_pts , start_pt , goal_pt ):
68+ """
69+ route_pts: list of (u, v) in pixels
70+ start_pt: (u, v) in pixels
71+ goal_pt: (u, v) in pixels
72+ """
73+ # 1. Copy the base image
74+ script_dir = os .path .dirname (os .path .abspath (__file__ ))
75+ frame_path = os .path .join (script_dir , "out.pgm" )
76+ frame = cv2 .imread (frame_path , cv2 .IMREAD_COLOR )
77+ img_h , img_w = frame .shape [:2 ]
78+ print ("frame shape" , frame .shape )
79+ print ("frame dtype" , frame .dtype )
80+
81+ # 2. Optionally clamp all points to image bounds
82+ def clamp (pt ):
83+ u , v = pt
84+ u = max (0 , min (int (round (u )), img_w - 1 ))
85+ v = max (0 , min (int (round (v )), img_h - 1 ))
86+ return (u , v )
87+
88+ pts = np .array ([clamp (p ) for p in route_pts ], dtype = np .int32 ).reshape (- 1 , 1 , 2 )
89+
90+ # 3. Draw the route polyline in green
91+ cv2 .polylines (
92+ frame ,
93+ [pts ],
94+ isClosed = False ,
95+ color = (0 , 255 , 0 ),
96+ thickness = 2
97+ )
98+
99+ # 4. Draw start marker in blue (star)
100+ u_s , v_s = clamp (start_pt )
101+ cv2 .drawMarker (
102+ frame ,
103+ (u_s , v_s ),
104+ color = (255 , 0 , 0 ),
105+ markerType = cv2 .MARKER_STAR ,
106+ markerSize = 20 ,
107+ thickness = 3
108+ )
109+
110+ # 5. Draw goal marker in red (tilted cross)
111+ u_g , v_g = clamp (goal_pt )
112+ cv2 .drawMarker (
113+ frame ,
114+ (u_g , v_g ),
115+ color = (0 , 0 , 255 ),
116+ markerType = cv2 .MARKER_TILTED_CROSS ,
117+ markerSize = 20 ,
118+ thickness = 3
119+ )
120+
121+ # 6. Publish via ROS
122+ out = self .bridge .cv2_to_imgmsg (frame , "bgr8" )
123+ print ("Publishing out: " , out )
124+ out .header .stamp = rospy .Time .now ()
125+ self .img_pub .publish (out )
126+
127+
128+
25129
26130 def update (self , state : AllState ):
27- print ("Route Planner's mission:" , state .mission_plan )
131+ print ("Route Planner's mission:" , state .mission_plan .planner_type .value )
132+ print ("type of mission plan:" , type (PlannerEnum .RRT_STAR ))
133+ print ("Route Planner's mission:" , state .mission_plan .planner_type .value == PlannerEnum .RRT_STAR .value )
134+ print ("Route Planner's mission:" , state .mission_plan .planner_type .value == PlannerEnum .PARKING .value )
135+ print ("Mission plan:" , state .mission_plan )
28136 print ("Vehicle x:" , state .vehicle .pose .x )
29137 print ("Vehicle y:" , state .vehicle .pose .y )
30138 print ("Vehicle yaw:" , state .vehicle .pose .yaw )
31- if state .mission_plan .planner_type == PlannerEnum .PARKING :
32- print ("Parking mode" )
33- elif state .mission_plan .planner_type == PlannerEnum .RRT_STAR :
139+ if state .mission_plan .planner_type .value == PlannerEnum .PARKING .value :
140+ print ("I am in PARKING mode" )
141+ # Return a route after doing some processing based on mission plan REMOVE ONCE OTHER PLANNERS ARE IMPLEMENTED
142+ base_path = os .path .dirname (__file__ )
143+ file_path = os .path .join (base_path , "../../knowledge/routes/forward_15m_extra.csv" )
144+
145+ waypoints = np .loadtxt (file_path , delimiter = ',' , dtype = float )
146+ if waypoints .shape [1 ] == 3 :
147+ waypoints = waypoints [:,:2 ]
148+ print ("waypoints" , waypoints )
149+ self .route = Route (frame = ObjectFrameEnum .START ,points = waypoints .tolist ())
150+ elif state .mission_plan .planner_type .value == PlannerEnum .RRT_STAR .value :
151+ print ("I am in RRT mode" )
152+ # start = (state.vehicle.pose.x, state.vehicle.pose.y)
153+ script_dir = os .path .dirname (os .path .abspath (__file__ ))
154+ map_path = os .path .join (script_dir , "out.pgm" )
155+
156+ print ("map_path" , map_path )
157+
158+ map_img = cv2 .imread (map_path , cv2 .IMREAD_UNCHANGED )
159+ occupancy_grid = (map_img > 0 ).astype (np .uint8 ) # (590, 1656)
160+ x_bounds = (0 ,occupancy_grid .shape [1 ])
161+ y_bounds = (0 ,occupancy_grid .shape [0 ])
162+ start_u_x , start_u_y = self ._car_to_pixel (state .vehicle .pose .x , state .vehicle .pose .y , occupancy_grid .shape [1 ], occupancy_grid .shape [0 ])
163+ start = (start_u_x , start_u_y )
164+ goal_u_x , goal_u_y = self ._car_to_pixel (state .mission_plan .goal_x , state .mission_plan .goal_y , occupancy_grid .shape [1 ], occupancy_grid .shape [0 ])
165+ goal = (goal_u_x , goal_u_y ) # When we implement kinodynamic, we need to include target_yaw also
166+ step_size = 1.0
167+ max_iter = 2000
168+
169+ self .planner = RRTStar (start , goal , x_bounds , y_bounds , max_iter = max_iter , step_size = step_size , vehicle_width = 1 , occupancy_grid = occupancy_grid )
34170 print ("RRT mode" )
171+ rrt_resp = self .planner .plan ()
172+ self .visualize_route_pixels (rrt_resp , start , goal )
173+ for i in range (len (rrt_resp )):
174+ x , y = rrt_resp [i ]
175+ # Convert to car coordinates
176+ car_x , car_y = self ._pixel_to_car (x , y , occupancy_grid .shape [1 ], occupancy_grid .shape [0 ])
177+ rrt_resp [i ] = (car_x , car_y )
178+
179+ self .route = Route (frame = ObjectFrameEnum .START , points = rrt_resp )
180+ # print("Route planning complete")
181+
35182 else :
36183 print ("Unknown mode" )
37184
38- ## Return a route after doing some processing based on mission plan REMOVE ONCE OTHER PLANNERS ARE IMPLEMENTED
39- base_path = os .path .dirname (__file__ )
40- file_path = os .path .join (base_path , "../../knowledge/routes/forward_15m.csv" )
41-
42- waypoints = np .loadtxt (file_path , delimiter = ',' , dtype = float )
43- if waypoints .shape [1 ] == 3 :
44- waypoints = waypoints [:,:2 ]
45- print ("waypoints" , waypoints )
46- route = Route (frame = ObjectFrameEnum .START ,points = waypoints .tolist ())
47- print ("route" , route )
48- return route
185+ return self .route
0 commit comments