From d6018aeb84da6437dc5bc3341e382af613eb4d2e Mon Sep 17 00:00:00 2001 From: "sanjay.pokkali99" Date: Wed, 12 Feb 2025 00:20:26 -0600 Subject: [PATCH 01/11] base implementation Part 1 --- .../onboard/planning/longitudinal_planning.py | 231 ++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 GEMstack/onboard/planning/longitudinal_planning.py diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py new file mode 100644 index 000000000..f6048a835 --- /dev/null +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -0,0 +1,231 @@ +# File: GEMstack/onboard/planning/longitudinal_planning.py +from typing import List +import math +from ..component import Component +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum +from ...utils import serialization +from ...mathutils import transforms +import numpy as np +DEBUG = True # Set to False to disable debug output + +def compute_cumulative_distances(points: List[List[float]]) -> List[float]: + s_vals = [0.0] + for i in range(1, len(points)): + dx = points[i][0] - points[i-1][0] + dy = points[i][1] - points[i-1][1] + ds = math.hypot(dx, dy) + s_vals.append(s_vals[-1] + ds) + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + return s_vals + +def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_speed: float, current_speed: float) -> Trajectory: + # Reparameterize the path by arc length (to get a timed trajectory) + path_normalized: Trajectory = path.arc_length_parameterize() + points = list(path_normalized.points) + s_vals = compute_cumulative_distances(points) + L = s_vals[-1] + if DEBUG: + print("[DEBUG] longitudinal_plan: Total path length L =", L) + print("[DEBUG] longitudinal_plan: Braking distance needed =", (current_speed**2) / (2 * deceleration)) + + if acceleration <= 0: + # Compute deceleration phase: the distance needed to decelerate to 0. + s_decel = (current_speed**2) / (2 * deceleration) + T_decel = current_speed / deceleration + times = [] + for s in s_vals: + if s <= s_decel: + # Compute the instantaneous speed at distance s: + v = math.sqrt(max(current_speed**2 - 2 * deceleration * s, 0.0)) + t_point = (current_speed - v) / deceleration + else: + t_point = T_decel + times.append(t_point) + if DEBUG: + print("[DEBUG] longitudinal_plan (accel==0): Final times =", times) + return Trajectory(frame=path.frame, points=points, times=times) + + # Otherwise, follow the full trapezoidal profile. + # Determine the highest achievable speed given the available distance. + v_peak_possible = math.sqrt((2 * acceleration * deceleration * L + deceleration * current_speed**2) / (acceleration + deceleration)) + v_target = min(max_speed, v_peak_possible) + if DEBUG: + print("[DEBUG] longitudinal_plan: v_peak_possible =", v_peak_possible, "v_target =", v_target) + + # Calculate acceleration phase (if any) + if v_target > current_speed: + s_accel = (v_target**2 - current_speed**2) / (2 * acceleration) + t_accel = (v_target - current_speed) / acceleration + else: + s_accel = 0.0 + t_accel = 0.0 + + # Calculate deceleration phase (from v_target to 0) + s_decel = (v_target**2) / (2 * deceleration) + t_decel = v_target / deceleration + + # Compute the cruise (constant-speed) segment, if any. + s_cruise = max(0.0, L - s_accel - s_decel) + t_cruise = s_cruise / v_target if v_target > 0 else 0.0 + + if DEBUG: + print("[DEBUG] longitudinal_plan: s_accel =", s_accel, "t_accel =", t_accel) + print("[DEBUG] longitudinal_plan: s_decel =", s_decel, "t_decel =", t_decel) + print("[DEBUG] longitudinal_plan: s_cruise =", s_cruise, "t_cruise =", t_cruise) + + # For each point along the path (by its cumulative distance s), compute its time stamp. + times = [] + for s in s_vals: + if s <= s_accel: + # Acceleration phase: v^2 = current_speed^2 + 2*a*s. + v = math.sqrt(current_speed**2 + 2 * acceleration * s) + t_point = (v - current_speed) / acceleration + if DEBUG: + print(f"[DEBUG] longitudinal_plan (accel): s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + elif s <= s_accel + s_cruise: + # Cruise phase. + t_point = t_accel + (s - s_accel) / v_target + if DEBUG: + print(f"[DEBUG] longitudinal_plan (cruise): s = {s:.2f}, t = {t_point:.2f}") + else: + # Deceleration phase. + s_decel_phase = s - s_accel - s_cruise + under_sqrt = max(v_target**2 - 2 * deceleration * s_decel_phase, 0.0) + t_dec = (v_target - math.sqrt(under_sqrt)) / deceleration + t_point = t_accel + t_cruise + t_dec + if DEBUG: + print(f"[DEBUG] longitudinal_plan (decel): s = {s:.2f}, t_dec = {t_dec:.2f}, t = {t_point:.2f}") + times.append(t_point) + + if DEBUG: + print("[DEBUG] longitudinal_plan: Final times =", times) + + return Trajectory(frame=path.frame, points=points, times=times) + + +def longitudinal_brake(path: Path, deceleration: float, current_speed: float) -> Trajectory: + # Vehicle already stopped - maintain position + if current_speed <= 0: + return Trajectory( + frame=path.frame, + points=[path.points[0]] * len(path.points), + times=[float(i) for i in range(len(path.points))] + ) + + # Calculate stopping time and distance + T_stop = current_speed / deceleration + s_stop = current_speed * T_stop - 0.5 * deceleration * (T_stop ** 2) + + # Get total path length + path_length = sum( + np.linalg.norm(np.array(path.points[i+1]) - np.array(path.points[i])) + for i in range(len(path.points)-1) + ) + + # Generate time points (use more points for smoother trajectory) + num_points = max(len(path.points), 50) + times = np.linspace(0, T_stop, num_points) + + # Calculate distances at each time point using physics equation + distances = current_speed * times - 0.5 * deceleration * (times ** 2) + + # Generate points along the path + points = [] + for d in distances: + if d <= path_length: + # If within path length, evaluate path at distance + points.append(path.eval(d)) + else: + points.append(path.points[-1]) + + + return Trajectory(frame=path.frame, points=points, times=times.tolist()) + +class YieldTrajectoryPlanner(Component): + """Follows the given route. Brakes if the ego–vehicle must yield + (e.g. to a pedestrian) or if the end of the route is near; otherwise, + it accelerates (or cruises) toward a desired speed. + """ + def __init__(self): + self.route_progress = None + self.t_last = None + self.acceleration = 0.5 + self.desired_speed = 1.0 + self.deceleration = 2.0 + + def state_inputs(self): + return ['all'] + + def state_outputs(self) -> List[str]: + return ['trajectory'] + + def rate(self): + return 10.0 + + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route + t = state.t + + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: t =", t) + + if self.t_last is None: + self.t_last = t + dt = t - self.t_last + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: dt =", dt) + + curr_x = vehicle.pose.x + curr_y = vehicle.pose.y + curr_v = vehicle.v + if DEBUG: + print(f"[DEBUG] YieldTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}") + + # Determine progress along the route. + if self.route_progress is None: + self.route_progress = 0.0 + _, closest_parameter = route.closest_point_local( + [curr_x, curr_y], + (self.route_progress - 5.0, self.route_progress + 5.0) + ) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Closest parameter on route =", closest_parameter) + self.route_progress = closest_parameter + + # Extract a 10 m segment of the route for planning lookahead. + route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) + + print("[DEBUG] state", state.relations) + # Check whether any yield relations (e.g. due to pedestrians) require braking. + should_brake = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: should_brake =", should_brake) + should_accelerate = (not should_brake and curr_v < self.desired_speed) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: should_accelerate =", should_accelerate) + + if should_accelerate: + traj = longitudinal_plan(route_with_lookahead, self.acceleration, + self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") + elif should_brake: + traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") + else: + # Maintain current speed if not accelerating or braking. + traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + + self.t_last = t + if DEBUG: + print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) + print("[DEBUG] YieldTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") + return traj From 6867f1a9b15c98d6ad17e66a5243b6587227a57d Mon Sep 17 00:00:00 2001 From: Yinan Zhao Date: Sun, 16 Feb 2025 02:10:05 -0600 Subject: [PATCH 02/11] update pedestrian_yield_logo.py --- .../perception/pedestrian_detection.py | 84 ++++ .../onboard/planning/longitudinal_planning.py | 62 +-- .../planning/pedestrian_yield_logic.py | 426 ++++++++++++++++++ homework/longitudinal_planning.py | 48 +- homework/pedestrian_detection.yaml | 2 +- homework/pedestrian_yield_logic.py | 418 ++++++++++++++++- homework/test_longitudinal_plan.py | 1 - launch/pedestrian_detection.yaml | 99 ++++ 8 files changed, 1083 insertions(+), 57 deletions(-) create mode 100644 GEMstack/onboard/perception/pedestrian_detection.py create mode 100644 GEMstack/onboard/planning/pedestrian_yield_logic.py create mode 100644 launch/pedestrian_detection.yaml diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py new file mode 100644 index 000000000..3134b1e35 --- /dev/null +++ b/GEMstack/onboard/perception/pedestrian_detection.py @@ -0,0 +1,84 @@ +from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum +from ..interface.gem import GEMInterface +from ..component import Component +#from ultralytics import YOLO +#import cv2 +from typing import Dict + +def box_to_fake_agent(box): + """Creates a fake agent state from an (x,y,w,h) bounding box. + + The location and size are pretty much meaningless since this is just giving a 2D location. + """ + x,y,w,h = box + pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) + dims = (w,h,0) + return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) + + +class PedestrianDetector2D(Component): + """Detects pedestrians.""" + def __init__(self,vehicle_interface : GEMInterface): + self.vehicle_interface = vehicle_interface + #self.detector = YOLO('../../knowledge/detection/yolov8n.pt') + self.last_person_boxes = [] + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def initialize(self): + #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat + #self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) + pass + + #def image_callback(self, image : cv2.Mat): + # detection_result = self.detector(image) + # self.last_person_boxes = [] + # #uncomment if you want to debug the detector... + # #for bb in self.last_person_boxes: + # # x,y,w,h = bb + # # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) + # #cv2.imwrite("pedestrian_detections.png",image) + + def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + res = {} + for i,b in enumerate(self.last_person_boxes): + x,y,w,h = b + res['pedestrian'+str(i)] = box_to_fake_agent(b) + if len(res) > 0: + print("Detected",len(res),"pedestrians") + return res + + +class FakePedestrianDetector2D(Component): + """Triggers a pedestrian detection at some random time ranges""" + def __init__(self,vehicle_interface : GEMInterface): + self.vehicle_interface = vehicle_interface + self.times = [(5.0,20.0),(30.0,35.0)] + self.t_start = None + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + if self.t_start is None: + self.t_start = self.vehicle_interface.time() + t = self.vehicle_interface.time() - self.t_start + res = {} + for times in self.times: + if t >= times[0] and t <= times[1]: + res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) + print("Detected a pedestrian") + return res diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index f6048a835..e522d7f7a 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -2,24 +2,29 @@ from typing import List import math from ..component import Component -from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \ + ObjectFrameEnum from ...utils import serialization from ...mathutils import transforms import numpy as np + DEBUG = True # Set to False to disable debug output + def compute_cumulative_distances(points: List[List[float]]) -> List[float]: s_vals = [0.0] for i in range(1, len(points)): - dx = points[i][0] - points[i-1][0] - dy = points[i][1] - points[i-1][1] + dx = points[i][0] - points[i - 1][0] + dy = points[i][1] - points[i - 1][1] ds = math.hypot(dx, dy) s_vals.append(s_vals[-1] + ds) if DEBUG: print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) return s_vals -def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_speed: float, current_speed: float) -> Trajectory: + +def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_speed: float, + current_speed: float) -> Trajectory: # Reparameterize the path by arc length (to get a timed trajectory) path_normalized: Trajectory = path.arc_length_parameterize() points = list(path_normalized.points) @@ -27,17 +32,17 @@ def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_ L = s_vals[-1] if DEBUG: print("[DEBUG] longitudinal_plan: Total path length L =", L) - print("[DEBUG] longitudinal_plan: Braking distance needed =", (current_speed**2) / (2 * deceleration)) - + print("[DEBUG] longitudinal_plan: Braking distance needed =", (current_speed ** 2) / (2 * deceleration)) + if acceleration <= 0: # Compute deceleration phase: the distance needed to decelerate to 0. - s_decel = (current_speed**2) / (2 * deceleration) + s_decel = (current_speed ** 2) / (2 * deceleration) T_decel = current_speed / deceleration times = [] for s in s_vals: if s <= s_decel: # Compute the instantaneous speed at distance s: - v = math.sqrt(max(current_speed**2 - 2 * deceleration * s, 0.0)) + v = math.sqrt(max(current_speed ** 2 - 2 * deceleration * s, 0.0)) t_point = (current_speed - v) / deceleration else: t_point = T_decel @@ -45,24 +50,25 @@ def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_ if DEBUG: print("[DEBUG] longitudinal_plan (accel==0): Final times =", times) return Trajectory(frame=path.frame, points=points, times=times) - + # Otherwise, follow the full trapezoidal profile. # Determine the highest achievable speed given the available distance. - v_peak_possible = math.sqrt((2 * acceleration * deceleration * L + deceleration * current_speed**2) / (acceleration + deceleration)) + v_peak_possible = math.sqrt( + (2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) v_target = min(max_speed, v_peak_possible) if DEBUG: print("[DEBUG] longitudinal_plan: v_peak_possible =", v_peak_possible, "v_target =", v_target) - + # Calculate acceleration phase (if any) if v_target > current_speed: - s_accel = (v_target**2 - current_speed**2) / (2 * acceleration) + s_accel = (v_target ** 2 - current_speed ** 2) / (2 * acceleration) t_accel = (v_target - current_speed) / acceleration else: s_accel = 0.0 t_accel = 0.0 # Calculate deceleration phase (from v_target to 0) - s_decel = (v_target**2) / (2 * deceleration) + s_decel = (v_target ** 2) / (2 * deceleration) t_decel = v_target / deceleration # Compute the cruise (constant-speed) segment, if any. @@ -73,13 +79,13 @@ def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_ print("[DEBUG] longitudinal_plan: s_accel =", s_accel, "t_accel =", t_accel) print("[DEBUG] longitudinal_plan: s_decel =", s_decel, "t_decel =", t_decel) print("[DEBUG] longitudinal_plan: s_cruise =", s_cruise, "t_cruise =", t_cruise) - + # For each point along the path (by its cumulative distance s), compute its time stamp. times = [] for s in s_vals: if s <= s_accel: # Acceleration phase: v^2 = current_speed^2 + 2*a*s. - v = math.sqrt(current_speed**2 + 2 * acceleration * s) + v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) t_point = (v - current_speed) / acceleration if DEBUG: print(f"[DEBUG] longitudinal_plan (accel): s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") @@ -91,16 +97,16 @@ def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_ else: # Deceleration phase. s_decel_phase = s - s_accel - s_cruise - under_sqrt = max(v_target**2 - 2 * deceleration * s_decel_phase, 0.0) + under_sqrt = max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0) t_dec = (v_target - math.sqrt(under_sqrt)) / deceleration t_point = t_accel + t_cruise + t_dec if DEBUG: print(f"[DEBUG] longitudinal_plan (decel): s = {s:.2f}, t_dec = {t_dec:.2f}, t = {t_point:.2f}") times.append(t_point) - + if DEBUG: print("[DEBUG] longitudinal_plan: Final times =", times) - + return Trajectory(frame=path.frame, points=points, times=times) @@ -116,20 +122,20 @@ def longitudinal_brake(path: Path, deceleration: float, current_speed: float) -> # Calculate stopping time and distance T_stop = current_speed / deceleration s_stop = current_speed * T_stop - 0.5 * deceleration * (T_stop ** 2) - + # Get total path length path_length = sum( - np.linalg.norm(np.array(path.points[i+1]) - np.array(path.points[i])) - for i in range(len(path.points)-1) + np.linalg.norm(np.array(path.points[i + 1]) - np.array(path.points[i])) + for i in range(len(path.points) - 1) ) # Generate time points (use more points for smoother trajectory) num_points = max(len(path.points), 50) times = np.linspace(0, T_stop, num_points) - + # Calculate distances at each time point using physics equation distances = current_speed * times - 0.5 * deceleration * (times ** 2) - + # Generate points along the path points = [] for d in distances: @@ -139,14 +145,15 @@ def longitudinal_brake(path: Path, deceleration: float, current_speed: float) -> else: points.append(path.points[-1]) - return Trajectory(frame=path.frame, points=points, times=times.tolist()) + class YieldTrajectoryPlanner(Component): """Follows the given route. Brakes if the ego–vehicle must yield (e.g. to a pedestrian) or if the end of the route is near; otherwise, it accelerates (or cruises) toward a desired speed. """ + def __init__(self): self.route_progress = None self.t_last = None @@ -165,7 +172,7 @@ def rate(self): def update(self, state: AllState): vehicle = state.vehicle # type: VehicleState - route = state.route # type: Route + route = state.route # type: Route t = state.t if DEBUG: @@ -222,10 +229,11 @@ def update(self, state: AllState): # Maintain current speed if not accelerating or braking. traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) if DEBUG: - print("[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + print( + "[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") self.t_last = t if DEBUG: print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) print("[DEBUG] YieldTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") - return traj + return traj \ No newline at end of file diff --git a/GEMstack/onboard/planning/pedestrian_yield_logic.py b/GEMstack/onboard/planning/pedestrian_yield_logic.py new file mode 100644 index 000000000..c533ebf94 --- /dev/null +++ b/GEMstack/onboard/planning/pedestrian_yield_logic.py @@ -0,0 +1,426 @@ +from ...state import AgentState, AgentEnum, EntityRelation, EntityRelationEnum, ObjectFrameEnum +from ..component import Component +from typing import List, Dict + +import numpy as np +from scipy.optimize import minimize_scalar + +DEBUG = True # Set to False to disable debug output + +class PedestrianYielder(Component): + """Yields for all pedestrians in the scene. + + Result is stored in the relations graph. + """ + + def rate(self): + return None + + def state_inputs(self): + return ['agents'] + + def state_outputs(self): + return ['relations'] + + def update(self, agents: Dict[str, AgentState]) -> List[EntityRelation]: + res = [] + for n, a in agents.items(): + if DEBUG: + print( + f"[DEBUG] PedestrianYielder.update: Agent:{n} frame:{a.pose.frame}, (x,y):{a.pose.x, a.pose.y}, velocity:{a.velocity}") + + """ collision estimation based on agent states in vehicle frame """ + a.to_frame(ObjectFrameEnum.CURRENT) # to_frame() is missing information to work with + # TODO: check how to convert to vehicle frame. May perception update agent state? + if a.type == AgentEnum.PEDESTRIAN: + is_collision, t_min, min_dist = check_collision_in_vehicle_frame(a) + if DEBUG: + print(f"[DEBUG] is_collision:{is_collision}, distance to buffer area:", shortest_distance_to_buffer_in_vehicle_frame((a.pose.x, a.pose.y), (2.53 / 2 + 3, 1.35 / 2 + 1))) + if is_collision: + # relation: ego-vehicle yields to pedestrian + res.append(EntityRelation(type=EntityRelationEnum.YIELDING, obj1='', obj2=n)) + + return res + + +# """ Vehicle Dimensions """ +# vehicle_model = "e2" # e2 or e4 +# # vehicle dimensions and buffer area +# if vehicle_model == 'e2': +# # e2 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e2.urdf +# size_x, size_y = 2.53, 1.35 # measured from base_link.STL +# lx_f2r = 0.88 + 0.87 # distance between front axle and rear axle, from gem_e2.urdf wheel_links +# ly_axle = 0.6 * 2 # length of axle +# l_rear_axle_to_front = 1.28 + 0.87 # measured from base_link.STL +# l_rear_axle_to_rear = size_y - l_rear_axle_to_front +# elif vehicle_model == 'e4': +# # e4 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e4.urdf +# size_x, size_y = 3.35, 1.35 # measured from base_link.STL +# lx_f2r = 1.2746 + 1.2904 # distance between front axle and rear axle, from gem_e4.urdf wheel_links +# ly_axle = 0.6545 * 2 # length of front and rear axles, from gem_e4.urdf wheel_links +# l_rear_axle_to_front = 1.68 + 1.29 # measured from base_link.STL +# l_rear_axle_to_rear = size_y - l_rear_axle_to_front +# # add buffer to outer dimensions, defined by half of the x, y buffer area dimensions +# buffer_x, buffer_y = 3, 1 +# buffer = size_x / 2 + buffer_x, size_y / 2 + buffer_y + + +""" Planning in vehicle frame without waypoints """ +def check_collision_in_vehicle_frame(agent_state: AgentState): + buffer = 2.53 / 2 + 3, 1.35 / 2 + 1 # e2 buffer area + time_scale = 100 # TODO: adjust if necessary + xp, yp = agent_state.pose.x, agent_state.pose.y + vx, vy = agent_state.velocity[:2] + # use optimization method to find minimum distance and the time at that point + t_min, min_dist = find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale) + # if the minimum distance between the position and the buffer area is less than 0, than a collision is expected + is_collision = False + if min_dist <= 0: + is_collision = True + return is_collision, t_min, min_dist + + +def find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale): + def pos_t(t): + xt, yt = xp + vx * t, yp + vy * t + return shortest_distance_to_buffer_in_vehicle_frame((xt, yt), buffer) + + res = minimize_scalar(pos_t, bounds=(0, time_scale)) + if res.success: + t_min = res.x + min_dist = res.fun + return t_min, min_dist + else: + return None, None + + +# Calculate the shortest distance from an object to vehicle buffer area in vehicle frame +def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): + """ + Calculate the distance between a pedestrian's position and the vehicle with buffer + """ + x_buff, y_buff = buffer + # consider 2D geometry + x_p, y_p = position + # initiate distance and collision + dist = 0 + collision = False + + # calculate distance + # front + if -y_buff <= y_p <= y_buff and x_p > x_buff: + dist = x_p - x_buff + # left front + elif y_p > y_buff and x_p > x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) + # right front + elif y_p < -y_buff and x_p > x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) + # left + elif y_p > y_buff and -x_buff <= x_p <= x_buff: + dist = y_p - y_buff + # right + elif y_p < -y_buff and -x_buff <= x_p <= x_buff: + dist = y_p + y_buff + # rear + elif -y_buff <= y_p <= y_buff and x_p < -x_buff: + dist = x_p + x_buff + # left rear + elif y_p > y_buff and x_p < -x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) + # right rear + elif y_p < -y_buff and x_p < -x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) + # intersect + else: + dist = 0 + + return dist + + +""" Planning in start frame with waypoints """ +# # TODO: how to get further unreached waypoints only +# def get_waypoint_arc_and_new_yaw(curr_x, curr_y, curr_yaw, waypoint): +# """ +# Input: +# curr_x, curr_y, curr_yaw: vehicle current pose +# waypoint: next waypoint +# Output: +# len_arc, radius, (x_c, y_c): length, radius and the center of the arc-shaped path +# next_yaw: vehicle yaw at the waypoint. +# """ +# # TODO: check if it works when the angles cross zero or delta greater than pi +# wp_x, wp_y = waypoint +# dist_curr2wp = np.sqrt((wp_x - curr_x) ** 2 + (wp_y - curr_y) ** 2) +# alpha = np.arctan2(wp_y - curr_y, wp_x - curr_x) - curr_yaw +# delta = 2 * alpha +# radius = dist_curr2wp / 2 / np.sin(alpha) +# len_arc = delta * radius +# x_c, y_c = curr_x - radius * np.sin(curr_yaw), curr_y + radius * np.cos(curr_yaw) +# next_yaw = curr_yaw + delta +# return len_arc, radius, (x_c, y_c), next_yaw +# +# +# def check_collision_with_waypoints(pose, velocity, yaw_rate, radius, center, expand, +# start_point, end_point, time_vehicle_in, time_vehicle_out): +# """ +# Given an arc as a path and a point with velocity, check if the point is crossing the path. +# Input: +# pose, velocity, yaw_rate: pedestrian pose, velocity and angular velocity +# radius, center, start_point, end_point: parameters of the path as an arc +# expand: enlarge the arc as the vehicle moving area +# start_point, end_point: start point and end point of the path +# time_vehicle_in, time_vehicle_out: the time that the vehicle is in the section of the path +# Output: +# is_crossing: boolean, if the pedestrian is crossing the path +# arc_dist_collision: distance from start_point to the closest pedestrian enter or exit point mapped to the path +# """ +# # TODO: compute pedestrian path with yaw_rate? +# xp, yp = pose.x, pose.y +# vx, vy = velocity +# xc, yc = center +# x1, y1 = start_point +# x2, y2 = end_point +# theta1 = np.arctan2(y1 - yc, x1 - xc) +# theta2 = np.arctan2(y2 - yc, x2 - xc) +# r_inner = max(radius - expand, 0) +# r_outer = radius + expand +# +# def is_angle_between(pt, start_angle, end_angle): +# xp, yp = pt +# angle = np.arctan2(yp - yc, xp - xc) +# if start_angle < end_angle: +# return start_angle <= angle <= end_angle +# else: +# return end_angle <= angle <= start_angle +# +# def is_in_ring(pt, xc, yc, r_inner, r_outer): +# xp, yp = pt +# dist = np.sqrt((xp - xc) ** 2 + (yp - yc) ** 2) +# return r_inner <= dist <= r_outer +# +# def find_arc_intersection(xc, yc, xp, yp, vx, vy, r): +# # solve equations: (xt - xc)^2 + (yt - yc)^2 = (R+/-b)^2 +# A = vx ** 2 + vy ** 2 +# B = 2 * (vx * (xp - xc) + vy * (yp - yc)) +# C = (xp - xc) ** 2 + (yp - yc) ** 2 - r ** 2 +# root = B ** 2 - 4 * A * C +# t_list = [] # time +# pt_list = [] # intersection point +# if root < 0: +# return t_list, pt_list # no intersection +# else: +# t1 = (-B - np.sqrt(root)) / (2 * A) +# t2 = (-B + np.sqrt(root)) / (2 * A) +# # t should be larger than 0 +# if t1 > 0: +# pt1 = (xp + vx * t1, yp + vy * t1) +# if is_angle_between(pt1, theta1, theta2): +# t_list.append(t1) +# pt_list.append(pt1) +# if t2 > 0: +# pt2 = (xp + vx * t2, yp + vy * t2) +# if is_angle_between(pt2, theta1, theta2): +# t_list.append(t2) +# pt_list.append(pt2) +# return t_list, pt_list +# +# def find_edge_intersection(xc, yc, xp, yp, vx, vy, theta, r_inner, r_outer): +# x_inner, y_inner = xc + r_inner * np.cos(theta), yc + r_inner * np.sin(theta) +# x_outer, y_outer = xc + r_outer * np.cos(theta), yc + r_outer * np.sin(theta) +# t_list = [] # time +# pt_list = [] # intersection point +# +# dx = x_outer - x_inner +# dy = y_outer - y_inner +# # solve t from: xt = xp + vx * t, yt = xp + vy * t and dx * (yt - y_inner) = dy * (xt - x_inner) +# if dx * vy - dy * vx == 0: +# return t_list, pt_list # parallel, no intersection +# else: +# t = (dy * xp - dx * yp + dx * y_inner - dy * x_inner) / (dx * vy - dy * vx) +# pt = xp + vx * t, yp + vy * t +# if is_in_ring(pt, xc, yc, r_inner, r_outer): +# t_list.append(t) +# pt_list.append(pt) +# return t_list, pt_list +# +# # find the time and points a pedestrian in and out of the path +# t_list = [] +# # Case pedestrian in the path at the beginning +# if is_in_ring((xp, yp), xc, yc, r_inner, r_outer) and is_angle_between((xp, yp), theta1, theta2): +# t_list.append(0) +# # Case pedestrian cross the arcs and the start and end edges of the path section +# t_inner, _ = find_arc_intersection(xc, yc, xp, yp, vx, vy, r_inner) if r_inner > 0 else [], [] +# t_outer, _ = find_arc_intersection(xc, yc, xp, yp, vx, vy, r_outer) +# t_theta1, _ = find_edge_intersection(xc, yc, xp, yp, vx, vy, theta1, r_inner, r_outer) +# t_theta2, _ = find_edge_intersection(xc, yc, xp, yp, vx, vy, theta2, r_inner, r_outer) +# # Combine all points together, The elements in both lists correspond one by one in order +# t_list = t_list + t_inner + t_outer + t_theta1 + t_theta2 +# +# def arc_length_from_start_point(pt, center, radius, start_angle): +# """ +# Calculate the arc length between the point mapping to the arc and the start point as the distance along the path +# Assume angle difference of the arc is not greater than 180 degrees +# """ +# x, y = pt +# xc, yc = center +# angle = np.arctan2(y - yc, x - xc) +# delta_angle = abs(angle - start_angle) +# if delta_angle > np.pi: +# delta_angle = 2 * np.pi - delta_angle +# return radius * delta_angle +# +# is_collision = False +# arc_dist_collision = None +# # collision if there is intersection and the time is between time_vehicle_in and time_vehicle_out +# if min(t_list) <= time_vehicle_out or max(t_list) >= time_vehicle_in: +# is_collision = True +# t_min = max(time_vehicle_in, min(t_list)) +# t_max = min(time_vehicle_out, max(t_list)) +# # map the point to the arc of path for calculating distance from the vehicle follow the path +# pt_min = xp + vx * t_min, yp + vy * t_min +# pt_max = xp + vx * t_max, yp + vy * t_max +# arc_time_min = arc_length_from_start_point(pt_min, center, radius, theta1) +# arc_time_max = arc_length_from_start_point(pt_max, center, radius, theta2) +# arc_dist_collision = min(arc_time_min, arc_time_max) +# +# return is_collision, arc_dist_collision +# +# +# def yield_in_start_frame(path_further, state, agents: Dict[str,AgentState]): +# """ +# All calculations are in start frame, origin reference: rear_axle_center (refer to GEMstack/knowledge/calibration) +# """ +# # TODO: confirm origin reference with calibration team +# # current states +# vehicle = state.vehicle +# curr_x = vehicle.pose.x +# curr_y = vehicle.pose.y +# curr_yaw = vehicle.pose.yaw +# curr_v = vehicle.v +# +# # determine lookahead distance using current velocity and a decent deceleration +# decel_decent = 2.0 # TODO: adjust for a decent deceleration if necessary +# t_brake = curr_v / decel_decent # time to brake down to zero +# dist_lookahead = curr_v * t_brake +# +# # TODO: check if it works with straight line waypoints +# distance = 0 +# temp_x, temp_y, temp_yaw = curr_x, curr_y, curr_yaw +# is_collision = False +# dist_collision = None +# for waypoint in path_further: +# # the path to next waypoint and vehicle yaw at next waypoint +# len_arc, radius, center, next_yaw = get_waypoint_arc_and_new_yaw(temp_x, temp_y, temp_yaw, waypoint) +# +# # the time of vehicle go in and out of the section to next waypoint in current velocity +# time_vehicle_in = (distance - l_rear_axle_to_front) / curr_v # consider distance from the center of rear axle to the front +# time_vehicle_out = (distance + len_arc + l_rear_axle_to_rear) / curr_v # consider distance from the center of rear axle to the rear +# +# # check all the pedestrian, get their time and position if they are going to cross the path +# arc_dist_collision_list = [] +# for n, ped in agents.items(): +# if ped.type == AgentEnum.PEDESTRIAN: +# # check collision: pedestrian is in the section of the path between time_vehicle_in and time_vehicle_out +# is_collision, arc_dist_collision = check_collision_with_waypoints(ped.pose, ped.velocity,ped.yaw_rate, +# radius, center, buffer_y, +# (temp_x, temp_y), waypoint, +# time_vehicle_in, time_vehicle_out) +# if is_collision: +# arc_dist_collision_list.append(arc_dist_collision) +# +# if is_collision: +# # use the minimum collision distance to yield +# dist_collision = distance + min(arc_dist_collision_list) +# break +# # update total distance by add the arc length of the section to the next waypoint, update pose at next waypoint +# distance += len_arc +# temp_x, temp_y = waypoint +# temp_yaw = next_yaw +# # end the loop if total distance is larger than lookahead distance +# if distance > dist_lookahead: +# break +# +# return is_collision, dist_collision + + +""" Assume the vehicle move straight forward """ +# def compute_intersect(A1, B1, C1, A2, B2, C2): +# """ +# calculate intersection point of two lines +# A1*x + B1*y + C1 = 0 and A2*x + B2*y + C2 = 0 +# """ +# if A1*B2 - A2*B1 == 0: +# if C1 == C2: +# return "conincide" +# else: +# return "parallel" +# else: +# x_inter = (B1*C2 - B2*C1) / (A1*B2 - A2*B1) +# y_inter = (C1*A2 - C2*A1) / (A1*B2 - A2*B1) +# return x_inter, y_inter +# +# def yield_in_ego_frame(pedestrian_position_ego, pedestrian_velocity_ego, dist_lookahead, buffer): +# """ +# Calculate the intersection of the pedestrian's path and vehicle path with buffer +# Yield if the intersection is in front of the vehicle and within lookahead distance +# """ +# x_buff, y_buff = buffer +# # consider 2D geometry +# x_p, y_p = pedestrian_position_ego[:2] +# v_x, v_y = pedestrian_velocity_ego[:2] +# # initiate yielding and distance +# yielding = False +# distance = None +# +# # line expression: A*x + B*y + C = 0 +# # pedestrian's path: v_y * x - v_x * y + (v_x * y_p - v_y * x_p) = 0 +# A_p, B_p, C_p = v_y, -v_x, v_x * y_p - v_y * x_p +# # buffer area: y = y_buff, y = - y_buff +# A_left, B_left, C_left = 0, 1, -y_buff +# A_right, B_right, C_right = 0, 1, y_buff +# +# # deal with parallel first +# if A_p*B_left - A_left*B_p == 0: +# if -y_buff <= y_p <= y_buff: +# yielding = True +# distance = x_p +# else: +# yielding = False +# distance = None +# # compute intersection +# else: +# left_inter_x, left_inter_y = compute_intersect(A_p, B_p, C_p, A_left, B_left, C_left) +# right_inter_x, right_inter_y = compute_intersect(A_p, B_p, C_p, A_right, B_right, C_right) +# if -x_buff <= left_inter_x <= dist_lookahead or -x_buff <= right_inter_x <= dist_lookahead: +# yielding = True +# else: +# yielding = False +# distance = min(left_inter_x, right_inter_x) +# +# return yielding, distance +# +# vehicle = state.vehicle +# curr_x = vehicle.pose.x +# curr_y = vehicle.pose.y +# curr_yaw = vehicle.pose.yaw +# curr_v = vehicle.v +# +# # 2D transformation from start frame to ego frame +# R_start2ego = np.array([[np.cos(curr_yaw),-np.sin(curr_yaw)],[np.sin(curr_yaw),np.cos(curr_yaw)]]) +# t_start2ego = np.array([curr_x, curr_y]) +# +# # check collision distance and break if going to collide +# dist_min = np.inf +# for ped in pedestrians: +# # pedestrian states from perception +# pos_p = ped.state.pose # both vectors are x, y in vehicle frame +# v_p = ped.state.v +# pos_start2ego = R_start2ego @ pos_p + t_start2ego +# v_start2ego = v_p @ pos_p + t_start2ego +# yielding, distance = yield_in_ego_frame(pos_start2ego,v_start2ego, dist_lookahead, buffer) +# if yielding == True and distance is not None: +# if distance < dist_min: +# dist_min = distance + + diff --git a/homework/longitudinal_planning.py b/homework/longitudinal_planning.py index e862239fe..c44482b70 100644 --- a/homework/longitudinal_planning.py +++ b/homework/longitudinal_planning.py @@ -1,33 +1,36 @@ from typing import List from ..component import Component -from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \ + ObjectFrameEnum from ...utils import serialization from ...mathutils.transforms import vector_madd -def longitudinal_plan(path : Path, acceleration : float, deceleration : float, max_speed : float, current_speed : float) -> Trajectory: + +def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_speed: float, + current_speed: float) -> Trajectory: """Generates a longitudinal trajectory for a path with a trapezoidal velocity profile. - + 1. accelerates from current speed toward max speed 2. travel along max speed 3. if at any point you can't brake before hitting the end of the path, decelerate with accel = -deceleration until velocity goes to 0. """ path_normalized = path.arc_length_parameterize() - #TODO: actually do something to points and times + # TODO: actually do something to points and times points = [p for p in path_normalized.points] times = [t for t in path_normalized.times] - trajectory = Trajectory(path.frame,points,times) + trajectory = Trajectory(path.frame, points, times) return trajectory -def longitudinal_brake(path : Path, deceleration : float, current_speed : float) -> Trajectory: +def longitudinal_brake(path: Path, deceleration: float, current_speed: float) -> Trajectory: """Generates a longitudinal trajectory for braking along a path.""" path_normalized = path.arc_length_parameterize() - #TODO: actually do something to points and times + # TODO: actually do something to points and times points = [p for p in path_normalized.points] times = [t for t in path_normalized.times] - trajectory = Trajectory(path.frame,points,times) + trajectory = Trajectory(path.frame, points, times) return trajectory @@ -36,6 +39,7 @@ class YieldTrajectoryPlanner(Component): you are at the end of the route, otherwise accelerates to the desired speed. """ + def __init__(self): self.route_progress = None self.t_last = None @@ -52,42 +56,44 @@ def state_outputs(self) -> List[str]: def rate(self): return 10.0 - def update(self, state : AllState): - vehicle = state.vehicle # type: VehicleState - route = state.route # type: Route + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route t = state.t if self.t_last is None: self.t_last = t dt = t - self.t_last - + curr_x = vehicle.pose.x curr_y = vehicle.pose.y curr_v = vehicle.v - #figure out where we are on the route + # figure out where we are on the route if self.route_progress is None: self.route_progress = 0.0 - closest_dist,closest_parameter = state.route.closest_point_local((curr_x,curr_y),[self.route_progress-5.0,self.route_progress+5.0]) + closest_dist, closest_parameter = state.route.closest_point_local((curr_x, curr_y), [self.route_progress - 5.0, + self.route_progress + 5.0]) self.route_progress = closest_parameter - #extract out a 10m segment of the route - route_with_lookahead = route.trim(closest_parameter,closest_parameter+10.0) + # extract out a 10m segment of the route + route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) - #parse the relations indicated + # parse the relations indicated should_brake = False for r in state.relations: if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': - #yielding to something, brake + # yielding to something, brake should_brake = True should_accelerate = (not should_brake and curr_v < self.desired_speed) - #choose whether to accelerate, brake, or keep at current velocity + # choose whether to accelerate, brake, or keep at current velocity if should_accelerate: - traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v) + traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, + curr_v) elif should_brake: traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) else: traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) - return traj + return traj \ No newline at end of file diff --git a/homework/pedestrian_detection.yaml b/homework/pedestrian_detection.yaml index 1a00fa797..5d9a92813 100644 --- a/homework/pedestrian_detection.yaml +++ b/homework/pedestrian_detection.yaml @@ -95,5 +95,5 @@ variants: drive: perception: agent_detection : pedestrian_detection.FakePedestrianDetector2D - #agent_detection : OmniscientAgentDetector #this option reads agents from the simulator +# agent_detection : OmniscientAgentDetector #this option reads agents from the simulator state_estimation : OmniscientStateEstimator diff --git a/homework/pedestrian_yield_logic.py b/homework/pedestrian_yield_logic.py index 2567c0093..c533ebf94 100644 --- a/homework/pedestrian_yield_logic.py +++ b/homework/pedestrian_yield_logic.py @@ -1,22 +1,426 @@ -from ...state import AgentState,AgentEnum,EntityRelation,EntityRelationEnum +from ...state import AgentState, AgentEnum, EntityRelation, EntityRelationEnum, ObjectFrameEnum from ..component import Component -from typing import List,Dict +from typing import List, Dict + +import numpy as np +from scipy.optimize import minimize_scalar + +DEBUG = True # Set to False to disable debug output class PedestrianYielder(Component): """Yields for all pedestrians in the scene. - + Result is stored in the relations graph. """ + def rate(self): return None + def state_inputs(self): return ['agents'] + def state_outputs(self): return ['relations'] - def update(self,agents : Dict[str,AgentState]) -> List[EntityRelation]: + + def update(self, agents: Dict[str, AgentState]) -> List[EntityRelation]: res = [] - for n,a in agents.items(): + for n, a in agents.items(): + if DEBUG: + print( + f"[DEBUG] PedestrianYielder.update: Agent:{n} frame:{a.pose.frame}, (x,y):{a.pose.x, a.pose.y}, velocity:{a.velocity}") + + """ collision estimation based on agent states in vehicle frame """ + a.to_frame(ObjectFrameEnum.CURRENT) # to_frame() is missing information to work with + # TODO: check how to convert to vehicle frame. May perception update agent state? if a.type == AgentEnum.PEDESTRIAN: - #relation: ego-vehicle yields to pedestrian - res.append(EntityRelation(type=EntityRelationEnum.YIELDING,obj1='',obj2=n)) + is_collision, t_min, min_dist = check_collision_in_vehicle_frame(a) + if DEBUG: + print(f"[DEBUG] is_collision:{is_collision}, distance to buffer area:", shortest_distance_to_buffer_in_vehicle_frame((a.pose.x, a.pose.y), (2.53 / 2 + 3, 1.35 / 2 + 1))) + if is_collision: + # relation: ego-vehicle yields to pedestrian + res.append(EntityRelation(type=EntityRelationEnum.YIELDING, obj1='', obj2=n)) + return res + + +# """ Vehicle Dimensions """ +# vehicle_model = "e2" # e2 or e4 +# # vehicle dimensions and buffer area +# if vehicle_model == 'e2': +# # e2 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e2.urdf +# size_x, size_y = 2.53, 1.35 # measured from base_link.STL +# lx_f2r = 0.88 + 0.87 # distance between front axle and rear axle, from gem_e2.urdf wheel_links +# ly_axle = 0.6 * 2 # length of axle +# l_rear_axle_to_front = 1.28 + 0.87 # measured from base_link.STL +# l_rear_axle_to_rear = size_y - l_rear_axle_to_front +# elif vehicle_model == 'e4': +# # e4 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e4.urdf +# size_x, size_y = 3.35, 1.35 # measured from base_link.STL +# lx_f2r = 1.2746 + 1.2904 # distance between front axle and rear axle, from gem_e4.urdf wheel_links +# ly_axle = 0.6545 * 2 # length of front and rear axles, from gem_e4.urdf wheel_links +# l_rear_axle_to_front = 1.68 + 1.29 # measured from base_link.STL +# l_rear_axle_to_rear = size_y - l_rear_axle_to_front +# # add buffer to outer dimensions, defined by half of the x, y buffer area dimensions +# buffer_x, buffer_y = 3, 1 +# buffer = size_x / 2 + buffer_x, size_y / 2 + buffer_y + + +""" Planning in vehicle frame without waypoints """ +def check_collision_in_vehicle_frame(agent_state: AgentState): + buffer = 2.53 / 2 + 3, 1.35 / 2 + 1 # e2 buffer area + time_scale = 100 # TODO: adjust if necessary + xp, yp = agent_state.pose.x, agent_state.pose.y + vx, vy = agent_state.velocity[:2] + # use optimization method to find minimum distance and the time at that point + t_min, min_dist = find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale) + # if the minimum distance between the position and the buffer area is less than 0, than a collision is expected + is_collision = False + if min_dist <= 0: + is_collision = True + return is_collision, t_min, min_dist + + +def find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale): + def pos_t(t): + xt, yt = xp + vx * t, yp + vy * t + return shortest_distance_to_buffer_in_vehicle_frame((xt, yt), buffer) + + res = minimize_scalar(pos_t, bounds=(0, time_scale)) + if res.success: + t_min = res.x + min_dist = res.fun + return t_min, min_dist + else: + return None, None + + +# Calculate the shortest distance from an object to vehicle buffer area in vehicle frame +def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): + """ + Calculate the distance between a pedestrian's position and the vehicle with buffer + """ + x_buff, y_buff = buffer + # consider 2D geometry + x_p, y_p = position + # initiate distance and collision + dist = 0 + collision = False + + # calculate distance + # front + if -y_buff <= y_p <= y_buff and x_p > x_buff: + dist = x_p - x_buff + # left front + elif y_p > y_buff and x_p > x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) + # right front + elif y_p < -y_buff and x_p > x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) + # left + elif y_p > y_buff and -x_buff <= x_p <= x_buff: + dist = y_p - y_buff + # right + elif y_p < -y_buff and -x_buff <= x_p <= x_buff: + dist = y_p + y_buff + # rear + elif -y_buff <= y_p <= y_buff and x_p < -x_buff: + dist = x_p + x_buff + # left rear + elif y_p > y_buff and x_p < -x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) + # right rear + elif y_p < -y_buff and x_p < -x_buff: + dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) + # intersect + else: + dist = 0 + + return dist + + +""" Planning in start frame with waypoints """ +# # TODO: how to get further unreached waypoints only +# def get_waypoint_arc_and_new_yaw(curr_x, curr_y, curr_yaw, waypoint): +# """ +# Input: +# curr_x, curr_y, curr_yaw: vehicle current pose +# waypoint: next waypoint +# Output: +# len_arc, radius, (x_c, y_c): length, radius and the center of the arc-shaped path +# next_yaw: vehicle yaw at the waypoint. +# """ +# # TODO: check if it works when the angles cross zero or delta greater than pi +# wp_x, wp_y = waypoint +# dist_curr2wp = np.sqrt((wp_x - curr_x) ** 2 + (wp_y - curr_y) ** 2) +# alpha = np.arctan2(wp_y - curr_y, wp_x - curr_x) - curr_yaw +# delta = 2 * alpha +# radius = dist_curr2wp / 2 / np.sin(alpha) +# len_arc = delta * radius +# x_c, y_c = curr_x - radius * np.sin(curr_yaw), curr_y + radius * np.cos(curr_yaw) +# next_yaw = curr_yaw + delta +# return len_arc, radius, (x_c, y_c), next_yaw +# +# +# def check_collision_with_waypoints(pose, velocity, yaw_rate, radius, center, expand, +# start_point, end_point, time_vehicle_in, time_vehicle_out): +# """ +# Given an arc as a path and a point with velocity, check if the point is crossing the path. +# Input: +# pose, velocity, yaw_rate: pedestrian pose, velocity and angular velocity +# radius, center, start_point, end_point: parameters of the path as an arc +# expand: enlarge the arc as the vehicle moving area +# start_point, end_point: start point and end point of the path +# time_vehicle_in, time_vehicle_out: the time that the vehicle is in the section of the path +# Output: +# is_crossing: boolean, if the pedestrian is crossing the path +# arc_dist_collision: distance from start_point to the closest pedestrian enter or exit point mapped to the path +# """ +# # TODO: compute pedestrian path with yaw_rate? +# xp, yp = pose.x, pose.y +# vx, vy = velocity +# xc, yc = center +# x1, y1 = start_point +# x2, y2 = end_point +# theta1 = np.arctan2(y1 - yc, x1 - xc) +# theta2 = np.arctan2(y2 - yc, x2 - xc) +# r_inner = max(radius - expand, 0) +# r_outer = radius + expand +# +# def is_angle_between(pt, start_angle, end_angle): +# xp, yp = pt +# angle = np.arctan2(yp - yc, xp - xc) +# if start_angle < end_angle: +# return start_angle <= angle <= end_angle +# else: +# return end_angle <= angle <= start_angle +# +# def is_in_ring(pt, xc, yc, r_inner, r_outer): +# xp, yp = pt +# dist = np.sqrt((xp - xc) ** 2 + (yp - yc) ** 2) +# return r_inner <= dist <= r_outer +# +# def find_arc_intersection(xc, yc, xp, yp, vx, vy, r): +# # solve equations: (xt - xc)^2 + (yt - yc)^2 = (R+/-b)^2 +# A = vx ** 2 + vy ** 2 +# B = 2 * (vx * (xp - xc) + vy * (yp - yc)) +# C = (xp - xc) ** 2 + (yp - yc) ** 2 - r ** 2 +# root = B ** 2 - 4 * A * C +# t_list = [] # time +# pt_list = [] # intersection point +# if root < 0: +# return t_list, pt_list # no intersection +# else: +# t1 = (-B - np.sqrt(root)) / (2 * A) +# t2 = (-B + np.sqrt(root)) / (2 * A) +# # t should be larger than 0 +# if t1 > 0: +# pt1 = (xp + vx * t1, yp + vy * t1) +# if is_angle_between(pt1, theta1, theta2): +# t_list.append(t1) +# pt_list.append(pt1) +# if t2 > 0: +# pt2 = (xp + vx * t2, yp + vy * t2) +# if is_angle_between(pt2, theta1, theta2): +# t_list.append(t2) +# pt_list.append(pt2) +# return t_list, pt_list +# +# def find_edge_intersection(xc, yc, xp, yp, vx, vy, theta, r_inner, r_outer): +# x_inner, y_inner = xc + r_inner * np.cos(theta), yc + r_inner * np.sin(theta) +# x_outer, y_outer = xc + r_outer * np.cos(theta), yc + r_outer * np.sin(theta) +# t_list = [] # time +# pt_list = [] # intersection point +# +# dx = x_outer - x_inner +# dy = y_outer - y_inner +# # solve t from: xt = xp + vx * t, yt = xp + vy * t and dx * (yt - y_inner) = dy * (xt - x_inner) +# if dx * vy - dy * vx == 0: +# return t_list, pt_list # parallel, no intersection +# else: +# t = (dy * xp - dx * yp + dx * y_inner - dy * x_inner) / (dx * vy - dy * vx) +# pt = xp + vx * t, yp + vy * t +# if is_in_ring(pt, xc, yc, r_inner, r_outer): +# t_list.append(t) +# pt_list.append(pt) +# return t_list, pt_list +# +# # find the time and points a pedestrian in and out of the path +# t_list = [] +# # Case pedestrian in the path at the beginning +# if is_in_ring((xp, yp), xc, yc, r_inner, r_outer) and is_angle_between((xp, yp), theta1, theta2): +# t_list.append(0) +# # Case pedestrian cross the arcs and the start and end edges of the path section +# t_inner, _ = find_arc_intersection(xc, yc, xp, yp, vx, vy, r_inner) if r_inner > 0 else [], [] +# t_outer, _ = find_arc_intersection(xc, yc, xp, yp, vx, vy, r_outer) +# t_theta1, _ = find_edge_intersection(xc, yc, xp, yp, vx, vy, theta1, r_inner, r_outer) +# t_theta2, _ = find_edge_intersection(xc, yc, xp, yp, vx, vy, theta2, r_inner, r_outer) +# # Combine all points together, The elements in both lists correspond one by one in order +# t_list = t_list + t_inner + t_outer + t_theta1 + t_theta2 +# +# def arc_length_from_start_point(pt, center, radius, start_angle): +# """ +# Calculate the arc length between the point mapping to the arc and the start point as the distance along the path +# Assume angle difference of the arc is not greater than 180 degrees +# """ +# x, y = pt +# xc, yc = center +# angle = np.arctan2(y - yc, x - xc) +# delta_angle = abs(angle - start_angle) +# if delta_angle > np.pi: +# delta_angle = 2 * np.pi - delta_angle +# return radius * delta_angle +# +# is_collision = False +# arc_dist_collision = None +# # collision if there is intersection and the time is between time_vehicle_in and time_vehicle_out +# if min(t_list) <= time_vehicle_out or max(t_list) >= time_vehicle_in: +# is_collision = True +# t_min = max(time_vehicle_in, min(t_list)) +# t_max = min(time_vehicle_out, max(t_list)) +# # map the point to the arc of path for calculating distance from the vehicle follow the path +# pt_min = xp + vx * t_min, yp + vy * t_min +# pt_max = xp + vx * t_max, yp + vy * t_max +# arc_time_min = arc_length_from_start_point(pt_min, center, radius, theta1) +# arc_time_max = arc_length_from_start_point(pt_max, center, radius, theta2) +# arc_dist_collision = min(arc_time_min, arc_time_max) +# +# return is_collision, arc_dist_collision +# +# +# def yield_in_start_frame(path_further, state, agents: Dict[str,AgentState]): +# """ +# All calculations are in start frame, origin reference: rear_axle_center (refer to GEMstack/knowledge/calibration) +# """ +# # TODO: confirm origin reference with calibration team +# # current states +# vehicle = state.vehicle +# curr_x = vehicle.pose.x +# curr_y = vehicle.pose.y +# curr_yaw = vehicle.pose.yaw +# curr_v = vehicle.v +# +# # determine lookahead distance using current velocity and a decent deceleration +# decel_decent = 2.0 # TODO: adjust for a decent deceleration if necessary +# t_brake = curr_v / decel_decent # time to brake down to zero +# dist_lookahead = curr_v * t_brake +# +# # TODO: check if it works with straight line waypoints +# distance = 0 +# temp_x, temp_y, temp_yaw = curr_x, curr_y, curr_yaw +# is_collision = False +# dist_collision = None +# for waypoint in path_further: +# # the path to next waypoint and vehicle yaw at next waypoint +# len_arc, radius, center, next_yaw = get_waypoint_arc_and_new_yaw(temp_x, temp_y, temp_yaw, waypoint) +# +# # the time of vehicle go in and out of the section to next waypoint in current velocity +# time_vehicle_in = (distance - l_rear_axle_to_front) / curr_v # consider distance from the center of rear axle to the front +# time_vehicle_out = (distance + len_arc + l_rear_axle_to_rear) / curr_v # consider distance from the center of rear axle to the rear +# +# # check all the pedestrian, get their time and position if they are going to cross the path +# arc_dist_collision_list = [] +# for n, ped in agents.items(): +# if ped.type == AgentEnum.PEDESTRIAN: +# # check collision: pedestrian is in the section of the path between time_vehicle_in and time_vehicle_out +# is_collision, arc_dist_collision = check_collision_with_waypoints(ped.pose, ped.velocity,ped.yaw_rate, +# radius, center, buffer_y, +# (temp_x, temp_y), waypoint, +# time_vehicle_in, time_vehicle_out) +# if is_collision: +# arc_dist_collision_list.append(arc_dist_collision) +# +# if is_collision: +# # use the minimum collision distance to yield +# dist_collision = distance + min(arc_dist_collision_list) +# break +# # update total distance by add the arc length of the section to the next waypoint, update pose at next waypoint +# distance += len_arc +# temp_x, temp_y = waypoint +# temp_yaw = next_yaw +# # end the loop if total distance is larger than lookahead distance +# if distance > dist_lookahead: +# break +# +# return is_collision, dist_collision + + +""" Assume the vehicle move straight forward """ +# def compute_intersect(A1, B1, C1, A2, B2, C2): +# """ +# calculate intersection point of two lines +# A1*x + B1*y + C1 = 0 and A2*x + B2*y + C2 = 0 +# """ +# if A1*B2 - A2*B1 == 0: +# if C1 == C2: +# return "conincide" +# else: +# return "parallel" +# else: +# x_inter = (B1*C2 - B2*C1) / (A1*B2 - A2*B1) +# y_inter = (C1*A2 - C2*A1) / (A1*B2 - A2*B1) +# return x_inter, y_inter +# +# def yield_in_ego_frame(pedestrian_position_ego, pedestrian_velocity_ego, dist_lookahead, buffer): +# """ +# Calculate the intersection of the pedestrian's path and vehicle path with buffer +# Yield if the intersection is in front of the vehicle and within lookahead distance +# """ +# x_buff, y_buff = buffer +# # consider 2D geometry +# x_p, y_p = pedestrian_position_ego[:2] +# v_x, v_y = pedestrian_velocity_ego[:2] +# # initiate yielding and distance +# yielding = False +# distance = None +# +# # line expression: A*x + B*y + C = 0 +# # pedestrian's path: v_y * x - v_x * y + (v_x * y_p - v_y * x_p) = 0 +# A_p, B_p, C_p = v_y, -v_x, v_x * y_p - v_y * x_p +# # buffer area: y = y_buff, y = - y_buff +# A_left, B_left, C_left = 0, 1, -y_buff +# A_right, B_right, C_right = 0, 1, y_buff +# +# # deal with parallel first +# if A_p*B_left - A_left*B_p == 0: +# if -y_buff <= y_p <= y_buff: +# yielding = True +# distance = x_p +# else: +# yielding = False +# distance = None +# # compute intersection +# else: +# left_inter_x, left_inter_y = compute_intersect(A_p, B_p, C_p, A_left, B_left, C_left) +# right_inter_x, right_inter_y = compute_intersect(A_p, B_p, C_p, A_right, B_right, C_right) +# if -x_buff <= left_inter_x <= dist_lookahead or -x_buff <= right_inter_x <= dist_lookahead: +# yielding = True +# else: +# yielding = False +# distance = min(left_inter_x, right_inter_x) +# +# return yielding, distance +# +# vehicle = state.vehicle +# curr_x = vehicle.pose.x +# curr_y = vehicle.pose.y +# curr_yaw = vehicle.pose.yaw +# curr_v = vehicle.v +# +# # 2D transformation from start frame to ego frame +# R_start2ego = np.array([[np.cos(curr_yaw),-np.sin(curr_yaw)],[np.sin(curr_yaw),np.cos(curr_yaw)]]) +# t_start2ego = np.array([curr_x, curr_y]) +# +# # check collision distance and break if going to collide +# dist_min = np.inf +# for ped in pedestrians: +# # pedestrian states from perception +# pos_p = ped.state.pose # both vectors are x, y in vehicle frame +# v_p = ped.state.v +# pos_start2ego = R_start2ego @ pos_p + t_start2ego +# v_start2ego = v_p @ pos_p + t_start2ego +# yielding, distance = yield_in_ego_frame(pos_start2ego,v_start2ego, dist_lookahead, buffer) +# if yielding == True and distance is not None: +# if distance < dist_min: +# dist_min = distance + + diff --git a/homework/test_longitudinal_plan.py b/homework/test_longitudinal_plan.py index 400f34fea..b81f0e329 100644 --- a/homework/test_longitudinal_plan.py +++ b/homework/test_longitudinal_plan.py @@ -35,7 +35,6 @@ def test_longitudinal_planning(): plt.ylabel('position') plt.show() - test_traj = longitudinal_plan(test_path, 1.0, 2.0, 3.0, 2.0) assert (t1 < t2 for (t1,t2) in zip(test_traj.times[:-1],test_traj.times[1:]) ) plt.plot(test_traj.times,[p[0] for p in test_traj.points]) diff --git a/launch/pedestrian_detection.yaml b/launch/pedestrian_detection.yaml new file mode 100644 index 000000000..2aeeff364 --- /dev/null +++ b/launch/pedestrian_detection.yaml @@ -0,0 +1,99 @@ +description: "Run the yielding trajectory planner on the real vehicle with real perception" +mode: hardware +vehicle_interface: gem_hardware.GEMHardwareInterface +mission_execution: StandardExecutor + +# Recovery behavior after a component failure +recovery: + planning: + trajectory_tracking : recovery.StopTrajectoryTracker + +# Driving behavior for the GEM vehicle. Runs real pedestrian perception, yield planner, but does not send commands to real vehicle. +drive: + perception: + state_estimation : GNSSStateEstimator + agent_detection : pedestrian_detection.PedestrianDetector2D + perception_normalization : StandardPerceptionNormalizer + planning: + relations_estimation: pedestrian_yield_logic.PedestrianYielder + route_planning: + type: StaticRoutePlanner + args: [!relative_path '../GEMstack/knowledge/routes/forward_15m.csv','start'] + motion_planning: longitudinal_planning.YieldTrajectoryPlanner + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + print: False + + +log: + # Specify the top-level folder to save the log files. Default is 'logs' + #folder : 'logs' + # If prefix is specified, then the log folder will be named with the prefix followed by the date and time. Default no prefix + #prefix : 'fixed_route_' + # If suffix is specified, then logs will output to folder/prefix+suffix. Default uses date and time as the suffix + #suffix : 'test3' + # Specify which ros topics to record to vehicle.bag. Default records nothing. This records the "standard" ROS topics. + ros_topics : + # Specify options to pass to rosbag record. Default is no options. + #rosbag_options : '--split --size=1024' + # If True, then record all readings / commands of the vehicle interface. Default False + vehicle_interface : True + # Specify which components to record to behavior.json. Default records nothing + components : ['state_estimation','agent_detection','motion_planning'] + # Specify which components of state to record to state.json. Default records nothing + #state: ['all'] + # Specify the rate in Hz at which to record state to state.json. Default records at the pipeline's rate + #state_rate: 10 +replay: # Add items here to set certain topics / inputs to be replayed from logs + # Specify which log folder to replay from + log: + ros_topics : [] + components : [] + +#usually can keep this constant +computation_graph: !include "../GEMstack/knowledge/defaults/computation_graph.yaml" + +variants: + detector_only: + run: + description: "Run the pedestrian detection code" + drive: + planning: + trajectory_tracking: + real_sim: + run: + description: "Run the pedestrian detection code with real detection and fake simulation" + mode: hardware + vehicle_interface: + type: gem_mixed.GEMRealSensorsWithSimMotionInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + mission_execution: StandardExecutor + require_engaged: False + visualization: !include "klampt_visualization.yaml" + drive: + perception: + state_estimation : OmniscientStateEstimator + + + fake_real: + run: + description: "Run the yielding trajectory planner on the real vehicle with faked perception" + drive: + perception: + agent_detection : pedestrian_detection.FakePedestrianDetector2D + + fake_sim: + run: + description: "Run the yielding trajectory planner in simulation with faked perception" + mode: simulation + vehicle_interface: + type: gem_simulator.GEMDoubleIntegratorSimulationInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + visualization: !include "klampt_visualization.yaml" + drive: + perception: +# agent_detection : pedestrian_detection.FakePedestrianDetector2D + agent_detection : OmniscientAgentDetector #this option reads agents from the simulator + state_estimation : OmniscientStateEstimator From 4c3df3d60d0b836a80bf468f95a7fe92f8226267 Mon Sep 17 00:00:00 2001 From: derekzyn Date: Mon, 17 Feb 2025 06:07:47 -0600 Subject: [PATCH 03/11] Update minimum distance calculation --- .../planning/pedestrian_yield_logic.py | 239 +++++++++++++----- 1 file changed, 182 insertions(+), 57 deletions(-) diff --git a/GEMstack/onboard/planning/pedestrian_yield_logic.py b/GEMstack/onboard/planning/pedestrian_yield_logic.py index c533ebf94..d2781ca69 100644 --- a/GEMstack/onboard/planning/pedestrian_yield_logic.py +++ b/GEMstack/onboard/planning/pedestrian_yield_logic.py @@ -33,10 +33,10 @@ def update(self, agents: Dict[str, AgentState]) -> List[EntityRelation]: a.to_frame(ObjectFrameEnum.CURRENT) # to_frame() is missing information to work with # TODO: check how to convert to vehicle frame. May perception update agent state? if a.type == AgentEnum.PEDESTRIAN: - is_collision, t_min, min_dist = check_collision_in_vehicle_frame(a) + check = check_collision_in_vehicle_frame(a) if DEBUG: print(f"[DEBUG] is_collision:{is_collision}, distance to buffer area:", shortest_distance_to_buffer_in_vehicle_frame((a.pose.x, a.pose.y), (2.53 / 2 + 3, 1.35 / 2 + 1))) - if is_collision: + if check in ['yield', 'stop']: # relation: ego-vehicle yields to pedestrian res.append(EntityRelation(type=EntityRelationEnum.YIELDING, obj1='', obj2=n)) @@ -71,71 +71,196 @@ def check_collision_in_vehicle_frame(agent_state: AgentState): time_scale = 100 # TODO: adjust if necessary xp, yp = agent_state.pose.x, agent_state.pose.y vx, vy = agent_state.velocity[:2] - # use optimization method to find minimum distance and the time at that point - t_min, min_dist = find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale) + t_min, min_dist, pt_min = find_min_distance_and_time(xp, yp, vx, vy, buffer) # if the minimum distance between the position and the buffer area is less than 0, than a collision is expected - is_collision = False + check = None if min_dist <= 0: - is_collision = True - return is_collision, t_min, min_dist + if t_min <= 3: + check = 'stop' + else: + check = 'yield' + else: + check = 'run' + return check + +def find_min_distance_and_time(xp, yp, vx, vy, buffer): + # path function: Ax + By + C = vy * x - vx * y + (yp * vx - xp * vy) = 0 + x_buff, y_buff = buffer + vx = vx if vx != 0 else 1e-6 + vy = vy if vy != 0 else 1e-6 + A = vy + B = -vx + C = yp * vx - xp * vy -def find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale): - def pos_t(t): - xt, yt = xp + vx * t, yp + vy * t - return shortest_distance_to_buffer_in_vehicle_frame((xt, yt), buffer) + def point_to_line(x0, y0, A, B, C): + # calculate the shortest distance from a point (x0, y0) to the line Ax + By + C = 0 """ + numerator = abs(A * x0 + B * y0 + C) + denominator = np.sqrt(A ** 2 + B ** 2) + x_foot = x0 - (A * (A * x0 + B * y0 + C)) / denominator + y_foot = y0 - (B * (A * x0 + B * y0 + C)) / denominator + dist = numerator / denominator if denominator != 0 else np.inf + return dist, (x_foot, y_foot) - res = minimize_scalar(pos_t, bounds=(0, time_scale)) - if res.success: - t_min = res.x - min_dist = res.fun - return t_min, min_dist + def point_dist(x1, y1, x2, y2): + return np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) + + """ Compute intersections at the front, left and right edge """ + if xp >= x_buff: + # front edge intersection: x = x_buff = xt = xp + vx * t_f + t_f = (x_buff - xp) / vx + yt = yp + vy * t_f + if t_f < 0: # object moving away with higher speed than vehicle, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if -y_buff <= yp <= y_buff: + min_dist = xp - x_buff # the distance to the front edge + elif yt > y_buff: + min_dist = point_dist(xp, yp, x_buff, y_buff) # the distance to front left corner + else: + min_dist = point_dist(xp, yp, x_buff, -y_buff) # the distance to front right corner + else: + if -y_buff <= yt <= y_buff: # intersect at front edge, is collision + t_min = t_f + min_dist = 0 + pt_min = x_buff, yt + elif yt > y_buff: # intersect at front left + if yp <= yt: + min_dist, pt_min = point_to_line(x_buff, y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_l = (y_buff - yp) / vy + xt = xp + vx * t_l + if -x_buff <= xt <= x_buff: # intersect at left edge, is collision + t_min = t_l + min_dist = 0 + pt_min = xt, y_buff + else: + min_dist, pt_min = point_to_line(-x_buff, y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # intersect at front right + if yp >= yt: + min_dist, pt_min = point_to_line(x_buff, -y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_r = (-y_buff - yp) / vy + xt = xp + vx * t_r + if -x_buff <= xt <= x_buff: # intersect at left edge, is collision + t_min = t_r + min_dist = 0 + pt_min = xt, -y_buff + else: + min_dist, pt_min = point_to_line(-x_buff, -y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx else: - return None, None + if yp >= y_buff: + # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_l = (y_buff - yp) / vy + xt = xp + vx * t_l + if t_l < 0: # object moving away, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if -x_buff <= xp <= x_buff: + min_dist = yp - y_buff # the distance to the left edge + else: + min_dist = point_dist(xp, yp, -x_buff, y_buff) # the distance to rear right corner + else: + if -x_buff <= xt <= x_buff: # intersect at left edge, is collision + t_min = t_l + min_dist = 0 + pt_min = xt, y_buff + elif xt > x_buff: + min_dist, pt_min = point_to_line(x_buff, y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + min_dist, pt_min = point_to_line(-x_buff, y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + elif yp <= -y_buff: + # right edge interaction: y = -y_buff = yt = yp + vy * t_l + t_r = (-y_buff - yp) / vy + xt = xp + vx * t_r + if t_r < 0: # object moving away, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if -x_buff <= xp <= x_buff: + min_dist = -yp - y_buff # the distance to the right edge + else: + min_dist = point_dist(xp, yp, -x_buff, y_buff) # the distance to rear right corner + else: + if -x_buff <= xt <= x_buff: # intersect at left edge, is collision + t_min = t_r + min_dist = 0 + pt_min = xt, -y_buff + elif xt > x_buff: + min_dist, pt_min = point_to_line(x_buff, -y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + min_dist, pt_min = point_to_line(-x_buff, -y_buff, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # rear position, should not be seen by the front camera + t_min = None + min_dist = None + pt_min = None + return t_min, min_dist, pt_min -# Calculate the shortest distance from an object to vehicle buffer area in vehicle frame -def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): - """ - Calculate the distance between a pedestrian's position and the vehicle with buffer - """ - x_buff, y_buff = buffer - # consider 2D geometry - x_p, y_p = position - # initiate distance and collision - dist = 0 - collision = False - # calculate distance - # front - if -y_buff <= y_p <= y_buff and x_p > x_buff: - dist = x_p - x_buff - # left front - elif y_p > y_buff and x_p > x_buff: - dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) - # right front - elif y_p < -y_buff and x_p > x_buff: - dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) - # left - elif y_p > y_buff and -x_buff <= x_p <= x_buff: - dist = y_p - y_buff - # right - elif y_p < -y_buff and -x_buff <= x_p <= x_buff: - dist = y_p + y_buff - # rear - elif -y_buff <= y_p <= y_buff and x_p < -x_buff: - dist = x_p + x_buff - # left rear - elif y_p > y_buff and x_p < -x_buff: - dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) - # right rear - elif y_p < -y_buff and x_p < -x_buff: - dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) - # intersect - else: - dist = 0 +# def find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale): +# def pos_t(t): +# xt, yt = xp + vx * t, yp + vy * t +# return shortest_distance_to_buffer_in_vehicle_frame((xt, yt), buffer) +# +# res = minimize_scalar(pos_t, bounds=(0, time_scale)) +# if res.success: +# t_min = res.x +# min_dist = res.fun +# return t_min, min_dist +# else: +# return None, None + - return dist +# # Calculate the shortest distance from an object to vehicle buffer area in vehicle frame +# def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): +# """ +# Calculate the distance between a pedestrian's position and the vehicle with buffer +# """ +# x_buff, y_buff = buffer +# # consider 2D geometry +# x_p, y_p = position +# # initiate distance and collision +# dist = 0 +# collision = False +# +# # calculate distance +# # front +# if -y_buff <= y_p <= y_buff and x_p > x_buff: +# dist = x_p - x_buff +# # left front +# elif y_p > y_buff and x_p > x_buff: +# dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) +# # right front +# elif y_p < -y_buff and x_p > x_buff: +# dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) +# # left +# elif y_p > y_buff and -x_buff <= x_p <= x_buff: +# dist = y_p - y_buff +# # right +# elif y_p < -y_buff and -x_buff <= x_p <= x_buff: +# dist = y_p + y_buff +# # rear +# elif -y_buff <= y_p <= y_buff and x_p < -x_buff: +# dist = x_p + x_buff +# # left rear +# elif y_p > y_buff and x_p < -x_buff: +# dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) +# # right rear +# elif y_p < -y_buff and x_p < -x_buff: +# dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) +# # intersect +# else: +# dist = 0 +# +# return dist """ Planning in start frame with waypoints """ From 887aefe19cc094ad0d0a2e3c35bdeaad96d77879 Mon Sep 17 00:00:00 2001 From: Brijesh Date: Tue, 18 Feb 2025 04:05:50 -0600 Subject: [PATCH 04/11] Updated longitudinal Planner --- .../onboard/planning/longitudinal_planning.py | 367 +++++++++++++++--- 1 file changed, 323 insertions(+), 44 deletions(-) diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index e862239fe..daecd909d 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -1,40 +1,276 @@ -from typing import List +# File: GEMstack/onboard/planning/longitudinal_planning.py +from typing import List, Tuple +import math from ..component import Component from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum from ...utils import serialization -from ...mathutils.transforms import vector_madd +from ...mathutils import transforms +import numpy as np +DEBUG = False # Set to False to disable debug output -def longitudinal_plan(path : Path, acceleration : float, deceleration : float, max_speed : float, current_speed : float) -> Trajectory: - """Generates a longitudinal trajectory for a path with a - trapezoidal velocity profile. +def generate_dense_points(points: List[Tuple[float, float]], density: int = 10) -> List[Tuple[float, float]]: + if not points: + return [] + if len(points) == 1: + return points.copy() + + dense_points = [points[0]] - 1. accelerates from current speed toward max speed - 2. travel along max speed - 3. if at any point you can't brake before hitting the end of the path, - decelerate with accel = -deceleration until velocity goes to 0. - """ - path_normalized = path.arc_length_parameterize() - #TODO: actually do something to points and times - points = [p for p in path_normalized.points] - times = [t for t in path_normalized.times] - trajectory = Trajectory(path.frame,points,times) - return trajectory + for i in range(len(points) - 1): + p0 = points[i] + p1 = points[i + 1] + dx = p1[0] - p0[0] + dy = p1[1] - p0[1] + seg_length = math.hypot(dx, dy) + + n_interp = int(round(seg_length * density)) + + for j in range(1, n_interp + 1): + fraction = j / (n_interp + 1) + x_interp = p0[0] + fraction * dx + y_interp = p0[1] + fraction * dy + dense_points.append((x_interp, y_interp)) + + dense_points.append(p1) + + return dense_points + +def compute_cumulative_distances(points: List[List[float]]) -> List[float]: + s_vals = [0.0] + for i in range(1, len(points)): + dx = points[i][0]- points[i-1][0] + dy = points[i][1] -points[i-1][1] + ds = math.hypot(dx, dy) + s_vals.append(s_vals[-1] + ds) + + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + + return s_vals -def longitudinal_brake(path : Path, deceleration : float, current_speed : float) -> Trajectory: - """Generates a longitudinal trajectory for braking along a path.""" +def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed): path_normalized = path.arc_length_parameterize() - #TODO: actually do something to points and times - points = [p for p in path_normalized.points] - times = [t for t in path_normalized.times] - trajectory = Trajectory(path.frame,points,times) - return trajectory + points = list(path_normalized.points) + dense_points = generate_dense_points(points) + s_vals = compute_cumulative_distances(dense_points) + L = s_vals[-1] # Total path length + stopping_distance = (current_speed**2) / (2 * deceleration) + + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + print("[DEBUG] longitudinal_plan: Total path length L =", L) + print("[DEBUG] longitudinal_plan: Braking distance needed =", stopping_distance) + + + if stopping_distance > L: #Case where there is not enough stopping distance to stop before path ends (calls emergency brake) + return longitudinal_brake(path, deceleration, current_speed) + + if current_speed > max_speed: #Case where car is exceeding the max speed so we need to slow down (do initial slowdown) + if DEBUG: + print(f"[DEBUG] Handling case where current_speed ({current_speed:.2f}) > max_speed ({max_speed:.2f})") + + # Initial deceleration phase to reach max_speed + initial_decel_distance = (current_speed**2 - max_speed**2) / (2 * deceleration) + initial_decel_time = (current_speed - max_speed) / deceleration + remaining_distance = L - initial_decel_distance + + if DEBUG: + print(f"[DEBUG] Phase 1 - Initial Decel: distance = {initial_decel_distance:.2f}, time = {initial_decel_time:.2f}") + print(f"[DEBUG] Remaining distance after reaching max_speed: {remaining_distance:.2f}") + + # Calculate final deceleration distance needed to stop from max_speed + final_decel_distance = (max_speed**2) / (2 * deceleration) + cruise_distance = remaining_distance - final_decel_distance + + if DEBUG: + print(f"[DEBUG] Phase 2 - Cruise: distance = {cruise_distance:.2f}") + print(f"[DEBUG] Phase 3 - Final Decel: distance = {final_decel_distance:.2f}") + + times = [] + for s in s_vals: + if s <= initial_decel_distance: # Phase 1: Initial deceleration to max_speed + v = math.sqrt(current_speed**2 - 2 * deceleration * s) + t = (current_speed - v) / deceleration + if DEBUG: # Print every 10m + print(f"[DEBUG] Initial Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") + + elif s <= initial_decel_distance + cruise_distance: # Phase 2: Cruise at max_speed + s_in_cruise = s - initial_decel_distance + t = initial_decel_time + s_in_cruise / max_speed + if DEBUG: # Print every 10m + print(f"[DEBUG] Cruise: s = {s:.2f}, v = {max_speed:.2f}, t = {t:.2f}") + + else: # Phase 3: Final deceleration to stop + s_in_final_decel = s - (initial_decel_distance + cruise_distance) + v = math.sqrt(max(max_speed**2 - 2 * deceleration * s_in_final_decel, 0.0)) + t = initial_decel_time + cruise_distance/max_speed + (max_speed - v) / deceleration + if DEBUG: # Print every 10m + print(f"[DEBUG] Final Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") + + times.append(t) + + if DEBUG: + print("[DEBUG] Trajectory complete: Three phases executed") + print(f"[DEBUG] Total time: {times[-1]:.2f}") + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + if acceleration <= 0: + if DEBUG: + print(f"[DEBUG] No acceleration allowed. Current speed: {current_speed:.2f}") + + # Pure deceleration phase + s_decel = (current_speed ** 2) / (2 * deceleration) + T_decel = current_speed / deceleration + + if DEBUG: + print(f"[DEBUG] Will maintain speed until s_decel: {s_decel:.2f}") + print(f"[DEBUG] Total deceleration time will be: {T_decel:.2f}") + + times = [] + for s in s_vals: + if s <= L - s_decel: # Maintain current speed until deceleration point + t_point = s / current_speed + if DEBUG: + print(f"[DEBUG] Constant Speed Phase: s = {s:.2f}, v = {current_speed:.2f}, t = {t_point:.2f}") + else: # Deceleration phase + s_in_decel = s - (L - s_decel) + v = math.sqrt(max(current_speed ** 2 - 2 * deceleration * s_in_decel, 0.0)) + t_point = (L - s_decel)/current_speed + (current_speed - v) / deceleration + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + + times.append(t_point) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + + # Determine max possible peak speed given distance + v_peak_possible = math.sqrt((2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) + v_target = min(max_speed, v_peak_possible) + + if DEBUG: + print("[DEBUG] longitudinal_plan: v_peak_possible =", v_peak_possible, "v_target =", v_target) + + # Compute acceleration phase + s_accel = max(0.0, (v_target ** 2 - current_speed ** 2) / (2 * acceleration)) + t_accel = max(0.0, (v_target - current_speed) / acceleration) + + # Compute deceleration phase + s_decel = max(0.0, (v_target ** 2) / (2 * deceleration)) + t_decel = max(0.0, v_target / deceleration) + + # Compute cruise phase + s_cruise = max(0.0, L - s_accel - s_decel) + t_cruise = s_cruise / v_target if v_target > 0 else 0.0 + + if DEBUG: + print("[DEBUG] longitudinal_plan: s_accel =", s_accel, "t_accel =", t_accel) + print("[DEBUG] longitudinal_plan: s_decel =", s_decel, "t_decel =", t_decel) + print("[DEBUG] longitudinal_plan: s_cruise =", s_cruise, "t_cruise =", t_cruise) + times = [] + for s in s_vals: + if s <= s_accel: # Acceleration phase + v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) + t_point = (v - current_speed) / acceleration + + if DEBUG: + print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + + elif s <= s_accel + s_cruise: # Cruise phase + t_point = t_accel + (s - s_accel) / v_target + + if DEBUG: + print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") + + else: # Deceleration phase + s_decel_phase = s - s_accel - s_cruise + v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) + t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration + + if t_point < times[-1]: # Ensure time always increases + t_point = times[-1] + 0.01 # Small time correction step + + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") + + times.append(t_point) + + if DEBUG: + print("[DEBUG] longitudinal_plan: Final times =", times) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + +def longitudinal_brake(path: Path, deceleration: float, current_speed: float, emergency_decel: float = 8.0) -> Trajectory: + # Vehicle already stopped - maintain position + if current_speed <= 0: + print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) + return Trajectory( + frame=path.frame, + points=[path.points[0]] * len(path.points), + times=[float(i) for i in range(len(path.points))] + ) + + # Get total path length + path_length = sum( + np.linalg.norm(np.array(path.points[i+1]) - np.array(path.points[i])) + for i in range(len(path.points)-1) + ) + + # Calculate stopping distance with normal deceleration + T_stop_normal = current_speed / deceleration + s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) + + # Check if emergency braking is needed + if s_stop_normal > path_length: + if DEBUG: + print("[DEBUG] longitudinal_brake: Emergency braking needed!") + print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") + print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") + + # Calculate emergency braking parameters + T_stop = current_speed / emergency_decel + s_stop = current_speed * T_stop - 0.5 * emergency_decel * (T_stop ** 2) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") + print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") + + decel_to_use = emergency_decel + + else: + if DEBUG: + print("[DEBUG] longitudinal_brake: Normal braking sufficient") + T_stop = T_stop_normal + decel_to_use = deceleration + + # Generate time points (use more points for smoother trajectory) + num_points = max(len(path.points), 50) + times = np.linspace(0, T_stop, num_points) + + # Calculate distances at each time point using physics equation + distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) + + # Generate points along the path + points = [] + for d in distances: + if d <= path_length: + points.append(path.eval(d)) + else: + points.append(path.eval(d)) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Using deceleration of {decel_to_use:.2f} m/s²") + print(f"[DEBUG] longitudinal_brake: Final stopping time: {T_stop:.2f}s") + + return Trajectory(frame=path.frame, points=points, times=times.tolist()) class YieldTrajectoryPlanner(Component): - """Follows the given route. Brakes if you have to yield or - you are at the end of the route, otherwise accelerates to - the desired speed. + """Follows the given route. Brakes if the ego–vehicle must yield + (e.g. to a pedestrian) or if the end of the route is near; otherwise, + it accelerates (or cruises) toward a desired speed. """ def __init__(self): self.route_progress = None @@ -42,6 +278,7 @@ def __init__(self): self.acceleration = 0.5 self.desired_speed = 1.0 self.deceleration = 2.0 + self.emergency_brake = 8.0 def state_inputs(self): return ['all'] @@ -52,42 +289,84 @@ def state_outputs(self) -> List[str]: def rate(self): return 10.0 - def update(self, state : AllState): - vehicle = state.vehicle # type: VehicleState - route = state.route # type: Route + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route t = state.t + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: t =", t) + if self.t_last is None: self.t_last = t dt = t - self.t_last - + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: dt =", dt) + curr_x = vehicle.pose.x curr_y = vehicle.pose.y curr_v = vehicle.v + if DEBUG: + print(f"[DEBUG] YieldTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ") - #figure out where we are on the route + # Determine progress along the route. if self.route_progress is None: self.route_progress = 0.0 - closest_dist,closest_parameter = state.route.closest_point_local((curr_x,curr_y),[self.route_progress-5.0,self.route_progress+5.0]) + _, closest_parameter = route.closest_point_local( + [curr_x, curr_y], + (self.route_progress - 5.0, self.route_progress + 5.0) + ) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Closest parameter on route =", closest_parameter) self.route_progress = closest_parameter - #extract out a 10m segment of the route - route_with_lookahead = route.trim(closest_parameter,closest_parameter+10.0) + # Extract a 10 m segment of the route for planning lookahead. + route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Route Lookahead =", route_with_lookahead) + + print("[DEBUG] state", state.relations) + # Check whether any yield relations (e.g. due to pedestrians) require braking. + stay_braking = False + pointSet = set() + for i in range(len(route_with_lookahead.points)): + if tuple(route_with_lookahead.points[i]) in pointSet: + stay_braking = True + break + pointSet.add(tuple(route_with_lookahead.points[i])) - #parse the relations indicated - should_brake = False - for r in state.relations: - if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': - #yielding to something, brake - should_brake = True + should_brake = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: stay_braking =", stay_braking) + print("[DEBUG] YieldTrajectoryPlanner.update: should_brake =", should_brake) should_accelerate = (not should_brake and curr_v < self.desired_speed) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: should_accelerate =", should_accelerate) - #choose whether to accelerate, brake, or keep at current velocity - if should_accelerate: - traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v) + if stay_braking: + traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake (stay braking).") + elif should_accelerate: + traj = longitudinal_plan(route_with_lookahead, self.acceleration, + self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") elif should_brake: traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") else: + # Maintain current speed if not accelerating or braking. traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") - return traj + self.t_last = t + if DEBUG: + print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) + print("[DEBUG] YieldTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") + return traj \ No newline at end of file From 3c126f4db789f79e931fa4340e67027701985be1 Mon Sep 17 00:00:00 2001 From: Brijesh Date: Tue, 18 Feb 2025 04:50:44 -0600 Subject: [PATCH 05/11] Testing for 3 peds in simultion --- GEMstack/knowledge/routes/forward_15m.csv | 53 +++++++++++++++++++++-- launch/pedestrian_detection.yaml | 4 +- scenes/xyhead_demo.yaml | 13 +++++- 3 files changed, 63 insertions(+), 7 deletions(-) diff --git a/GEMstack/knowledge/routes/forward_15m.csv b/GEMstack/knowledge/routes/forward_15m.csv index 0d02ce2aa..77cabd376 100644 --- a/GEMstack/knowledge/routes/forward_15m.csv +++ b/GEMstack/knowledge/routes/forward_15m.csv @@ -1,7 +1,7 @@ -0.0,0,0 -1.0,0,0 -2.0,0,0 -3.0,0,0 +0,0,0 +1,0,0 +2,0,0 +3,0,0 4,0,0 5,0,0 6,0,0 @@ -14,3 +14,48 @@ 13,0,0 14,0,0 15,0,0 +16,0,0 +17,0,0 +18,0,0 +19,0,0 +20,0,0 +21,0,0 +22,0,0 +23,0,0 +24,0,0 +25,0,0 +26,0,0 +27,0,0 +28,0,0 +29,0,0 +30,0,0 +31,0,0 +32,0,0 +33,0,0 +34,0,0 +35,0,0 +36,0,0 +37,0,0 +38,0,0 +39,0,0 +40,0,0 +41,0,0 +42,0,0 +43,0,0 +44,0,0 +45,0,0 +46,0,0 +47,0,0 +48,0,0 +49,0,0 +50,0,0 +51,0,0 +52,0,0 +53,0,0 +54,0,0 +55,0,0 +56,0,0 +57,0,0 +58,0,0 +59,0,0 +60,0,0 \ No newline at end of file diff --git a/launch/pedestrian_detection.yaml b/launch/pedestrian_detection.yaml index 1a00fa797..98f49a9aa 100644 --- a/launch/pedestrian_detection.yaml +++ b/launch/pedestrian_detection.yaml @@ -94,6 +94,6 @@ variants: visualization: !include "klampt_visualization.yaml" drive: perception: - agent_detection : pedestrian_detection.FakePedestrianDetector2D - #agent_detection : OmniscientAgentDetector #this option reads agents from the simulator + # agent_detection : pedestrian_detection.FakePedestrianDetector2D + agent_detection : OmniscientAgentDetector #this option reads agents from the simulator state_estimation : OmniscientStateEstimator diff --git a/scenes/xyhead_demo.yaml b/scenes/xyhead_demo.yaml index c6d97477a..70e922461 100644 --- a/scenes/xyhead_demo.yaml +++ b/scenes/xyhead_demo.yaml @@ -6,4 +6,15 @@ agents: nominal_velocity: 0.5 target: [15.0,10.0] behavior: loop - \ No newline at end of file + ped2: + type: pedestrian + position: [30.0, 2.0] + nominal_velocity: 0.4 + target: [30.0,10.0] + behavior: loop + ped3: + type: pedestrian + position: [50.0, 2.0] + nominal_velocity: 0.5 + target: [50.0,10.0] + behavior: loop \ No newline at end of file From 6caa8ff205bc89ebcbd7d86c1194bb4d5e944917 Mon Sep 17 00:00:00 2001 From: derekzyn Date: Wed, 26 Feb 2025 21:33:13 -0600 Subject: [PATCH 06/11] modified for simulation with pedestrian detection --- .../onboard/perception/agent_detection.py | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/GEMstack/onboard/perception/agent_detection.py b/GEMstack/onboard/perception/agent_detection.py index 5d600f792..4b17d10f3 100644 --- a/GEMstack/onboard/perception/agent_detection.py +++ b/GEMstack/onboard/perception/agent_detection.py @@ -5,29 +5,52 @@ import threading import copy +import numpy as np + class OmniscientAgentDetector(Component): """Obtains agent detections from a simulator""" - def __init__(self,vehicle_interface : GEMInterface): + + def __init__(self, vehicle_interface: GEMInterface): self.vehicle_interface = vehicle_interface self.agents = {} self.lock = threading.Lock() + self.start_pose = None + def rate(self): return 4.0 - + def state_inputs(self): - return [] - + return ['vehicle'] + def state_outputs(self): return ['agents'] def initialize(self): - self.vehicle_interface.subscribe_sensor('agent_detector',self.agent_callback, AgentState) - - def agent_callback(self, name : str, agent : AgentState): + self.vehicle_interface.subscribe_sensor('agent_detector', self.agent_callback, AgentState) + + def agent_callback(self, name: str, agent: AgentState): with self.lock: self.agents[name] = agent - def update(self) -> Dict[str,AgentState]: + def update(self, vehicle : VehicleState) -> Dict[str, AgentState]: with self.lock: - return copy.deepcopy(self.agents) + res = {} + ped_num = 0 + + if self.start_pose is None: + self.start_pose = vehicle.pose + # print("\nVehicle start state self.start_pose:", self.start_pose) + # print("\nVehicle current state:", vehicle.pose, vehicle.v) + + for n, a in self.agents.items(): + # print("\nBefore to_frame: Agent:", n, a.pose, a.velocity) + a = a.to_frame(ObjectFrameEnum.START, current_pose = a.pose, start_pose_abs = self.start_pose) + # print("\nAfter to_frame START: Agent:", n, a.pose, a.velocity) + # print('==============', a.pose.frame==ObjectFrameEnum.START) + res[n] = a + if a.type == AgentEnum.PEDESTRIAN: + ped_num += 1 + if ped_num > 0: + print("\nDetected", ped_num, "pedestrians") + return res From a3569c884cd934579ef839101965f4e649e99678 Mon Sep 17 00:00:00 2001 From: derekzyn Date: Wed, 26 Feb 2025 21:33:42 -0600 Subject: [PATCH 07/11] Update longitudinal_planning.py --- .../onboard/planning/longitudinal_planning.py | 147 ++++++++++-------- 1 file changed, 83 insertions(+), 64 deletions(-) diff --git a/GEMstack/onboard/planning/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py index daecd909d..27538bf50 100644 --- a/GEMstack/onboard/planning/longitudinal_planning.py +++ b/GEMstack/onboard/planning/longitudinal_planning.py @@ -2,12 +2,15 @@ from typing import List, Tuple import math from ..component import Component -from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, ObjectFrameEnum +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \ + ObjectFrameEnum from ...utils import serialization from ...mathutils import transforms import numpy as np + DEBUG = False # Set to False to disable debug output + def generate_dense_points(points: List[Tuple[float, float]], density: int = 10) -> List[Tuple[float, float]]: if not points: return [] @@ -15,31 +18,32 @@ def generate_dense_points(points: List[Tuple[float, float]], density: int = 10) return points.copy() dense_points = [points[0]] - + for i in range(len(points) - 1): p0 = points[i] p1 = points[i + 1] dx = p1[0] - p0[0] dy = p1[1] - p0[1] seg_length = math.hypot(dx, dy) - + n_interp = int(round(seg_length * density)) - + for j in range(1, n_interp + 1): fraction = j / (n_interp + 1) x_interp = p0[0] + fraction * dx y_interp = p0[1] + fraction * dy dense_points.append((x_interp, y_interp)) - + dense_points.append(p1) - + return dense_points + def compute_cumulative_distances(points: List[List[float]]) -> List[float]: s_vals = [0.0] for i in range(1, len(points)): - dx = points[i][0]- points[i-1][0] - dy = points[i][1] -points[i-1][1] + dx = points[i][0] - points[i - 1][0] + dy = points[i][1] - points[i - 1][1] ds = math.hypot(dx, dy) s_vals.append(s_vals[-1] + ds) @@ -55,59 +59,59 @@ def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed dense_points = generate_dense_points(points) s_vals = compute_cumulative_distances(dense_points) L = s_vals[-1] # Total path length - stopping_distance = (current_speed**2) / (2 * deceleration) + stopping_distance = (current_speed ** 2) / (2 * deceleration) if DEBUG: print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) print("[DEBUG] longitudinal_plan: Total path length L =", L) print("[DEBUG] longitudinal_plan: Braking distance needed =", stopping_distance) - - - if stopping_distance > L: #Case where there is not enough stopping distance to stop before path ends (calls emergency brake) + + if stopping_distance > L: # Case where there is not enough stopping distance to stop before path ends (calls emergency brake) return longitudinal_brake(path, deceleration, current_speed) - if current_speed > max_speed: #Case where car is exceeding the max speed so we need to slow down (do initial slowdown) + if current_speed > max_speed: # Case where car is exceeding the max speed so we need to slow down (do initial slowdown) if DEBUG: print(f"[DEBUG] Handling case where current_speed ({current_speed:.2f}) > max_speed ({max_speed:.2f})") - + # Initial deceleration phase to reach max_speed - initial_decel_distance = (current_speed**2 - max_speed**2) / (2 * deceleration) + initial_decel_distance = (current_speed ** 2 - max_speed ** 2) / (2 * deceleration) initial_decel_time = (current_speed - max_speed) / deceleration remaining_distance = L - initial_decel_distance - + if DEBUG: - print(f"[DEBUG] Phase 1 - Initial Decel: distance = {initial_decel_distance:.2f}, time = {initial_decel_time:.2f}") + print( + f"[DEBUG] Phase 1 - Initial Decel: distance = {initial_decel_distance:.2f}, time = {initial_decel_time:.2f}") print(f"[DEBUG] Remaining distance after reaching max_speed: {remaining_distance:.2f}") # Calculate final deceleration distance needed to stop from max_speed - final_decel_distance = (max_speed**2) / (2 * deceleration) + final_decel_distance = (max_speed ** 2) / (2 * deceleration) cruise_distance = remaining_distance - final_decel_distance - + if DEBUG: print(f"[DEBUG] Phase 2 - Cruise: distance = {cruise_distance:.2f}") print(f"[DEBUG] Phase 3 - Final Decel: distance = {final_decel_distance:.2f}") - + times = [] for s in s_vals: - if s <= initial_decel_distance: # Phase 1: Initial deceleration to max_speed - v = math.sqrt(current_speed**2 - 2 * deceleration * s) + if s <= initial_decel_distance: # Phase 1: Initial deceleration to max_speed + v = math.sqrt(current_speed ** 2 - 2 * deceleration * s) t = (current_speed - v) / deceleration if DEBUG: # Print every 10m print(f"[DEBUG] Initial Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") - - elif s <= initial_decel_distance + cruise_distance: # Phase 2: Cruise at max_speed + + elif s <= initial_decel_distance + cruise_distance: # Phase 2: Cruise at max_speed s_in_cruise = s - initial_decel_distance t = initial_decel_time + s_in_cruise / max_speed if DEBUG: # Print every 10m print(f"[DEBUG] Cruise: s = {s:.2f}, v = {max_speed:.2f}, t = {t:.2f}") - - else: # Phase 3: Final deceleration to stop + + else: # Phase 3: Final deceleration to stop s_in_final_decel = s - (initial_decel_distance + cruise_distance) - v = math.sqrt(max(max_speed**2 - 2 * deceleration * s_in_final_decel, 0.0)) - t = initial_decel_time + cruise_distance/max_speed + (max_speed - v) / deceleration + v = math.sqrt(max(max_speed ** 2 - 2 * deceleration * s_in_final_decel, 0.0)) + t = initial_decel_time + cruise_distance / max_speed + (max_speed - v) / deceleration if DEBUG: # Print every 10m print(f"[DEBUG] Final Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") - + times.append(t) if DEBUG: @@ -119,35 +123,35 @@ def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed if acceleration <= 0: if DEBUG: print(f"[DEBUG] No acceleration allowed. Current speed: {current_speed:.2f}") - + # Pure deceleration phase s_decel = (current_speed ** 2) / (2 * deceleration) T_decel = current_speed / deceleration - + if DEBUG: print(f"[DEBUG] Will maintain speed until s_decel: {s_decel:.2f}") print(f"[DEBUG] Total deceleration time will be: {T_decel:.2f}") - + times = [] for s in s_vals: - if s <= L - s_decel: # Maintain current speed until deceleration point + if s <= L - s_decel: # Maintain current speed until deceleration point t_point = s / current_speed if DEBUG: print(f"[DEBUG] Constant Speed Phase: s = {s:.2f}, v = {current_speed:.2f}, t = {t_point:.2f}") - else: # Deceleration phase + else: # Deceleration phase s_in_decel = s - (L - s_decel) v = math.sqrt(max(current_speed ** 2 - 2 * deceleration * s_in_decel, 0.0)) - t_point = (L - s_decel)/current_speed + (current_speed - v) / deceleration + t_point = (L - s_decel) / current_speed + (current_speed - v) / deceleration if DEBUG: print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") - + times.append(t_point) return Trajectory(frame=path.frame, points=dense_points, times=times) - # Determine max possible peak speed given distance - v_peak_possible = math.sqrt((2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) + v_peak_possible = math.sqrt( + (2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) v_target = min(max_speed, v_peak_possible) if DEBUG: @@ -172,27 +176,27 @@ def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed times = [] for s in s_vals: - if s <= s_accel: # Acceleration phase + if s <= s_accel: # Acceleration phase v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) t_point = (v - current_speed) / acceleration if DEBUG: print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") - elif s <= s_accel + s_cruise: # Cruise phase + elif s <= s_accel + s_cruise: # Cruise phase t_point = t_accel + (s - s_accel) / v_target if DEBUG: print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") - else: # Deceleration phase + else: # Deceleration phase s_decel_phase = s - s_accel - s_cruise v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration if t_point < times[-1]: # Ensure time always increases t_point = times[-1] + 0.01 # Small time correction step - + if DEBUG: print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") @@ -203,7 +207,9 @@ def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed return Trajectory(frame=path.frame, points=dense_points, times=times) -def longitudinal_brake(path: Path, deceleration: float, current_speed: float, emergency_decel: float = 8.0) -> Trajectory: + +def longitudinal_brake(path: Path, deceleration: float, current_speed: float, + emergency_decel: float = 8.0) -> Trajectory: # Vehicle already stopped - maintain position if current_speed <= 0: print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) @@ -215,31 +221,31 @@ def longitudinal_brake(path: Path, deceleration: float, current_speed: float, em # Get total path length path_length = sum( - np.linalg.norm(np.array(path.points[i+1]) - np.array(path.points[i])) - for i in range(len(path.points)-1) + np.linalg.norm(np.array(path.points[i + 1]) - np.array(path.points[i])) + for i in range(len(path.points) - 1) ) # Calculate stopping distance with normal deceleration T_stop_normal = current_speed / deceleration s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) - + # Check if emergency braking is needed if s_stop_normal > path_length: if DEBUG: print("[DEBUG] longitudinal_brake: Emergency braking needed!") print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") - + # Calculate emergency braking parameters T_stop = current_speed / emergency_decel s_stop = current_speed * T_stop - 0.5 * emergency_decel * (T_stop ** 2) - + if DEBUG: print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") - + decel_to_use = emergency_decel - + else: if DEBUG: print("[DEBUG] longitudinal_brake: Normal braking sufficient") @@ -249,10 +255,10 @@ def longitudinal_brake(path: Path, deceleration: float, current_speed: float, em # Generate time points (use more points for smoother trajectory) num_points = max(len(path.points), 50) times = np.linspace(0, T_stop, num_points) - + # Calculate distances at each time point using physics equation distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) - + # Generate points along the path points = [] for d in distances: @@ -267,16 +273,18 @@ def longitudinal_brake(path: Path, deceleration: float, current_speed: float, em return Trajectory(frame=path.frame, points=points, times=times.tolist()) + class YieldTrajectoryPlanner(Component): """Follows the given route. Brakes if the ego–vehicle must yield (e.g. to a pedestrian) or if the end of the route is near; otherwise, it accelerates (or cruises) toward a desired speed. """ + def __init__(self): self.route_progress = None self.t_last = None - self.acceleration = 0.5 - self.desired_speed = 1.0 + self.acceleration = 5 + self.desired_speed = 2.0 self.deceleration = 2.0 self.emergency_brake = 8.0 @@ -291,7 +299,7 @@ def rate(self): def update(self, state: AllState): vehicle = state.vehicle # type: VehicleState - route = state.route # type: Route + route = state.route # type: Route t = state.t if DEBUG: @@ -336,37 +344,48 @@ def update(self, state: AllState): pointSet.add(tuple(route_with_lookahead.points[i])) should_brake = any( - r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == '' for r in state.relations ) + should_decelerate = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) if should_brake == False else False + + should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) + if DEBUG: print("[DEBUG] YieldTrajectoryPlanner.update: stay_braking =", stay_braking) print("[DEBUG] YieldTrajectoryPlanner.update: should_brake =", should_brake) - should_accelerate = (not should_brake and curr_v < self.desired_speed) - if DEBUG: print("[DEBUG] YieldTrajectoryPlanner.update: should_accelerate =", should_accelerate) + print("[DEBUG] YieldTrajectoryPlanner.update: should_decelerate =", should_decelerate) if stay_braking: traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0) if DEBUG: print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake (stay braking).") + elif should_brake: + traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") + elif should_decelerate: + traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") elif should_accelerate: traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, curr_v) if DEBUG: print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") - elif should_brake: - traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) - if DEBUG: - print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") else: # Maintain current speed if not accelerating or braking. traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) if DEBUG: - print("[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + print( + "[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") self.t_last = t if DEBUG: print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) print("[DEBUG] YieldTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") - return traj \ No newline at end of file + return traj From 005b49cfab9d69423931cbb2fd4b5b62f1a62d42 Mon Sep 17 00:00:00 2001 From: derekzyn Date: Wed, 26 Feb 2025 21:34:42 -0600 Subject: [PATCH 08/11] Update pedestrian_yield_logic.py --- .../planning/pedestrian_yield_logic.py | 426 +++++++++--------- 1 file changed, 215 insertions(+), 211 deletions(-) diff --git a/GEMstack/onboard/planning/pedestrian_yield_logic.py b/GEMstack/onboard/planning/pedestrian_yield_logic.py index 2b1b39544..647c863ef 100644 --- a/GEMstack/onboard/planning/pedestrian_yield_logic.py +++ b/GEMstack/onboard/planning/pedestrian_yield_logic.py @@ -1,18 +1,37 @@ -from ...state import AgentState, AgentEnum, EntityRelation, EntityRelationEnum, ObjectFrameEnum +from ...state import AgentState, AgentEnum, EntityRelation, EntityRelationEnum, ObjectFrameEnum, VehicleState from ..component import Component from typing import List, Dict import numpy as np -from scipy.optimize import minimize_scalar -##### Hardcoded for testing ##### -from ...state import ObjectPose -ped1_start_pose_abs = ObjectPose(frame=ObjectFrameEnum.ABSOLUTE_CARTESIAN, t=0, x=15, y=2) -ped2_start_pose_abs = ObjectPose(frame=ObjectFrameEnum.ABSOLUTE_CARTESIAN, t=0, x=15, y=2) # Not sure the transform ~! The exact pos is (30,2) -################################# -DEBUG = True # Set to False to disable debug output -# DEBUG = False +DEBUG = True # Set to False to disable debug output + +""" Vehicle Configuration """ +GEM_MODEL = 'e4' # e2 or e4 +buffer_x, buffer_y = 3, 1 +if GEM_MODEL == 'e2': + # e2 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e2.urdf + size_x, size_y = 2.53, 1.35 # measured from base_link.STL + lx_f2r = 0.88 + 0.87 # distance between front axle and rear axle, from gem_e2.urdf wheel_links + ly_axle = 0.6 * 2 # length of axle + l_rear_axle_to_front = 1.28 + 0.87 # measured from base_link.STL + l_rear_axle_to_rear = size_x - l_rear_axle_to_front +elif GEM_MODEL == 'e4': + # e4 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e4.urdf + size_x, size_y = 3.35, 1.35 # measured from base_link.STL + lx_f2r = 1.2746 + 1.2904 # distance between front axle and rear axle, from gem_e4.urdf wheel_links + ly_axle = 0.6545 * 2 # length of front and rear axles, from gem_e4.urdf wheel_links + l_rear_axle_to_front = 1.68 + 1.29 # measured from base_link.STL + l_rear_axle_to_rear = size_x - l_rear_axle_to_front +# add buffer to outer dimensions, defined by half of the x, y buffer area dimensions +buffer_front = l_rear_axle_to_front + buffer_x +buffer_rear = -(l_rear_axle_to_rear + buffer_x) +buffer_left = size_y / 2 + buffer_y +buffer_right = -(size_y / 2 + buffer_y) +# comfortable deceleration for lookahead time calculation +comfort_decel = 2.0 + class PedestrianYielder(Component): """Yields for all pedestrians in the scene. @@ -24,151 +43,219 @@ def rate(self): return None def state_inputs(self): - return ['agents'] + return ['agents', 'vehicle'] def state_outputs(self): return ['relations'] - def update(self, agents: Dict[str, AgentState]) -> List[EntityRelation]: + def update(self, agents: Dict[str, AgentState], vehicle: VehicleState) -> List[EntityRelation]: + if DEBUG: + print("PedestrianYielder vehicle pose:", vehicle.pose, vehicle.v) res = [] for n, a in agents.items(): if DEBUG: - print( - f"[DEBUG] PedestrianYielder.update: Agent:{n} frame:{a.pose.frame}, (x,y):{a.pose.x, a.pose.y}, velocity:{a.velocity}") + print(f"[DEBUG] PedestrianYielder.update: Agent:", a.pose, a.velocity) """ collision estimation based on agent states in vehicle frame """ - ##### Hardcoded for testing ##### - if n == 'ped1': - a = a.to_frame(ObjectFrameEnum.START, current_pose=a.pose, start_pose_abs=ped1_start_pose_abs) - elif n == 'ped2': - a = a.to_frame(ObjectFrameEnum.START, current_pose=a.pose, start_pose_abs=ped2_start_pose_abs) - ################################# - print(f"[SIM] PedestrianYielder.update: Agent:{n} frame:{a.pose.frame}, (x,y):{a.pose.x, a.pose.y}, velocity:{a.velocity}") - # TODO: check how to convert to vehicle frame. May perception update agent state? if a.type == AgentEnum.PEDESTRIAN: - check, t_min, min_dist = check_collision_in_vehicle_frame(a) + check, t_min, min_dist, pt_min = check_collision_in_vehicle_frame(a, vehicle) if DEBUG: - print(f"[DEBUG] is_collision:{check}, t_min:{t_min}, min_dist:{min_dist}, distance to buffer area:", shortest_distance_to_buffer_in_vehicle_frame((a.pose.x, a.pose.y), (2.53 / 2 + 3, 1.35 / 2 + 1))) - # if check == 'STOP': - # # relation: ego-vehicle yields to pedestrian - # res.append(EntityRelation(type=EntityRelationEnum.STOPPING_AT, obj1='', obj2=n)) - # elif check == 'YIELD': - # res.append(EntityRelation(type=EntityRelationEnum.YIELDING, obj1='', obj2=n)) - # else: - # continue - if check: + print( + f"[DEBUG] ID {n}, relation:{check}, minimum distance:{min_dist}, time to min_dist: {t_min}, point of min_dist:{pt_min}") + # collision may occur, slow down + if check == 'YIELD': res.append(EntityRelation(type=EntityRelationEnum.YIELDING, obj1='', obj2=n)) - return res - + # collision in a short time, emergency stop + elif check == 'STOP': + res.append(EntityRelation(type=EntityRelationEnum.STOPPING_AT, obj1='', obj2=n)) -# """ Vehicle Dimensions """ -# vehicle_model = "e2" # e2 or e4 -# # vehicle dimensions and buffer area -# if vehicle_model == 'e2': -# # e2 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e2.urdf -# size_x, size_y = 2.53, 1.35 # measured from base_link.STL -# lx_f2r = 0.88 + 0.87 # distance between front axle and rear axle, from gem_e2.urdf wheel_links -# ly_axle = 0.6 * 2 # length of axle -# l_rear_axle_to_front = 1.28 + 0.87 # measured from base_link.STL -# l_rear_axle_to_rear = size_y - l_rear_axle_to_front -# elif vehicle_model == 'e4': -# # e4 axle dimensions, refer to GEMstack/knowledge/vehicle/model/gem_e4.urdf -# size_x, size_y = 3.35, 1.35 # measured from base_link.STL -# lx_f2r = 1.2746 + 1.2904 # distance between front axle and rear axle, from gem_e4.urdf wheel_links -# ly_axle = 0.6545 * 2 # length of front and rear axles, from gem_e4.urdf wheel_links -# l_rear_axle_to_front = 1.68 + 1.29 # measured from base_link.STL -# l_rear_axle_to_rear = size_y - l_rear_axle_to_front -# # add buffer to outer dimensions, defined by half of the x, y buffer area dimensions -# buffer_x, buffer_y = 3, 1 -# buffer = size_x / 2 + buffer_x, size_y / 2 + buffer_y + return res """ Planning in vehicle frame without waypoints """ -def check_collision_in_vehicle_frame(agent_state: AgentState): - buffer = 2.53 / 2 + 3, 1.35 / 2 + 1 # e2 buffer area - time_scale = 100 # TODO: adjust if necessary - xp, yp = agent_state.pose.x, agent_state.pose.y - vx, vy = agent_state.velocity[:2] - - # print(f"[TEST] check_collision_in_vehicle_frame: xp:{xp}, yp:{yp}, vx:{vx}, vy:{vy}") +def check_collision_in_vehicle_frame(agent: AgentState, vehicle: VehicleState): + xp, yp = agent.pose.x, agent.pose.y + vx, vy = agent.velocity[:2] + xv = vehicle.pose.x + yv = vehicle.pose.y + yaw = vehicle.pose.yaw + vel = vehicle.v + # time to stop from current velocity with comfortable deceleration + t_look = vel / comfort_decel + # calculate relative pedestrian position and velocity in vehicle frame + if agent.pose.frame == ObjectFrameEnum.CURRENT: + # xp, yp, vx, vy are already in vehicle frame + pass + elif agent.pose.frame == ObjectFrameEnum.START: + # convert xp, yp, vx, vy to vehicle frame + R = np.array([[np.cos(yaw), np.sin(yaw)], [-np.sin(yaw), np.cos(yaw)]], dtype=np.float32) + dx, dy = xp - xv, yp - yv + dvx, dvy = vx - vel * np.cos(yaw), vy - vel * np.sin(yaw) + # pedestrian pose and velocity in vehicle frame + xp, yp = np.dot(R, np.array([dx, dy])) + vx, vy = np.dot(R, np.array([dvx, dvy])) - # use optimization method to find minimum distance and the time at that point - t_min, min_dist = find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale) + # If pedestrian already in buffer area + if buffer_rear <= xp <= buffer_front and buffer_right <= yp <= buffer_left: + return 'STOP', 0, 0, (xp, yp) + t_min, min_dist, pt_min = find_min_distance_and_time(xp, yp, vx, vy) # if the minimum distance between the position and the buffer area is less than 0, than a collision is expected - check = False - if min_dist <= 3: #TODO: adjust time threshold if necessary - check = True - # elif min_dist > 0 and t_min <= 5: #TODO: adjust time threshold if necessary - # check = 'YIELD' - # else: - # check = None - return check, t_min, min_dist - + if min_dist is not None: + if min_dist <= 0: + if t_min <= 0: + check = 'STOP' + elif t_min <= t_look: + check = 'YIELD' + else: + check = 'RUN' + else: + check = 'RUN' + else: + check = 'RUN' + return check, t_min, min_dist, pt_min -def find_min_distance_and_time(xp, yp, vx, vy, buffer, time_scale): - def pos_t(t): - xt, yt = xp + vx * t, yp + vy * t - # print(f"[TEST] post_t: xt:{xt}, yt:{yt}, t:{t}") - return shortest_distance_to_buffer_in_vehicle_frame((xt, yt), buffer) - res = minimize_scalar(pos_t, bounds=(0, time_scale)) - # print(f"[TEST] find_min_distance_and_time: res:{res}") - if res.success: - t_min = res.x - min_dist = res.fun - return t_min, min_dist - else: - return None, None +def find_min_distance_and_time(xp, yp, vx, vy): + # path function: Ax + By + C = vy * x - vx * y + (yp * vx - xp * vy) = 0 + vx = vx if vx != 0 else 1e-6 + vy = vy if vy != 0 else 1e-6 + A = vy + B = -vx + C = yp * vx - xp * vy + def point_to_line(x0, y0, A, B, C): + # calculate the shortest distance from a point (x0, y0) to the line Ax + By + C = 0 """ + numerator = abs(A * x0 + B * y0 + C) + denominator = np.sqrt(A ** 2 + B ** 2) + x_foot = x0 - (A * (A * x0 + B * y0 + C)) / denominator + y_foot = y0 - (B * (A * x0 + B * y0 + C)) / denominator + dist = numerator / denominator if denominator != 0 else np.inf + return dist, (x_foot, y_foot) -# Calculate the shortest distance from an object to vehicle buffer area in vehicle frame -def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): - """ - Calculate the distance between a pedestrian's position and the vehicle with buffer - """ - x_buff, y_buff = buffer - # consider 2D geometry - x_p, y_p = position - # initiate distance - dist = 0 + def point_dist(x1, y1, x2, y2): + return np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) - # calculate distance - # front - if -y_buff <= y_p <= y_buff and x_p > x_buff: - dist = x_p - x_buff - # left front - elif y_p > y_buff and x_p > x_buff: - dist = np.sqrt((x_p - x_buff) ** 2 + (y_p - y_buff) ** 2) - # right front - elif y_p < -y_buff and x_p > x_buff: - dist = np.sqrt((x_p - x_buff) ** 2 + (y_p + y_buff) ** 2) - # left - elif y_p > y_buff and -x_buff <= x_p <= x_buff: - dist = y_p - y_buff - # right - elif y_p < -y_buff and -x_buff <= x_p <= x_buff: - dist = abs(y_p) + abs(y_buff) ### Modified - # rear - # elif -y_buff <= y_p <= y_buff and x_p < -x_buff: - # dist = abs(x_p) + abs(x_buff) ### Modified - # # left rear - # elif y_p > y_buff and x_p < -x_buff: - # dist = np.sqrt((x_p + x_buff) ** 2 + (y_p - y_buff) ** 2) - # # right rear - # elif y_p < -y_buff and x_p < -x_buff: - # dist = np.sqrt((x_p + x_buff) ** 2 + (y_p + y_buff) ** 2) - # intersect + """ Compute intersections at the front, left and right edge """ + if xp >= buffer_front: + # front edge intersection: x = x_buff = xt = xp + vx * t_f + t_f = (buffer_front - xp) / vx + yt = yp + vy * t_f + if t_f < 0: # object moving away with higher speed than vehicle, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if buffer_right <= yp <= buffer_left: + min_dist = xp - buffer_front # the distance to the front edge + elif yt > buffer_left: + min_dist = point_dist(xp, yp, buffer_front, buffer_left) # the distance to front left corner + else: + min_dist = point_dist(xp, yp, buffer_front, buffer_right) # the distance to front right corner + else: + if buffer_right <= yt <= buffer_left: # intersect at front edge, is collision + t_min = t_f + min_dist = 0 + pt_min = buffer_front, yt + elif yt > buffer_left: # intersect at front left + if yp <= yt: + min_dist, pt_min = point_to_line(buffer_front, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_l = (buffer_left - yp) / vy + xt = xp + vx * t_l + if buffer_rear <= xt <= buffer_front: # intersect at left edge, is collision + t_min = t_l + min_dist = 0 + pt_min = xt, buffer_left + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # intersect at front right + if yp >= yt: + min_dist, pt_min = point_to_line(buffer_front, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: # right edge interaction: y = y_buff = yt = yp + vy * t_l + t_r = (buffer_right - yp) / vy + xt = xp + vx * t_r + if buffer_rear <= xt <= buffer_front: # intersect at right edge, is collision + t_min = t_r + min_dist = 0 + pt_min = xt, buffer_right + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx else: - dist = 100 ### Modified - # Note : Sometimes the minimize_scalar() is not returning correct value causing the car to keep stopping. - # So, Added a large value to make sure the car doesn't stop. + if yp >= buffer_left: + # left edge interaction: y = y_buff = yt = yp + vy * t_l + t_l = (buffer_left - yp) / vy + xt = xp + vx * t_l + if t_l < 0: # object moving away, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if buffer_rear <= xp <= buffer_front: + min_dist = yp - buffer_left # the distance to the left edge + else: + min_dist = point_dist(xp, yp, buffer_rear, buffer_left) # the distance to rear right corner + else: + if buffer_rear <= xt <= buffer_front: # intersect at left edge, is collision + t_min = t_l + min_dist = 0 + pt_min = xt, buffer_left + elif xt > buffer_front: + min_dist, pt_min = point_to_line(buffer_front, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_left, A, B, C) + t_min = (pt_min[0] - xp) / vx + elif yp <= buffer_right: + # right edge interaction: y = -y_buff = yt = yp + vy * t_l + t_r = (buffer_right - yp) / vy + xt = xp + vx * t_r + if t_r < 0: # object moving away, start point has minimum distance + t_min = 0 + pt_min = xp, yp + if buffer_rear<= xp <= buffer_front: + min_dist = -yp - buffer_right # the distance to the right edge + else: + min_dist = point_dist(xp, yp, buffer_rear, buffer_right) # the distance to rear right corner + else: + if buffer_rear <= xt <= buffer_front: # intersect at left edge, is collision + t_min = t_r + min_dist = 0 + pt_min = xt, buffer_right + elif xt > buffer_front: + min_dist, pt_min = point_to_line(buffer_front, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + else: + min_dist, pt_min = point_to_line(buffer_rear, buffer_right, A, B, C) + t_min = (pt_min[0] - xp) / vx + elif xp >= buffer_rear: + t_min = 0 + min_dist = -1 + pt_min = xp, yp + else: # rear position, should not be seen by the front camera + t_min = None + min_dist = None + pt_min = None + + return t_min, min_dist, pt_min + - # print(f"[TEST] Shortest Dist: position:{position}, buffer:{buffer}, distance:{dist}") - return dist +# def find_min_distance_and_time_by_optimizer(xp, yp, vx, vy, buffer, time_scale): +# from scipy.optimize import minimize_scalar +# def pos_t(t): +# xt, yt = xp + vx * t, yp + vy * t +# return shortest_distance_to_buffer_in_vehicle_frame((xt, yt), buffer) +# +# res = minimize_scalar(pos_t, bounds=(0, time_scale)) +# if res.success: +# t_min = res.x +# min_dist = res.fun +# return t_min, min_dist +# else: +# return None, None """ Planning in start frame with waypoints """ -# # TODO: how to get further unreached waypoints only +# # TODO: to get further unreached waypoints only # def get_waypoint_arc_and_new_yaw(curr_x, curr_y, curr_yaw, waypoint): # """ # Input: @@ -204,7 +291,7 @@ def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): # is_crossing: boolean, if the pedestrian is crossing the path # arc_dist_collision: distance from start_point to the closest pedestrian enter or exit point mapped to the path # """ -# # TODO: compute pedestrian path with yaw_rate? +# # TODO: consider pedestrian path with yaw_rate # xp, yp = pose.x, pose.y # vx, vy = velocity # xc, yc = center @@ -320,7 +407,6 @@ def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): # """ # All calculations are in start frame, origin reference: rear_axle_center (refer to GEMstack/knowledge/calibration) # """ -# # TODO: confirm origin reference with calibration team # # current states # vehicle = state.vehicle # curr_x = vehicle.pose.x @@ -329,11 +415,9 @@ def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): # curr_v = vehicle.v # # # determine lookahead distance using current velocity and a decent deceleration -# decel_decent = 2.0 # TODO: adjust for a decent deceleration if necessary -# t_brake = curr_v / decel_decent # time to brake down to zero +# t_brake = curr_v / comfort_decel # time to brake down to zero # dist_lookahead = curr_v * t_brake # -# # TODO: check if it works with straight line waypoints # distance = 0 # temp_x, temp_y, temp_yaw = curr_x, curr_y, curr_yaw # is_collision = False @@ -371,83 +455,3 @@ def shortest_distance_to_buffer_in_vehicle_frame(position, buffer): # break # # return is_collision, dist_collision - - -""" Assume the vehicle move straight forward """ -# def compute_intersect(A1, B1, C1, A2, B2, C2): -# """ -# calculate intersection point of two lines -# A1*x + B1*y + C1 = 0 and A2*x + B2*y + C2 = 0 -# """ -# if A1*B2 - A2*B1 == 0: -# if C1 == C2: -# return "conincide" -# else: -# return "parallel" -# else: -# x_inter = (B1*C2 - B2*C1) / (A1*B2 - A2*B1) -# y_inter = (C1*A2 - C2*A1) / (A1*B2 - A2*B1) -# return x_inter, y_inter -# -# def yield_in_ego_frame(pedestrian_position_ego, pedestrian_velocity_ego, dist_lookahead, buffer): -# """ -# Calculate the intersection of the pedestrian's path and vehicle path with buffer -# Yield if the intersection is in front of the vehicle and within lookahead distance -# """ -# x_buff, y_buff = buffer -# # consider 2D geometry -# x_p, y_p = pedestrian_position_ego[:2] -# v_x, v_y = pedestrian_velocity_ego[:2] -# # initiate yielding and distance -# yielding = False -# distance = None -# -# # line expression: A*x + B*y + C = 0 -# # pedestrian's path: v_y * x - v_x * y + (v_x * y_p - v_y * x_p) = 0 -# A_p, B_p, C_p = v_y, -v_x, v_x * y_p - v_y * x_p -# # buffer area: y = y_buff, y = - y_buff -# A_left, B_left, C_left = 0, 1, -y_buff -# A_right, B_right, C_right = 0, 1, y_buff -# -# # deal with parallel first -# if A_p*B_left - A_left*B_p == 0: -# if -y_buff <= y_p <= y_buff: -# yielding = True -# distance = x_p -# else: -# yielding = False -# distance = None -# # compute intersection -# else: -# left_inter_x, left_inter_y = compute_intersect(A_p, B_p, C_p, A_left, B_left, C_left) -# right_inter_x, right_inter_y = compute_intersect(A_p, B_p, C_p, A_right, B_right, C_right) -# if -x_buff <= left_inter_x <= dist_lookahead or -x_buff <= right_inter_x <= dist_lookahead: -# yielding = True -# else: -# yielding = False -# distance = min(left_inter_x, right_inter_x) -# -# return yielding, distance -# -# vehicle = state.vehicle -# curr_x = vehicle.pose.x -# curr_y = vehicle.pose.y -# curr_yaw = vehicle.pose.yaw -# curr_v = vehicle.v -# -# # 2D transformation from start frame to ego frame -# R_start2ego = np.array([[np.cos(curr_yaw),-np.sin(curr_yaw)],[np.sin(curr_yaw),np.cos(curr_yaw)]]) -# t_start2ego = np.array([curr_x, curr_y]) -# -# # check collision distance and break if going to collide -# dist_min = np.inf -# for ped in pedestrians: -# # pedestrian states from perception -# pos_p = ped.state.pose # both vectors are x, y in vehicle frame -# v_p = ped.state.v -# pos_start2ego = R_start2ego @ pos_p + t_start2ego -# v_start2ego = v_p @ pos_p + t_start2ego -# yielding, distance = yield_in_ego_frame(pos_start2ego,v_start2ego, dist_lookahead, buffer) -# if yielding == True and distance is not None: -# if distance < dist_min: -# dist_min = distance From 8807b62a0e0b0e7a8b4dfaf8a1f2b2861b2c24cb Mon Sep 17 00:00:00 2001 From: brijesh2709 <52804384+brijesh2709@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:42:03 -0600 Subject: [PATCH 09/11] Delete GEMstack/onboard/perception/pedestrian_detection.py --- .../perception/pedestrian_detection.py | 84 ------------------- 1 file changed, 84 deletions(-) delete mode 100644 GEMstack/onboard/perception/pedestrian_detection.py diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py deleted file mode 100644 index e01087efe..000000000 --- a/GEMstack/onboard/perception/pedestrian_detection.py +++ /dev/null @@ -1,84 +0,0 @@ -from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum -from ..interface.gem import GEMInterface -from ..component import Component -#from ultralytics import YOLO -#import cv2 -from typing import Dict - -def box_to_fake_agent(box): - """Creates a fake agent state from an (x,y,w,h) bounding box. - - The location and size are pretty much meaningless since this is just giving a 2D location. - """ - x,y,w,h = box - pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) - dims = (w,h,0) - return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) - - -class PedestrianDetector2D(Component): - """Detects pedestrians.""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface - #self.detector = YOLO('../../knowledge/detection/yolov8n.pt') - self.last_person_boxes = [] - - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def initialize(self): - #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat - #self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) - pass - - #def image_callback(self, image : cv2.Mat): - # detection_result = self.detector(image) - # self.last_person_boxes = [] - # #uncomment if you want to debug the detector... - # #for bb in self.last_person_boxes: - # # x,y,w,h = bb - # # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) - # #cv2.imwrite("pedestrian_detections.png",image) - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - res = {} - for i,b in enumerate(self.last_person_boxes): - x,y,w,h = b - res['pedestrian'+str(i)] = box_to_fake_agent(b) - if len(res) > 0: - print("Detected",len(res),"pedestrians") - return res - - -class FakePedestrianDetector2D(Component): - """Triggers a pedestrian detection at some random time ranges""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface - self.times = [(5.0,20.0),(30.0,35.0)] - self.t_start = None - - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - if self.t_start is None: - self.t_start = self.vehicle_interface.time() - t = self.vehicle_interface.time() - self.t_start - res = {} - for times in self.times: - if t >= times[0] and t <= times[1]: - res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) - print("Detected a pedestrian") - return res \ No newline at end of file From 8120d99f4f2720c9a9f95652234f255d825ed54a Mon Sep 17 00:00:00 2001 From: brijesh2709 <52804384+brijesh2709@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:43:42 -0600 Subject: [PATCH 10/11] Delete GEMstack/knowledge/routes/forward_15m.csv --- GEMstack/knowledge/routes/forward_15m.csv | 61 ----------------------- 1 file changed, 61 deletions(-) delete mode 100644 GEMstack/knowledge/routes/forward_15m.csv diff --git a/GEMstack/knowledge/routes/forward_15m.csv b/GEMstack/knowledge/routes/forward_15m.csv deleted file mode 100644 index 77cabd376..000000000 --- a/GEMstack/knowledge/routes/forward_15m.csv +++ /dev/null @@ -1,61 +0,0 @@ -0,0,0 -1,0,0 -2,0,0 -3,0,0 -4,0,0 -5,0,0 -6,0,0 -7,0,0 -8,0,0 -9,0,0 -10,0,0 -11,0,0 -12,0,0 -13,0,0 -14,0,0 -15,0,0 -16,0,0 -17,0,0 -18,0,0 -19,0,0 -20,0,0 -21,0,0 -22,0,0 -23,0,0 -24,0,0 -25,0,0 -26,0,0 -27,0,0 -28,0,0 -29,0,0 -30,0,0 -31,0,0 -32,0,0 -33,0,0 -34,0,0 -35,0,0 -36,0,0 -37,0,0 -38,0,0 -39,0,0 -40,0,0 -41,0,0 -42,0,0 -43,0,0 -44,0,0 -45,0,0 -46,0,0 -47,0,0 -48,0,0 -49,0,0 -50,0,0 -51,0,0 -52,0,0 -53,0,0 -54,0,0 -55,0,0 -56,0,0 -57,0,0 -58,0,0 -59,0,0 -60,0,0 \ No newline at end of file From c769c83a112c371a59c8676722ed9565613de441 Mon Sep 17 00:00:00 2001 From: brijesh2709 <52804384+brijesh2709@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:44:08 -0600 Subject: [PATCH 11/11] Delete homework/longitudinal_planning.py --- homework/longitudinal_planning.py | 99 ------------------------------- 1 file changed, 99 deletions(-) delete mode 100644 homework/longitudinal_planning.py diff --git a/homework/longitudinal_planning.py b/homework/longitudinal_planning.py deleted file mode 100644 index 873e3a95a..000000000 --- a/homework/longitudinal_planning.py +++ /dev/null @@ -1,99 +0,0 @@ -from typing import List -from ..component import Component -from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \ - ObjectFrameEnum -from ...utils import serialization -from ...mathutils.transforms import vector_madd - - -def longitudinal_plan(path: Path, acceleration: float, deceleration: float, max_speed: float, - current_speed: float) -> Trajectory: - """Generates a longitudinal trajectory for a path with a - trapezoidal velocity profile. - - 1. accelerates from current speed toward max speed - 2. travel along max speed - 3. if at any point you can't brake before hitting the end of the path, - decelerate with accel = -deceleration until velocity goes to 0. - """ - path_normalized = path.arc_length_parameterize() - # TODO: actually do something to points and times - points = [p for p in path_normalized.points] - times = [t for t in path_normalized.times] - trajectory = Trajectory(path.frame, points, times) - return trajectory - - -def longitudinal_brake(path: Path, deceleration: float, current_speed: float) -> Trajectory: - """Generates a longitudinal trajectory for braking along a path.""" - path_normalized = path.arc_length_parameterize() - # TODO: actually do something to points and times - points = [p for p in path_normalized.points] - times = [t for t in path_normalized.times] - trajectory = Trajectory(path.frame, points, times) - return trajectory - - -class YieldTrajectoryPlanner(Component): - """Follows the given route. Brakes if you have to yield or - you are at the end of the route, otherwise accelerates to - the desired speed. - """ - - def __init__(self): - self.route_progress = None - self.t_last = None - self.acceleration = 0.5 - self.desired_speed = 1.0 - self.deceleration = 2.0 - - def state_inputs(self): - return ['all'] - - def state_outputs(self) -> List[str]: - return ['trajectory'] - - def rate(self): - return 10.0 - - def update(self, state: AllState): - vehicle = state.vehicle # type: VehicleState - route = state.route # type: Route - t = state.t - - if self.t_last is None: - self.t_last = t - dt = t - self.t_last - - curr_x = vehicle.pose.x - curr_y = vehicle.pose.y - curr_v = vehicle.v - - # figure out where we are on the route - if self.route_progress is None: - self.route_progress = 0.0 - closest_dist, closest_parameter = state.route.closest_point_local((curr_x, curr_y), [self.route_progress - 5.0, - self.route_progress + 5.0]) - self.route_progress = closest_parameter - - # extract out a 10m segment of the route - route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) - - # parse the relations indicated - should_brake = False - for r in state.relations: - if r.type == EntityRelationEnum.YIELDING and r.obj1 == '': - # yielding to something, brake - should_brake = True - should_accelerate = (not should_brake and curr_v < self.desired_speed) - - # choose whether to accelerate, brake, or keep at current velocity - if should_accelerate: - traj = longitudinal_plan(route_with_lookahead, self.acceleration, self.deceleration, self.desired_speed, - curr_v) - elif should_brake: - traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) - else: - traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) - - return traj